Partage via


Ingérer des données avec Apache Flink dans Azure Data Explorer

Apache Flink est une infrastructure et un moteur de traitement distribué pour les calculs avec état sur des flux de données non liés et limités.

Le connecteur Flink est projet open source qui peut s'exécuter sur n'importe quel cluster Flink. Il implémente le récepteur de données pour déplacer des données à partir d'un cluster Flink. À l'aide du connecteur pour Apache Flink, vous pouvez rapidement créer des applications évolutives ciblant des scénarios axés sur les données, comme l'apprentissage automatique (ML), l'extraction, la transformation et le chargement (ETL) et l'analytique des journaux d'activité.

Dans cet article, vous découvrirez comment utiliser le connecteur Flink pour envoyer des données de Flink vers votre table. Vous créez une table et un mappage de données, puis vous aller demander à Flink d'envoyer des données dans la table, et enfin vous validez les résultats.

Prérequis

Pour les projets Flink qui utilisent Maven pour gérer les dépendances, intégrez le Flink Connector Core Sink For Azure Data Explorer en l'ajoutant comme dépendance :

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Pour les projets qui n'utilisent pas Maven pour gérer les dépendances, clonez le référentiel pour Azure Data Explorer Connecter or pour Apache Flink et générez-le localement. Cette approche vous permet d'ajouter manuellement le connecteur à votre référentiel Maven local à l'aide de la commande mvn clean install -DskipTests.

Vous pouvez vous authentifier à partir de Flink à l’aide d’une application Microsoft Entra ID ou d’une identité managée.

Ce principal de service sera l’identité utilisée par le connecteur pour écrire des données dans votre table dans Kusto. Vous accorderez ultérieurement des autorisations pour ce principal de service afin d’accéder à des ressources Kusto

  1. Connectez-vous à votre abonnement Azure via Azure CLI. Authentifiez-vous ensuite dans le navigateur.

    az login
    
  2. Choisissez l’abonnement pour héberger le principal. Cette étape est nécessaire quand vous avez plusieurs abonnements.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Créez le principal de service. Dans cet exemple, le principal de service est appelé my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. À partir des données JSON retournées, copiez le appId, password et tenant pour une utilisation ultérieure.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Vous avez créé votre application Microsoft Entra et votre principal de service.

  1. Accordez à l'utilisateur de l'application des autorisations sur la base de données :

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Accordez à l'ingestion ou à l'administrateur de l'application des autorisations sur la table. Les autorisations requises dépendent de la méthode d'écriture de données choisie. Les autorisations d'ingestion sont suffisantes pour SinkV2, tandis que WriteAndSink requiert des autorisations d'administrateur.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Pour plus d'informations sur les autorisations, consultez Contrôle d'accès basé sur les rôles de Kusto.

Pour écrire des données à partir de Flink :

  1. Importez les fonctions requises :

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Utilisez votre application ou identité managée pour l’authentification.

    Pour l’authentification de l’application :

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Pour l’authentification de l’identité managée :

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configurez les paramètres récepteurs comme la base de données et la table :

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Vous pouvez ajouter d'autres options, comme décrit dans le tableau suivant :

    Option Description Valeur par défaut
    IngestionMappingRef Fait référence à un mappage d'ingestion existant.
    Vider immédiatement Vide immédiatement les données et peut entraîner des problèmes de performances. Cette méthode n'est pas recommandée.
    BatchIntervalMs Contrôle la fréquence à laquelle les données sont vidées. 30 secondes
    BatchSize Définit la taille du lot pour les enregistrements de mise en mémoire tampon avant de vider. 1 000 enregistrements
    ClientBatchSizeLimit Spécifie la taille dans Mo de données agrégées avant l'ingestion. 300 Mo
    PollForIngestionStatus Si c'est le cas, le connecteur interroge l'état de l'ingestion après l'extraction des données. false
    DeliveryGuarantee Détermine la sémantique de garantie de remise. Pour obtenir exactement une fois la sémantique, utilisez WriteAheadSink. AT_LEAST_ONCE
  2. Écrivez des données de diffusion en continu à l'aide de l'une des méthodes suivantes :

    • SinkV2 : il s'agit d'une option sans état qui vide les données sur case activée point, garantissant au moins une fois la cohérence. Nous vous recommandons cette option pour l'ingestion de données volumineuses.
    • WriteAheadSink : cette méthode émet des données à un KustoSink. Elle est intégrée au système de case activée pointage de Flink et offre exactement une fois les garanties. Les données sont stockées dans un AbstractStateBackend et ne sont validées que lorsqu'un point de contrôle est terminé.

    L'exemple suivant utilise SinkV2. Pour utiliser WriteAheadSink, utilisez la méthode buildWriteAheadSink au lieu de build :

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Le code complet doit se présenter comme ceci :

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Vérifiez que les données sont ingérées

Une fois la connexion configurée, les données sont envoyées à votre table. Vous pouvez vérifier que les données sont ingérées en exécutant une requête KQL.

  1. Exécutez la requête suivante pour vérifier que les données sont ingérées dans la table :

    <MyTable>
    | count
    
  2. Exécutez la requête suivante pour afficher les données :

    <MyTable>
    | take 100