Azure Stream Analytics での一般的なクエリ パターン

Azure Stream Analytics のクエリは SQL に類似したクエリ言語で表現されます。 言語の構造については、「Stream Analytics query language reference」(Stream Analytics クエリ言語リファレンス) ガイドで確認できます。

クエリのデザインでは、イベント データを 1 つの入力ストリームから出力データ ストアに移動する、単純なパススルー ロジックを表すことができます。また、「Stream Analytics を使って IoT ソリューションを構築する」のガイドで説明しているように、豊富なパターン マッチングとテンポラル解析を行って、さまざまな時間枠にわたって集計を計算することもできます。 複数の入力からのデータを結合してストリーミング イベントを結合し、静的な参照データに対する参照を行ってイベントの値を多様化することができます。 複数の出力にデータを書き込むこともできます。

この記事では、実際のシナリオに基づいて、いくつかの一般的なクエリ パターンの対処方法について説明します。

サポートされるデータ形式

Azure Stream Analytics では、CSV、JSON、および Avro データ形式のイベントの処理をサポートしています。

JSON と Avro のどちらも、入れ子になったオブジェクト (レコード) や配列などの複合型を含むことができます。 これらの複雑なデータ型の操作については、JSON および AVRO データの解析に関する記事をご覧ください。

複数の出力にデータを送信する

複数の SELECT ステートメントを使用して、異なる出力シンクにデータを出力できます。 たとえば、一方の SELECT でしきい値に基づくアラートを出力し、もう一方の SELECT で Blob Storage にイベントを出力することができます。

入力:

Make Time
Make1 2015-01-01T00:00:01Z
Make1 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:01Z
Make2 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:03Z

出力 ArchiveOutput:

Make Time
Make1 2015-01-01T00:00:01Z
Make1 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:01Z
Make2 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:03Z

出力 AlertOutput:

Make Time Count
Make2 2015-01-01T00:00:10Z 3

Query:

SELECT
	*
INTO
	ArchiveOutput
FROM
	Input TIMESTAMP BY Time

SELECT
	Make,
	System.TimeStamp() AS Time,
	COUNT(*) AS [Count]
INTO
	AlertOutput
FROM
	Input TIMESTAMP BY Time
GROUP BY
	Make,
	TumblingWindow(second, 10)
HAVING
	[Count] >= 3

INTO 句を使用して、Stream Analytics に対して、データを書き込む出力先を指定します。 最初の SELECT では、入力からデータを受信して ArchiveOutput という名前の出力に送信するパススルー クエリを定義します。 2 番目のクエリでは、簡単な集計とフィルター処理を行い、その結果を下流のアラート システム出力である AlertOutput に送信します。

WITH 句を使用して、複数のサブクエリ ブロックを定義できることに注意してください。 このオプションには、入力ソースに対して開くリーダーが少なくて済むという利点があります。

Query:

WITH ReaderQuery AS (
	SELECT
		*
	FROM
		Input TIMESTAMP BY Time
)

SELECT * INTO ArchiveOutput FROM ReaderQuery

SELECT 
	Make,
	System.TimeStamp() AS Time,
	COUNT(*) AS [Count] 
INTO AlertOutput 
FROM ReaderQuery
GROUP BY
	Make,
	TumblingWindow(second, 10)
HAVING [Count] >= 3

詳細については、「WITH」を参照してください。

単純なパススルー クエリ

単純なパススルー クエリを使用すると、入力ストリーム データを出力にコピーできます。 たとえば、リアルタイムの車両情報を含むデータ ストリームを SQL データベースに保存して、後で分析を行う必要がある場合、単純なパススルー クエリを使用してジョブを実行します。

入力:

Make Time Weight
Make1 2015-01-01T00:00:01Z "1000"
Make1 2015-01-01T00:00:02Z "2000"

出力:

Make Time Weight
Make1 2015-01-01T00:00:01Z "1000"
Make1 2015-01-01T00:00:02Z "2000"

Query:

SELECT
	*
INTO Output
FROM Input

