Ingérer des données Apache Kafka dans Azure Data Explorer

Apache Kafka est une plateforme de streaming distribuée qui permet de créer des pipelines de données de streaming en temps réel qui déplacent de manière fiable des données entre des systèmes ou des applications. Kafka Connect est un outil pour le streaming de données scalable et fiable entre Apache Kafka et d’autres systèmes de données. Le récepteur Kusto Kafka sert de connecteur à partir de Kafka et ne nécessite pas d’utiliser de code. Téléchargez le fichier jar du connecteur récepteur à partir du référentiel Git ou du hub de connecteur Confluent.

Cet article explique comment ingérer des données avec Kafka, à l’aide d’une configuration Docker autonome pour simplifier la configuration du cluster Kafka et du cluster du connecteur Kafka.

Pour plus d’informations, consultez le dépôt Git et les spécificités des versions du connecteur.

Prérequis

Créer un principal de service Microsoft Entra

Le principal de service Microsoft Entra peut être créé via le Portail Azure ou de manière programmatique, comme dans l’exemple suivant.

Ce principal de service sera l’identité utilisée par le connecteur pour écrire les données de votre table dans Kusto. Vous accorderez ultérieurement des autorisations pour que ce principal de service accède aux ressources Kusto.

  1. Connectez-vous à votre abonnement Azure via Azure CLI. Authentifiez-vous ensuite dans le navigateur.

    az login
    
  2. Choisissez l’abonnement pour héberger le principal. Cette étape est nécessaire quand vous avez plusieurs abonnements.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Créez le principal de service. Dans cet exemple, le principal de service est appelé my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. À partir des données JSON retournées, copiez , appIdpasswordet tenant pour une utilisation ultérieure.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Vous avez créé votre application Microsoft Entra et votre principal de service.

Créer une table cible

  1. À partir de votre environnement de requête, créez une table appelée Storms à l’aide de la commande suivante :

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Créez le mappage de table correspondant Storms_CSV_Mapping pour les données ingérées en utilisant la commande suivante :

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Créez une stratégie de traitement par lots d’ingestion sur la table pour la latence d’ingestion en file d’attente configurable.

    Conseil

    La stratégie d’ingestion par lot est un optimiseur de performances et comprend trois paramètres. La premier conditions satisfaite déclenche l’ingestion dans la table Azure Data Explorer.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Utilisez le principal de service de Créer un principal de service Microsoft Entra pour accorder l’autorisation d’utiliser la base de données.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Exécuter le labo

Le labo suivant est conçu pour vous permettre de commencer à créer des données, à configurer le connecteur Kafka et à diffuser en streaming ces données vers Azure Data Explorer avec le connecteur. Vous pouvez ensuite examiner les données ingérées.

Cloner le dépôt Git

Clonez le dépôt Git du labo.

  1. Créez un répertoire local sur votre machine.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Clonez le dépôt.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Contenu du dépôt cloné

Exécutez la commande suivante pour lister le contenu du dépôt cloné :

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Le résultat de cette recherche est le suivant :

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Examiner les fichiers dans le dépôt cloné

Les sections suivantes décrivent les parties importantes des fichiers de l’arborescence de fichiers ci-dessus.

adx-sink-config.json

Ce fichier contient le fichier de propriétés du récepteur Kusto dans lequel vous allez mettre à jour des détails de configuration spécifiques :

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Remplacez les valeurs des attributs suivants par celles de votre configuration Azure Data Explorer : aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (nom de la base de données), kusto.ingestion.url et kusto.query.url.

Connector - Dockerfile

Ce fichier contient les commandes permettant de générer l’image Docker pour l’instance du connecteur. Il comprend le téléchargement du connecteur à partir du répertoire de publication des dépôts git.

Répertoire Storm-events-producer

