SQL Database からの参照データを Azure Stream Analytics ジョブに使用する

Azure Stream Analytics では、参照データ入力のソースとして Azure SQL Database がサポートされています。 Stream Analytics ツールを使用して、Azure portal および Visual Studio で Stream Analytics ジョブに対する参照データとして SQL Database を使用できます。 この記事では、両方の方法の実行方法を示します。

Azure portal

Azure portal を使用して参照入力ソースとして Azure SQL Database を追加するには、次の手順のようにします。

ポータルの前提条件

  1. Stream Analytics ジョブを作成します。

  2. Stream Analytics ジョブで使用するストレージ アカウントを作成します。

    重要

    Azure Stream Analytics では、このストレージ アカウント内にスナップショットが保持されます。 アイテム保持ポリシーを構成するときは、選択した期間に Stream Analytics ジョブの目的の回復期間が効果的に含まれるようにすることが不可欠です。

  3. Stream Analytics ジョブで参照データとして使用されるデータ セットを含む Azure SQL Database を作成します。

SQL Database の参照データ入力を定義する

  1. Stream Analytics ジョブで、 [ジョブ トポロジ][入力] を選択します。 [参照入力の追加] をクリックして、 [SQL Database] を選択します。

    Inputs is selected in the left navigation pane. On Inputs, + Add reference input is selected, revealing a drop-down list that shows the values Blob storage and SQL Database.

  2. Stream Analytics の入力構成を指定します。 データベース名、サーバー名、ユーザー名、パスワードを選択します。 参照データ入力を定期的に更新する場合は、[オン] を選択し、DD:HH:MM で更新間隔を指定します。 データ セットが大きくて更新間隔が短い場合は、 デルタ クエリを使用して、開始時刻 (@deltaStartTime) から終了時間 (@deltaEndTime) までの間に挿入または削除されたすべての行を SQL Database から取得することで、参照データ内の変更を追跡できます。

デルタ クエリ」を参照してください。

When SQL Database is selected, the SQL Database New input page appears. There is a configuration form in the left pane, and a Snapshot query in the right pane.

  1. SQL クエリ エディターでスナップショット クエリをテストします。 詳しくは、「Azure portal の SQL クエリ エディターを使用した接続とデータの照会」をご覧ください

ジョブの構成でストレージ アカウントを指定する

[構成][ストレージ アカウントの設定] に移動し、 [ストレージ アカウントの追加] を選択します。

Storage account settings is selected in the left pane. There is an Add storage account button in the right pane.

ジョブの開始

他の入力、出力、およびクエリを構成した後、Stream Analytics ジョブを開始できます。

Tools for Visual Studio

Visual Studio を使用して参照入力ソースとして Azure SQL Database を追加するには、次の手順のようにします。

Visual Studio の前提条件

  1. Visual Studio 用の Stream Analytics ツールをインストールします。 次のバージョンの Visual Studio がサポートされています。

    • Visual Studio 2015
    • Visual Studio 2019
  2. Visual Studio 用 Stream Analytics ツールのクイック スタートで理解を深めます。

  3. ストレージ アカウントを作成します。

    重要

    Azure Stream Analytics では、このストレージ アカウント内にスナップショットが保持されます。 アイテム保持ポリシーを構成するときは、選択した期間に Stream Analytics ジョブの目的の回復期間が効果的に含まれるようにすることが不可欠です。

SQL Database テーブルの作成

SQL Server Management Studio を使用して、参照データを格納するためのテーブルを作成します。 詳しくは、SSMS を使用した最初の Azure SQL Database の設計に関するチュートリアルをご覧ください。

後の例で使用するテーブルの例は、次のステートメントで作成しました。

create table chemicals(Id Bigint,Name Nvarchar(max),FullName Nvarchar(max));

サブスクリプションの選択

  1. Visual Studio の [表示] メニューで [サーバー エクスプローラー] を選択します。

  2. [Azure] を右クリックし、 [Microsoft Azure サブスクリプションへの接続] を選択して、Azure アカウントでサインインします。

Stream Analytics プロジェクトを作成する

  1. [ファイル] > [新しいプロジェクト] を選びます。

  2. 左側のテンプレートの一覧で [Stream Analytics] を選択し、 [Azure Stream Analytics アプリケーション] を選択します。

  3. プロジェクトの [名前][場所][ソリューション名] を入力し、 [OK] を選択します。

    The Stream Analytics template is selected, Azure Stream Analytics Application is selected, and the Name, Location, and Solution names boxes are highlighted.