SELECT * クエリを実行すると、受信イベントのすべてのフィールドが射影され、出力に送信されます。 同様に、SELECT を使用して、入力から必須フィールドのみを射影することもできます。 以下の例では、車両の MakeTime のみが保存される必須フィールドである場合、これらのフィールドを SELECT ステートメントで指定できます。

入力:

Make Time Weight
Make1 2015-01-01T00:00:01Z 1000
Make1 2015-01-01T00:00:02Z 2000
Make2 2015-01-01T00:00:04Z 1500

出力:

Make Time
Make1 2015-01-01T00:00:01Z
Make1 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:04Z

Query:

SELECT
	Make, Time
INTO Output
FROM Input

LIKE と NOT LIKE を使用した文字列の照合

LIKENOT LIKE を使用して、フィールドが特定のパターンに一致するかどうかを検証することができます。 たとえば、文字 "A" で始まり、番号 9 で終わるナンバー プレートだけを返すフィルターを作成できます。

入力:

Make License_plate Time
Make1 ABC-123 2015-01-01T00:00:01Z
Make2 AAA-999 2015-01-01T00:00:02Z
Make3 ABC-369 2015-01-01T00:00:03Z

出力:

Make License_plate Time
Make2 AAA-999 2015-01-01T00:00:02Z
Make3 ABC-369 2015-01-01T00:00:03Z

Query:

SELECT
	*
FROM
	Input TIMESTAMP BY Time
WHERE
	License_plate LIKE 'A%9'

LIKE ステートメントを使用して License_plate フィールドの値を検査します。 値は、文字 "A" で始まり、0 文字以上の文字列が続き、数字 9 で終わる必要があります。

過去のイベントに対する計算

LAG 関数を使用すると、時間枠内の過去のイベントを調べて、現在のイベントと比較することができます。 たとえば、現在の自動車メーカーが料金所を通過した最後の自動車と異なる場合に、現在の自動車メーカーを出力できます。

入力:

Make Time
Make1 2015-01-01T00:00:01Z
Make2 2015-01-01T00:00:02Z

出力:

Make Time
Make2 2015-01-01T00:00:02Z

Query:

SELECT
	Make,
	Time
FROM
	Input TIMESTAMP BY Time
WHERE
	LAG(Make, 1) OVER (LIMIT DURATION(minute, 1)) <> Make

LAG を使用して入力ストリームで 1 つ前のイベントを調べて、Make の値を取得し、その値を現在のイベントの Make と比較し、イベントを出力します。

詳細については、LAG に関するページを参照してください。

期間内の最後のイベントを返す

イベントはシステムによってリアルタイムで使用されるため、イベントがその時間枠で到着する最後のイベントかどうかを判断できる関数はありません。 これを実現するには、入力ストリームを別の入力ストリームと結合する必要があります。この場合、イベントの時間はその時間枠におけるすべてのイベントの最大時間になります。

入力:

License_plate Make Time
DXE 5291 Make1 2015-07-27T00:00:05Z
YZK 5704 Make3 2015-07-27T00:02:17Z
RMV 8282 Make1 2015-07-27T00:05:01Z
YHN 6970 Make2 2015-07-27T00:06:00Z
VFE 1616 Make2 2015-07-27T00:09:31Z
QYF 9358 Make1 2015-07-27T00:12:02Z
MDR 6128 Make4 2015-07-27T00:13:45Z

出力:

License_plate Make Time
VFE 1616 Make2 2015-07-27T00:09:31Z
MDR 6128 Make4 2015-07-27T00:13:45Z

Query:

WITH LastInWindow AS
(
	SELECT 
		MAX(Time) AS LastEventTime
	FROM 
		Input TIMESTAMP BY Time
	GROUP BY 
		TumblingWindow(minute, 10)
)

SELECT 
	Input.License_plate,
	Input.Make,
	Input.Time
FROM
	Input TIMESTAMP BY Time 
	INNER JOIN LastInWindow
	ON DATEDIFF(minute, Input, LastInWindow) BETWEEN 0 AND 10
	AND Input.Time = LastInWindow.LastEventTime

