Stream Analytics での参照に参照データを使用する

参照データは、静的であるか緩やかに変化する性質を持った有限のデータセットです。 参照を実行したり、データ ストリームを拡張したりするために使用されます。 参照データはルックアップ テーブルとも呼ばれます。

例として IoT のシナリオを考えてみましょう。 頻繁に変更されることがない、センサーに関するメタデータを参照データに保存できます。 その後、それをリアルタイムの IoT データ ストリームと結合できます。

Azure Stream Analytics は、参照データをメモリに読み込んで、低遅延のストリーム処理を実現します。 Stream Analytics ジョブで参照データを使用するには、一般にクエリで参照データの結合を使用します。

自動車が料金所を通過したときに生成されるイベントのリアルタイム ストリームを作成できます。 料金所では、ナンバー プレートをリアルタイムでキャプチャできます。 そのデータを、登録の詳細を含む静的データセットと結合して、有効期限が切れたナンバー プレートを識別できます。

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

Stream Analytics は、参照データの格納レイヤーとして Azure BLOB ストレージ、Azure Data Lake Storage Gen2 および Azure SQL Database をサポートします。 他のデータ ストアに参照データがある場合は、Azure Data Factory を使用して、サポートされているいずれかのデータ ストアにデータを抽出、変換、読み込みしてみてください。 詳細については、「Azure Data Factory のコピー アクティビティの概要」を参照してください。

Azure Blob Storage または Azure Data Lake Storage Gen 2

参照データは、BLOB のシーケンスとしてモデル化され、BLOB の名前内で指定された日付/時刻の昇順で並べられます。 BLOB は、シーケンス内の最後の BLOB で指定された日付/時刻より "新しい" 日付/時刻を使用してシーケンスの末尾に追加することだけがサポートされています。 BLOB は入力構成で定義されます。

詳細については、Blob Storage からの参照データの Stream Analytics ジョブでの使用に関するページを参照してください。

BLOB 参照データを構成する

参照データを構成するには、まず、タイプが "参照データ" の入力を作成する必要があります。 次の表は、参照データ入力の作成時に指定する必要がある各プロパティとその説明を示しています。

プロパティ名 説明
入力のエイリアス この入力を参照するためにジョブ クエリで使用されるわかりやすい名前。
ストレージ アカウント BLOB が配置されるストレージ アカウントの名前。 Stream Analytics のジョブと同じサブスクリプションにある場合は、ドロップダウン リストから選択します。
ストレージ アカウント キー ストレージ アカウントに関連付けられている秘密キー。 このキーは、ストレージ アカウントが Stream Analytics のジョブと同じサブスクリプションにある場合は、自動的に設定されます。
ストレージ コンテナー コンテナーは、Blob Storage に保存された BLOB を論理的にグループ化したものです。 BLOB を Blob Storage にアップロードするとき、その BLOB のコンテナーを指定する必要があります。
パスのパターン この必須プロパティは、指定したコンテナー内で BLOB を見つけるために使用します。 パス内に、変数 {date} と {time} のいずれかまたは両方のインスタンスを指定できます。
例 1: products/{date}/{time}/product-list.csv
例 2: products/{date}/product-list.csv
例 3: product-list.csv

指定されたパスに BLOB が存在しない場合、BLOB が使用可能になるまで、Stream Analytics ジョブは無期限に待機します。
日付形式 [省略可能] 指定したパス パターン内で {date} を使用した場合は、サポートされている形式のドロップダウン リストから、BLOB を編成する日付形式を選択できます。
例: YYYY/MM/DD または MM/DD/YYYY
時刻形式 [省略可能] 指定したパス パターン内で {time} を使用した場合は、サポートされている形式のドロップダウン リストから、BLOB を編成する時刻形式を選択できます。
例: HH、HH/mm、HH-mm
イベントのシリアル化の形式 クエリを予想どおりに動作させるには、入ってくるデータ ストリームに使用しているシリアル化形式が Stream Analytics で認識される必要があります。 参照データの場合、サポートされている形式は CSV と JSON です。
エンコード 現時点でサポートされているエンコード形式は UTF-8 だけです。

静的参照データ

参照データは変更されない可能性があります。 静的参照データのサポートを有効にするには、入力構成で静的パスを指定します。

Stream Analytics は、指定されたパスから BLOB を取得します。 置換トークン ({date} と {time}) は必要ありません。 参照データは Stream Analytics 内で不変であるため、静的参照データの BLOB を上書きすることはお勧めしません。

スケジュールに従って参照データを生成する

参照データは、緩やかに変化するデータセットである可能性があります。 参照データを更新するには、入力構成で置換トークン {date} および {time} を使用して、パス パターンを指定します。 Stream Analytics は、このパス パターンに基づいて、更新された参照データ定義を取得します。

たとえば、日付形式が YYYY-MM-DD で、時刻形式が HH-mm の sample/{date}/{time}/products.csv というパターンは、更新された BLOB sample/2015-04-16/17-30/products.csv を UTC の 2015 年 4 月 16 日午後 5 時 30 分に取得するように Stream Analytics に指示します。

