Получение данных из Apache Kafka в Azure Data Explorer

Apache Kafka — это распределенная платформа потоковой передачи для создания конвейеров потоковой передачи данных в режиме реального времени, которые надежно перемещают данные между системами или приложениями. Kafka Connect — это инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Kusto Kafka Sink служит соединителем из Kafka и не требует использования кода. Скачайте JAR-файл соединителя приемника из репозитория Git или концентратора соединителя Confluent.

В этой статье показано, как принимать данные с помощью Kafka с помощью автономной установки Docker, чтобы упростить настройку кластера Kafka и кластера соединителя Kafka.

Дополнительные сведения см. в разделе Репозиторий Git и Сведения о версиях для соединителя.

Предварительные требования

Создание субъекта-службы Microsoft Entra

Субъект-службу Microsoft Entra можно создать с помощью портал Azure или программно, как показано в следующем примере.

Этот субъект-служба будет идентификатором, используемым соединителем для записи данных таблицы в Kusto. Позже вы предоставите этому субъекту-службе разрешения на доступ к ресурсам Kusto.

  1. Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.

    az login
    
  2. Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Создайте субъект-службу. В этом примере принципал службы называется my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Из возвращенных данных JSON скопируйте appId, passwordи tenant для использования в будущем.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Вы создали приложение Microsoft Entra и субъект-службу.

Создание целевой таблицы

  1. В среде запросов создайте таблицу с именем Storms с помощью следующей команды:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Создайте соответствующее сопоставление таблицы Storms_CSV_Mapping для загруженных данных с помощью следующей команды:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Создайте политику пакетной обработки приема в таблице для настраиваемой задержки приема в очереди.

    Совет

    Политика пакетной обработки приема — это оптимизатор производительности, включающий три параметра. Первое удовлетворенное условие запускает прием данных в таблицу Azure Data Explorer.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Используйте субъект-службу из раздела Создание Microsoft Entra субъекта-службы, чтобы предоставить разрешение на работу с базой данных.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Запуск лаборатории

Следующая лабораторная работа предназначена для того, чтобы дать вам опыт создания данных, настройки соединителя Kafka и потоковой передачи этих данных в Azure Data Explorer с помощью соединителя. Затем вы можете просмотреть полученные данные.

Клонировать репозиторий git

Клонировать репозиторий git лаборатории.

  1. Создайте локальный каталог на вашем компьютере.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Клонирование репозитория.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Содержание клонированного репозитория

Выполните следующую команду, чтобы вывести список содержимого клонированного репозитория:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Вот результат этого поиска:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Просмотрите файлы в клонированном репозитория

В следующих разделах объясняются важные части файлов в дереве файлов выше.

adx-sink-config.json

Этот файл содержит файл свойств приемника Kusto, в котором вы обновляете определенные детали конфигурации:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Замените значения следующих атрибутов в соответствии с настройкой Azure Data Explorer: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (имя базы данных) kusto.ingestion.url и kusto.query.url.

Соединитель — Dockerfile

В этом файле есть команды для создания образа докера для экземпляра соединителя. Он включает загрузку соединителя из каталога выпуска репозитория git.

Каталог Storm-events-producer

В этом каталоге находится программа Go, которая считывает локальный файл StormEvents.csv и публикует данные в теме Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Запуск контейнеров

  1. В терминале запустите контейнеры:

    docker-compose up
    

    Приложение-производитель начнет отправлять события в тему storm-events. Вы должны увидеть журналы, похожие на следующие журналы:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Чтобы проверить журналы, выполните следующую команду в отдельном терминале:

    docker-compose logs -f | grep kusto-connect
    

Запуск соединителя

Используйте REST-вызов Kafka Connect для запуска соединителя.

  1. В отдельном терминале запустите задачу приемника с помощью следующей команды:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Чтобы проверить статус, запустите следующую команду в отдельном терминале:

    curl http://localhost:8083/connectors/storm/status
    

Соединитель начнет ставить в очередь процессы приема в Azure Data Explorer.

Примечание

Если у вас возникли проблемы с подключением к соединителю создайте вопрос.

Запросить и просмотреть данные

Подтвердите получение данных

  1. Подождите, пока данные поступят в таблицу Storms. Чтобы подтвердить передачу данных, проверьте количество строк:

    Storms | count
    
  2. Убедитесь, что в процессе приема нет сбоев:

    .show ingestion failures
    

    Как только вы увидите данные, попробуйте выполнить несколько запросов.

Запрос данных

  1. Чтобы увидеть все записи, выполните следующий запрос:

    Storms
    
  2. Используйте where и project для фильтрации определенных данных:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Используйте оператор summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Снимок экрана: гистограмма запросов Kafka в Data Explorer Azure.

Дополнительные примеры запросов и рекомендации см. в статье Написание запросов в KQL и язык запросов Kusto документации.

Reset

Для сброса выполните следующие действия.

  1. Остановка контейнеров (docker-compose down -v)
  2. Удалить (drop table Storms)
  3. Повторное создание таблицы Storms
  4. Повторное создание сопоставления таблиц
  5. Перезапуск контейнеров (docker-compose up)

Очистка ресурсов

Чтобы удалить ресурсы Azure Data Explorer, используйте az cluster delete или az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Настройка соединителя Kafka Sink

Настройте соединитель Kafka Sink для работы с политикой пакетной обработки приема данных:

  • Настройте предельный размер flush.size.bytes для Kafka Sink начиная с 1 МБ с шагом приращения 10 МБ или 100 МБ.
  • При использовании Kafka Sink данные агрегируются дважды. Данные на стороне соединителя агрегируются в соответствии с параметрами сброса, а на стороне службы Azure Data Explorer — в соответствии с политикой пакетной обработки. Если время пакетной обработки слишком мало и данные не могут быть приняты ни соединителем, ни службой, необходимо увеличить время пакетной обработки. Установите размер пакетной обработки в 1 ГБ. При необходимости его можно увеличить или уменьшить с шагом 100 МБ. Например, если размер очистки равен 1 МБ, а размер политики пакетной обработки — 100 МБ, после агрегирования пакета размером 100 МБ соединителем приемника Kafka пакет размером 100 МБ будет приниматься службой Azure Data Explorer. Если время политики пакетной обработки составляет 20 секунд, а соединитель Kafka Sink сбрасывает 50 МБ за 20-секундный период, служба примет пакет размером 50 МБ.
  • Вы можете изменять масштаб, добавляя экземпляры и разделы Kafka. Увеличьте значение tasks.max до нужного числа разделов. Создайте раздел, если у вас достаточно данных для создания большого двоичного объекта, размер которого равен значению параметра flush.size.bytes. Если большой двоичный объект меньше, пакет обрабатывается по достижении предельного времени, поэтому секция не получит достаточной пропускной способности. Большое количество разделов приводит к увеличению времени на обработку.