クエリの最初の手順では、10 分の時間枠で最大のタイム スタンプを検索します。これは、その時間枠で最後のイベントのタイム スタンプです。 2 番目の手順では、最初のクエリの結果と元のストリームを結合し、各期間で最後のタイム スタンプに一致するイベントを検索します。

DATEDIFF は、2 つの DateTime フィールド間の時差を比較して返す、日付固有の関数です。詳細については、日付関数に関するページを参照してください。

ストリームの結合の詳細については、JOIN に関するページを参照してください。

時間ごとのデータ集計

時間枠で情報を計算するために、データをまとめて集計できます。 この例では、特定の自動車メーカーごとに直近 10 秒間のカウントが計算されます。

入力:

Make Time Weight
Make1 2015-01-01T00:00:01Z 1000
Make1 2015-01-01T00:00:02Z 2000
Make2 2015-01-01T00:00:04Z 1500

出力:

Make Count
Make1 2
Make2 1

Query:

SELECT
	Make,
	COUNT(*) AS Count
FROM
	Input TIMESTAMP BY Time
GROUP BY
	Make,
	TumblingWindow(second, 10)

この集計では、自動車を Make でグループ化して、10 秒ごとにカウントします。 出力には、料金所を通過した自動車の MakeCount が含まれます。

TumblingWindow は、イベントをグループ化するために使用されるウィンドウ関数です。 集計は、グループ化されたすべてのイベントに適用できます。 詳細については、ウィンドウ関数に関するページを参照してください。

集計の詳細については、集計関数に関するページを参照してください。

定期的な値の出力

不規則または見つからないイベントが発生した場合は、よりまばらなデータ入力から定期的な間隔の出力を生成できます。 たとえば、最近検出されたデータ ポイントを報告する 5 秒ごとのイベントを生成します。

入力:

Time
"2014-01-01T06:01:00" 1
"2014-01-01T06:01:05" 2
"2014-01-01T06:01:10" 3
"2014-01-01T06:01:15" 4
"2014-01-01T06:01:30" 5
"2014-01-01T06:01:35" 6

出力 (最初の 10 行) :

Window_end Last_event.Time Last_event.Value
2014-01-01T14:01:00Z 2014-01-01T14:01:00Z 1
2014-01-01T14:01:05Z 2014-01-01T14:01:05Z 2
2014-01-01T14:01:10Z 2014-01-01T14:01:10Z 3
2014-01-01T14:01:15Z 2014-01-01T14:01:15Z 4
2014-01-01T14:01:20Z 2014-01-01T14:01:15Z 4
2014-01-01T14:01:25Z 2014-01-01T14:01:15Z 4
2014-01-01T14:01:30Z 2014-01-01T14:01:30Z 5
2014-01-01T14:01:35Z 2014-01-01T14:01:35Z 6
2014-01-01T14:01:40Z 2014-01-01T14:01:35Z 6
2014-01-01T14:01:45Z 2014-01-01T14:01:35Z 6

Query:

SELECT
	System.Timestamp() AS Window_end,
	TopOne() OVER (ORDER BY Time DESC) AS Last_event
FROM
	Input TIMESTAMP BY Time
GROUP BY
	HOPPINGWINDOW(second, 300, 5)

このクエリは、5 秒ごとにイベントを生成し、それまでに受信した最後のイベントを出力します。 HOPPINGWINDOW 期間で、クエリが最新のイベントを検出するためにさかのぼる期間を指定します。

詳細については、ホッピング ウィンドウに関するページを参照してください。

ストリームでのイベントの関連付け

LAG 関数を使用して、過去のイベントを調べることにより、同じストリーム内のイベントを関連付けることができます。 たとえば、直近 90 秒間に同じ Make の 2 台の自動車が連続して料金所を通過した場合に出力を生成できます。

入力:

Make License_plate Time
Make1 ABC-123 2015-01-01T00:00:01Z
Make1 AAA-999 2015-01-01T00:00:02Z
Make2 DEF-987 2015-01-01T00:00:03Z
Make1 GHI-345 2015-01-01T00:00:04Z