SQL Database の参照データ入力を定義する

  1. 新しい入力を作成します。

    On Add New Item, Input is selected.

  2. ソリューション エクスプローラーで、Input.json をダブルクリックします。

  3. [Stream Analytics Input Configuration](Stream Analytics の入力構成) を設定します。 データベース名、サーバー名、更新の種類、更新間隔を選択します。 DD:HH:MM の形式で更新間隔を指定します。

    In Stream Analytics Input Configuration, values are entered or selected from drop-down lists.

    [Execute only once](1 回だけ実行) または [Execute periodically](定期的に実行) を選択した場合は、プロジェクトの Input.json ファイル ノードの下に <入力エイリアス>.snapshot.sql という名前の SQL 分離コード ファイルが 1 つ生成されます。

    The SQL CodeBehind file Chemicals.snapshot.sql is highlighted.

    [Refresh Periodically with Delta](デルタで定期的に更新) を択した場合は、2 つの SQL 分離コード ファイル <入力エイリアス>.snapshot.sql<入力エイリアス>.delta.sql が生成されます。

    The SQL CodeBehind files Chemicals.delta.sql and Chemicals.snapshot.sql are highlighted.

  4. エディターで SQL ファイルを開き、SQL クエリを記述します。

  5. Visual Studio 2019 を使用していて、SQL Server Data Tools をインストールしている場合は、 [実行] をクリックしてクエリをテストできます。 SQL Database への接続を支援するウィザード ウィンドウがポップアップし、クエリの結果がウィンドウの下部に表示されます。

ストレージ アカウントを指定する

JobConfig.json を開き、SQL 参照スナップショットを格納するためのストレージ アカウントを指定します。

Stream Analytics Job Configure Configuration is shown with default values. The Global Storage Settings are highlighted.

ローカル環境でテストして Azure にデプロイする

ジョブを Azure にデプロイする前に、ローカル環境でライブ入力データに対してクエリ ロジックをテストできます。 この機能について詳しくは、「Visual Studio の Azure Stream Analytics ツールを使用してライブ データをローカルにテストする (プレビュー)」をご覧ください。 テストが完了したら、 [Azure に送信] をクリックします。 ジョブを開始する方法については、「クイック スタート: Visual Studio の Azure Stream Analytics ツールを使用した Stream Analytics ジョブの作成」をご覧ください。

デルタ クエリ

デルタ クエリを使用するときは、Azure SQL Database のテンポラル テーブルをお勧めします。

  1. Azure SQL Database にテンポラル テーブルを作成します。

       CREATE TABLE DeviceTemporal
       (
          [DeviceId] int NOT NULL PRIMARY KEY CLUSTERED
          , [GroupDeviceId] nvarchar(100) NOT NULL
          , [Description] nvarchar(100) NOT NULL
          , [ValidFrom] datetime2 (0) GENERATED ALWAYS AS ROW START
          , [ValidTo] datetime2 (0) GENERATED ALWAYS AS ROW END
          , PERIOD FOR SYSTEM_TIME (ValidFrom, ValidTo)
       )
       WITH (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DeviceHistory));  -- DeviceHistory table will be used in Delta query
    
  2. スナップショット クエリを作成します。

    @snapshotTime パラメーターを使用して、システム時刻において有効であった SQL Database の一時テーブルから参照データ セットを取得するよう Stream Analytics ランタイムに指示します。 このパラメーターを指定しないと、クロックのずれが原因で不正確な基本参照データ セットを取得する可能性があります。 完全なスナップショット クエリの例を以下に示します。

       SELECT DeviceId, GroupDeviceId, [Description]
       FROM dbo.DeviceTemporal
       FOR SYSTEM_TIME AS OF @snapshotTime
    
  3. デルタ クエリを作成します。

    このクエリでは、開始時刻 @deltaStartTime と終了時刻 @deltaEndTime の間に挿入または削除された SQL Database のすべての行が取得されます。 デルタ クエリでは、スナップショット クエリと同じ列および列の操作を返す必要があります。 この列では、@deltaStartTime@deltaEndTime の間に行が挿入または削除されたかどうかが定義されています。 結果の行には、レコードが挿入された場合は 1、削除された場合は 2 のフラグが設定されます。 また、差分期間内のすべての更新が適切にキャプチャされるようにするために、SQL Server 側から透かしを追加する必要があります。 透かしなしでデルタ クエリを使用すると、参照データセットに誤りが生じる可能性があります。

    更新されたレコードの場合、テンポラル テーブルでは挿入と削除の操作をキャプチャすることによってブックキーピングが行われます。 その場合、Stream Analytics ランタイムでは、前のスナップショットにデルタ クエリの結果を適用することによって、参照データが最新の状態に維持されます。 デルタ クエリの例を次に示します。

       SELECT DeviceId, GroupDeviceId, Description, ValidFrom as _watermark_, 1 as _operation_
       FROM dbo.DeviceTemporal
       WHERE ValidFrom BETWEEN @deltaStartTime AND @deltaEndTime   -- records inserted
       UNION
       SELECT DeviceId, GroupDeviceId, Description, ValidTo as _watermark_, 2 as _operation_
       FROM dbo.DeviceHistory   -- table we created in step 1
       WHERE ValidTo BETWEEN @deltaStartTime AND @deltaEndTime     -- record deleted
    

    Stream Analytics ランタイムでは、デルタ クエリに加えてスナップショット クエリが定期的に実行されてチェックポイントが保存される場合があることに注意してください。

    重要

    参照データ デルタ クエリを使用する場合は、テンポラル参照データ テーブルに対して同じ更新を複数回行わないでください。 これにより、正しくない結果が生成される可能性があります。 参照データが正しくない結果を生成する可能性がある例を次に示します。

     UPDATE myTable SET VALUE=2 WHERE ID = 1;
     UPDATE myTable SET VALUE=2 WHERE ID = 1;      
    

    正しい例:

     UPDATE myTable SET VALUE = 2 WHERE ID = 1 and not exists (select * from myTable where ID = 1 and value = 2);
    

    これにより、重複する更新プログラムが実行されなくなります。

