Ingesta de datos de Apache Kafka en Azure Data Explorer

Apache Kafka es una plataforma de streaming distribuida para crear canalizaciones de datos de streaming en tiempo real que mueven datos de forma confiable entre sistemas o aplicaciones. Kafka Connect es una herramienta para realizar streaming de datos de forma escalable y confiable entre Apache Kafka y otros sistemas de datos. El receptor de Kafka de Kusto actúa como conector de Kafka y no requiere el uso de código. Descargue el archivo jar del conector receptor desde el repositorio de Git o confluent Connector Hub.

En este artículo se muestra cómo ingerir datos con Kafka mediante una configuración independiente de Docker para simplificar la configuración del clúster de Kafka y del conector de Kafka.

Para más información, consulte el repositorio Git del conector y las especificaciones de la versión.

Requisitos previos

Creación de una entidad de servicio de Microsoft Entra

La entidad de servicio Microsoft Entra se puede crear mediante el Azure Portal o mediante programación, como en el ejemplo siguiente.

Esta entidad de servicio será la identidad que usa el conector para escribir datos de la tabla en Kusto. Más adelante concederá permisos para que esta entidad de servicio acceda a los recursos de Kusto.

  1. Inicie sesión en su suscripción de Azure a través de la CLI de Azure. A continuación, realice la autenticación en el explorador.

    az login
    
  2. Elija la suscripción para hospedar la entidad de seguridad. Este paso es necesario si tiene varias suscripciones.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Cree la entidad de servicio. En este ejemplo, la entidad de servicio se llama my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. En los datos JSON devueltos, copie , appIdpasswordy tenant para su uso futuro.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Ha creado una aplicación de Microsoft Entra y una entidad de servicio.

Creación de una tabla de destino

  1. En el entorno de consulta, cree una tabla denominada Storms con el siguiente comando:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Cree la asignación de tabla Storms_CSV_Mapping correspondiente para los datos ingeridos con el siguiente comando:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Cree una directiva de procesamiento por lotes de ingesta en la tabla para la latencia de ingesta en cola configurable.

    Sugerencia

    La directiva de procesamiento por lotes de ingesta es un optimizador de rendimiento e incluye tres parámetros. Si se cumple la primera condición, se desencadena la ingesta en la tabla de Azure Data Explorer.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Use la entidad de servicio de Creación de una entidad de servicio de Microsoft Entra para conceder permiso para trabajar con la base de datos.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Ejecución del laboratorio

El siguiente laboratorio está diseñado para ofrecerle la experiencia de empezar a crear datos, configurar el conector de Kafka y transmitir estos datos a Azure Data Explorer con el conector. Después, puede examinar los datos ingeridos.

Clonación del repositorio Git

Clone el repositorio Git del laboratorio.

  1. Cree un directorio local en el equipo.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Clone el repositorio.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Contenido del repositorio clonado

Ejecute el siguiente comando para enumerar el contenido del repositorio clonado:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

El resultado de esta búsqueda es:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Revisión de los archivos del repositorio clonado

En las secciones siguientes se explican las partes importantes de los archivos del árbol de archivos anterior.

adx-sink-config.json

Este archivo contiene el archivo de propiedades del receptor de Kusto en el que se van a actualizar los detalles de configuración específicos:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Reemplace los valores de los siguientes atributos según su configuración de Azure Data Explorer: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (nombre de la base de datos), kusto.ingestion.url y kusto.query.url.

Connector/Dockerfile

Este archivo tiene los comandos para generar la imagen de Docker para la instancia del conector. Incluye la descarga del conector desde el directorio de versión del repositorio Git.

Directorio Storm-events-producer

Este directorio tiene un programa de Go que lee un archivo "StormEvents.csv" local y publica los datos en un tema de Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Inicio de los contenedores

  1. Inicie los contenedores en un terminal:

    docker-compose up
    

    La aplicación del productor comenzará a enviar eventos al tema storm-events. Debería ver registros similares a los siguientes:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Para comprobar los registros, ejecute el siguiente comando en un terminal independiente:

    docker-compose logs -f | grep kusto-connect
    

Inicio del conector

Use una llamada REST de Kafka Connect para iniciar el conector.

  1. En un terminal independiente, inicie la tarea del receptor con el siguiente comando:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Para comprobar el estado, ejecute el siguiente comando en un terminal independiente:

    curl http://localhost:8083/connectors/storm/status
    

El conector comenzará a poner en cola los procesos de ingesta en Azure Data Explorer.

Nota

Si tiene problemas con el conector de registro, cree una incidencia.

Consulta y revisión de los datos

Confirmación de la ingesta de datos

  1. Espere a que lleguen los datos a la tabla Storms. Para confirmar la transferencia de datos, compruebe el número de filas:

    Storms | count
    
  2. Confirme que no hay errores en el proceso de ingesta:

    .show ingestion failures
    

    Una vez que aparezcan los datos, pruebe a realizar algunas consultas.

Consultar los datos

  1. Para ver todos los registros, ejecute la siguiente consulta:

    Storms
    
  2. Use where y project para filtrar datos específicos:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Use el operador summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Captura de pantalla del gráfico de columnas de consulta de Kafka da como resultado Azure Data Explorer.

Para obtener más ejemplos de consultas e instrucciones, consulte Escritura de consultas en KQL y documentación de Lenguaje de consulta Kusto.

Reset

Para restablecer, realice los pasos siguientes:

  1. Detener los contenedores (docker-compose down -v)
  2. Eliminar (drop table Storms)
  3. Volver a crear la tabla Storms
  4. Volver a crear la asignación de tabla
  5. Reiniciar los contenedores (docker-compose up)

Limpieza de recursos

Para eliminar los recursos de Azure Data Explorer, use az cluster delete o az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Optimización del conector receptor de Kafka

Ajuste el conector receptor de Kafka para que funcione con la directiva de procesamiento por lotes de ingesta:

  • Ajuste el límite de tamaño flush.size.bytes del receptor de Kafka a partir de 1 MB, aumentando en incrementos de 10 MB o 100 MB.
  • Cuando se usa el receptor de Kafka, los datos se agregan dos veces. En el lado del conector, los datos se agregan según la configuración de vaciado y en el del servicio Azure Data Explorer según la directiva de procesamiento por lotes. Si el tiempo de procesamiento por lotes es demasiado corto y el conector y el servicio no pueden ingerir datos, se debe aumentar el tiempo de procesamiento por lotes. Establezca el tamaño del procesamiento por lotes en 1 GB y aumente o disminuya en incrementos de 100 MB según sea necesario. Por ejemplo, si el tamaño de vaciado es de 1 MB y el tamaño de la directiva de procesamiento por lotes es de 100 MB, después de que el conector receptor de Kafka agregue un lote de 100 MB, el servicio Azure Data Explorer ingerirá un lote de 100 MB. Si el tiempo de la directiva de procesamiento por lotes es de 20 segundos y el conector receptor de Kafka vacía 50 MB en un período de 20 segundos, el servicio ingerirá un lote de 50 MB.
  • Puede escalar agregando instancias y particiones de Kafka. Aumente tasks.max hasta alcanzar el número de particiones. Cree una partición si tiene suficientes datos para generar un blob del tamaño de la opción flush.size.bytes. Si el blob es más pequeño, el lote se procesa cuando alcanza el límite de tiempo, por lo que la partición no recibirá suficiente rendimiento. Un gran número de particiones significa una mayor sobrecarga de procesamiento.