Compartilhar via


Ingerir dados com o Apache Flink para o Azure Data Explorer

O Apache Flink é uma estrutura e um mecanismo de processamento distribuído para cálculos com estado em fluxos de dados não associados e limitados.

O conector do Flink é um projeto de código aberto que pode ser executado em qualquer cluster do Flink. Ele implementa o coletor de dados para mover dados de um cluster Flink. Usando o conector para o Apache Flink, você pode criar aplicativos rápidos e escalonáveis, direcionando os dados controlados por cenários, como aprendizado de máquina (ML), extração, transformação e carregamento (ETL) e análise de logs.

Neste artigo, você aprenderá a usar o conector do Flink para enviar dados do Flink para sua tabela. Você criará um mapeamento de tabela e dados, direcionará o Flink para enviar dados para a tabela e, em seguida, validará os resultados.

Pré-requisitos

Para projetos do Flink que usam o Maven para gerenciar dependências, integre o Flink Connector Core Sink para o Azure Data Explorer adicionando-o como uma dependência:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Para projetos que não usam o Maven para gerenciar dependências, clone o repositório do Azure Data Explorer Connector para Apache Flink e compile-o localmente. Essa abordagem permite a você adicionar manualmente o conector ao repositório local do Maven usando o comando mvn clean install -DskipTests.

Você pode autenticar do Flink para usar um aplicativo do Microsoft Entra ID ou uma identidade gerenciada.

Essa entidade de serviço será a identidade utilizada pelo conector para gravar na tabela do Kusto. Posteriormente, você vai conceder permissões para essa entidade de serviço acessar recursos do Kusto.

  1. Inicie sessão na sua assinatura do Azure com a CLI do Azure. Em seguida, autentique no navegador.

    az login
    
  2. Escolha a assinatura para hospedar a entidade de segurança. Essa etapa é necessária quando você tem várias assinaturas.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Crie a entidade de serviço. Neste exemplo, a entidade de serviço é chamada my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. A partir dos dados JSON retornados, copie o appId, password e tenant para uso futuro.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Você criou o aplicativo do Microsoft Entra e a entidade de serviço.

  1. Conceda ao aplicativo permissões de usuário no banco de dados:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Conceda ao aplicativo permissões de ingestor ou administrador na tabela. As permissões necessárias dependem do método de gravação de dados escolhido. As permissões de ingestor são suficientes para o SinkV2, mas o WriteAndSink requer permissões de administrador.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Para obter mais informações sobre autorização, consulte Autorização com controle de acesso baseado em função do Kusto.

Para gravar dados do Flink:

  1. Importe as opções necessárias:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Use o aplicativo ou a identidade gerenciada para autenticação.

    Para autenticação de aplicativo:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Para autenticação de identidade gerenciada:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configure os parâmetros do coletor, como banco de dados e tabela:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    É possível adicionar mais opções, conforme descrito na seguinte tabela:

    Opção Descrição Valor Padrão
    IngestionMappingRef Faz referência a um mapeamento de ingestão existente.
    FlushImmediately Libera os dados imediatamente e pode causar problemas de desempenho. Esse método não é recomendado.
    BatchIntervalMs Controla a frequência com que os dados são liberados. 30 segundos
    BatchSize Define o tamanho do lote para armazenar em buffer os registros antes da liberação. 1.000 registros
    ClientBatchSizeLimit Especifica o tamanho em MB dos dados agregados antes da ingestão. 300 MB
    PollForIngestionStatus Se verdadeiro, o conector sonda o status de ingestão após a liberação de dados. false
    DeliveryGuarantee Determina a semântica da garantia de entrega. Para obter exatamente uma semântica, use WriteAheadSink. AT_LEAST_ONCE
  2. Grave dados de streaming com um dos seguintes métodos:

    • SinkV2: esta é uma opção sem estado que libera dados no ponto de verificação, garantindo a consistência pelo menos uma vez. Recomendamos esta opção para ingestão de dados de alto volume.
    • WriteAheadSink: este método emite dados para um KustoSink. É integrado com o sistema de ponto de verificação do Flink e oferece garantias exatamente únicas. Os dados são armazenados em um AbstractStateBackend e confirmados somente depois que um ponto de verificação é concluído.

    O exemplo a seguir usa SinkV2. Para usar WriteAheadSink, use o método buildWriteAheadSink em vez de build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

O código concluído deverá ser semelhante a:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Verifique se os dados estão sendo ingeridos

Depois que a conexão é configurada, os dados são enviados para sua tabela. Você pode verificar se os dados são ingeridos executando uma consulta KQL.

  1. Execute a seguinte consulta para verificar se os dados são ingeridos na tabela:

    <MyTable>
    | count
    
  2. Execute a consulta a seguir para exibir os dados:

    <MyTable>
    | take 100