Azure Data Factory コマンド アクティビティを使用して Azure Data Explorer 管理コマンドを実行する

Azure Data Factory (ADF) は、データに対してあらゆるアクティビティを組み合わせて実行できるクラウドベースのデータ統合サービスです。 ADF を使用して、データ移動とデータ変換を調整し、自動化するためのデータ駆動型ワークフローを作成します。 Azure Data Factory の Azure Data Explorer コマンド アクティビティを使用すると、ADF ワークフロー内で Azure Data Explorer管理コマンドを実行できます。 この記事では、Azure Data Explorer のコマンド アクティビティを含む、ルックアップ アクティビティと ForEach アクティビティを使用してパイプラインを作成する方法について説明します。

前提条件

新しいパイプラインを作成する

  1. [作成者] 鉛筆ツールを選びます。

  2. +を選択して新しいパイプラインを作成し、ドロップダウンから [パイプライン] を選択します。

    新しいパイプラインを作成する。

ルックアップ アクティビティを作成する

ルックアップ アクティビティでは、Azure Data Factory でサポートするすべてのデータ ソースからデータセットを取得できます。 ルックアップ アクティビティからの出力は、ForEach またはその他のアクティビティで使用できます。

  1. [アクティビティ] ウィンドウの [全般] で、 [ルックアップ] アクティビティを選択します。 右のメイン キャンバスにドラッグ アンド ドロップします。

    ルックアップ アクティビティを選択する。

  2. これで、キャンバスには作成したルックアップ アクティビティが含まれます。 キャンバスの下にあるタブを使用して、関連するパラメーターを変更します。 [全般] で、アクティビティの名前を変更します。

    ルックアップ アクティビティを編集する。

    ヒント

    空のキャンバス領域をクリックして、パイプラインのプロパティを表示します。 パイプラインの名前を変更するには、 [全般] タブを使用します。 パイプラインには pipeline-4-docs という名前が付けられています。

ルックアップ アクティビティで Azure Data Explorer データセットを作成する

  1. [設定] で、事前に作成した Azure Data Explorer のソース データセットを選択するか、 [ + 新規] を選択して新しいデータセットを作成します。

    ルックアップ設定でデータセットを追加する。

  2. [新しいデータセット] ウィンドウから Azure Data Explorer (Kusto) データセットを選択します。 [続行] を選択して新しいデータセットを追加します。

    新しいデータセットを選択する。

  3. 新しい Azure Data Explorer データセット パラメーターは、 [設定] で表示できます。 パラメーターを更新するには、 [編集] を選択します。

    Azure Data Explorer データセットを使用したルックアップの設定。

  4. メインキャンバスで AzureDataExplorerTable のタブが新しく開きます。

    • [全般] を選択し、データセット名を編集します。
    • データセットのプロパティを編集するには、 [接続] を選択します。
    • ドロップダウンから [リンクされたサービス] を選択するか、 [+ 新規] を選択してリンクされたサービスを新規作成します。

    Azure Data Explorer データセットのプロパティを編集する。

  5. 新しいリンクされたサービスを作成する場合、 [New Linked Service (Azure Data Explorer)](新しいリンクされたサービス (Azure Data Explorer)) ページが開きます。

    Azure Data Explorer新しいリンクされたサービスです。

    • Azure Data Explorer のリンクされたサービスの [名前] を選択します。 必要に応じて [説明] を追加します。
    • 必要に応じて、 [Connect via integration runtime](統合ランタイム経由で接続) で現在の設定を変更します。
    • [Account selection method](アカウントの選択方法) で、次の 2 つの方法のいずれかを使用してクラスターを選択します。
      • [From Azure subscription](Azure サブスクリプションから) ラジオ ボタンを選択し、 [Azure サブスクリプション] アカウントを選択します。 次に、自分の [クラスター] を選択します。 ドロップダウンには、ユーザーに属するクラスターのみが一覧表示されることに注意してください。
      • または、 [手動で入力] ラジオ ボタンを選択し、自分の [エンドポイント] (クラスタの URL) を入力します。
    • [テナント] を指定します。
    • [サービス プリンシパル ID] を入力します。 この値は、アプリ登録>の概要>アプリケーション (クライアント) IDAzure portalにあります。 プリンシパルには、使用されているコマンドに必要なアクセス許可レベルに従って、適切なアクセス許可が必要です。
    • [サービス プリンシパル キー] ボタンを選択し、 [サービス プリンシパル キー] を入力します。
    • ドロップダウン メニューから [データベース] を選択します。 または、 [編集] チェックボックスをオンにし、データベース名を入力します。
    • [Test Connection](接続のテスト) を選択して、作成したリンクされたサービスの接続をテストします。 セットアップに接続できる場合は、緑色のチェックマーク (接続成功) が表示されます。
    • [完了] を選択して、リンクされたサービスの作成を完了します。
  6. リンクされたサービスの設定が完了したら、 [AzureDataExplorerTable]>[接続] で、 [テーブル] 名を追加します。 [データのプレビュー] を選択して、データが正しく表示されていることを確認します。

    データセットの準備ができました。パイプラインの編集を続行できます。

ルックアップ アクティビティにクエリを追加する

  1. [pipeline-4-docs]>[Settings] で、 [クエリ] テキストボックスにクエリを追加します。次に例を示します。

    ClusterQueries
    | where Database !in ("KustoMonitoringPersistentDatabase", "$systemdb")
    | summarize count() by Database
    
  2. 必要に応じて、クエリ タイムアウト または [No truncation](切り捨てしない) および [First row only](先頭行のみ) プロパティを変更します。 このフローでは、既定の [クエリ タイムアウト] を保持し、チェックボックスをオフにします。

    ルックアップ アクティビティの最終設定。

