チュートリアル: Azure Data Lake Storage Gen2 に格納されている Delta テーブルに書き込む

このチュートリアルでは、Stream Analytics ジョブを作成して、Azure Data Lake Storage Gen2 の Delta テーブルに書き込む方法について説明します。 このチュートリアルでは、次の作業を行う方法について説明します。

  • イベント ハブにサンプル データを送信するイベント ジェネレーターをデプロイする
  • Stream Analytics のジョブの作成
  • Delta テーブルを使って Azure Data Lake Storage Gen2 を構成する
  • Stream Analytics ジョブの実行

前提条件

始める前に、以下の手順を完了します。

Stream Analytics のジョブの作成

  1. Azure portal にサインインします。

  2. 左側のメニューから、 [すべてのサービス] を選択します。

  3. [分析] セクションの [Stream Analytics ジョブ] にマウスを移動し、[+] (プラス) を選びます。

    [すべてのサービス] ページでの Stream Analytics ジョブの選択を示すスクリーンショット。

  4. Azure Portal の左上隅にある [リソースの作成] を選択します。

  5. 結果の一覧で、 [Analytics]>[Stream Analytics ジョブ] の順に選択します。

  6. [新しい Stream Analytics ジョブ] ページで、次の手順のようにします。

    1. [サブスクリプション] で、Azure サブスクリプションを選択します。
    2. [リソース グループ] で、TollApp のデプロイで前に使ったものと同じリソースを選びます。
    3. [名前] に、ジョブの名前を入力します。 Stream Analytics ジョブ名には、英数字、ハイフン、アンダースコアのみを使用することができます。長さは 3 文字以上 63 文字以下でなければなりません。
    4. [ホスティング環境] で、[クラウド] が選ばれていることを確認します。
    5. [ストリーミング ユニット] で、[1] を選びます。 ストリーミング ユニットとは、ジョブの実行に必要なコンピューティング リソースのことです。 ストリーミング ユニットのスケーリングについては、ストリーミング ユニットの理解と調整に関する記事を参照してください。

    Stream Analytics ジョブの作成ページを示すスクリーンショット。

  7. ページ下部にある [確認と作成] を選択します。

  8. [確認および作成] ページで設定を確認し、[作成] を選んで Stream Analytics ページを作成します。

  9. デプロイのページで、[リソースに移動] を選んで [Stream Analytics ジョブ] ページに移動します。

ジョブの入力を構成する

次の手順では、TollApp のデプロイで作成したイベント ハブを使用してデータを読み取るためにジョブの入力ソースを定義します。

  1. 前のセクションで作成した Stream Analytics ジョブを見つけます。

  2. [Stream Analytics ジョブ] の [ジョブ トポロジ] セクションで、 [入力] を選択します。

  3. [+ 入力の追加][イベント ハブ] を選びます。

    [入力] ページを示すスクリーンショット。

  4. TollApp Azure Template を使用して作成された次の値を入力フォームに入力します。

    1. [入力のエイリアス] に「entrystream」と入力します。

    2. [サブスクリプションから Event Hub を選択する] を選びます。

    3. [サブスクリプション] で、Azure サブスクリプションを選択します。

    4. [イベント ハブの名前空間] には、前のセクションで作成したイベント ハブの名前空間を選択します。

    5. 残りの設定では既定のオプションを使用し、[保存] を選択します。

      入力イベント ハブの選択を示すスクリーンショット。

ジョブの出力を構成する

次の手順では、ジョブがデータを書き込むことができる場所として、出力シンクを定義します。 このチュートリアルでは、Azure Data Lake Storage Gen2 の Delta テーブルに出力を書き込みます。

  1. [Stream Analytics ジョブ] の [ジョブ トポロジ] セクションで、 [出力] オプションを選択します。

  2. [+ 出力の追加]>[BLOB ストレージ/ADLS Gen2] を選びます。

    [出力] ページを示すスクリーンショット。

  3. 出力フォームに次の詳細を入力して、 [保存] を選択します。

    1. [出力のエイリアス] に「DeltaOutput」と入力します。

    2. [サブスクリプションから Blob Storage または ADLS Gen2 を選択する] を選びます。

    3. [サブスクリプション] で、Azure サブスクリプションを選択します。

    4. [ストレージ アカウント] で、作成した ADLS Gen2 アカウント (tollapp で始まるもの) を選びます。

    5. [コンテナー] で、[新規作成] を選び、一意のコンテナー名を指定します。

    6. [イベントのシリアル化の形式] で、[Delta Lake] を選択します。 Delta Lake はここでオプションの 1 つとしてリストに表示されていますが、データ形式ではありません。 Delta Lake では、バージョン管理された Parquet ファイルを使用してデータを格納します。 Delta lake の詳細については、以下を参照してください。

    7. [Delta テーブルのパス] には、「tutorial folder/delta table」と入力します。

    8. 残りの設定では既定のオプションを使用し、[保存] を選択します。

      出力の構成を示すスクリーンショット。

クエリを作成する

ここまでで、着信データ ストリームを読み取る Stream Analytics ジョブを設定しました。 次に、リアルタイムでデータを分析するクエリを作成します。 このクエリは、Stream Analytics に固有のいくつかの拡張を含む SQL に似た言語を使います。

  1. 次に、左側のメニューで、[ジョブ トポロジ] の下にある [クエリ] を選びます。

  2. クエリ ウィンドウに次のクエリを入力します。 この例では、クエリは Event Hubs からデータを読み取り、選択した値を ADLS Gen2 の Delta テーブルにコピーします。

     SELECT State, CarModel.Make, TollAmount
     INTO DeltaOutput
     FROM EntryStream TIMESTAMP BY EntryTime
    
  3. ツール バーの [クエリの保存] を選びます。

    ジョブのクエリを示すスクリーンショット。

Stream Analytics ジョブを開始して出力をチェックする

  1. Azure portal でジョブの概要ページに戻り、[開始] を選びます。

    [概要] ページでの [ジョブの開始] ボタンの選択を示すスクリーンショット。

  2. [ジョブの開始] ページで、[ジョブ出力の開始時刻] に [今すぐ] が選ばれていることを確認し、ページの下部にある [開始] を選択します。

    [ジョブの開始] ページでの選択を示すスクリーンショット。

  3. 数分経ったら、ポータルで、ジョブの出力として構成したストレージ アカウントとコンテナーを見つけます。 コンテナーで指定されたフォルダーに Delta テーブルが表示されるようになりました。 ジョブは初めて開始するときに数分かかり、開始後はデータが到着すると実行され続けます。

    コンテナー内の出力データ ファイルを示すスクリーンショット。

リソースをクリーンアップする

リソース グループ、Stream Analytics ジョブ、およびすべての関連するリソースは、不要になったら削除します。 ジョブを削除すると、ジョブによって消費されるストリーミング ユニットに対する課金を回避することができます。 ジョブを後で使用する計画がある場合は、ジョブを停止し、必要なときに再起動することができます。 このジョブを継続して使用しない場合は、以下の手順を使用して、このチュートリアルで作成したすべてのリソースを削除してください。

  1. Azure Portal の左側のメニューで [リソース グループ] を選択し、作成したリソースの名前を選択します。
  2. リソース グループのページで [削除] を選択し、削除するリソースの名前をテキスト ボックスに入力してから [削除] を選択します。

次の手順

このチュートリアルでは、単純な Stream Analytics ジョブを作成し、受信データをフィルター処理し、結果を ADLS Gen2 アカウントの Delta テーブルに書き込みました。 Stream Analytics の詳細については、以下を参照してください。