Azure Data Factory コマンド アクティビティを使用して Azure Data Explorer 管理コマンドを実行する
Azure Data Factory (ADF) は、データに対してあらゆるアクティビティを組み合わせて実行できるクラウドベースのデータ統合サービスです。 ADF を使用して、データ移動とデータ変換を調整し、自動化するためのデータ駆動型ワークフローを作成します。 Azure Data Factory の Azure Data Explorer コマンド アクティビティを使用すると、ADF ワークフロー内で Azure Data Explorer管理コマンドを実行できます。 この記事では、Azure Data Explorer のコマンド アクティビティを含む、ルックアップ アクティビティと ForEach アクティビティを使用してパイプラインを作成する方法について説明します。
前提条件
- Azure サブスクリプション。 無料の Azure アカウントを作成します。
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します。
- データのソース。
- データ ファクトリ。 データ ファクトリを作成します。
新しいパイプラインを作成する
[作成者] 鉛筆ツールを選びます。
+を選択して新しいパイプラインを作成し、ドロップダウンから [パイプライン] を選択します。
ルックアップ アクティビティを作成する
ルックアップ アクティビティでは、Azure Data Factory でサポートするすべてのデータ ソースからデータセットを取得できます。 ルックアップ アクティビティからの出力は、ForEach またはその他のアクティビティで使用できます。
[アクティビティ] ウィンドウの [全般] で、 [ルックアップ] アクティビティを選択します。 右のメイン キャンバスにドラッグ アンド ドロップします。
これで、キャンバスには作成したルックアップ アクティビティが含まれます。 キャンバスの下にあるタブを使用して、関連するパラメーターを変更します。 [全般] で、アクティビティの名前を変更します。
ヒント
空のキャンバス領域をクリックして、パイプラインのプロパティを表示します。 パイプラインの名前を変更するには、 [全般] タブを使用します。 パイプラインには pipeline-4-docs という名前が付けられています。
ルックアップ アクティビティで Azure Data Explorer データセットを作成する
[設定] で、事前に作成した Azure Data Explorer のソース データセットを選択するか、 [ + 新規] を選択して新しいデータセットを作成します。
[新しいデータセット] ウィンドウから Azure Data Explorer (Kusto) データセットを選択します。 [続行] を選択して新しいデータセットを追加します。
新しい Azure Data Explorer データセット パラメーターは、 [設定] で表示できます。 パラメーターを更新するには、 [編集] を選択します。
メインキャンバスで AzureDataExplorerTable のタブが新しく開きます。
- [全般] を選択し、データセット名を編集します。
- データセットのプロパティを編集するには、 [接続] を選択します。
- ドロップダウンから [リンクされたサービス] を選択するか、 [+ 新規] を選択してリンクされたサービスを新規作成します。
新しいリンクされたサービスを作成する場合、 [New Linked Service (Azure Data Explorer)](新しいリンクされたサービス (Azure Data Explorer)) ページが開きます。
- Azure Data Explorer のリンクされたサービスの [名前] を選択します。 必要に応じて [説明] を追加します。
- 必要に応じて、 [Connect via integration runtime](統合ランタイム経由で接続) で現在の設定を変更します。
-
[Account selection method](アカウントの選択方法) で、次の 2 つの方法のいずれかを使用してクラスターを選択します。
- [From Azure subscription](Azure サブスクリプションから) ラジオ ボタンを選択し、 [Azure サブスクリプション] アカウントを選択します。 次に、自分の [クラスター] を選択します。 ドロップダウンには、ユーザーに属するクラスターのみが一覧表示されることに注意してください。
- または、 [手動で入力] ラジオ ボタンを選択し、自分の [エンドポイント] (クラスタの URL) を入力します。
- [テナント] を指定します。
- [サービス プリンシパル ID] を入力します。 この値は、アプリ登録>の概要>アプリケーション (クライアント) ID のAzure portalにあります。 プリンシパルには、使用されているコマンドに必要なアクセス許可レベルに従って、適切なアクセス許可が必要です。
- [サービス プリンシパル キー] ボタンを選択し、 [サービス プリンシパル キー] を入力します。
- ドロップダウン メニューから [データベース] を選択します。 または、 [編集] チェックボックスをオンにし、データベース名を入力します。
- [Test Connection](接続のテスト) を選択して、作成したリンクされたサービスの接続をテストします。 セットアップに接続できる場合は、緑色のチェックマーク (接続成功) が表示されます。
- [完了] を選択して、リンクされたサービスの作成を完了します。
リンクされたサービスの設定が完了したら、 [AzureDataExplorerTable]>[接続] で、 [テーブル] 名を追加します。 [データのプレビュー] を選択して、データが正しく表示されていることを確認します。
データセットの準備ができました。パイプラインの編集を続行できます。
ルックアップ アクティビティにクエリを追加する
[pipeline-4-docs]>[Settings] で、 [クエリ] テキストボックスにクエリを追加します。次に例を示します。
ClusterQueries | where Database !in ("KustoMonitoringPersistentDatabase", "$systemdb") | summarize count() by Database
必要に応じて、クエリ タイムアウト または [No truncation](切り捨てしない) および [First row only](先頭行のみ) プロパティを変更します。 このフローでは、既定の [クエリ タイムアウト] を保持し、チェックボックスをオフにします。
For-Each アクティビティを作成する
For-Each アクティビティは、コレクションを反復処理するために使用され、指定されたアクティビティをループで実行します。
次に、For-Each アクティビティをパイプラインに追加します。 このアクティビティは、ルックアップ アクティビティから返されたデータを処理します。
[アクティビティ] ウィンドウで、 [Iteration & Conditions]\(繰り返しと条件\) から [ForEach] アクティビティを選択し、キャンバスにドラッグ アンド ドロップします。
ルックアップ アクティビティの出力と、キャンバス内の ForEach アクティビティの入力を線で繋いで接続します。
キャンバスで [ForEach] アクティビティを選択します。 以下の [設定] タブで次を行います。
[シーケンシャル] チェックボックスをオンにしてルックアップ結果のシーケンシャル処理を行うか、オフのままにして並列処理を作成する。
バッチ カウント を設定する
[項目] で、出力値 @activity('Lookup1').output.value への参照を指定する
ForEach アクティビティ内に Azure Data Explorer コマンド アクティビティを作成する
キャンバスの ForEach アクティビティをダブルクリックして新しいキャンバスで開き、ForEach 内のアクティビティを指定します。
[アクティビティ] ウィンドウの [Azure Data Explorer] で [Azure Data Explorer コマンド] アクティビティを選択し、キャンバスにドラッグ アンド ドロップします。
[接続] タブで、以前作成したリンクされたサービスを選択します。
[コマンド] タブで、次のコマンドを指定します。
.export async compressed into csv h"http://<storageName>.blob.core.windows.net/data/ClusterQueries;<storageKey>" with ( sizeLimit=100000, namePrefix=export ) <| ClusterQueries | where Database == "@{item().Database}"
[コマンド] は、指定されたクエリの結果を圧縮形式で BLOB ストレージにエクスポートするよう Azure Data Explorer に指示します。 これは、非同期で実行されます (非同期修飾子を使用)。 このクエリは、ルックアップ アクティビティの結果の各行のデータベース列に対応します。 [コマンド タイムアウト] は必ずしも変更する必要はありません。
注意
コマンド アクティビティには次の制限があります。
- サイズの制限:1 MB の応答サイズ
- 制限時間:20分 (既定値)、1時間 (最大)。
- 必要に応じて、AdminThenQuery を使用して結果にクエリを追加し、結果のサイズ/時間を減らすことができます。
パイプラインの準備ができました。 パイプライン名をクリックすると、メインのパイプライン ビューに戻ることができます。
パイプラインを公開する前に、 [デバッグ] を選択します。 パイプラインの進行状況は、 [出力] タブで確認できます。
必要に応じて [すべて公開] 、 [トリガーの追加] の順に選択してパイプラインを実行します。
管理コマンドの出力
コマンド アクティビティの出力の構造の詳細を以下に示します。 この出力は、パイプラインの次のアクティビティで使用できます。
非同期以外の管理コマンドの戻り値
非同期管理コマンドでは、返される値の構造は Lookup アクティビティの結果の構造に似ています。
count
フィールドは、返されたレコードの数を示します。 固定配列フィールド value
にはレコードの一覧が含まれています。
{
"count": "2",
"value": [
{
"ExtentId": "1b9977fe-e6cf-4cda-84f3-4a7c61f28ecd",
"ExtentSize": 1214.0,
"CompressedSize": 520.0
},
{
"ExtentId": "b897f5a3-62b0-441d-95ca-bf7a88952974",
"ExtentSize": 1114.0,
"CompressedSize": 504.0
}
]
}
非同期管理コマンドの戻り値
非同期管理コマンドでは、非同期操作が完了するかタイムアウトになるまで、アクティビティは操作テーブルをバックグラウンドでポーリングします。したがって、返される値には、指定された OperationId プロパティの .show operations OperationId
の結果が含まれます。 操作が正常に完了したことを確認するには、State および Status の各プロパティの値を確認します。
{
"count": "1",
"value": [
{
"OperationId": "910deeae-dd79-44a4-a3a2-087a90d4bb42",
"Operation": "TableSetOrAppend",
"NodeId": "",
"StartedOn": "2019-06-23T10:12:44.0371419Z",
"LastUpdatedOn": "2019-06-23T10:12:46.7871468Z",
"Duration": "00:00:02.7500049",
"State": "Completed",
"Status": "",
"RootActivityId": "f7c5aaaf-197b-4593-8ba0-e864c94c3c6f",
"ShouldRetry": false,
"Database": "MyDatabase",
"Principal": "<some principal id>",
"User": "<some User id>"
}
]
}