このページでは、透かしの概念について説明し、一般的なステートフル ストリーミング操作で透かしを使用するための推奨事項を示します。
ストリーミング クエリは、時間の経過と同時に状態データを蓄積します。 ウォーターマークは、メモリ エラーや処理待ち時間の増加を防ぐために、古い状態データを自動的に削除します。
透かしとは?
処理中、構造化ストリーミングはマイクロバッチ間で状態を保持します。 ストリーミング クエリでは、マイクロバッチごとにすべてを再計算するのではなく、状態を使用して結果を増分更新します。 ウォーターマークは、クエリが状態エンティティの処理を停止したときのしきい値を制御します。
状態エンティティの一般的な例を次に示します:
- 時間ウィンドウでの集計。
- ストリーム同士の結合における一意のキー。
ストリーミング DataFrame でウォーターマークを宣言するには、タイムスタンプ フィールドと遅延しきい値を指定します。 新しいデータが到着すると、状態マネージャーは指定されたフィールドの最新のタイムスタンプを追跡し、遅延しきい値内のレコードのみを処理します。
クエリは常に、しきい値内に到着したレコードを処理します。 クエリでは、しきい値を超えて到着したレコードが処理される可能性がありますが、これは保証されません。
次の例では、ウィンドウの数に 10 分の透かししきい値を適用します。
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
この例では:
-
event_time列は、10分間のウォーターマークと5分間のタンブリングウィンドウを定義するために使用されます。 - 重複しない5分間の各ウィンドウについて、各
idのカウントが収集されます。 - 各カウントの状態情報は、ウィンドウの終了時刻が最新に観測された
event_timeより 10 分古くなるまで保持されます。
重要
groupBy()操作とwindow()操作では、名前、"<colName>"、またはcol("<colName>")で列を参照して、イベント時間マーカーが保持されるようにします。 Scala では、 $colNameを使用することもできます。
透かしは処理時間とスループットにどのように影響しますか?
出力モードは、ウォーターマークを持つクエリがシンクにデータを書き込むタイミングを制御します。 ウォーターマークは、メモリ内の状態情報の総量を減らすので、ステートフル ストリーミングのスループット制御に不可欠です。 ステートフル操作すべてにおいて、すべての出力モードがサポートされているわけではありません。 ウィンドウ集計のウォーターマークおよび出力モードを参照してください。
ウォーターマークの表示期間を選択する際には、一長一短があります。
- ウォーターマークが短いほど、クエリの待機時間が短くなります。クエリでは、各ウォーターマーク期間が完了した後に状態情報が格納され、結果が書き込まれるためです。 ただし、短いウォーターマークは、遅れて到着するデータに対する許容度が低いです。
- 長めのウォーターマークは、遅延データを高い程度まで許容します。 ただし、長い透かしはクエリの待機時間を長くします。クエリは、より多くの状態情報を格納し、長い透かし期間が経過した後に結果を書き込むのを待つ必要があるためです。
ウィンドウ集計のウォーターマークと出力モード
次の表は、タイムスタンプと透かしの集計を含むクエリの処理動作を示しています。
| 出力モード | 動作 |
|---|---|
| 追加 | クエリは、基準値のしきい値を超えた後、ターゲット テーブルに行を書き込みます。 すべての書き込みは、遅延しきい値に基づいて遅延されます。 しきい値に達すると、古い集計状態が削除されます。 |
| アップデート | クエリは、結果が計算されるとターゲット テーブルに行を書き込み、新しいデータが到着すると、クエリで行を更新および上書きできます。 しきい値に達すると、古い集計状態が削除されます。 |
| 完了 | 集計状態は削除されません。 クエリは、トリガーごとにターゲット テーブルを書き換えます。 |
ストリーム間結合のウォーターマークと出力モード
複数のストリーム間の結合では、追加モードのみがサポートされます。 クエリは、バッチごとに一致したレコードを書き込みます。
内部結合の場合、Databricks では、クエリが古いレコードの状態情報を破棄できるように、各ストリーミング データ ソースに基準値のしきい値を設定することをお勧めします。 ウォーターマークがない場合、Structured Streaming は、各トリガーで結合の両側からすべてのキーを結合しようとします。これはパフォーマンスに影響する可能性があります。
外部結合では、ウォーターマークが必須となります。 レコードが一致しない場合、クエリはそのキーの null 値を書き込みます。 結合では追加モードのみがサポートされるため、不一致のレコードは遅延しきい値に達するまで書き込まれません。
複数ウォーターマーク ポリシーで遅延データのしきい値を制御する
複数の構造化ストリーミング入力の場合は、複数の基準値を設定して、遅延データの許容しきい値を制御できます。 透かしを使用すると、状態情報と待機時間を制御できます。
ストリーミング クエリには、結合または結合された複数の入力ストリームを含めできます。 ステートフル操作の場合、各入力ストリームでは、遅延データ許容度に対して異なるしきい値が必要になる場合があります。 各入力ストリームで withWatermark("eventTime", delay) を使用して、これらのしきい値を指定します。
ストリーム同士の結合を使用したクエリの例を次に示します。
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
ステートフル操作でクエリを実行している間、Structured Streaming は各入力ストリームの最大イベント時間を個別に追跡し、対応する遅延に基づいて透かしを計算し、1 つのグローバル基準値を決定します。 既定では、構造化ストリーミングでは、グローバル基準値として最小値が使用されます。 あるストリームが他のストリームより遅れる場合でも、最小のグローバル ウォーターマークにより、クエリが誤ってデータを遅延データとしてマークするのを防ぐことができます。 たとえば、アップストリームの障害が原因で、ストリームの 1 つがデータの受信を停止した場合に発生する可能性があります。 グローバル ウォーターマークは、最も遅いストリームのペースで確実に進み、必要に応じてクエリの出力を遅らせます。
待機時間を短縮するには、 spark.sql.streaming.multipleWatermarkPolicy を max (既定値は min) に設定して、最速のストリームの透かしをグローバルウォーターマークとして使用します。 ただし、この構成を使用すると、最も低速なストリームからデータが削除されます。 Databricks では、この構成を慎重に適用することをお勧めします。
個別の操作に透かしを適用する
distinct 操作は、ステート内の一意の各レコードを追跡します。 ウォーターマークがないと、状態が無期限に増加し、メモリの問題が発生する可能性があります。 タイムスタンプ フィールドに基準値を指定して状態をバインドし、しきい値に達した後で古いレコードを削除します。
次の例では、 distinct 操作に透かしを適用します。
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
この例では、ストリーミング クエリは、観察された最新の eventTimeから 1 時間以内に到着した重複レコードを削除します。 クエリは、しきい値に達した後に重複除去の状態情報を削除します。
重要
すべての列ではなく特定の列を重複除去するには、dropDuplicates()ではなくdropDuplicatesWithinWatermark()またはdistinctを使用します。 「ウォーターマーク内の重複をドロップする」を参照してください。
ウォーターマーク内の重複をドロップする
Databricks Runtime 13.3 LTS 以降では、一意の識別子を使用して、基準値のしきい値内のレコードを重複除去できます。
構造化ストリーミングでは、1 回だけ処理が保証されますが、データ ソースからレコードが重複除去されることはありません。 イベント時刻や到着時刻など、重複するレコード間でフィールドが異なる場合でも、 dropDuplicatesWithinWatermark を使用して任意のフィールドの重複を削除します。
dropDuplicatesWithinWatermarkでは、クエリは常に基準値のしきい値内に到着したレコードを重複除去します。 クエリによっては、しきい値の範囲外で到着したレコードも重複排除する場合がありますが、これは保証されません。 クエリがすべての重複を削除することを保証するには、基準値のしきい値を重複イベント間のタイムスタンプの最大差より大きく設定します。
dropDuplicatesWithinWatermarkメソッドを使用するには、透かしを指定する必要があります。
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
ユースケースの例
次の例は、高度なウィンドウ作成のユース ケースを示しています。
タンブリングウィンドウを使用して1時間ごとの売上合計を計算する
タンブリング ウィンドウは、重複しない間隔で固定サイズです。 各入力行は、1 つのウィンドウに属します。 タンブリング ウィンドウを使用して、時間単位の売上合計などの個別の期間の集計を計算します。
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
この例では:
-
window("timestamp", "1 hour")は、5 ~ 6 AM、6 ~ 7 AM など、重複しない 1 時間間隔に注文をグループ化します。 -
withWatermark("timestamp", "1 hour")は、ウィンドウの終了タイムスタンプが最大注文タイムスタンプより 1 時間前になるまで、各ウィンドウの集計を状態に保ちます。
スライディング ウィンドウを使用してローリング 集計を計算する
スライディング ウィンドウは、間隔が重なる可能性がある固定サイズです。 1 つの行を複数のウィンドウに属させることができます。 スライディング ウィンドウを使用して、ローリング 集計 (ローリング 6 時間の売上など) を計算します。
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
この例では:
-
window("timestamp", "6 hours", slideDuration="1 hour")は、注文を 1 時間進む 6 時間間隔 (たとえば、午前 5 時から午前 11 時、午前 6 時から午後 12 時) にグループ化します。 -
withWatermark("timestamp", "1 hour")は、ウィンドウの終了タイムスタンプが最大注文タイムスタンプより 1 時間前になるまで、各ウィンドウの集計を状態に保ちます。 -
slideDurationは、windowDuration以下である必要があります。
セッション ウィンドウを使用してユーザー アクティビティを確認する
セッション ウィンドウのサイズは固定されません。 行が到着するとウィンドウが開き、新しい行を含まないギャップ期間が経過すると閉じます。 セッション ウィンドウを使用して、30 分以内のユーザーのページ ビューなど、長時間のアイドル期間中のアクティビティ バーストを集計します。
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
この例では:
-
session_window("timestamp", gapDuration="30 minutes")は、最初のページ ビューが到着したときにウィンドウを開きます。 30 分以内に到着する後続の各ページ ビューは、ウィンドウを拡張します。 30 分以内にページ ビューが到着しない場合は、ウィンドウが閉じられ、次のページ ビューによって新しいウィンドウが開始されます。 -
withWatermark("timestamp", "1 hour")は、ウィンドウの終了タイムスタンプがページビューの最大タイムスタンプより1時間古くなるまで、各セッションの集計結果を状態として保持します。 -
timeColumnとwindow()のsession_window()引数は、TimestampTypeまたはTimestampNTZTypeである必要があります。 - イベント時間ではなく処理時間に基づいてウィンドウを定義するには、
current_timestamp()を使用します。 - ウィンドウの期間は、マイクロ秒単位から数日まで設定できます。 月の期間以上はサポートされていません。
- ウィンドウ集計
complete出力モードを使用して、すべてのウィンドウの状態を無期限に維持します。 適切なウォーターマークを使用したappend出力モードを使用して、状態の増大を抑制し、大規模なデータセットでのメモリの問題を回避します。 出力モードの動作の詳細については、ウィンドウ集計のウォーターマークと出力モードを参照してください。