使用 Apache Flink 將資料內嵌至 Azure Data Explorer
Apache Flink 是一種架構和分散式處理引擎,可針對未系結和限定的數據流進行具狀態計算。
Flink 連接器是可在任何 Flink 叢集上執行的 開放原始碼 專案。 它會實作從 Flink 叢集行動資料的數據接收。 使用連接器至 Apache Flink,您可以建置以數據驅動案例為目標的快速且可調整應用程式,例如機器學習服務 (ML) 、擷取-轉換-載入 (ETL) 和 Log Analytics。
在本文中,您將瞭解如何使用 Flink 連接器將數據從 Flink 傳送到您的數據表。 您可以建立數據表和數據對應、直接 Flink 將數據傳送至數據表,然後驗證結果。
必要條件
- Azure 資料總管叢集和資料庫。 在 Microsoft Fabric 的 Real-Time Analytics 中建立叢集和資料庫或 KQL 資料庫。
- 資料庫中的目標數據表。 請參閱在 Azure Data Explorer 中建立數據表或在 Real-Time Analytics 中建立數據表
- Apache Flink 叢集。 建立叢集。
- Maven 3.x
取得 Flink 連接器
對於使用 Maven 管理相依性的 Flink 專案,請將其新增為相依性,以整合適用於 Azure 的 Flink 連接器核心接收 Data Explorer:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
對於不使用 Maven 來管理相依性的專案,請複製適用於 Apache Flink 的 Azure Data Explorer Connector 存放庫,並在本機建置它。 此方法可讓您使用 命令 mvn clean install -DskipTests
,手動將連接器新增至本機 Maven 存放庫。
您可以從 Flink 驗證,以使用 Microsoft Entra ID 應用程式或受控識別。
此服務主體會是連接器用來在 Kusto 中寫入數據表的身分識別。 您稍後會授與此服務主體的許可權,以存取 Kusto 資源。
透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。
az login
選擇要裝載主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。
az account set --subscription YOUR_SUBSCRIPTION_GUID
建立服務主體。 在這裡範例中,服務主體稱為
my-service-principal
。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
從傳回的 JSON 數據中,複製
appId
、password
和tenant
以供日後使用。{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
您已建立 Microsoft Entra 應用程式和服務主體。
授與應用程式使用者對資料庫的許可權:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
授與應用程式在數據表上的擷取者或系統管理員許可權。 所需的許可權取決於所選的數據寫入方法。 擷取器許可權足以供 SinkV2 使用,而 WriteAndSink 則需要系統管理員許可權。
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
如需授權的詳細資訊,請參閱 Kusto 角色型訪問控制。
從 Flink 寫入數據
若要從 Flink 寫入資料:
匯入必要的選項:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
使用您的應用程式或受控識別進行驗證。
針對應用程式驗證:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
針對受控識別驗證:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
設定接收參數,例如資料庫和數據表:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
您可以新增更多選項,如下表所述:
選項 Description 預設值 IngestionMappingRef 參考現有的 擷取對應。 FlushImmediately 立即清除數據,並可能導致效能問題。 不建議使用此方法。 BatchIntervalMs 控制數據排清的頻率。 30 秒 BatchSize 在排清之前,設定緩衝記錄的批次大小。 1,000 筆記錄 ClientBatchSizeLimit 在擷取之前,以 MB 為單位指定匯總數據的大小。 300 MB PollForIngestionStatus 如果為 true,連接器會輪詢數據排清之後的擷取狀態。 false DeliveryGuarantee 判斷傳遞保證語意。 若要完全達到一次語意,請使用 WriteAheadSink。 AT_LEAST_ONCE 使用下列其中一種方法寫入串流資料:
- SinkV2:這是無狀態選項,可排清檢查點上的數據,確保至少一次一致性。 建議使用此選項來擷取大量數據。
- WriteAheadSink:此方法會將數據發出至 KustoSink。 它會與 Flink 的檢查點系統整合,並提供一次完全保證。 數據會儲存在 AbstractStateBackend 中,且只有在檢查點完成之後才會認可。
下列範例使用 SinkV2。 若要使用 WriteAheadSink,請使用
buildWriteAheadSink
方法,而不是build
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
完整的程式代碼看起來應該像這樣:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
確認已擷取數據
設定連線之後,數據就會傳送至您的數據表。 您可以執行 KQL 查詢來確認資料是否已內嵌。
執行下列查詢,以確認資料已內嵌至數據表:
<MyTable> | count
執行下列查詢以檢視資料:
<MyTable> | take 100
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應