Stream Analytics では、更新された参照データ BLOB が 1 分間隔で自動的にスキャンされます。 10:30:00 のタイムスタンプを持つ BLOB が、たとえば 10:30:30 のように、少し遅れてアップロードされる場合があります。 この BLOB を参照する Stream Analytics ジョブに、わずかな遅れがあることがわかります。

このようなシナリオを回避するには、ターゲットの有効時間 (この例では 10:30:00) より前に BLOB をアップロードします。 そうすれば、Stream Analytics ジョブで BLOB を検出してメモリに読み込み、操作を実行するのに十分な時間ができます。

注意

現在、Stream Analytics のジョブは、コンピューター時間が、BLOB の名前でエンコードされた時刻まで進んだ場合にのみ、BLOB の更新を検索します。 たとえば、ジョブは、sample/2015-04-16/17-30/products.csv を、できるだけ早く、ただし、UTC の 2015 年 4 月 16 日午後 5 時 30 分以降に検索します。 BLOB のエンコードされた時刻が、検出された最新時刻よりも前の場合、その BLOB は "決して" 検索されません。

たとえば、ジョブで BLOB sample/2015-04-16/17-30/products.csv が見つかった後、エンコードされた日付が 2015 年 4 月 16 日午後 5 時 30 分より前のファイルは無視されます。 後から BLOB sample/2015-04-16/17-25/products.csv が同じコンテナーに作成された場合、それはジョブで使用されません。

別の例として、sample/2015-04-16/17-30/products.csv は、2015 年 4 月 16 日午後 10 時 03 分にのみ生成されますが、コンテナーにはそれより前の日付の BLOB はありません。 すると、ジョブは 2015 年 4 月 16 日午後 10 時 03 分以降にこのファイルを使用し、それまでは以前の参照データを使用します。

この動作の例外は、ジョブで過去のデータを再処理する必要がある場合や、ジョブが初めて開始された場合です。

開始時点で、ジョブは、指定されたジョブ開始時刻より前に生成された最新の BLOB を探します。 この動作により、ジョブの開始時に "空でない" 参照データセットが確実に存在することになります。 見つからない場合は、ジョブによって次の診断が表示されます: Initializing input without a valid reference data blob for UTC time <start time>

参照データセットが更新されると、診断ログが生成されます: Loaded new reference data from <blob path>。 多くの理由から、ジョブで以前の参照データセットを再読み込みすることが必要になる場合があります。 ほとんどの場合、その理由は過去のデータを再処理することです。 その時点で同じ診断ログが生成されます。 この動作は、現在のストリーム データから過去の参照データを使用することを意味するものではありません。

Azure Data Factory を使用して Stream Analytics で必要な更新された BLOB を作成するタスクを調整し、参照データ定義を更新することができます。

Data Factory は、データの移動や変換を調整し自動化するクラウドベースのデータ統合サービスです。 Data Factory は、クラウドベースとオンプレミスの多数のデータ ストアへの接続をサポートします。 指定した定期的なスケジュールに基づいてデータを簡単に移動できます。

事前に定義されたスケジュールで更新される Stream Analytics の参照データを生成するために Data Factory パイプラインを設定する方法の詳細については、この GitHub のサンプルを参照してください。

BLOB 参照データの更新に関するヒント

  • 参照データ BLOB は変更不可であるため、上書きしないでください。
  • 参照データを更新するための推奨方法は次のとおりです。
    • パス パターンで {date}/{time} を使用する。
    • ジョブ入力に定義されている同じコンテナーとパス パターンを使用して、新しい BLOB を追加する。
    • シーケンスの最後の BLOB で指定されている日付/時刻よりも後の値を使用する
  • 参照データの BLOB は、BLOB の最終更新日時の順に並べられていません。 BLOB 名で {date} と {time} の置換を使用して指定された日付と時刻の順に並んでいるだけです。
  • 多数の BLOB を列挙する必要がないように、今後処理を行う予定がない古い BLOB は、削除してください。 Stream Analytics では、再起動などの一部のシナリオで少量の再処理が必要になる場合があります。

Azure SQL Database

Stream Analytics ジョブは SQL Database 参照データを取得し、それを処理するためにスナップショットとしてメモリに格納します。 参照データのスナップショットは、ストレージ アカウントのコンテナーにも格納されます。 ストレージ アカウントは、構成設定で指定します。

ジョブを開始すると、コンテナーは自動作成されます。 ジョブが停止するか失敗状態になると、自動作成されたコンテナーはジョブの再開時に削除されます。

参照データが緩やかに変化するデータセットである場合、ジョブで使用されるスナップショットを定期的に更新する必要があります。

Stream Analytics を使用すると、Azure SQL Database 入力接続を構成するときに更新頻度を設定できます。 Stream Analytics ランタイムは、更新頻度によって指定された間隔で Azure SQL Database に対してクエリを実行します。 サポートされている最速のリフレッシュ レートは 1 分に 1 回です。 更新のたびに、Stream Analytics は指定されたストレージ アカウントに新しいスナップショットを格納します。

