Partager via


Intégration d’Azure Data Explorer et d’Apache Flink®

Azure Data Explorer est une plateforme d’analytique Big Data très performante et complètement managée, qui facilite l’analyse de grands volumes de données quasiment en temps réel.

ADX aide les utilisateurs à analyser de gros volumes de données provenant d'applications de streaming, de sites Web, d'appareils IoT, etc. L’intégration d’Apache Flink avec ADX vous aide à traiter les données en temps réel et à les analyser dans ADX.

Prérequis

  1. Créez un cluster Flink.

  2. Créez ADX avec la base de données et la table selon les besoins.

  3. Ajoutez des autorisations d’ingesteur pour l’identité gérée dans Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Exécutez un exemple de programme définissant l'URI (Uniform Resource Identifier) du cluster Kusto, la base de données et l'identité gérée utilisées, ainsi que la table dans laquelle il doit écrire.

  5. Clonez le projet flink-connector-kusto : https://github.com/Azure/flink-connector-kusto.git

  6. Créez la table dans ADX à l'aide de la commande suivante

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Mettez à jour le fichier FlinkKustoSinkSample.java avec l’URI du cluster Kusto approprié, la base de données et l’identité gérée utilisée.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Construisez plus tard le projet en utilisant « mvn clean package »

  8. Localisez le fichier JAR nommé « samples-java-1.0-SNAPSHOT-shaded.jar » dans le dossier « sample-java/target », puis téléchargez ce fichier JAR dans l'interface utilisateur Flink et soumettez le travail.

  9. Interrogez la table Kusto pour vérifier la sortie

    screenshot shows query the Kusto table to verify the output.

    Il n'y a aucun délai dans l'écriture des données dans la table Kusto depuis Flink.

Référence