从 Kafka 获取数据
Apache Kafka 是一个分布式流式处理平台,可用于构建实时流式处理数据管道,在系统或应用程序之间可靠地移动数据。 Kafka Connect 是一个工具,用于在 Apache Kafka 和其他数据系统之间以可缩放且可靠的方式流式传输数据。 Kusto Kafka 接收器充当来自 Kafka 的连接器,并且不需要使用代码。 从 Git 存储库或 Confluent 连接器中心下载接收器连接器 jar。
本文演示了如何通过 Kafka 并使用自包含的 Docker 安装程序来引入数据,从而简化 Kafka 群集和 Kafka 连接器群集设置。
- Azure 订阅。 创建免费 Azure 帐户。
- Microsoft Fabric 中的 KQL 数据库。
- 要在配置 JSON 文件中使用的数据库引入 URI 和查询 URI。 有关详细信息,请参阅复制 URI。
- Azure CLI。
- Docker 和 Docker Compose。
Microsoft Entra 服务主体可以通过 Azure 门户或通过编程方式进行创建,如以下示例所示。
此服务主体是连接器用于将数据写入到 Kusto 中的表的标识。 授予此服务主体访问 Kusto 资源所需的权限。
通过 Azure CLI 登录到你的 Azure 订阅。 然后在浏览器中进行身份验证。
az login
选择要托管主体的订阅。 当你有多个订阅时,此步骤是必需的。
az account set --subscription YOUR_SUBSCRIPTION_GUID
创建服务主体。 在此示例中,服务主体名为
my-service-principal
。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
从返回的 JSON 数据中复制
appId
、password
、tenant
供将来使用。{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
现已创建了 Microsoft Entra 应用程序和服务主体。
使用以下命令从查询环境中创建一个名为
Storms
的表:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
使用以下命令为引入的数据创建对应的表映射
Storms_CSV_Mapping
:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
针对可配置的排队引入延迟在表中创建引入批处理策略。
提示
引入批处理策略是一个性能优化器,包含三个参数。 满足第一个条件将触发到表的引入。
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
使用创建 Microsoft Entra 服务主体中的服务主体来授予使用数据库的权限。
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
以下实验室旨在为你提供开始创建数据、设置 Kafka 连接器以及流式传输此数据的体验。 然后,你可以查看引入的数据。
克隆实验室的 git 存储库。
在计算机上创建一个本地目录。
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
克隆存储库。
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
运行以下命令以列出克隆的存储库的内容:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
此搜索的该结果是:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
以下各部分介绍了上述文件树中的文件的重要部分。
此文件包含 Kusto 接收器属性文件,你将在其中更新特定配置详细信息:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "<ingestion URI per prerequisites>",
"kusto.query.url": "<query URI per prerequisites>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
根据你的设置替换以下属性的值:aad.auth.authority
、aad.auth.appid
、aad.auth.appkey
、kusto.tables.topics.mapping
(数据库名称)、kusto.ingestion.url
和 kusto.query.url
。
此文件包含用于为连接器实例生成 docker 映像的命令。 它包括 git 存储库版本目录中的连接器下载。
此目录包含一个用于读取本地“StormEvents.csv”文件并将数据发布到 Kafka 主题的 Go 程序。
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
在终端启动容器:
docker-compose up
生成者应用程序开始向
storm-events
主题发送事件。 你应当会看到类似于以下日志的日志:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
若要检查日志,请在单独的终端中运行以下命令:
docker-compose logs -f | grep kusto-connect
使用 Kafka Connect REST 调用来启动连接器。
在单独的终端中,使用以下命令启动接收器任务:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
若要检查状态,请在单独的终端中运行以下命令:
curl http://localhost:8083/connectors/storm/status
连接器开始对引入进程进行列队。
备注
如果有日志连接器问题,请创建问题。
数据到达
Storms
表后,通过检查行计数来确认数据的传输:Storms | count
确认引入进程中没有失败:
.show ingestion failures
看到数据后,请尝试一些查询。
若要查看所有记录,请运行以下查询:
Storms | take 10
使用
where
和project
来筛选特定数据:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
使用
summarize
运算符:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
如需更多查询示例和指导,请参阅在 KQL 中编写查询和 Kusto 查询语言文档。
若要进行重置,请执行以下步骤:
- 停止容器 (
docker-compose down -v
) - 删除 (
drop table Storms
) - 重新创建
Storms
表 - 重新创建表映射
- 重启容器 (
docker-compose up
)
通过导航到在其中创建项的工作区来清理创建的项。
在工作区中,将鼠标悬停在数据库上,然后选择“更多菜单”>“删除”。
选择“删除”。 无法恢复已删除的项目。
- 优化 Kafka 接收器
flush.size.bytes
大小限制,从 1 MB 开始,以 10 MB 或 100 MB 的增量增加。 - 使用 Kafka 接收器时,数据将聚合两次。 在连接器端上,数据根据刷新设置进行聚合,而在服务端上,数据根据批处理策略进行聚合。 如果批处理时间太短,使连接器和服务都无法引入数据,则必须增加批处理时间。 将批处理大小设置为 1 GB,并根据需要以 100 MB 的增量增加或减少。 例如,如果刷新大小为 1 MB,批处理策略大小为 100 MB,则 Kafka 接收器连接器会将数据聚合为 100 MB 的批。 然后,服务将引入该批处理。 如果批处理策略时间为 20 秒,并且 Kafka 接收器连接器在 20 秒期间刷新 50 MB,则服务将引入 50 MB 的批。
- 可通过添加实例和 Kafka 分区进行缩放。 将
tasks.max
增加到分区的数目。 如果有足够的数据生成 blob,则创建一个分区,其大小与flush.size.bytes
设置的大小相同。 如果 blob 较小,批处理会在达到时间限制时进行处理,因此分区不会收到足够的吞吐量。 大量的分区意味着会产生更多处理开销。