Stream Analytics には、SQL Database へのクエリ実行に関するオプションが 2 つあります。 スナップショット クエリは必須であり、各ジョブに含まれている必要があります。 Stream Analytics は、更新間隔に基づいてスナップショット クエリを定期的に実行します。 クエリの結果 (スナップショット) を参照データセットとして使用します。

スナップショット クエリはほとんどのシナリオに適しています。 データセットが大きく更新頻度が頻繁なためにパフォーマンスの問題が発生した場合は、デルタ クエリ オプションを使用します。 クエリは、60 秒以内に参照データセットを返せないと、タイムアウトになります。

デルタ クエリ オプションでは、Stream Analytics はスナップショット クエリを最初に実行してベースライン参照データセットを取得します。 その後 Stream Analytics は、更新間隔に基づいてデルタ クエリを定期的に実行し、増分の変更を取得します。 これらの増分変更が参照データセットに継続的に適用されることで、更新された状態が維持されます。 デルタ クエリ オプションを使用すると、ストレージ コストおよびネットワーク I/O 操作を削減できる場合があります。

SQL Database 参照データを構成する

SQL Database 参照データを構成するには、まず参照データ入力を作成する必要があります。 次の表は、参照データ入力の作成時に指定する必要がある各プロパティとその説明を示しています。 詳細については、SQL Database からの参照データの Stream Analytics ジョブでの使用に関するページを参照してください。

参照データ入力として Azure SQL Managed Instance を使用できます。 Azure SQL Managed Instance でパブリック エンドポイントを構成する必要があります。 次に、Stream Analytics で次の設定を手動で構成します。 データベースがアタッチされた SQL Server が実行されている Azure 仮想マシンも、これらの設定を手動で構成することによりサポートされます。

プロパティ名 説明
入力のエイリアス この入力を参照するためにジョブ クエリで使用されるわかりやすい名前。
サブスクリプション 該当するサブスクリプション。
データベース 参照データを含む SQL Database インスタンス。 SQL Managed Instance の場合は、ポート 3342 を指定する必要があります。 たとえば sampleserver.public.database.windows.net,3342 です。
ユーザー名 SQL Database インスタンスに関連付けられているユーザー名。
Password SQL Database インスタンスに関連付けられているパスワード。
定期的に更新 このオプションを使用して更新頻度を選択できます。 オンを選択すると、更新頻度を DD:HH:MM で指定できます。
スナップショット クエリ この既定のクエリ オプションは、SQL Database インスタンスから参照データを取得します。
デルタ クエリ データセットが大きく更新頻度が短い高度なシナリオでは、デルタ クエリを追加します。

サイズ制限

最適なパフォーマンスを得るには、300 MB 未満の参照データセットを使用します。 5 GB 以下の参照データセットは、6 個以上のストリーミング ユニットがあるジョブでサポートされます。 大きな参照データセットを使用すると、ジョブのエンドツーエンドの待ち時間に影響する可能性があります。

ウィンドウ集計、一時的な結合、一時的な分析関数などのステートフルな処理を含めると、クエリの複雑さが増す可能性があります。 複雑さが増すと、サポートされている参照データの最大サイズが減少します。

Stream Analytics が参照データを読み込むことができないときに、複雑な操作が実行された場合、ジョブはメモリ不足になって失敗します。 このような場合、ストリーミング ユニットの使用率メトリックは 100% に達します。

ストリーミング ユニットの数 推奨サイズ
1 50 MB 以下
3 150 MB 以下
6 以上 5 GB 以下

参照データの圧縮はサポートされていません。 300 MB を超える参照データセットの場合、最適なパフォーマンスを実現するためにデルタ クエリ オプションを使用して、SQL Database をソースとして使用することをお勧めします。 このようなシナリオでデルタ クエリを使用しない場合、参照データセットが更新されるたびにウォーターマークの遅延メトリックが急増します。

ジョブ内の複数の参照データセットを結合する

参照データ入力はストリーミング入力にのみ結合できます。 このため、複数の参照データセットを結合するには、クエリを複数のステップに分割します。 次に例を示します。

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

IoT Edge ジョブ

Stream Analytics エッジ ジョブの場合、ローカル参照データのみがサポートされます。 ジョブが IoT Edge デバイスに展開されると、ユーザー定義のファイル パスから参照データが読み込まれます。 デバイス上で参照データ ファイルを準備します。

Windows コンテナーの場合は、ローカル ドライブ上に参照データ ファイルを配置し、ローカル ドライブを Docker コンテナーと共有します。 Linux コンテナーの場合は、Docker ボリュームを作成し、ボリュームにデータ ファイルを設定します。

IoT Edge の更新プログラム上の参照データは、デプロイによってトリガーされます。 トリガーされた後、実行中のジョブを停止することなく、更新されたデータが Stream Analytics モジュールによって取得されます。

参照データは 2 つの方法で更新できます。

  • Azure portal から Stream Analytics ジョブ内の参照データ パスを更新します。
  • IoT Edge デプロイを更新します。

次のステップ