Ingérer des données Apache Kafka dans Azure Data Explorer
Apache Kafka est une plateforme de streaming distribuée qui permet de créer des pipelines de données de streaming en temps réel qui déplacent de manière fiable des données entre des systèmes ou des applications. Kafka Connect est un outil pour le streaming de données scalable et fiable entre Apache Kafka et d’autres systèmes de données. Le récepteur Kusto Kafka sert de connecteur à partir de Kafka et ne nécessite pas d’utiliser de code. Téléchargez le fichier jar du connecteur récepteur à partir du référentiel Git ou du hub de connecteur Confluent.
Cet article explique comment ingérer des données avec Kafka, à l’aide d’une configuration Docker autonome pour simplifier la configuration du cluster Kafka et du cluster du connecteur Kafka.
Pour plus d’informations, consultez le dépôt Git et les spécificités des versions du connecteur.
Prérequis
- Un abonnement Azure. Créez un compte Azure gratuit.
- Un cluster et une base de données Azure Data Explorer avec les stratégies de cache et de rétention par défaut ou une base de données KQL dans Microsoft Fabric.
- Azure CLI.
- Docker et Docker Compose.
Créer un principal de service Microsoft Entra
Le principal de service Microsoft Entra peut être créé via le Portail Azure ou de manière programmatique, comme dans l’exemple suivant.
Ce principal de service sera l’identité utilisée par le connecteur pour écrire les données de votre table dans Kusto. Vous accorderez ultérieurement des autorisations pour que ce principal de service accède aux ressources Kusto.
Connectez-vous à votre abonnement Azure via Azure CLI. Authentifiez-vous ensuite dans le navigateur.
az login
Choisissez l’abonnement pour héberger le principal. Cette étape est nécessaire quand vous avez plusieurs abonnements.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Créez le principal de service. Dans cet exemple, le principal de service est appelé
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
À partir des données JSON retournées, copiez ,
appId
password
ettenant
pour une utilisation ultérieure.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Vous avez créé votre application Microsoft Entra et votre principal de service.
Créer une table cible
À partir de votre environnement de requête, créez une table appelée
Storms
à l’aide de la commande suivante :.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Créez le mappage de table correspondant
Storms_CSV_Mapping
pour les données ingérées en utilisant la commande suivante :.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Créez une stratégie de traitement par lots d’ingestion sur la table pour la latence d’ingestion en file d’attente configurable.
Conseil
La stratégie d’ingestion par lot est un optimiseur de performances et comprend trois paramètres. La premier conditions satisfaite déclenche l’ingestion dans la table Azure Data Explorer.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Utilisez le principal de service de Créer un principal de service Microsoft Entra pour accorder l’autorisation d’utiliser la base de données.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Exécuter le labo
Le labo suivant est conçu pour vous permettre de commencer à créer des données, à configurer le connecteur Kafka et à diffuser en streaming ces données vers Azure Data Explorer avec le connecteur. Vous pouvez ensuite examiner les données ingérées.
Cloner le dépôt Git
Clonez le dépôt Git du labo.
Créez un répertoire local sur votre machine.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Clonez le dépôt.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Contenu du dépôt cloné
Exécutez la commande suivante pour lister le contenu du dépôt cloné :
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Le résultat de cette recherche est le suivant :
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Examiner les fichiers dans le dépôt cloné
Les sections suivantes décrivent les parties importantes des fichiers de l’arborescence de fichiers ci-dessus.
adx-sink-config.json
Ce fichier contient le fichier de propriétés du récepteur Kusto dans lequel vous allez mettre à jour des détails de configuration spécifiques :
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Remplacez les valeurs des attributs suivants par celles de votre configuration Azure Data Explorer : aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(nom de la base de données), kusto.ingestion.url
et kusto.query.url
.
Connector - Dockerfile
Ce fichier contient les commandes permettant de générer l’image Docker pour l’instance du connecteur. Il comprend le téléchargement du connecteur à partir du répertoire de publication des dépôts git.
Répertoire Storm-events-producer
Ce répertoire contient un programme Go qui lit un fichier local « StormEvents. csv » et publie les données dans une rubrique Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Démarrer les conteneurs
Sur un terminal, démarrez les conteneurs :
docker-compose up
L’application producer commence à envoyer des événements à la rubrique
storm-events
. Vous devriez voir des journaux similaires aux journaux suivants :.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Pour consulter les journaux, exécutez la commande suivante dans un terminal distinct :
docker-compose logs -f | grep kusto-connect
Démarrer le connecteur
Utilisez un appel REST Kafka Connect pour démarrer le connecteur.
Dans un autre terminal, lancez la tâche du récepteur avec la commande suivante :
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Pour vérifier l’état, exécutez la commande suivante dans un autre terminal :
curl http://localhost:8083/connectors/storm/status
Le connecteur démarre la mise en file d’attente des processus d’ingestion dans Azure Data Explorer.
Notes
Si vous rencontrez des problèmes de connecteur de journaux, signalez-les.
Interroger et examiner les données
Confirmer l’ingestion de données
Attendez que les données arrivent dans la table
Storms
. Pour confirmer le transfert de données, vérifiez le nombre de lignes :Storms | count
Confirmez qu’il n’y a pas de défaillance dans le processus d’ingestion :
.show ingestion failures
Une fois que vous voyez les données, essayez quelques requêtes.
Interroger les données
Pour voir tous les enregistrements, exécutez la requête suivante :
Storms
Utilisez
where
etproject
pour filtrer des données spécifiques :Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Utilisez l’opérateur
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Pour obtenir d’autres exemples de requêtes et des conseils, consultez Écrire des requêtes dans KQL et Langage de requête Kusto documentation.
Réinitialiser
Pour réinitialiser, procédez comme suit :
- Arrêter les conteneurs (
docker-compose down -v
) - Supprimer (
drop table Storms
) - Recréer la table
Storms
- Recréer le mappage de table
- Redémarrer les conteneurs (
docker-compose up
)
Nettoyer les ressources
Pour supprimer les ressources Azure Data Explorer, utilisez az cluster delete ou az Kusto database delete :
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Réglage du connecteur Kafka Sink
Réglez le connecteur Kafka Sink pour qu’il fonctionne avec la stratégie de traitement par lot d’ingestion :
- Réglez la taille limite
flush.size.bytes
de Kafka Sink en commençant à 1 Mo, puis en augmentant par incréments de 10 Mo ou 100 Mo. - Lors de l’utilisation de Kafka Sink, les données sont agrégées deux fois. Les données sont agrégées en fonction des paramètres de vidage côté connecteur et selon la stratégie de traitement par lot du côté du service Azure Data Explorer. Si le temps de traitement par lot est trop faible et qu’aucune donnée ne peut être ingérée par le connecteur et le service, le temps de traitement par lot doit être augmenté. Définissez la taille de lot sur 1 Go et augmentez ou diminuez la taille par incréments de 100 Mo en fonction des besoins. Par exemple, si la taille de vidage est de 1 Mo et la taille de la stratégie de traitement par lots est de 100 Mo, une fois qu’un lot de 100 Mo est agrégé par le connecteur Récepteur Kafka, un lot de 100 Mo sera ingéré par le service Azure Data Explorer. Si la durée de la stratégie de traitement par lots est de 20 secondes et que le connecteur Récepteur Kafka vide 50 Mo sur une période de 20 secondes, le service ingère un lot de 50 Mo.
- Vous pouvez effectuer une mise à l’échelle en ajoutant des instances et des partitions Kafka. Augmentez
tasks.max
en le définissant sur le nombre de partitions. Créez une partition si vous avez suffisamment de données pour produire un objet blob de la taille définie par le paramètreflush.size.bytes
. Si l’objet blob est plus petit, le lot est traité lorsqu’il atteint la limite de temps, de sorte que la partition ne reçoit pas suffisamment de débit. Un grand nombre de partitions signifie une charge de traitement plus importante.
Contenu connexe
- Explorez en détail l’architecture de big data.
- Apprenez à Ingérer des exemples de données au format JSON dans Azure Data Explorer.
- Pour d’autres labos Kafka :