分享方式:


整合 Azure 資料總管和 Apache Flink®

Azure 資料總管 (ADX) 是一個完全受控的高效能巨量資料分析平台,可讓您輕易且近乎即時地分析大量資料。

ADX 可協助使用者分析來自串流應用程式、網站、IoT 裝置等中的大量資料。將 Apache Flink 與 ADX 整合可協助您在 ADX 中處理即時資料並進行分析。

必要條件

  1. 建立 Flink 叢集

  2. 視需要建立具有資料庫和資料表的 ADX

  3. 在 Kusto 中新增受控識別的擷取器權限。

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. 執行定義 Kusto 叢集 URI (統一資源識別項)、資料庫和所使用的受控識別,以及它需要寫入其中的資料表的範例程式。

  5. 複製 flink-connector-kusto 專案:https://github.com/Azure/flink-connector-kusto.git

  6. 使用下列命令在 ADX 中建立資料表

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. 使用正確的 Kusto 叢集 URI、資料庫和所使用的受控識別來更新FlinkKustoSinkSample.java 檔案。

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    稍後會使用「mvn 全新套件」來建置該專案

  8. 在 'sample-java/target' 資料夾下找到名為 'samples-java-1.0-SNAPSHOT-shaded.jar' 的 JAR 檔,然後在 Flink UI 中上傳此 JAR 檔並提交作業。

  9. 查詢 Kusto 資料表以確認輸出

    screenshot shows query the Kusto table to verify the output.

    從 Flink 中將資料寫入 Kusto 資料表不會有任何延遲。

參考