クエリをテストする

クエリで、Stream Analytics ジョブによって参照データとして使用される予期されるデータセットが返されることを確認することが重要です。 クエリをテストするには、ポータルの [ジョブ トポロジ] セクションの [入力] に移動します。 SQL Database 参照入力で、サンプル データを選択することができます。 サンプルが利用できるようになったら、ファイルをダウンロードし、返されるデータが予想どおりであるかどうかを確認します。 開発を最適化し、イテレーションをテストする場合、Visual Studio の Stream Analytics ツールを使用することをお勧めします。 または希望する他の任意のツールを使用して、まずクエリによって Azure SQL Database から正しい結果が返されることを確認してから、Stream Analytics ジョブでそれを使用することもできます。

Visual Studio Code を使用してクエリをテストする

Visual Studio Code に Azure Stream Analytics ToolsSQL Server (mssql) をインストールし、ASA プロジェクトを設定します。 詳細については、「クイック スタート: Visual Studio Code で Azure Stream Analytics ジョブを作成する」と SQL Server (mssql) 拡張機能のチュートリアルを参照してください。

  1. SQL 参照データの入力を構成します。

    A Visual Studio Code editor (tab) shows ReferenceSQLDatabase.json.

  2. SQL Server アイコンを選択し、 [接続の追加] をクリックします。

    + Add Connection appears in the left pane and is highlighted.

  3. 接続情報を入力します。

    The two boxes for database and server information are highlighted.

  4. 参照 SQL を右クリックし、 [クエリの実行] を選択します。

    Execute Query is highlighted in the context menu.

  5. 接続を選択します。

    The dialog box says

  6. クエリ結果の確認と検証を行います。

    The query search results are in a VS Code editor tab.

よく寄せられる質問

Azure Stream Analytics で SQL 参照データ入力を使用すると追加コストが発生しますか?

Stream Analytics ジョブではストリーミング ユニットごとにコストが追加されることはありません。 ただし、Stream Analytics ジョブには、Azure ストレージ アカウントが関連付けられている必要があります。 Stream Analytics ジョブでは、SQL DB のクエリが実行されて (ジョブの開始時と定期更新時)、参照データ セットが取得され、そのスナップショットがストレージ アカウントに格納されます。 これらのスナップショットを格納するため、Azure ストレージ アカウントの価格に関するページで詳しく説明されている追加料金が発生します。

参照データ スナップショットが SQL DB から照会され、Azure Stream Analytics ジョブで使用されていることは、どのようにすれば知ることができますか?

論理名でフィルター処理される 2 つのメトリックがあり (メトリック Azure portal の下)、それを使用して SQL Database 参照データ入力の正常性を監視できます。

  • InputEvents:このメトリックでは、SQL Database 参照データ セットから読み込まれたレコードの数が測定されます。
  • InputEventBytes:このメトリックでは、Stream Analytics ジョブのメモリに読み込まれた参照データ スナップショットのサイズが測定されます。

両方のメトリックの組み合わせを使用して、ジョブで SQL Database のクエリが実行されて参照データ セットがフェッチされ、メモリに読み込まれているかどうかを推論できます。

特殊な種類の Azure SQL Database が必要ですか

Azure Stream Analytics は、任意の種類の Azure SQL Database で動作します。 ただし、参照データ入力に対して設定されている更新間隔が、クエリの負荷に影響を与える可能性があることを理解しておく必要があります。 デルタ クエリ オプションを使用するには、Azure SQL Database のテンポラル テーブルを使用することをお勧めします。

Azure Stream Analytics で Azure ストレージ アカウントにスナップショットが格納されるのはなぜですか?

厳密には、Stream Analytics で保証されるイベントの処理は 1 回、イベントの配信は 1 回以上となります。 一時的な問題でジョブが影響を受けた場合、状態を復元するために若干の再生が必要です。 再生を有効にするには、これらのスナップショットが Azure ストレージ アカウントに格納されている必要があります。 チェックポイントの再生について詳しくは、「Azure Stream Analytics ジョブでのチェックポイントと再生の概念」をご覧ください。

次のステップ