Apache Flink は、境界なしと境界ありのデータ ストリームに対するステートフルな計算のためのフレームワークおよび分散処理エンジンです。
Flink コネクタは、あらゆる Flink クラスターで実行できるオープン ソース プロジェクトです。 Flink クラスターからデータを移動するためのデータ シンクを実装します。 Apache Flink へのコネクタを使用すると、機械学習 (ML)、抽出・変換・読み込み (ETL)、Log Analytics などのデータ ドリブン シナリオを対象とする高速でスケーラブルなアプリケーションを構築できます。
この記事では、Flink コネクタを使用して Flink からテーブルにデータを送信する方法について説明します。 テーブルとデータマッピングを作成し、Flink にテーブルにデータを送信するように指示し、結果を検証します。
前提条件
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成するか、または Microsoft Fabric のリアルタイム インテリジェンスで KQL データベースを作成します。
- データベース内のターゲット テーブル。 「Azure Data Explorer でテーブルを作成する」または「Real-Time Intelligence でテーブルを作成する」を参照してください
- Apache Flink クラスター。 クラスターを作成します。
- Maven 3.x
Flink コネクタを取得する
Maven を使用して依存関係を管理する Flink プロジェクトの場合は、依存関係として追加して、Flink Connector Core Sink for Azure Data エクスプローラーを統合します。
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
依存関係の管理に Maven を使用しないプロジェクトの場合は、Azure Data Explorer Connector for Apache Flink のリポジトリを複製し、ローカルにビルドします。 この方法では、コマンド mvn clean install -DskipTests
を使用してローカル Maven リポジトリにコネクタを手動で追加できます。
認証
You can authenticate from Flink to using a Microsoft Entra ID application.
このサービス プリンシパルは、Azure テーブルに書き込むためにコネクタによって利用される ID になります。 後で、Kusto リソースにアクセスするためのアクセス許可をこのサービス プリンシパルに付与します。
Azure CLI 経由で Azure サブスクリプションにサインインします。 次に、ブラウザーで認証します。
az login
プリンシパルをホストするサブスクリプションを選択します。 この手順は、複数のサブスクリプションがある場合に必要です。
az account set --subscription YOUR_SUBSCRIPTION_GUID
サービス プリンシパルを作成します。 この例では、サービス プリンシパルを
my-service-principal
と呼びます。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
返された JSON データから、
appId
、password
、およびtenant
を後で使用のためにコピーします。{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Microsoft Entra アプリケーションとサービス プリンシパルが作成されました。
データベースに対するアプリケーション ユーザー権限を付与します。
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
アプリケーションに、テーブルに対するインジェストまたは管理者のアクセス許可を付与します。 必要な権限は、選択したデータ書き込み方法によって異なります。 SinkV2 には取り込み権限で十分ですが、WriteAndSink には管理者アクセス許可が必要です。
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
詳細については、「Kusto ロールベースのアクセス コントロール」を参照してください。
Flink からデータを書き込む
Flink からデータを書き込むには:
必要な関数をインポートするには:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
アプリケーションを使用して認証します。
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
データベースやテーブルなどのシンク パラメーターを構成します。
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
You can add more options, as described in the following table:
オプション 説明 Default Value IngestionMappingRef 既存 のインジェスト マッピングを参照します。 FlushImmediately データをすぐにフラッシュし、パフォーマンスの問題を引き起こす可能性があります。 この方法は推奨されていません。 BatchIntervalMs データをフラッシュする頻度をコントロールします。 30 秒 BatchSize フラッシュ前にレコードをバッファリングするためのバッチ サイズを設定します。 1,000 レコード ClientBatchSizeLimit インジェスト前の集計データのMBサイズを指定します。 300 MB PollForIngestionStatus true の場合、コネクタはデータフラッシュ後にインジェスト状態をポーリングします。 false DeliveryGuarantee 配信保証のセマンティクスを決定します。 1 回だけセマンティクスを実現するには、WriteAheadSink を使用します。 AT_LEAST_ONCE 次のいずれかの方法でストリーミング データを書き込みます。
- SinkV2: これは、少なくとも一度の整合性を確保して、チェックポイントでデータをフラッシュするステートレス オプションです。 大量のデータ インジェストには、このオプションをお勧めします。
- WriteAheadSink: このメソッドは、KustoSink にデータを出力します。 Flinkのチェックポイントシステムと統合されており、一度回だけ保証を提供します。 データは AbstractStateBackend に記憶され、チェックポイントが完了した後にのみコミットされます。
次の例では、SinkV2 を使用します。 WriteAheadSink を使用するには、
buildWriteAheadSink
の代わりに方法build
を使用します。KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
完全なコードは次のようになります。
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
データが取り込まれていることを確認する
接続が構成されると、データがテーブルに送信されます。 データが取り込まれていることは、KQL クエリを実行することで確認できます。
次のクエリを実行して、データがテーブルに取り込まれていることを確認します。
<MyTable> | count
Run the following query to view the data:
<MyTable> | take 100
関連するコンテンツ
- AKS 上の Azure HDInsight で Apache Flink を使用する