Ce répertoire contient un programme Go qui lit un fichier local « StormEvents. csv » et publie les données dans une rubrique Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Démarrer les conteneurs

  1. Sur un terminal, démarrez les conteneurs :

    docker-compose up
    

    L’application producer commence à envoyer des événements à la rubrique storm-events. Vous devriez voir des journaux similaires aux journaux suivants :

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Pour consulter les journaux, exécutez la commande suivante dans un terminal distinct :

    docker-compose logs -f | grep kusto-connect
    

Démarrer le connecteur

Utilisez un appel REST Kafka Connect pour démarrer le connecteur.

  1. Dans un autre terminal, lancez la tâche du récepteur avec la commande suivante :

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Pour vérifier l’état, exécutez la commande suivante dans un autre terminal :

    curl http://localhost:8083/connectors/storm/status
    

Le connecteur démarre la mise en file d’attente des processus d’ingestion dans Azure Data Explorer.

Notes

Si vous rencontrez des problèmes de connecteur de journaux, signalez-les.

Interroger et examiner les données

Confirmer l’ingestion de données

  1. Attendez que les données arrivent dans la table Storms. Pour confirmer le transfert de données, vérifiez le nombre de lignes :

    Storms | count
    
  2. Confirmez qu’il n’y a pas de défaillance dans le processus d’ingestion :

    .show ingestion failures
    

    Une fois que vous voyez les données, essayez quelques requêtes.

Interroger les données

  1. Pour voir tous les enregistrements, exécutez la requête suivante :

    Storms
    
  2. Utilisez where et project pour filtrer des données spécifiques :

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Utilisez l’opérateur summarize :

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Capture d’écran des résultats de l’histogramme de requête Kafka dans Azure Data Explorer.

Pour obtenir d’autres exemples de requêtes et des conseils, consultez Écrire des requêtes dans KQL et Langage de requête Kusto documentation.

Réinitialiser

Pour réinitialiser, procédez comme suit :

  1. Arrêter les conteneurs (docker-compose down -v)
  2. Supprimer (drop table Storms)
  3. Recréer la table Storms
  4. Recréer le mappage de table
  5. Redémarrer les conteneurs (docker-compose up)

Nettoyer les ressources

Pour supprimer les ressources Azure Data Explorer, utilisez az cluster delete ou az Kusto database delete :

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Réglage du connecteur Kafka Sink

Réglez le connecteur Kafka Sink pour qu’il fonctionne avec la stratégie de traitement par lot d’ingestion :

  • Réglez la taille limite flush.size.bytes de Kafka Sink en commençant à 1 Mo, puis en augmentant par incréments de 10 Mo ou 100 Mo.
  • Lors de l’utilisation de Kafka Sink, les données sont agrégées deux fois. Les données sont agrégées en fonction des paramètres de vidage côté connecteur et selon la stratégie de traitement par lot du côté du service Azure Data Explorer. Si le temps de traitement par lot est trop faible et qu’aucune donnée ne peut être ingérée par le connecteur et le service, le temps de traitement par lot doit être augmenté. Définissez la taille de lot sur 1 Go et augmentez ou diminuez la taille par incréments de 100 Mo en fonction des besoins. Par exemple, si la taille de vidage est de 1 Mo et la taille de la stratégie de traitement par lots est de 100 Mo, une fois qu’un lot de 100 Mo est agrégé par le connecteur Récepteur Kafka, un lot de 100 Mo sera ingéré par le service Azure Data Explorer. Si la durée de la stratégie de traitement par lots est de 20 secondes et que le connecteur Récepteur Kafka vide 50 Mo sur une période de 20 secondes, le service ingère un lot de 50 Mo.
  • Vous pouvez effectuer une mise à l’échelle en ajoutant des instances et des partitions Kafka. Augmentez tasks.max en le définissant sur le nombre de partitions. Créez une partition si vous avez suffisamment de données pour produire un objet blob de la taille définie par le paramètre flush.size.bytes. Si l’objet blob est plus petit, le lot est traité lorsqu’il atteint la limite de temps, de sorte que la partition ne reçoit pas suffisamment de débit. Un grand nombre de partitions signifie une charge de traitement plus importante.