Pozyskiwanie danych z platformy Apache Kafka do usługi Azure Data Explorer

Apache Kafka to rozproszona platforma przesyłania strumieniowego do tworzenia potoków danych przesyłanych strumieniowo w czasie rzeczywistym, które niezawodnie przenoszą dane między systemami lub aplikacjami. Kafka Connect to narzędzie do skalowalnego i niezawodnego przesyłania strumieniowego danych między platformą Apache Kafka a innymi systemami danych. Ujście platformy Kafka Kusto służy jako łącznik z platformy Kafka i nie wymaga użycia kodu. Pobierz plik jar łącznika ujścia z repozytorium Git lub centrum łącznika Confluent.

W tym artykule pokazano, jak pozyskiwać dane za pomocą platformy Kafka przy użyciu samodzielnej konfiguracji platformy Docker w celu uproszczenia konfiguracji klastra platformy Kafka i klastra łącznika platformy Kafka.

Aby uzyskać więcej informacji, zobacz repozytorium Git łącznika i specyfikę wersji.

Wymagania wstępne

Tworzenie jednostki usługi Microsoft Entra

Jednostkę usługi Microsoft Entra można utworzyć za pomocą Azure Portal lub programowo, jak w poniższym przykładzie.

Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.

  1. Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.

    az login
    
  2. Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z zwróconych danych JSON skopiuj wartości appId, passwordi tenant do użycia w przyszłości.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Utworzono aplikację Microsoft Entra i jednostkę usługi.

Tworzenie tabeli docelowej

  1. W środowisku zapytania utwórz tabelę o nazwie Storms przy użyciu następującego polecenia:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Utwórz odpowiednie mapowanie Storms_CSV_Mapping tabeli dla pozyskanych danych przy użyciu następującego polecenia:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Utwórz zasady przetwarzania wsadowego pozyskiwania danych w tabeli w celu skonfigurowania opóźnienia pozyskiwania w kolejce.

    Porada

    Zasady przetwarzania wsadowego pozyskiwania są optymalizatorem wydajności i zawierają trzy parametry. Pierwszy warunek spełnia wymagania dotyczące pozyskiwania danych do tabeli azure Data Explorer.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Użyj jednostki usługi w sekcji Tworzenie jednostki usługi Microsoft Entra, aby udzielić uprawnień do pracy z bazą danych.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Uruchamianie laboratorium

Poniższe laboratorium zostało zaprojektowane tak, aby umożliwić rozpoczęcie tworzenia danych, konfigurowania łącznika platformy Kafka i przesyłania strumieniowego tych danych do usługi Azure Data Explorer za pomocą łącznika. Następnie możesz przyjrzeć się pozyskanym danym.

Klonowanie repozytorium git

Sklonuj repozytorium git laboratorium.

  1. Utwórz katalog lokalny na maszynie.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Sklonuj repozytorium.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Zawartość sklonowanego repozytorium

Uruchom następujące polecenie, aby wyświetlić listę zawartości sklonowanego repozytorium:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Ten wynik wyszukiwania to:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Przejrzyj pliki w sklonowanym repozytorium

W poniższych sekcjach opisano ważne części plików w drzewie plików powyżej.

adx-sink-config.json

Ten plik zawiera plik właściwości ujścia Kusto, w którym zostaną zaktualizowane szczegółowe informacje o konfiguracji:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Zastąp wartości następujących atrybutów zgodnie z konfiguracją usługi Azure Data Explorer: , , , (nazwa bazy danych), kusto.ingestion.urli kusto.query.url. kusto.tables.topics.mappingaad.auth.appkeyaad.auth.appidaad.auth.authority

Łącznik — Dockerfile

Ten plik zawiera polecenia służące do generowania obrazu platformy Docker dla wystąpienia łącznika. Obejmuje on pobieranie łącznika z katalogu wydania repozytorium Git.

Katalog storm-events-producer

Ten katalog zawiera program Go, który odczytuje lokalny plik "StormEvents.csv" i publikuje dane w temacie platformy Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Uruchamianie kontenerów

  1. W terminalu uruchom kontenery:

    docker-compose up
    

    Aplikacja producenta rozpocznie wysyłanie zdarzeń do tematu storm-events . Powinny zostać wyświetlone dzienniki podobne do następujących dzienników:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Aby sprawdzić dzienniki, uruchom następujące polecenie w osobnym terminalu:

    docker-compose logs -f | grep kusto-connect
    

