Share via


使用 Apache Flink 將資料內嵌至 Azure Data Explorer

Apache Flink 是一種架構和分散式處理引擎,可針對未系結和限定的數據流進行具狀態計算。

Flink 連接器是可在任何 Flink 叢集上執行的 開放原始碼 專案。 它會實作從 Flink 叢集行動資料的數據接收。 使用連接器至 Apache Flink,您可以建置以數據驅動案例為目標的快速且可調整應用程式,例如機器學習服務 (ML) 、擷取-轉換-載入 (ETL) 和 Log Analytics。

在本文中,您將瞭解如何使用 Flink 連接器將數據從 Flink 傳送到您的數據表。 您可以建立數據表和數據對應、直接 Flink 將數據傳送至數據表,然後驗證結果。

必要條件

對於使用 Maven 管理相依性的 Flink 專案,請將其新增為相依性,以整合適用於 Azure 的 Flink 連接器核心接收 Data Explorer

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

對於不使用 Maven 來管理相依性的專案,請複製適用於 Apache Flink 的 Azure Data Explorer Connector 存放庫,並在本機建置它。 此方法可讓您使用 命令 mvn clean install -DskipTests,手動將連接器新增至本機 Maven 存放庫。

您可以從 Flink 驗證,以使用 Microsoft Entra ID 應用程式或受控識別。

此服務主體會是連接器用來在 Kusto 中寫入數據表的身分識別。 您稍後會授與此服務主體的許可權,以存取 Kusto 資源。

  1. 透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。

    az login
    
  2. 選擇要裝載主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 建立服務主體。 在這裡範例中,服務主體稱為 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 從傳回的 JSON 數據中,複製 appIdpasswordtenant 以供日後使用。

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

您已建立 Microsoft Entra 應用程式和服務主體。

  1. 授與應用程式使用者對資料庫的許可權:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. 授與應用程式在數據表上的擷取者或系統管理員許可權。 所需的許可權取決於所選的數據寫入方法。 擷取器許可權足以供 SinkV2 使用,而 WriteAndSink 則需要系統管理員許可權。

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

如需授權的詳細資訊,請參閱 Kusto 角色型訪問控制

若要從 Flink 寫入資料:

  1. 匯入必要的選項:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. 使用您的應用程式或受控識別進行驗證。

    針對應用程式驗證:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    針對受控識別驗證:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. 設定接收參數,例如資料庫和數據表:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    您可以新增更多選項,如下表所述:

    選項 Description 預設值
    IngestionMappingRef 參考現有的 擷取對應
    FlushImmediately 立即清除數據,並可能導致效能問題。 不建議使用此方法。
    BatchIntervalMs 控制數據排清的頻率。 30 秒
    BatchSize 在排清之前,設定緩衝記錄的批次大小。 1,000 筆記錄
    ClientBatchSizeLimit 在擷取之前,以 MB 為單位指定匯總數據的大小。 300 MB
    PollForIngestionStatus 如果為 true,連接器會輪詢數據排清之後的擷取狀態。 false
    DeliveryGuarantee 判斷傳遞保證語意。 若要完全達到一次語意,請使用 WriteAheadSink。 AT_LEAST_ONCE
  2. 使用下列其中一種方法寫入串流資料:

    • SinkV2:這是無狀態選項,可排清檢查點上的數據,確保至少一次一致性。 建議使用此選項來擷取大量數據。
    • WriteAheadSink:此方法會將數據發出至 KustoSink。 它會與 Flink 的檢查點系統整合,並提供一次完全保證。 數據會儲存在 AbstractStateBackend 中,且只有在檢查點完成之後才會認可。

    下列範例使用 SinkV2。 若要使用 WriteAheadSink,請使用 buildWriteAheadSink 方法,而不是 build

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

完整的程式代碼看起來應該像這樣:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

確認已擷取數據

設定連線之後,數據就會傳送至您的數據表。 您可以執行 KQL 查詢來確認資料是否已內嵌。

  1. 執行下列查詢,以確認資料已內嵌至數據表:

    <MyTable>
    | count
    
  2. 執行下列查詢以檢視資料:

    <MyTable>
    | take 100