Erfassen von Daten aus Apache Kafka in Azure Data Explorer
Apache Kafka ist eine verteilte Streamingplattform zum Erstellen von Echtzeitstreamingdatenpipelines, die Daten zuverlässig zwischen Systemen oder Anwendungen verschieben. Kafka Connect ist ein Tool zum skalierbaren und zuverlässigen Streamen von Daten zwischen Apache Kafka und anderen Datensystemen. Die Kusto Kafka-Senke dient als Connector von Kafka und erfordert keinen Code. Laden Sie die Jar-Datei des Senkenconnectors aus dem Git-Repository oder confluent Connector Hub herunter.
In diesem Artikel wird gezeigt, wie Sie Daten mit Kafka erfassen, indem Sie ein eigenständiges Docker-Setup verwenden, um die Einrichtung des Kafka-Clusters und des Kafka-Connectorclusters zu vereinfachen.
Weitere Informationen finden Sie im Git-Repository und in den Versionsangaben zum Connector.
Voraussetzungen
- Ein Azure-Abonnement. Erstellen Sie ein kostenloses Azure-Konto.
- Ein Azure Data Explorer Cluster und Eine Datenbank mit den Standardcache- und Aufbewahrungsrichtlinien oder einer KQL-Datenbank in Microsoft Fabric.
- Azure-Befehlszeilenschnittstelle.
- Docker und Docker Compose.
Erstellen eines Microsoft Entra Dienstprinzipals
Der Microsoft Entra Dienstprinzipal kann wie im folgenden Beispiel über die Azure-Portal oder programmgesteuert erstellt werden.
Dieser Dienstprinzipal ist die Identität, die vom Connector zum Schreiben von Daten in Ihrer Tabelle in Kusto verwendet wird. Später erteilen Sie diesem Dienstprinzipal Berechtigungen für den Zugriff auf Kusto-Ressourcen.
Melden Sie sich über die Azure CLI bei Ihrem Azure-Abonnement an. Führen Sie anschließend im Browser die Authentifizierung durch.
az login
Wählen Sie das Abonnement aus, das den Prinzipal hosten soll. Dieser Schritt ist erforderlich, wenn Sie über mehrere Abonnements verfügen.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Erstellen Sie den Dienstprinzipal. In diesem Beispiel wird der Dienstprinzipal als
my-service-principal
bezeichnet.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Kopieren Sie aus den zurückgegebenen JSON-Daten die
appId
,password
undtenant
zur zukünftigen Verwendung.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Sie haben Ihre Microsoft Entra-Anwendung und den Dienstprinzipal erstellt.
Erstellen einer Zieltabelle
Erstellen Sie in Ihrer Abfrageumgebung mit dem folgenden Befehl eine Tabelle namens
Storms
:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Erstellen Sie mit dem folgenden Befehl die entsprechende Tabellenzuordnung
Storms_CSV_Mapping
für erfasste Daten:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Erstellen Sie eine Erfassungsbatchesrichtlinie für die Tabelle für konfigurierbare Wartezeiten für die Erfassung in der Warteschlange.
Tipp
Bei der Richtlinie für die Batcherfassung handelt es sich um eine Leistungsoptimierung mit drei Parametern. Die erste erfüllte Bedingung löst die Erfassung in der Azure Data Explorer-Tabelle aus.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Verwenden Sie den Dienstprinzipal aus Erstellen eines Microsoft Entra Dienstprinzipals, um die Berechtigung zum Arbeiten mit der Datenbank zu erteilen.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Ausführen des Labs
Im folgenden Lab können Sie ausprobieren, wie Sie mit der Erstellung von Daten beginnen, den Kafka-Connector einrichten und diese Daten über den Connector an Azure Data Explorer streamen. Sie können die erfassten Daten dann anzeigen.
Klonen des Git-Repositorys
Klonen Sie das Git-Repository des Labs.
Erstellen Sie auf Ihrem Computer ein lokales Verzeichnis.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Klonen Sie das Repository.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Inhalt des geklonten Repositorys
Führen Sie den folgenden Befehl aus, um den Inhalt des geklonten Repositorys aufzulisten:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Das Ergebnis dieser Suche lautet wie folgt:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Überprüfen der Dateien im geklonten Repository
In den folgenden Abschnitten werden die wichtigen Teile der Dateien in der obigen Dateistruktur beschrieben.
adx-sink-config.json
Diese Datei enthält die Eigenschaftendatei der Kusto-Senke, in der Sie bestimmte Konfigurationsdetails aktualisieren:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Ersetzen Sie die Werte für die folgenden Attribute gemäß Ihrer Azure Data Explorer-Einrichtung: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(Datenbankname), kusto.ingestion.url
und kusto.query.url
.
Connector: Dockerfile
Diese Datei enthält die Befehle zum Generieren des Docker-Images für die Connectorinstanz. Sie enthält den Download des Connectors aus dem Releaseverzeichnis des Git-Repositorys.
Verzeichnis „Storm-events-producer“
Dieses Verzeichnis enthält ein Go-Programm, mit dem die lokale Datei „StormEvents.csv“ gelesen wird und die Daten in einem Kafka-Thema veröffentlicht werden.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Starten der Container
Starten Sie die Container in einem Terminal:
docker-compose up
Die Produceranwendung beginnt mit dem Senden von Ereignissen an das Thema
storm-events
. Es sollten Protokolle angezeigt werden, die den folgenden Protokollen ähneln:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Führen Sie den folgenden Befehl in einem separaten Terminal aus, um die Protokolle zu überprüfen:
docker-compose logs -f | grep kusto-connect
Starten des Connectors
Verwenden Sie einen Kafka Connect-REST-Aufruf, um den Connector zu starten.
Starten Sie den Senkentask in einem separaten Terminal, indem Sie den folgenden Befehl ausführen:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Führen Sie zum Überprüfen des Status den folgenden Befehl in einem separaten Terminal aus:
curl http://localhost:8083/connectors/storm/status
Der Connector beginnt damit, Erfassungsprozesse für Azure Data Explorer in die Warteschlange einzureihen.
Hinweis
Erstellen Sie eine Problemmeldung, falls bei Ihnen Probleme mit dem Protokollconnector auftreten.
Abfragen und Überprüfen von Daten
Bestätigen der Datenerfassung
Warten Sie, bis Daten in der Tabelle
Storms
angezeigt werden. Überprüfen Sie die Zeilenanzahl, um die Übertragung der Daten zu bestätigen:Storms | count
Vergewissern Sie sich, dass der Erfassungsprozess keine Fehler aufweist:
.show ingestion failures
Probieren Sie einige Abfragen aus, nachdem Daten angezeigt werden.
Abfragen der Daten
Führen Sie die folgende Abfrage aus, um alle Datensätze anzuzeigen:
Storms
Verwenden Sie
where
undproject
, um die spezifischen Daten zu filtern:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Verwenden Sie den Operator
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Weitere Abfragebeispiele und Anleitungen finden Sie unter Schreiben von Abfragen in KQL und Kusto-Abfragesprache Dokumentation.
Reset
Führen Sie zum Zurücksetzen die folgenden Schritte aus:
- Anhalten der Container (
docker-compose down -v
) - Löschen (
drop table Storms
) - Neuerstellen der Tabelle
Storms
- Neuerstellen der Tabellenzuordnung
- Neustarten von Containern (
docker-compose up
)
Bereinigen von Ressourcen
Verwenden Sie zum Löschen der Azure Data Explorer-Ressourcen az cluster delete oder az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Optimieren des Kafka-Senkenconnectors
Optimieren Sie den Connector für die Kafka-Senke, damit er mit der Batchverarbeitungsrichtlinie verwendet werden kann:
- Optimieren Sie den Grenzwert für die Größe der Kafka-Senken
flush.size.bytes
. Beginnen Sie bei 1 MB, und erhöhen Sie den Grenzwert in Schritten von 10 MB oder 100 MB. - Bei Verwendung der Kafka-Senke werden die Daten zweimal aggregiert. Auf der Connectorseite werden Daten gemäß den Leerungseinstellungen aggregiert, und aufseiten des Azure Data Explorer-Diensts gemäß der Batchverarbeitungsrichtlinie. Wenn die Batchverarbeitungszeit zu kurz ist und sowohl vom Connector als auch vom Dienst keine Daten erfasst werden können, muss die Batchverarbeitungszeit erhöht werden. Legen Sie die Batchverarbeitungsgröße auf 1 GB fest, und erhöhen oder verringern Sie sie bei Bedarf in Schritten von 100 MB. Wenn beispielsweise die Leergröße 1 MB und die Größe der Batchrichtlinien 100 MB beträgt, wird ein 100-MB-Batch vom Kafka-Senkenconnector aggregiert, ein 100-MB-Batch vom Azure Data Explorer-Dienst erfasst. Wenn die Zeit für die Batchverarbeitungsrichtlinie 20 Sekunden beträgt und der Kafka-Senke-Connector in einem Zeitraum von 20 Sekunden 50 MB leert, erfasst der Dienst einen Batch mit 50 MB.
- Sie können skalieren, indem Sie Instanzen und Kafka-Partitionen hinzufügen. Erhöhen Sie
tasks.max
auf die Anzahl von Partitionen. Erstellen Sie eine Partition, wenn Sie über genügend Daten verfügen, um ein Blob mit der Größe der Einstellungflush.size.bytes
zu erstellen. Wenn das Blob kleiner ist, wird der Batch verarbeitet, wenn er das Zeitlimit erreicht, sodass die Partition nicht genügend Durchsatz erhält. Eine große Anzahl von Partitionen bedeutet mehr Verarbeitungsaufwand.
Verwandte Inhalte
- Informieren Sie sich über die Big Data-Architektur.
- Informieren Sie sich über das Erfassen von Beispieldaten im JSON-Format in Azure Data Explorer.
- Für zusätzliche Kafka-Labs:
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Tickets als Feedbackmechanismus für Inhalte auslaufen lassen und es durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unter:Feedback senden und anzeigen für