Erfassen von Daten aus Apache Kafka in Azure Data Explorer

Apache Kafka ist eine verteilte Streamingplattform zum Erstellen von Echtzeitstreamingdatenpipelines, die Daten zuverlässig zwischen Systemen oder Anwendungen verschieben. Kafka Connect ist ein Tool zum skalierbaren und zuverlässigen Streamen von Daten zwischen Apache Kafka und anderen Datensystemen. Die Kusto Kafka-Senke dient als Connector von Kafka und erfordert keinen Code. Laden Sie die Jar-Datei des Senkenconnectors aus dem Git-Repository oder confluent Connector Hub herunter.

In diesem Artikel wird gezeigt, wie Sie Daten mit Kafka erfassen, indem Sie ein eigenständiges Docker-Setup verwenden, um die Einrichtung des Kafka-Clusters und des Kafka-Connectorclusters zu vereinfachen.

Weitere Informationen finden Sie im Git-Repository und in den Versionsangaben zum Connector.

Voraussetzungen

Erstellen eines Microsoft Entra Dienstprinzipals

Der Microsoft Entra Dienstprinzipal kann wie im folgenden Beispiel über die Azure-Portal oder programmgesteuert erstellt werden.

Dieser Dienstprinzipal ist die Identität, die vom Connector zum Schreiben von Daten in Ihrer Tabelle in Kusto verwendet wird. Später erteilen Sie diesem Dienstprinzipal Berechtigungen für den Zugriff auf Kusto-Ressourcen.

  1. Melden Sie sich über die Azure CLI bei Ihrem Azure-Abonnement an. Führen Sie anschließend im Browser die Authentifizierung durch.

    az login
    
  2. Wählen Sie das Abonnement aus, das den Prinzipal hosten soll. Dieser Schritt ist erforderlich, wenn Sie über mehrere Abonnements verfügen.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Erstellen Sie den Dienstprinzipal. In diesem Beispiel wird der Dienstprinzipal als my-service-principal bezeichnet.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopieren Sie aus den zurückgegebenen JSON-Daten die appId, passwordund tenant zur zukünftigen Verwendung.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Sie haben Ihre Microsoft Entra-Anwendung und den Dienstprinzipal erstellt.

Erstellen einer Zieltabelle

  1. Erstellen Sie in Ihrer Abfrageumgebung mit dem folgenden Befehl eine Tabelle namens Storms :

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Erstellen Sie mit dem folgenden Befehl die entsprechende Tabellenzuordnung Storms_CSV_Mapping für erfasste Daten:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Erstellen Sie eine Erfassungsbatchesrichtlinie für die Tabelle für konfigurierbare Wartezeiten für die Erfassung in der Warteschlange.

    Tipp

    Bei der Richtlinie für die Batcherfassung handelt es sich um eine Leistungsoptimierung mit drei Parametern. Die erste erfüllte Bedingung löst die Erfassung in der Azure Data Explorer-Tabelle aus.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Verwenden Sie den Dienstprinzipal aus Erstellen eines Microsoft Entra Dienstprinzipals, um die Berechtigung zum Arbeiten mit der Datenbank zu erteilen.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Ausführen des Labs

Im folgenden Lab können Sie ausprobieren, wie Sie mit der Erstellung von Daten beginnen, den Kafka-Connector einrichten und diese Daten über den Connector an Azure Data Explorer streamen. Sie können die erfassten Daten dann anzeigen.

Klonen des Git-Repositorys

Klonen Sie das Git-Repository des Labs.

  1. Erstellen Sie auf Ihrem Computer ein lokales Verzeichnis.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Klonen Sie das Repository.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Inhalt des geklonten Repositorys

Führen Sie den folgenden Befehl aus, um den Inhalt des geklonten Repositorys aufzulisten:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Das Ergebnis dieser Suche lautet wie folgt:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Überprüfen der Dateien im geklonten Repository

In den folgenden Abschnitten werden die wichtigen Teile der Dateien in der obigen Dateistruktur beschrieben.

adx-sink-config.json

Diese Datei enthält die Eigenschaftendatei der Kusto-Senke, in der Sie bestimmte Konfigurationsdetails aktualisieren:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Ersetzen Sie die Werte für die folgenden Attribute gemäß Ihrer Azure Data Explorer-Einrichtung: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (Datenbankname), kusto.ingestion.url und kusto.query.url.

Connector: Dockerfile

Diese Datei enthält die Befehle zum Generieren des Docker-Images für die Connectorinstanz. Sie enthält den Download des Connectors aus dem Releaseverzeichnis des Git-Repositorys.

Verzeichnis „Storm-events-producer“