Uruchamianie łącznika

Użyj wywołania REST platformy Kafka Connect, aby uruchomić łącznik.

  1. W osobnym terminalu uruchom zadanie ujścia za pomocą następującego polecenia:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Aby sprawdzić stan, uruchom następujące polecenie w osobnym terminalu:

    curl http://localhost:8083/connectors/storm/status
    

Łącznik rozpocznie kolejkowanie procesów pozyskiwania do platformy Azure Data Explorer.

Uwaga

Jeśli masz problemy z łącznikiem dzienników, utwórz problem.

Wykonywanie zapytań i przeglądanie danych

Potwierdzanie pozyskiwania danych

  1. Poczekaj na nadejście danych do Storms tabeli. Aby potwierdzić transfer danych, sprawdź liczbę wierszy:

    Storms | count
    
  2. Upewnij się, że w procesie pozyskiwania nie występują błędy:

    .show ingestion failures
    

    Po wyświetleniu danych wypróbuj kilka zapytań.

Wykonywanie zapytań na danych

  1. Aby wyświetlić wszystkie rekordy, uruchom następujące zapytanie:

    Storms
    
  2. Użyj where funkcji i project , aby filtrować określone dane:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. summarize Użyj operatora:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Zrzut ekranu przedstawiający wyniki wykresu kolumnowego zapytania platformy Kafka w usłudze Azure Data Explorer.

Aby uzyskać więcej przykładów zapytań i wskazówek, zobacz Artykuł Write query in KQL and język zapytań Kusto documentation (Pisanie zapytań w języku KQL i język zapytań Kusto dokumentacji).

Reset

Aby zresetować, wykonaj następujące czynności:

  1. Zatrzymaj kontenery (docker-compose down -v)
  2. Usuń (drop table Storms)
  3. Utwórz ponownie tabelę Storms
  4. Ponowne tworzenie mapowania tabeli
  5. Ponowne uruchamianie kontenerów (docker-compose up)

Czyszczenie zasobów

Aby usunąć zasoby usługi Azure Data Explorer, użyj polecenia az cluster delete lub az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Dostrajanie łącznika ujścia platformy Kafka

Dostosuj łącznik ujścia platformy Kafka do pracy z zasadami dzielenia na partie pozyskiwania:

  • Dostosuj limit rozmiaru ujścia flush.size.bytes platformy Kafka rozpoczynający się od 1 MB, zwiększając się o wzrost o 10 MB lub 100 MB.
  • W przypadku korzystania z ujścia platformy Kafka dane są agregowane dwa razy. Dane po stronie łącznika są agregowane zgodnie z ustawieniami opróżniania, a po stronie usługi Azure Data Explorer zgodnie z zasadami przetwarzania wsadowego. Jeśli czas przetwarzania wsadowego jest zbyt krótki i nie można pozyskać danych zarówno przez łącznik, jak i usługę, należy zwiększyć czas przetwarzania wsadowego. Ustaw rozmiar partii na 1 GB i zwiększ lub zmniejsz o 100 MB przyrostów zgodnie z potrzebami. Jeśli na przykład rozmiar opróżnienia wynosi 1 MB, a rozmiar zasad przetwarzania wsadowego wynosi 100 MB, po zagregowaniu partii 100 MB przez łącznik ujścia platformy Kafka zostanie pozyskany przez usługę Azure Data Explorer. Jeśli czas zasad przetwarzania wsadowego wynosi 20 sekund, a łącznik ujścia platformy Kafka opróżnia 50 MB w ciągu 20 sekund — usługa pozyskuje partię 50 MB.
  • Można skalować, dodając wystąpienia i partycje platformy Kafka. Zwiększ tasks.max liczbę partycji. Utwórz partycję, jeśli masz wystarczającą ilość danych, aby utworzyć obiekt blob rozmiar flush.size.bytes ustawienia. Jeśli obiekt blob jest mniejszy, partia jest przetwarzana po osiągnięciu limitu czasu, więc partycja nie otrzyma wystarczającej przepływności. Duża liczba partycji oznacza większe obciążenie związane z przetwarzaniem.