你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
将数据从 Apache Kafka 引入到 Azure 数据资源管理器中
Apache Kafka 是一个分布式流式处理平台,用于构建在系统或应用程序之间可靠地移动数据的实时流数据管道。 Kafka Connect 是一个工具,用于在 Apache Kafka 和其他数据系统之间以可缩放且可靠的方式流式传输数据。 Kusto Kafka 接收器充当 Kafka 的连接器,不需要使用代码。 从 Git 存储库 或 Confluent 连接器中心下载接收器连接器 jar。
本文介绍如何使用 Kafka 引入数据,使用自包含 Docker 设置来简化 Kafka 群集和 Kafka 连接器群集设置。
先决条件
- Azure 订阅。 创建免费 Azure 帐户。
- Azure 数据资源管理器具有默认缓存和保留策略或Microsoft Fabric 中的 KQL 数据库的群集和数据库。
- Azure CLI。
- Docker 和 Docker Compose。
创建Microsoft Entra服务主体
可以通过Azure 门户或编程方式创建Microsoft Entra服务主体,如以下示例所示。
此服务主体将是连接器用于在 Kusto 中写入表数据的标识。 稍后将授予此服务主体访问 Kusto 资源的权限。
通过 Azure CLI 登录到 Azure 订阅。 然后在浏览器中进行身份验证。
az login
选择要托管主体的订阅。 当你有多个订阅时,此步骤是必需的。
az account set --subscription YOUR_SUBSCRIPTION_GUID
创建服务主体。 在此示例中,服务主体名为
my-service-principal
。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
从返回的 JSON 数据中
appId
,复制 、password
和tenant
供将来使用。{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
现已创建了 Microsoft Entra 应用程序和服务主体。
创建目标表
在查询环境中,使用以下命令创建名为 的
Storms
表:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
使用以下命令为引入的数据创建对应的表映射
Storms_CSV_Mapping
:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
针对可配置的排队引入延迟在表上创建引入 批处理策略 。
提示
引入批处理策略是一个性能优化器,包含三个参数。 满足第一个条件将触发到 Azure 数据资源管理器表的引入。
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
使用创建Microsoft Entra服务主体中的服务主体授予使用数据库的权限。
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
运行实验室
以下实验室旨在为你提供开始创建数据、设置 Kafka 连接器以及使用连接器将此数据流式传输到 Azure 数据资源管理器的体验。 然后,你可以查看引入的数据。
克隆 git 存储库
克隆实验室的 git 存储库。
在计算机上创建一个本地目录。
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
克隆存储库。
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
克隆的存储库的内容
运行以下命令以列出克隆的存储库的内容:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
此搜索的该结果是:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
查看克隆的存储库中的文件
以下各部分介绍了上述文件树中的文件的重要部分。
adx-sink-config.json
此文件包含 Kusto 接收器属性文件,你将在其中更新特定配置详细信息:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
根据你的 Azure 数据资源管理器设置替换以下属性的值:aad.auth.authority
、aad.auth.appid
、aad.auth.appkey
、kusto.tables.topics.mapping
(数据库名称)、kusto.ingestion.url
和 kusto.query.url
。
连接器 - Dockerfile
此文件包含用于为连接器实例生成 docker 映像的命令。 它包括 git 存储库版本目录中的连接器下载。
Storm-events-producer 目录
此目录包含一个用于读取本地“StormEvents.csv”文件并将数据发布到 Kafka 主题的 Go 程序。
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
启动容器
在终端启动容器:
docker-compose up
生成者应用程序将开始向
storm-events
主题发送事件。 你应当会看到类似于以下日志的日志:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
若要检查日志,请在单独的终端中运行以下命令:
docker-compose logs -f | grep kusto-connect
启动连接器
使用 Kafka Connect REST 调用来启动连接器。
在单独的终端中,使用以下命令启动接收器任务:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
若要检查状态,请在单独的终端中运行以下命令:
curl http://localhost:8083/connectors/storm/status
连接器将开始对到 Azure 数据资源管理器的引入进程排队。
注意
如果有日志连接器问题,请创建问题。
查询和查看数据
确认数据引入
等待数据到达
Storms
表。 若要确认数据的传输,请检查行计数:Storms | count
确认引入进程中没有失败:
.show ingestion failures
看到数据后,请尝试一些查询。
查询数据
若要查看所有记录,请运行以下查询:
Storms
使用
where
和project
来筛选特定数据:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
使用
summarize
运算符:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
有关更多查询示例和指南,请参阅在 KQL 中编写查询和Kusto 查询语言文档。
重置
若要进行重置,请执行以下步骤:
- 停止容器 (
docker-compose down -v
) - 删除 (
drop table Storms
) - 重新创建
Storms
表 - 重新创建表映射
- 重启容器 (
docker-compose up
)
清理资源
若要删除 Azure 数据资源管理器资源,请使用 az cluster delete 或 az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
优化 Kafka 接收器连接器
- 优化 Kafka 接收器
flush.size.bytes
大小限制,从 1 MB 开始,以 10 MB 或 100 MB 的增量增加。 - 使用 Kafka 接收器时,数据将聚合两次。 在连接器端上,数据根据刷新设置进行聚合,而在 Azure 数据资源管理器服务端上,数据根据批处理策略进行聚合。 如果批处理时间太短,连接器和服务都无法引入数据,则必须增加批处理时间。 将批处理大小设置为 1 GB,并根据需要以 100 MB 的增量增加或减少。 例如,如果刷新大小为 1 MB,批处理策略大小为 100 MB,则 Kafka 接收器连接器聚合 100 MB 批后,Azure 数据资源管理器 服务将引入 100 MB 的批处理。 如果批处理策略时间为 20 秒,并且 Kafka 接收器连接器在 20 秒期间刷新 50 MB,则服务将引入 50 MB 的批。
- 可通过添加实例和 Kafka 分区进行缩放。 将
tasks.max
增加到分区的数目。 如果有足够的数据生成 blob,则创建一个分区,其大小与flush.size.bytes
设置的大小相同。 如果 blob 较小,批处理会在达到时间限制时进行处理,因此分区不会收到足够的吞吐量。 大量的分区意味着会产生更多处理开销。