整合 Azure 資料總管和 Apache Flink®
Azure 資料總管 (ADX) 是一個完全受控的高效能巨量資料分析平台,可讓您輕易且近乎即時地分析大量資料。
ADX 可協助使用者分析來自串流應用程式、網站、IoT 裝置等中的大量資料。將 Apache Flink 與 ADX 整合可協助您在 ADX 中處理即時資料並進行分析。
必要條件
在 Flink 中使用 Azure 資料總管作為接收器 (sink) 的步驟
視需要建立具有資料庫和資料表的 ADX。
在 Kusto 中新增受控識別的擷取器權限。
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
執行定義 Kusto 叢集 URI (統一資源識別項)、資料庫和所使用的受控識別,以及它需要寫入其中的資料表的範例程式。
複製 flink-connector-kusto 專案:https://github.com/Azure/flink-connector-kusto.git
使用下列命令在 ADX 中建立資料表
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
使用正確的 Kusto 叢集 URI、資料庫和所使用的受控識別來更新FlinkKustoSinkSample.java 檔案。
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
稍後會使用「mvn 全新套件」來建置該專案
在 'sample-java/target' 資料夾下找到名為 'samples-java-1.0-SNAPSHOT-shaded.jar' 的 JAR 檔,然後在 Flink UI 中上傳此 JAR 檔並提交作業。
查詢 Kusto 資料表以確認輸出
從 Flink 中將資料寫入 Kusto 資料表不會有任何延遲。
參考
- Apache Flink 網站
- Apache、Apache Flink、Flink 和相關聯的開放原始碼專案名稱為 Apache Software Foundation (ASF) 的 商標。