出力:

Make Time Current_car_license_plate First_car_license_plate First_car_time
Make1 2015-01-01T00:00:02Z AAA-999 ABC-123 2015-01-01T00:00:01Z

Query:

SELECT
	Make,
	Time,
	License_plate AS Current_car_license_plate,
	LAG(License_plate, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_license_plate,
	LAG(Time, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_time
FROM
	Input TIMESTAMP BY Time
WHERE
	LAG(Make, 1) OVER (LIMIT DURATION(second, 90)) = Make

LAG 関数を使用して、入力ストリームで 1 つ前のイベントを調べて、Make の値を取得し、現在のイベントの Make の値と比較することができます。 条件が満たされると、SELECT ステートメントで LAG を使用して、前のイベントのデータを射影できます。

詳細については、LAG に関するページを参照してください。

イベントの間隔を検出する

イベントの間隔は、終了イベントを受信したときに最後の開始イベントを調べることによって計算できます。 このクエリは、ユーザーがページまたは機能に費やした時間を判断するのに役立ちます。

入力:

User 機能 Event Time
user@location.com RightMenu [開始] 2015-01-01T00:00:01Z
user@location.com RightMenu End 2015-01-01T00:00:08Z

出力:

User 機能 Duration
user@location.com RightMenu 7

Query:

SELECT
	[user],
	feature,
	DATEDIFF(
		second,
		LAST(Time) OVER (PARTITION BY [user], feature LIMIT DURATION(hour, 1) WHEN Event = 'start'),
		Time) as duration
FROM input TIMESTAMP BY Time
WHERE
	Event = 'end'

LAST 関数を使用して、特定の条件内で最後のイベントを取得できます。 この例では、条件は Start 型のイベントで、ユーザーと機能に対して PARTITION BY を実行することで検索をパーティション分割します。 これにより、すべてのユーザーと機能が、開始イベントを検索するときに個別に処理されます。 LIMIT DURATION を使用して、終了イベントと開始イベントの間の過去 1 時間に検索を制限します。

一意の値をカウントする

COUNTDISTINCT を使用すると、ストリームに出現するフィールドの、値ごとの件数を一定間隔でカウントできます。 料金所を通過した自動車の、Make ごとの台数を 2 秒間隔で計算するクエリを作成できます。

入力:

Make Time
Make1 2015-01-01T00:00:01Z
Make1 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:01Z
Make2 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:03Z

出力:

Count_make Time
2 2015-01-01T00:00:02Z
1 2015-01-01T00:00:04Z

クエリ:

SELECT
     COUNT(DISTINCT Make) AS Count_make,
     System.TIMESTAMP() AS Time
FROM Input TIMESTAMP BY TIME
GROUP BY 
     TumblingWindow(second, 2)

COUNT(DISTINCT Make) は、ある時間枠内で、Make 列の個別の値の数を返します。 詳細については、COUNT 集計関数に関するページをご覧ください。

期間内の最初のイベントを取得する

IsFirst は、時間枠内の最初のイベントを取得するために使用できます。 たとえば、10 分間隔で最初の自動車情報を出力します。

入力:

License_plate Make Time
DXE 5291 Make1 2015-07-27T00:00:05Z
YZK 5704 Make3 2015-07-27T00:02:17Z
RMV 8282 Make1 2015-07-27T00:05:01Z
YHN 6970 Make2 2015-07-27T00:06:00Z
VFE 1616 Make2 2015-07-27T00:09:31Z
QYF 9358 Make1 2015-07-27T00:12:02Z
MDR 6128 Make4 2015-07-27T00:13:45Z

出力:

License_plate Make Time
DXE 5291 Make1 2015-07-27T00:00:05Z
QYF 9358 Make1 2015-07-27T00:12:02Z

Query:

SELECT 
	License_plate,
	Make,
	Time
FROM 
	Input TIMESTAMP BY Time
WHERE 
	IsFirst(minute, 10) = 1

IsFirst を使用して、データをパーティション分割し、10 分間隔で検出された自動車の Make ごとの最初のイベントを計算することもできます。

出力:

License_plate Make Time
DXE 5291 Make1 2015-07-27T00:00:05Z
YZK 5704 Make3 2015-07-27T00:02:17Z
YHN 6970 Make2 2015-07-27T00:06:00Z
QYF 9358 Make1 2015-07-27T00:12:02Z
MDR 6128 Make4 2015-07-27T00:13:45Z

Query:

SELECT 
	License_plate,
	Make,
	Time
FROM 
	Input TIMESTAMP BY Time
WHERE 
	IsFirst(minute, 10) OVER (PARTITION BY Make) = 1

詳しくは、IsFirst に関するページをご覧ください。

期間内の重複するイベントを削除する

特定の期間内のイベントに対する平均の計算などの操作を実行するときは、重複するイベントを除外する必要があります。 次の例では、2 番目のイベントは最初のイベントの重複です。

入力:

deviceId Time 属性
1 2018-07-27T00:00:01Z 気温 50
1 2018-07-27T00:00:01Z 気温 50
2 2018-07-27T00:00:01Z 気温 40
1 2018-07-27T00:00:05Z 気温 60
2 2018-07-27T00:00:05Z 気温 50
1 2018-07-27T00:00:10Z 気温 100

出力:

AverageValue deviceId
70 1
45 2

Query:

With Temp AS (
SELECT
	COUNT(DISTINCT Time) AS CountTime,
	Value,
	DeviceId
FROM
	Input TIMESTAMP BY Time
GROUP BY
	Value,
	DeviceId,
	SYSTEM.TIMESTAMP()
)

SELECT
    AVG(Value) AS AverageValue, DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)

COUNT(DISTINCT Time) は、特定の時間枠内での、Time 列の個別の値の数を返します。 その後、最初のステップの出力を使用し、重複を除去することによって、デバイスあたりの平均を計算できます。

詳細については、COUNT(DISTINCT Time) に関するページを参照してください。

異なるケース/値に異なるロジックを指定する (CASE ステートメント)

CASE ステートメントでは、特定の条件に基づいてさまざまなフィールドに対して異なる計算を行うことができます。 たとえば、レーン "A" を Make1 の自動車、レーン "B" をその他のメーカーに割り当てることができます。

入力:

Make Time
Make1 2015-01-01T00:00:01Z
Make2 2015-01-01T00:00:02Z
Make2 2015-01-01T00:00:03Z

出力:

Make Dispatch_to_lane Time
Make1 "A" 2015-01-01T00:00:01Z
Make2 "B" 2015-01-01T00:00:02Z

解決策:

SELECT
	Make
	CASE
		WHEN Make = "Make1" THEN "A"
		ELSE "B"
	END AS Dispatch_to_lane,
	System.TimeStamp() AS Time
FROM
	Input TIMESTAMP BY Time

CASE 式を使用して、式を一連の単純な式と比較して結果を決定します。 この例では Make1 の車両はレーン "A" にディスパッチされますが、その他のメーカーの車両にはレーン "B" が割り当てられます。

詳細については、「CASE 式」を参照してください。

データ変換

CAST メソッドを使用して、データをリアルタイムでキャストできます。 たとえば、自動車の重量を型 nvarchar (max) から型 bigint に変換し、数値計算で使用することができます。

入力:

Make Time Weight
Make1 2015-01-01T00:00:01Z "1000"
Make1 2015-01-01T00:00:02Z "2000"

出力:

Make Weight
Make1 3000

Query:

SELECT
	Make,
	SUM(CAST(Weight AS BIGINT)) AS Weight
FROM
	Input TIMESTAMP BY Time
GROUP BY
	Make,
	TumblingWindow(second, 10)

CAST ステートメントを使用してデータ型を指定します。 サポートされるデータ型の一覧については、「データ型 (Azure Stream Analytics)」をご覧ください。

詳細については、データ変換関数に関するページを参照してください。

条件の期間を検出する

複数のイベントが対象となる条件の場合は、LAG 関数を使用して、その条件の期間を特定できます。 たとえば、バグのためにすべての自動車の重量が正しくない (20,000 ポンドを超過) 結果になった場合、バグが継続した期間を計算する必要があります。

入力:

Make Time Weight
Make1 2015-01-01T00:00:01Z 2000
Make2 2015-01-01T00:00:02Z 25000
Make1 2015-01-01T00:00:03Z 26000
Make2 2015-01-01T00:00:04Z 25000
Make1 2015-01-01T00:00:05Z 26000
Make2 2015-01-01T00:00:06Z 25000
Make1 2015-01-01T00:00:07Z 26000
Make2 2015-01-01T00:00:08Z 2000

出力:

Start_fault End_fault
2015-01-01T00:00:02Z 2015-01-01T00:00:07Z

Query:

WITH SelectPreviousEvent AS
(
SELECT
	*,
	LAG([time]) OVER (LIMIT DURATION(hour, 24)) as previous_time,
	LAG([weight]) OVER (LIMIT DURATION(hour, 24)) as previous_weight
FROM input TIMESTAMP BY [time]
)

SELECT 
	LAG(time) OVER (LIMIT DURATION(hour, 24) WHEN previous_weight < 20000 ) [Start_fault],
	previous_time [End_fault]
FROM SelectPreviousEvent
WHERE
	[weight] < 20000
	AND previous_weight > 20000

最初の SELECT ステートメントでは、現在の重量測定値を前の測定値と関連付け、現在の測定値と共に射影します。 2 番目の SELECT では、previous_weight が 20000 未満であり、現在の重量が 20000 未満で、現在のイベントの previous_weight が 20000 を超えていた最後のイベントを探します。

End_fault は、前のイベントでは問題が発生し、現在のイベントでは問題が発生していないイベントです。Start_fault は、それより前で問題が発生していない最後のイベントです。

個別の時間でイベントを処理する (サブストリーム)

イベント プロデューサー間またはパーティション間のクロックのずれや、ネットワーク待機時間が原因でイベントが遅れて、あるいは順序がずれて到着することがあります。 たとえば、TollID 2 のデバイス クロックは TollID 1 より 5 秒遅れており、TollID 3 のデバイス クロックは TollID 1 より 10 秒遅れています。 計算は料金所ごとに個別に実行され、各料金所のクロック データだけがタイムスタンプとして考慮されます。

入力:

LicensePlate Make Time TollID
DXE 5291 Make1 2015-07-27T00:00:01Z 1
YHN 6970 Make2 2015-07-27T00:00:05Z 1
QYF 9358 Make1 2015-07-27T00:00:01Z 2
GXF 9462 Make3 2015-07-27T00:00:04Z 2
VFE 1616 Make2 2015-07-27T00:00:10Z 1
RMV 8282 Make1 2015-07-27T00:00:03Z 3
MDR 6128 Make3 2015-07-27T00:00:11Z 2
YZK 5704 Make4 2015-07-27T00:00:07Z 3

出力:

TollID Count
1 2
2 2
1 1
3 1
2 1
3 1

Query:

SELECT
      TollId,
      COUNT(*) AS Count
FROM input
      TIMESTAMP BY Time OVER TollId
GROUP BY TUMBLINGWINDOW(second, 5), TollId

TIMESTAMP OVER BY 句では、サブストリームを使用して各デバイスのタイムラインで個別に検索します。 各 TollID の出力イベントは、計算されるときに生成されます。つまり、すべてのデバイスが同じクロックを参照しているように順序が変更されるのではなく、各 TollID を基準にしてイベントが順序付けられます。

詳細については、TIMESTAMP BY OVER に関するページを参照してください。

セッション ウィンドウ

セッション ウィンドウとは、イベントが発生している間、拡張し続ける時間枠のことです。セッション ウィンドウは、イベントを受信しないまま一定時間が経過したり、時間枠が最大継続時間に達したりすると計算のために終了します。 この時間枠は、ユーザー操作データを計算する場合に特に役立ちます。 時間枠は、ユーザーがシステムを操作し始めると開始し、イベントがそれ以上観測されなくなると (つまり、ユーザーが操作をやめると) 終了します。 たとえば、ユーザーが Web ページを操作していて、クリック回数がログに記録される場合、セッション ウィンドウを使用して、ユーザーがサイトを操作した時間を調べることができます。

入力:

User_id Time URL
0 2017-01-26T00:00:00Z "www.example.com/a.html"
0 2017-01-26T00:00:20Z "www.example.com/b.html"
1 2017-01-26T00:00:55Z "www.example.com/c.html"
0 2017-01-26T00:01:10Z "www.example.com/d.html"
1 2017-01-26T00:01:15Z "www.example.com/e.html"

出力:

User_id StartTime EndTime Duration_in_seconds
0 2017-01-26T00:00:00Z 2017-01-26T00:01:10Z 70
1 2017-01-26T00:00:55Z 2017-01-26T00:01:15Z 20

Query:

SELECT
	user_id,
	MIN(time) as StartTime,
	MAX(time) as EndTime,
	DATEDIFF(second, MIN(time), MAX(time)) AS duration_in_seconds
FROM input TIMESTAMP BY time
GROUP BY
	user_id,
	SessionWindow(minute, 1, 60) OVER (PARTITION BY user_id)

SELECT を使用して、ユーザー操作に関連するデータを操作時間と共に射影します。 ユーザーと SessionWindow でデータをグループ化します。このセッション ウィンドウでは 1 分間操作が発生しないと終了し、最大ウィンドウ サイズは 60 分です。

SessionWindow の詳細については、セッション ウィンドウに関するページを参照してください。

JavaScript と C# でのユーザー定義関数を使用した言語拡張

Azure Stream Analytics のクエリ言語は、JavaScript または C# 言語で記述されたカスタム関数を使用して拡張できます。 ユーザー定義関数 (UDF) とは、SQL 言語を使用して簡単に表現することができないカスタムまたは複雑な計算です。 これらの UDF は一度定義すると、クエリ内で複数回使用できます。 たとえば、UDF を使用して、16 進数の nvarchar(max) 値を bigint 値に変換できます。

入力:

Device_id HexValue
1 "B4"
2 "11B"
3 "121"

出力:

Device_id Decimal
1 180
2 283
3 289
function hex2Int(hexValue){
	return parseInt(hexValue, 16);
}
public static class MyUdfClass {
	public static long Hex2Int(string hexValue){
		return int.Parse(hexValue, System.Globalization.NumberStyles.HexNumber);
	}
}
SELECT
	Device_id,
	udf.Hex2Int(HexValue) AS Decimal
From
	Input

ユーザー定義関数では、使用されるすべてのイベントに対して HexValue の bigint 値を計算します。

詳細については、JavaScriptC# に関するページを参照してください。

MATCH_RECOGNIZE を使用した高度なパターン マッチング

MATCH_RECOGNIZE は、イベントのシーケンスを適切に定義された正規表現パターンに一致させるために使用できる高度なパターン マッチング メカニズムです。 たとえば、ATM の障害をリアルタイムで監視しており、ATM の操作時に管理者への通知が必要な警告メッセージが 2 つ連続して発生したとします。

入力:

ATM_id Operation_id Return_Code Time
1 "Entering Pin" "Success" 2017-01-26T00:10:00Z
2 "Opening Money Slot" "Success" 2017-01-26T00:10:07Z
2 "Closing Money Slot" "Success" 2017-01-26T00:10:11Z
1 "Entering Withdraw Quantity" "Success" 2017-01-26T00:10:08Z
1 "Opening Money Slot" "Warning" 2017-01-26T00:10:14Z
1 "Printing Bank Balance" "Warning" 2017-01-26T00:10:19Z

出力:

ATM_id First_Warning_Operation_id Warning_Time
1 "Opening Money Slot" 2017-01-26T00:10:14Z
SELECT *
FROM input TIMESTAMP BY time OVER ATM_id
MATCH_RECOGNIZE (
	LIMIT DURATION(minute, 1)
	PARTITION BY ATM_id
	MEASURES
		First(Warning.ATM_id) AS ATM_id,
		First(Warning.Operation_Id) AS First_Warning_Operation_id,
		First(Warning.Time) AS Warning_Time
	AFTER MATCH SKIP TO NEXT ROW
	PATTERN (Success+ Warning{2,})
	DEFINE
		Success AS Succes.Return_Code = 'Success',
		Warning AS Warning.Return_Code <> 'Success'
) AS patternMatch

このクエリは、少なくとも 2 つの連続するエラー イベントに一致し、条件が満たされたときにアラームを生成します。 PATTERN では、照合に使用される正規表現を定義します。このケースでは、少なくとも 1 つの成功した操作の後、少なくとも 2 つの連続した警告が続きます。 成功と警告は、Return_Code 値を使用して定義します。条件が満たされると、ATM_id、最初の警告操作、最初の警告時刻を使用して MEASURES が射影されます。

詳細については、MATCH_RECOGNIZE に関するページを参照してください。

ジオフェンシングおよび地理空間クエリ

Azure Stream Analytics には、フリート管理、ライド シェア、コネクテッド カー、および資産追跡などのシナリオを実装するために使用できる、組み込みの地理空間関数が用意されています。 地理空間データは、イベント ストリームまたは参照データの一部として、GeoJSON または WKT 形式で取り込むことができます。 たとえば、パスポート印刷機械の製造を専門とする会社が、政府機関や領事館に機械をリースします。 その機械を置き間違えてパスポートの偽造に使われることがないようにするため、そのような機械の場所は厳重に管理されます。 各機械には GPS トラッカーが搭載され、その情報は Azure Stream Analytics ジョブにリレーされます。 製造元では、これらの機械の場所を追跡し、そのうちの 1 つが承認済みのエリアを離れたときにアラートを受け取るようにします。こうすれば、製造元は装置をリモートで使用不可能にし、関係機関に注意喚起し、装置を取り返すことができます。

入力:

Equipment_id Equipment_current_location Time
1 "POINT(-122.13288797982818 47.64082002051315)" 2017-01-26T00:10:00Z
1 "POINT(-122.13307252987875 47.64081350934929)" 2017-01-26T00:11:00Z
1 "POINT(-122.13308862313283 47.6406508603241)" 2017-01-26T00:12:00Z
1 "POINT(-122.13341048821462 47.64043760861279)" 2017-01-26T00:13:00Z

参照データ入力:

Equipment_id Equipment_lease_location
1 "POLYGON((-122.13326028450979 47.6409833866794,-122.13261655434621 47.6409833866794,-122.13261655434621 47.64061471602751,-122.13326028450979 47.64061471602751,-122.13326028450979 47.6409833866794))"

出力:

Equipment_id Equipment_alert_location Time
1 "POINT(-122.13341048821462 47.64043760861279)" 2017-01-26T00:13:00Z
SELECT
	input.Equipment_id AS Equipment_id,
	input.Equipment_current_location AS Equipment_current_location,
	input.Time AS Time
FROM input TIMESTAMP BY time
JOIN
	referenceInput 
	ON input.Equipment_id = referenceInput.Equipment_id
	WHERE 
		ST_WITHIN(input.Equipment_currenct_location, referenceInput.Equipment_lease_location) = 1

このクエリを使用すると、製造元では機械の場所を自動的に監視し、機械が許可されたジオフェンスから離れたときにアラートを受け取ることができます。 組み込みの地理空間関数を活用すれば、サードパーティのライブラリを使用せずに、クエリ内で GPS データを使用できます。

詳細については、「Azure Stream Analytics を使用したジオフェンシングおよび地理空間集計のシナリオ」を参照してください。

ヘルプの参照

詳細については、Azure Stream Analytics に関する Microsoft Q&A 質問ページを参照してください。

次のステップ