For-Each アクティビティを作成する

For-Each アクティビティは、コレクションを反復処理するために使用され、指定されたアクティビティをループで実行します。

  1. 次に、For-Each アクティビティをパイプラインに追加します。 このアクティビティは、ルックアップ アクティビティから返されたデータを処理します。

    • [アクティビティ] ウィンドウで、 [Iteration & Conditions]\(繰り返しと条件\) から [ForEach] アクティビティを選択し、キャンバスにドラッグ アンド ドロップします。

    • ルックアップ アクティビティの出力と、キャンバス内の ForEach アクティビティの入力を線で繋いで接続します。

      ForEach アクティビティ。

  2. キャンバスで [ForEach] アクティビティを選択します。 以下の [設定] タブで次を行います。

    • [シーケンシャル] チェックボックスをオンにしてルックアップ結果のシーケンシャル処理を行うか、オフのままにして並列処理を作成する。

    • バッチ カウント を設定する

    • [項目] で、出力値 @activity('Lookup1').output.value への参照を指定する

      ForEach アクティビティの設定。

ForEach アクティビティ内に Azure Data Explorer コマンド アクティビティを作成する

  1. キャンバスの ForEach アクティビティをダブルクリックして新しいキャンバスで開き、ForEach 内のアクティビティを指定します。

  2. [アクティビティ] ウィンドウの [Azure Data Explorer][Azure Data Explorer コマンド] アクティビティを選択し、キャンバスにドラッグ アンド ドロップします。

    Azure Data Explorer のコマンド アクティビティ。

  3. [接続] タブで、以前作成したリンクされたサービスを選択します。

    Azure Data Explorer の [コマンド アクティビティの接続] タブ。

  4. [コマンド] タブで、次のコマンドを指定します。

    .export
    async compressed
    into csv h"http://<storageName>.blob.core.windows.net/data/ClusterQueries;<storageKey>" with (
    sizeLimit=100000,
    namePrefix=export
    )
    <| ClusterQueries | where Database == "@{item().Database}"
    

    [コマンド] は、指定されたクエリの結果を圧縮形式で BLOB ストレージにエクスポートするよう Azure Data Explorer に指示します。 これは、非同期で実行されます (非同期修飾子を使用)。 このクエリは、ルックアップ アクティビティの結果の各行のデータベース列に対応します。 [コマンド タイムアウト] は必ずしも変更する必要はありません。

    コマンド アクティビティ。

    注意

    コマンド アクティビティには次の制限があります。

    • サイズの制限:1 MB の応答サイズ
    • 制限時間:20分 (既定値)、1時間 (最大)。
    • 必要に応じて、AdminThenQuery を使用して結果にクエリを追加し、結果のサイズ/時間を減らすことができます。
  5. パイプラインの準備ができました。 パイプライン名をクリックすると、メインのパイプライン ビューに戻ることができます。

    Azure Data Explorer のコマンド パイプライン。

  6. パイプラインを公開する前に、 [デバッグ] を選択します。 パイプラインの進行状況は、 [出力] タブで確認できます。

    Azure Data Explorer のコマンド アクティビティの出力。

  7. 必要に応じて [すべて公開][トリガーの追加] の順に選択してパイプラインを実行します。

管理コマンドの出力

コマンド アクティビティの出力の構造の詳細を以下に示します。 この出力は、パイプラインの次のアクティビティで使用できます。

非同期以外の管理コマンドの戻り値

非同期管理コマンドでは、返される値の構造は Lookup アクティビティの結果の構造に似ています。 count フィールドは、返されたレコードの数を示します。 固定配列フィールド value にはレコードの一覧が含まれています。

{ 
    "count": "2", 
    "value": [ 
        { 
            "ExtentId": "1b9977fe-e6cf-4cda-84f3-4a7c61f28ecd", 
            "ExtentSize": 1214.0, 
            "CompressedSize": 520.0 
        }, 
        { 
            "ExtentId": "b897f5a3-62b0-441d-95ca-bf7a88952974", 
            "ExtentSize": 1114.0, 
            "CompressedSize": 504.0 
        } 
    ] 
} 

非同期管理コマンドの戻り値

非同期管理コマンドでは、非同期操作が完了するかタイムアウトになるまで、アクティビティは操作テーブルをバックグラウンドでポーリングします。したがって、返される値には、指定された OperationId プロパティの .show operations OperationId の結果が含まれます。 操作が正常に完了したことを確認するには、State および Status の各プロパティの値を確認します。

{ 
    "count": "1", 
    "value": [ 
        { 
            "OperationId": "910deeae-dd79-44a4-a3a2-087a90d4bb42", 
            "Operation": "TableSetOrAppend", 
            "NodeId": "", 
            "StartedOn": "2019-06-23T10:12:44.0371419Z", 
            "LastUpdatedOn": "2019-06-23T10:12:46.7871468Z", 
            "Duration": "00:00:02.7500049", 
            "State": "Completed", 
            "Status": "", 
            "RootActivityId": "f7c5aaaf-197b-4593-8ba0-e864c94c3c6f", 
            "ShouldRetry": false, 
            "Database": "MyDatabase", 
            "Principal": "<some principal id>", 
            "User": "<some User id>" 
        } 
    ] 
}