Dieses Verzeichnis enthält ein Go-Programm, mit dem die lokale Datei „StormEvents.csv“ gelesen wird und die Daten in einem Kafka-Thema veröffentlicht werden.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Starten der Container

  1. Starten Sie die Container in einem Terminal:

    docker-compose up
    

    Die Produceranwendung beginnt mit dem Senden von Ereignissen an das Thema storm-events. Es sollten Protokolle angezeigt werden, die den folgenden Protokollen ähneln:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Führen Sie den folgenden Befehl in einem separaten Terminal aus, um die Protokolle zu überprüfen:

    docker-compose logs -f | grep kusto-connect
    

Starten des Connectors

Verwenden Sie einen Kafka Connect-REST-Aufruf, um den Connector zu starten.

  1. Starten Sie den Senkentask in einem separaten Terminal, indem Sie den folgenden Befehl ausführen:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Führen Sie zum Überprüfen des Status den folgenden Befehl in einem separaten Terminal aus:

    curl http://localhost:8083/connectors/storm/status
    

Der Connector beginnt damit, Erfassungsprozesse für Azure Data Explorer in die Warteschlange einzureihen.

Hinweis

Erstellen Sie eine Problemmeldung, falls bei Ihnen Probleme mit dem Protokollconnector auftreten.

Abfragen und Überprüfen von Daten

Bestätigen der Datenerfassung

  1. Warten Sie, bis Daten in der Tabelle Storms angezeigt werden. Überprüfen Sie die Zeilenanzahl, um die Übertragung der Daten zu bestätigen:

    Storms | count
    
  2. Vergewissern Sie sich, dass der Erfassungsprozess keine Fehler aufweist:

    .show ingestion failures
    

    Probieren Sie einige Abfragen aus, nachdem Daten angezeigt werden.

Abfragen der Daten

  1. Führen Sie die folgende Abfrage aus, um alle Datensätze anzuzeigen:

    Storms
    
  2. Verwenden Sie where und project, um die spezifischen Daten zu filtern:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Verwenden Sie den Operator summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Screenshot: Ergebnisse des Kafka-Abfragespaltendiagramms in Azure Data Explorer

Weitere Abfragebeispiele und Anleitungen finden Sie unter Schreiben von Abfragen in KQL und Kusto-Abfragesprache Dokumentation.

Reset

Führen Sie zum Zurücksetzen die folgenden Schritte aus:

  1. Anhalten der Container (docker-compose down -v)
  2. Löschen (drop table Storms)
  3. Neuerstellen der Tabelle Storms
  4. Neuerstellen der Tabellenzuordnung
  5. Neustarten von Containern (docker-compose up)

Bereinigen von Ressourcen

Verwenden Sie zum Löschen der Azure Data Explorer-Ressourcen az cluster delete oder az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Optimieren des Kafka-Senkenconnectors

Optimieren Sie den Connector für die Kafka-Senke, damit er mit der Batchverarbeitungsrichtlinie verwendet werden kann:

  • Optimieren Sie den Grenzwert für die Größe der Kafka-Senken flush.size.bytes. Beginnen Sie bei 1 MB, und erhöhen Sie den Grenzwert in Schritten von 10 MB oder 100 MB.
  • Bei Verwendung der Kafka-Senke werden die Daten zweimal aggregiert. Auf der Connectorseite werden Daten gemäß den Leerungseinstellungen aggregiert, und aufseiten des Azure Data Explorer-Diensts gemäß der Batchverarbeitungsrichtlinie. Wenn die Batchverarbeitungszeit zu kurz ist und sowohl vom Connector als auch vom Dienst keine Daten erfasst werden können, muss die Batchverarbeitungszeit erhöht werden. Legen Sie die Batchverarbeitungsgröße auf 1 GB fest, und erhöhen oder verringern Sie sie bei Bedarf in Schritten von 100 MB. Wenn beispielsweise die Leergröße 1 MB und die Größe der Batchrichtlinien 100 MB beträgt, wird ein 100-MB-Batch vom Kafka-Senkenconnector aggregiert, ein 100-MB-Batch vom Azure Data Explorer-Dienst erfasst. Wenn die Zeit für die Batchverarbeitungsrichtlinie 20 Sekunden beträgt und der Kafka-Senke-Connector in einem Zeitraum von 20 Sekunden 50 MB leert, erfasst der Dienst einen Batch mit 50 MB.
  • Sie können skalieren, indem Sie Instanzen und Kafka-Partitionen hinzufügen. Erhöhen Sie tasks.max auf die Anzahl von Partitionen. Erstellen Sie eine Partition, wenn Sie über genügend Daten verfügen, um ein Blob mit der Größe der Einstellung flush.size.bytes zu erstellen. Wenn das Blob kleiner ist, wird der Batch verarbeitet, wenn er das Zeitlimit erreicht, sodass die Partition nicht genügend Durchsatz erhält. Eine große Anzahl von Partitionen bedeutet mehr Verarbeitungsaufwand.