Ingest data from Apache Kafka into Azure Data Explorer
Azure Data Explorer supports data ingestion from Apache Kafka. Apache Kafka is a distributed streaming platform for building real-time streaming data pipelines that reliably move data between systems or applications. Kafka Connect is a tool for scalable and reliable streaming of data between Apache Kafka and other data systems. The Azure Data Explorer Kafka Sink serves as the connector from Kafka and doesn't require using code. Download the sink connector jar from this Git repo or Confluent Connector Hub.
This article shows how to ingest data with Kafka into Azure Data Explorer, using a self-contained Docker setup to simplify the Kafka cluster and Kafka connector cluster setup.
For more information, see the connector Git repo and version specifics.
Prerequisites
- An Azure subscription. Create a free Azure account.
- An Azure Data Explorer cluster and database with the default cache and retention policies. Create a cluster and database.
- Azure CLI.
- Docker and Docker Compose.
Create an Azure Active Directory service principal
The Azure Active Directory service principal can be created through the Azure portal or programatically, as in the following example.
This service principal will be the identity leveraged by the connector to write to the Azure Data Explorer table. We'll later grant permissions for this service principal to access Azure Data Explorer.
Log in to your Azure subscription via Azure CLI. Then authenticate in the browser.
az login
Choose the subscription you want use to run the lab. This step is needed when you have multiple subscriptions.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Create the service principal. In this example, the service principal is called
kusto-kafka-spn
.az ad sp create-for-rbac -n "kusto-kafka-spn" --role Contributor --scopes /subscriptions/{SubID}
You'll get a JSON response as shown below. Copy the
appId
,password
, andtenant
, as you'll need them in later steps.{ "appId": "fe7280c7-5705-4789-b17f-71a472340429", "displayName": "kusto-kafka-spn", "name": "http://kusto-kafka-spn", "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee", "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42" }
Create a target table in Azure Data Explorer
Sign in to the Azure portal
Go to your Azure Data Explorer cluster.
Create a table called
Storms
using the following command:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Create the corresponding table mapping
Storms_CSV_Mapping
for ingested data using the following command:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Create a batch ingestion policy on the table for configurable ingestion latency.
Tip
The ingestion batching policy is a performance optimizer and includes three parameters. The first condition satisfied triggers ingestion into the Azure Data Explorer table.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Use the service principal from Create an Azure Active Directory service principal to grant permission to work with the database.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Run the lab
The following lab is designed to give you the experience of starting to create data, setting up the Kafka connector, and streaming this data to Azure Data Explorer with the connector. You can then look at the ingested data.
Clone the git repo
Clone the lab's git repo.
Create a local directory on your machine.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Clone the repo.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Contents of the cloned repo
Run the following command to list the contents of the cloned repo:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
This result of this search is:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Review the files in the cloned repo
The following sections explain the important parts of the files in the file tree above.
adx-sink-config.json
This file contains the Kusto sink properties file where you'll update specific configuration details:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Replace the values for the following attributes as per your Azure Data Explorer setup: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(the database name), kusto.ingestion.url
, and kusto.query.url
.
Connector - Dockerfile
This file has the commands to generate the docker image for the connector instance. It includes the connector download from the git repo release directory.
Storm-events-producer directory
This directory has a Go program that reads a local "StormEvents.csv" file and publishes the data to a Kafka topic.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Start the containers
In a terminal, start the containers:
docker-compose up
The producer application will start sending events to the
storm-events
topic. You should see logs similar to the following logs:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
To check the logs, run the following command in a separate terminal:
docker-compose logs -f | grep kusto-connect
Start the connector
Use a Kafka Connect REST call to start the connector.
In a separate terminal, launch the sink task with the following command:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
To check the status, run the following command in a separate terminal:
curl http://localhost:8083/connectors/storm/status
The connector will start queueing ingestion processes to Azure Data Explorer.
Note
If you have log connector issues, create an issue.
Query and review data
Confirm data ingestion
Wait for data to arrive in the
Storms
table. To confirm the transfer of data, check the row count:Storms | count
Confirm that there are no failures in the ingestion process:
.show ingestion failures
Once you see data, try out a few queries.
Query the data
To see all the records, run the following query:
Storms
Use
where
andproject
to filter specific data:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Use the
summarize
operator:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
For more query examples and guidance, see Write queries for Azure Data Explorer and Kusto Query Language documentation.
Reset
To reset, do the following steps:
- Stop the containers (
docker-compose down -v
) - Delete (
drop table Storms
) - Re-create the
Storms
table - Recreate table mapping
- Restart containers (
docker-compose up
)
Clean up resources
To delete the Azure Data Explorer resources, use az cluster delete or az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Tuning the Kafka Sink connector
Tune the Kafka Sink connector to work with the ingestion batching policy:
- Tune the Kafka Sink
flush.size.bytes
size limit starting from 1 MB, increasing by increments of 10 MB or 100 MB. - When using Kafka Sink, data is aggregated twice. On the connector side data is aggregated according to flush settings, and on the Azure Data Explorer service side according to the batching policy. If the batching time is too short and no data can be ingested by both connector and service, batching time must be increased. Set batching size at 1 GB and increase or decrease by 100 MB increments as needed. For example, if the flush size is 1 MB and the batching policy size is 100 MB,after a 100-MB batch is aggregated by the Kafka Sink connector, a 100-MB batch will be ingested by the Azure Data Explorer service. If the batching policy time is 20 seconds and the Kafka Sink connector flushes 50 MB in a 20-second period - then the service will ingest a 50-MB batch.
- You can scale by adding instances and Kafka partitions. Increase
tasks.max
to the number of partitions. Create a partition if you have enough data to produce a blob the size of theflush.size.bytes
setting. If the blob is smaller, the batch is processed when it reaches the time limit, so the partition won't receive enough throughput. A large number of partitions means more processing overhead.
Next Steps
- Learn more about Big data architecture.
- Learn how to ingest JSON formatted sample data into Azure Data Explorer.
- For additional Kafka labs: