Pozyskiwanie danych z platformy Apache Kafka do usługi Azure Data Explorer
Apache Kafka to rozproszona platforma przesyłania strumieniowego do tworzenia potoków danych przesyłanych strumieniowo w czasie rzeczywistym, które niezawodnie przenoszą dane między systemami lub aplikacjami. Kafka Connect to narzędzie do skalowalnego i niezawodnego przesyłania strumieniowego danych między platformą Apache Kafka a innymi systemami danych. Ujście platformy Kafka Kusto służy jako łącznik z platformy Kafka i nie wymaga użycia kodu. Pobierz plik jar łącznika ujścia z repozytorium Git lub centrum łącznika Confluent.
W tym artykule pokazano, jak pozyskiwać dane za pomocą platformy Kafka przy użyciu samodzielnej konfiguracji platformy Docker w celu uproszczenia konfiguracji klastra platformy Kafka i klastra łącznika platformy Kafka.
Aby uzyskać więcej informacji, zobacz repozytorium Git łącznika i specyfikę wersji.
Wymagania wstępne
- Subskrypcja platformy Azure. Utwórz bezpłatne konto platformy Azure.
- Klaster i baza danych platformy Azure Data Explorer z domyślnymi zasadami pamięci podręcznej i przechowywania lubbazą danych KQL w usłudze Microsoft Fabric.
- Interfejs wiersza polecenia platformy Azure.
- Docker i Docker Compose.
Tworzenie jednostki usługi Microsoft Entra
Jednostkę usługi Microsoft Entra można utworzyć za pomocą Azure Portal lub programowo, jak w poniższym przykładzie.
Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.
Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.
az login
Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z zwróconych danych JSON skopiuj wartości
appId
,password
itenant
do użycia w przyszłości.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Utworzono aplikację Microsoft Entra i jednostkę usługi.
Tworzenie tabeli docelowej
W środowisku zapytania utwórz tabelę o nazwie
Storms
przy użyciu następującego polecenia:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Utwórz odpowiednie mapowanie
Storms_CSV_Mapping
tabeli dla pozyskanych danych przy użyciu następującego polecenia:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Utwórz zasady przetwarzania wsadowego pozyskiwania danych w tabeli w celu skonfigurowania opóźnienia pozyskiwania w kolejce.
Porada
Zasady przetwarzania wsadowego pozyskiwania są optymalizatorem wydajności i zawierają trzy parametry. Pierwszy warunek spełnia wymagania dotyczące pozyskiwania danych do tabeli azure Data Explorer.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Użyj jednostki usługi w sekcji Tworzenie jednostki usługi Microsoft Entra, aby udzielić uprawnień do pracy z bazą danych.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Uruchamianie laboratorium
Poniższe laboratorium zostało zaprojektowane tak, aby umożliwić rozpoczęcie tworzenia danych, konfigurowania łącznika platformy Kafka i przesyłania strumieniowego tych danych do usługi Azure Data Explorer za pomocą łącznika. Następnie możesz przyjrzeć się pozyskanym danym.
Klonowanie repozytorium git
Sklonuj repozytorium git laboratorium.
Utwórz katalog lokalny na maszynie.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Sklonuj repozytorium.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Zawartość sklonowanego repozytorium
Uruchom następujące polecenie, aby wyświetlić listę zawartości sklonowanego repozytorium:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Ten wynik wyszukiwania to:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Przejrzyj pliki w sklonowanym repozytorium
W poniższych sekcjach opisano ważne części plików w drzewie plików powyżej.
adx-sink-config.json
Ten plik zawiera plik właściwości ujścia Kusto, w którym zostaną zaktualizowane szczegółowe informacje o konfiguracji:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Zastąp wartości następujących atrybutów zgodnie z konfiguracją usługi Azure Data Explorer: , , , (nazwa bazy danych), kusto.ingestion.url
i kusto.query.url
. kusto.tables.topics.mapping
aad.auth.appkey
aad.auth.appid
aad.auth.authority
Łącznik — Dockerfile
Ten plik zawiera polecenia służące do generowania obrazu platformy Docker dla wystąpienia łącznika. Obejmuje on pobieranie łącznika z katalogu wydania repozytorium Git.
Katalog storm-events-producer
Ten katalog zawiera program Go, który odczytuje lokalny plik "StormEvents.csv" i publikuje dane w temacie platformy Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Uruchamianie kontenerów
W terminalu uruchom kontenery:
docker-compose up
Aplikacja producenta rozpocznie wysyłanie zdarzeń do tematu
storm-events
. Powinny zostać wyświetlone dzienniki podobne do następujących dzienników:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Aby sprawdzić dzienniki, uruchom następujące polecenie w osobnym terminalu:
docker-compose logs -f | grep kusto-connect
Uruchamianie łącznika
Użyj wywołania REST platformy Kafka Connect, aby uruchomić łącznik.
W osobnym terminalu uruchom zadanie ujścia za pomocą następującego polecenia:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Aby sprawdzić stan, uruchom następujące polecenie w osobnym terminalu:
curl http://localhost:8083/connectors/storm/status
Łącznik rozpocznie kolejkowanie procesów pozyskiwania do platformy Azure Data Explorer.
Uwaga
Jeśli masz problemy z łącznikiem dzienników, utwórz problem.
Wykonywanie zapytań i przeglądanie danych
Potwierdzanie pozyskiwania danych
Poczekaj na nadejście danych do
Storms
tabeli. Aby potwierdzić transfer danych, sprawdź liczbę wierszy:Storms | count
Upewnij się, że w procesie pozyskiwania nie występują błędy:
.show ingestion failures
Po wyświetleniu danych wypróbuj kilka zapytań.
Wykonywanie zapytań na danych
Aby wyświetlić wszystkie rekordy, uruchom następujące zapytanie:
Storms
Użyj
where
funkcji iproject
, aby filtrować określone dane:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
summarize
Użyj operatora:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Aby uzyskać więcej przykładów zapytań i wskazówek, zobacz Artykuł Write query in KQL and język zapytań Kusto documentation (Pisanie zapytań w języku KQL i język zapytań Kusto dokumentacji).
Reset
Aby zresetować, wykonaj następujące czynności:
- Zatrzymaj kontenery (
docker-compose down -v
) - Usuń (
drop table Storms
) - Utwórz ponownie tabelę
Storms
- Ponowne tworzenie mapowania tabeli
- Ponowne uruchamianie kontenerów (
docker-compose up
)
Czyszczenie zasobów
Aby usunąć zasoby usługi Azure Data Explorer, użyj polecenia az cluster delete lub az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Dostrajanie łącznika ujścia platformy Kafka
Dostosuj łącznik ujścia platformy Kafka do pracy z zasadami dzielenia na partie pozyskiwania:
- Dostosuj limit rozmiaru ujścia
flush.size.bytes
platformy Kafka rozpoczynający się od 1 MB, zwiększając się o wzrost o 10 MB lub 100 MB. - W przypadku korzystania z ujścia platformy Kafka dane są agregowane dwa razy. Dane po stronie łącznika są agregowane zgodnie z ustawieniami opróżniania, a po stronie usługi Azure Data Explorer zgodnie z zasadami przetwarzania wsadowego. Jeśli czas przetwarzania wsadowego jest zbyt krótki i nie można pozyskać danych zarówno przez łącznik, jak i usługę, należy zwiększyć czas przetwarzania wsadowego. Ustaw rozmiar partii na 1 GB i zwiększ lub zmniejsz o 100 MB przyrostów zgodnie z potrzebami. Jeśli na przykład rozmiar opróżnienia wynosi 1 MB, a rozmiar zasad przetwarzania wsadowego wynosi 100 MB, po zagregowaniu partii 100 MB przez łącznik ujścia platformy Kafka zostanie pozyskany przez usługę Azure Data Explorer. Jeśli czas zasad przetwarzania wsadowego wynosi 20 sekund, a łącznik ujścia platformy Kafka opróżnia 50 MB w ciągu 20 sekund — usługa pozyskuje partię 50 MB.
- Można skalować, dodając wystąpienia i partycje platformy Kafka. Zwiększ
tasks.max
liczbę partycji. Utwórz partycję, jeśli masz wystarczającą ilość danych, aby utworzyć obiekt blob rozmiarflush.size.bytes
ustawienia. Jeśli obiekt blob jest mniejszy, partia jest przetwarzana po osiągnięciu limitu czasu, więc partycja nie otrzyma wystarczającej przepływności. Duża liczba partycji oznacza większe obciążenie związane z przetwarzaniem.
Zawartość pokrewna
- Dowiedz się więcej o architekturze danych big data.
- Dowiedz się, jak pozyskiwać przykładowe dane w formacie JSON do usługi Azure Data Explorer.
- Aby uzyskać dodatkowe laboratoria platformy Kafka:
- Praktyczne laboratorium na potrzeby pozyskiwania z platformy Kafka w chmurze Confluent w trybie rozproszonym
- Praktyczne laboratorium na potrzeby pozyskiwania danych z usługi HDInsight Kafka w trybie rozproszonym
- Praktyczne laboratorium na potrzeby pozyskiwania z platformy Confluent IaaS Kafka w usłudze AKS w trybie rozproszonym