Share via


Ingérer des données avec Apache Flink dans Azure Data Explorer

Apache Flink est un framework et un moteur de traitement distribué pour les calculs avec état sur des flux de données non liés et délimités.

Le connecteur Flink est un projet open source qui peut s’exécuter sur n’importe quel cluster Flink. Il implémente le récepteur de données pour déplacer des données à partir d’un cluster Flink. À l’aide du connecteur à Apache Flink, vous pouvez créer des applications rapides et évolutives ciblant des scénarios basés sur les données, par exemple, le Machine Learning (ML), l’extraction-transformation-chargement (ETL) et Log Analytics.

Dans cet article, vous allez apprendre à utiliser le connecteur Flink pour envoyer des données de Flink à votre table. Vous créez une table et un mappage de données, dirigez Flink pour envoyer des données dans la table, puis validez les résultats.

Prérequis

Pour les projets Flink qui utilisent Maven pour gérer les dépendances, intégrez le récepteur principal du connecteur Flink pour Azure Data Explorer en l’ajoutant en tant que dépendance :

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Pour les projets qui n’utilisent pas Maven pour gérer les dépendances, clonez le référentiel pour le connecteur Azure Data Explorer pour Apache Flink et générez-le localement. Cette approche vous permet d’ajouter manuellement le connecteur à votre dépôt Maven local à l’aide de la commande mvn clean install -DskipTests.

Vous pouvez vous authentifier à partir de Flink à l’aide d’une application Microsoft Entra ID ou d’une identité managée.

Ce principal de service sera l’identité utilisée par le connecteur pour écrire les données de votre table dans Kusto. Vous accorderez ultérieurement des autorisations pour que ce principal de service accède aux ressources Kusto.

  1. Connectez-vous à votre abonnement Azure via Azure CLI. Authentifiez-vous ensuite dans le navigateur.

    az login
    
  2. Choisissez l’abonnement pour héberger le principal. Cette étape est nécessaire quand vous avez plusieurs abonnements.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Créez le principal de service. Dans cet exemple, le principal de service est appelé my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. À partir des données JSON retournées, copiez , appIdpasswordet tenant pour une utilisation ultérieure.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Vous avez créé votre application Microsoft Entra et votre principal de service.

  1. Accordez à l’utilisateur de l’application des autorisations sur la base de données :

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Accordez à l’application des autorisations d’ingestion ou d’administrateur sur la table. Les autorisations requises dépendent de la méthode d’écriture de données choisie. Les autorisations d’ingestion sont suffisantes pour SinkV2, tandis que WriteAndSink nécessite des autorisations d’administrateur.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Pour plus d’informations sur l’autorisation, consultez Contrôle d’accès en fonction du rôle Kusto.

Pour écrire des données à partir de Flink :

  1. Importez les options requises :

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Utilisez votre application ou votre identité managée pour vous authentifier.

    Pour l’authentification de l’application :

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Pour l’authentification d’identité managée :

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configurez les paramètres du récepteur tels que la base de données et la table :

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Vous pouvez ajouter d’autres options, comme décrit dans le tableau suivant :

    Option Description Valeur par défaut
    IngestionMappingRef Fait référence à un mappage d’ingestion existant.
    Vidage immédiat Vide immédiatement les données et peut entraîner des problèmes de performances. Cette méthode n’est pas recommandée.
    BatchIntervalMs Contrôle la fréquence à laquelle les données sont vidées. 30 secondes
    BatchSize Définit la taille du lot pour les enregistrements de mise en mémoire tampon avant le vidage. 1 000 enregistrements
    ClientBatchSizeLimit Spécifie la taille en Mo des données agrégées avant l’ingestion. 300 Mo
    PollForIngestionStatus Si la valeur est true, le connecteur interroge l’ingestion status après le vidage des données. false
    DeliveryGuarantee Détermine la sémantique de la garantie de remise. Pour obtenir exactement une seule sémantique, utilisez WriteAheadSink. AT_LEAST_ONCE
  2. Écrire des données de streaming avec l’une des méthodes suivantes :

    • SinkV2 : il s’agit d’une option sans état qui vide les données sur le point de contrôle, garantissant au moins une fois la cohérence. Nous vous recommandons cette option pour l’ingestion de données à volume élevé.
    • WriteAheadSink : cette méthode émet des données vers un KustoSink. Il est intégré au système de contrôle de Flink et offre exactement des garanties une seule fois. Les données sont stockées dans un AbstractStateBackend et validées uniquement une fois qu’un point de contrôle est terminé.

    L’exemple suivant utilise SinkV2. Pour utiliser WriteAheadSink, utilisez la buildWriteAheadSink méthode au lieu de build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Le code complet doit ressembler à ceci :

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Vérifier que les données sont ingérées

Une fois la connexion configurée, les données sont envoyées à votre table. Vous pouvez vérifier que les données sont ingérées en exécutant une requête KQL.

  1. Exécutez la requête suivante pour vérifier que les données sont ingérées dans la table :

    <MyTable>
    | count
    
  2. Exécutez la requête suivante pour afficher les données :

    <MyTable>
    | take 100