Condividi tramite


Integrazione di Azure Esplora dati e Apache Flink®

Azure Esplora dati è una piattaforma di analisi dei Big Data completamente gestita e ad alte prestazioni che semplifica l'analisi di volumi elevati di dati quasi in tempo reale.

ADX consente agli utenti di analizzare grandi volumi di dati da applicazioni di streaming, siti Web, dispositivi IoT e così via. L'integrazione di Apache Flink con ADX consente di elaborare dati in tempo reale e analizzarli in ADX.

Prerequisiti

  1. Creare un cluster Flink.

  2. Creare ADX con database e tabella in base alle esigenze.

  3. Aggiungere autorizzazioni di ingestor per l'identità gestita in Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Eseguire un programma di esempio che definisce l'URI del cluster Kusto (Uniform Resource Identifier), il database e l'identità gestita usata e la tabella in cui deve scrivere.

  5. Clonare il progetto flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Creare la tabella in ADX usando il comando seguente

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Aggiornare il file FlinkKustoSinkSample.java con l'URI del cluster Kusto corretto, il database e l'identità gestita usata.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Successivamente compilare il progetto usando "pacchetto pulito mvn"

  8. Individuare il file JAR denominato "samples-java-1.0-SN piattaforma di strumenti analitici HOT-shaded.jar" nella cartella "sample-java/target", quindi caricare il file JAR nell'interfaccia utente Flink e inviare il processo.

  9. Eseguire una query sulla tabella Kusto per verificare l'output

    screenshot shows query the Kusto table to verify the output.

    Non c'è alcun ritardo nella scrittura dei dati nella tabella Kusto da Flink.

Riferimento