Inserire dati da Apache Kafka in Azure Esplora dati
Apache Kafka è una piattaforma di streaming distribuita per la creazione di pipeline di dati di streaming in tempo reale che spostano in modo affidabile i dati tra sistemi o applicazioni. Kafka Connect è uno strumento per lo streaming scalabile e affidabile dei dati tra Apache Kafka e altri sistemi di dati. Il sink Kusto Kafka funge da connettore da Kafka e non richiede l'uso del codice. Scaricare il file jar del connettore sink dal repository Git o dall'hub del connettore Confluent.
Questo articolo illustra come inserire dati con Kafka usando una configurazione docker autonoma per semplificare la configurazione del cluster Kafka e del cluster Kafka.
Per altre informazioni, vedere il repository Git del connettore e le specifiche della versione.
Prerequisiti
- Una sottoscrizione di Azure. Creare un account Azure gratuito.
- Un cluster e un database di Azure Esplora dati con i criteri di memorizzazione e cache predefiniti o un database KQL in Microsoft Fabric.
- Interfaccia della riga di comando di Azure.
- Docker e Docker Compose.
Creare un'entità servizio Microsoft Entra
L'Microsoft Entra'entità servizio può essere creata tramite il portale di Azure o a livello di codice, come nell'esempio seguente.
Questa entità servizio sarà l'identità usata dal connettore per scrivere dati nella tabella in Kusto. In seguito si concedono le autorizzazioni per questa entità servizio per accedere alle risorse Kusto.
Accedere alla sottoscrizione di Azure tramite l'interfaccia della riga di comando di Azure. Eseguire quindi l'autenticazione nel browser.
az login
Scegliere la sottoscrizione per ospitare l'entità. Questo passaggio è necessario quando si hanno più sottoscrizioni.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Creare l'entità servizio. In questo esempio l'entità servizio è denominata
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Dai dati JSON restituiti copiare ,
appId
password
etenant
per un uso futuro.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
È stata creata l'applicazione Microsoft Entra e l'entità servizio.
Creare una tabella di destinazione
Dall'ambiente di query creare una tabella denominata
Storms
usando il comando seguente:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Creare il mapping
Storms_CSV_Mapping
della tabella corrispondente per i dati inseriti usando il comando seguente:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Creare un criterio di inserimento in batch nella tabella per la latenza di inserimento in coda configurabile.
Suggerimento
I criteri di inserimento in batch sono un'utilità di ottimizzazione delle prestazioni e includono tre parametri. La prima condizione soddisfatta attiva l'inserimento nella tabella Esplora dati di Azure.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Usare l'entità servizio da Creare un'entità servizio Microsoft Entra per concedere l'autorizzazione per lavorare con il database.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Eseguire il lab
Il lab seguente è progettato per offrire l'esperienza di iniziare a creare dati, configurare il connettore Kafka e trasmettere questi dati in Azure Esplora dati con il connettore. È quindi possibile esaminare i dati inseriti.
Clonare il repository Git
Clonare il repository Git del lab.
Creare una directory locale nel computer.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Clonare il repository.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Contenuto del repository clonato
Eseguire il comando seguente per elencare il contenuto del repository clonato:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Questo risultato della ricerca è:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Esaminare i file nel repository clonato
Le sezioni seguenti illustrano le parti importanti dei file nell'albero dei file precedente.
adx-sink-config.json
Questo file contiene il file delle proprietà del sink Kusto in cui verranno aggiornati dettagli di configurazione specifici:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Sostituire i valori per gli attributi seguenti in base all'installazione di Azure Esplora dati: , , (il nome del database), kusto.ingestion.url
e kusto.query.url
. kusto.tables.topics.mapping
aad.auth.appkey
aad.auth.appid
aad.auth.authority
Connettore - Dockerfile
Questo file include i comandi per generare l'immagine Docker per l'istanza del connettore. Include il download del connettore dalla directory della versione del repository Git.
Directory Storm-events-producer
Questa directory include un programma Go che legge un file "StormEvents.csv" locale e pubblica i dati in un argomento Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Avviare i contenitori
In un terminale avviare i contenitori:
docker-compose up
L'applicazione producer inizierà a inviare eventi all'argomento
storm-events
. Verranno visualizzati log simili ai log seguenti:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Per controllare i log, eseguire il comando seguente in un terminale separato:
docker-compose logs -f | grep kusto-connect
Avviare il connettore
Usare una chiamata REST Kafka Connect per avviare il connettore.
In un terminale separato avviare l'attività sink con il comando seguente:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Per controllare lo stato, eseguire il comando seguente in un terminale separato:
curl http://localhost:8083/connectors/storm/status
Il connettore avvierà l'accodamento dei processi di inserimento in Azure Esplora dati.
Nota
In caso di problemi del connettore di log, creare un problema.
Eseguire query ed esaminare i dati
Confermare l'inserimento dati
Attendere che i dati arrivino nella
Storms
tabella. Per confermare il trasferimento dei dati, controllare il numero di righe:Storms | count
Verificare che non siano presenti errori nel processo di inserimento:
.show ingestion failures
Dopo aver visualizzato i dati, provare alcune query.
Eseguire una query sui dati
Per visualizzare tutti i record, eseguire la query seguente:
Storms
Usare
where
eproject
per filtrare dati specifici:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Usare l'operatore
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Per altri esempi di query e linee guida, vedere Scrivere query in KQL e Linguaggio di query Kusto documentazione.
Reset
Per reimpostare, seguire questa procedura:
- Arrestare i contenitori (
docker-compose down -v
) - Elimina (
drop table Storms
) - Ricreare la
Storms
tabella - Ricreare il mapping delle tabelle
- Riavviare i contenitori (
docker-compose up
)
Pulire le risorse
Per eliminare le risorse di Azure Esplora dati, usare az cluster delete o az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Ottimizzazione del connettore Sink Kafka
Ottimizzare il connettore Sink Kafka per usare i criteri di invio in batch:
- Ottimizzare il limite di dimensioni sink Kafka
flush.size.bytes
a partire da 1 MB, aumentando con incrementi di 10 MB o 100 MB. - Quando si usa il sink Kafka, i dati vengono aggregati due volte. I dati sul lato connettore vengono aggregati in base alle impostazioni di scaricamento e sul lato del servizio Esplora dati di Azure in base ai criteri di invio in batch. Se il tempo di invio in batch è troppo breve e non è possibile inserire dati sia dal connettore che dal servizio, è necessario aumentare il tempo di invio in batch. Impostare le dimensioni di invio in batch a 1 GB e aumentare o diminuire di 100 MB incrementi in base alle esigenze. Ad esempio, se le dimensioni dello scaricamento sono pari a 1 MB e le dimensioni dei criteri di invio in batch sono pari a 100 MB, dopo che un batch di 100 MB viene aggregato dal connettore Sink Kafka, un batch di 100 MB verrà inserito dal servizio Azure Esplora dati. Se il tempo dei criteri di invio in batch è di 20 secondi e il connettore Sink Kafka scarica 50 MB in un periodo di 20 secondi, il servizio inserisce un batch di 50 MB.
- È possibile ridimensionare aggiungendo istanze e partizioni Kafka. Aumentare
tasks.max
al numero di partizioni. Creare una partizione se sono presenti dati sufficienti per produrre un BLOB le dimensioni dell'impostazioneflush.size.bytes
. Se il BLOB è più piccolo, il batch viene elaborato quando raggiunge il limite di tempo, quindi la partizione non riceverà una velocità effettiva sufficiente. Un numero elevato di partizioni significa un sovraccarico di elaborazione maggiore.
Contenuti correlati
- Altre informazioni sull'architettura big data.
- Informazioni su come inserire dati di esempio formattati JSON in Azure Esplora dati.
- Per altri lab Kafka: