次の方法で共有


透かしを適用してデータ処理のしきい値を制御する

この記事では、透かしの基本的な概念について説明し、一般的なステートフル ストリーミング操作で透かしを使用するためのレコメンデーションを示します。 ステートフル ストリーミング操作に透かしを適用して、状態に保持されているデータの量が無限に拡大されないようにする必要があります。これにより、メモリの問題が発生し、長時間のストリーミング操作中に処理の待機時間が長くなる可能性があります。

透かしとは

構造化ストリーミングでは、透かしを使用して、特定の状態エンティティの更新の処理を継続する期間のしきい値を制御します。 状態エンティティの一般的な例を次に示します:

  • timeウィンドウでの集計。
  • 2 つのストリーム間の結合内の一意のキー。

透かしを宣言するときは、ストリーミング DataFrame にタイムスタンプ フィールドと透かしのしきい値を指定します。 新しいデータが到着すると、状態マネージャーは、指定されたフィールドの最新のタイムスタンプを追跡し、遅延しきい値内のすべてのレコードを処理します。

次の例では、ウィンドウの数に 10 分の透かししきい値を適用します。

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

次の点に注意してください。

  • event_time 列は、10 分間の透かしと 5 分間のタンブリング ウィンドウを定義するために使用されます。
  • 重複していない 5 分間のウィンドウごとに、観察されたそれぞれの idの カウントが収集されます。
  • 状態情報は、ウィンドウの終わりが最新の観測 event_time値より 10 分前になるまで、カウントごとに保持されます。

重要

透かしのしきい値は、指定されたしきい値内にレコードが到着した場合に、定義されたクエリのセマンティクスに従って処理されることを保証します。 指定されたしきい値外に到着した遅延到着レコードは、クエリ メトリックを使用して処理される可能性がありますが、これは保証されません。

透かしは処理時間とスループットにどのような影響を与えますか?

透かしは出力モードと対話して、データがシンクに書き込まれるタイミングを制御します。 透かしは処理する状態情報の合計量を減らすので、効率的なステートフル ストリーミング スループットを実現するには、透かしの効果的な使用が不可欠です。

注意

すべてのステートフル操作で、すべての出力モードがサポートされているわけではありません。

ウィンドウ集計の透かしと出力モード

次の表では、透かしが定義されたタイムスタンプに対する集計を含むクエリの処理について詳しく説明します:

出力モード 動作
Append 透かしのしきい値を超えると、ターゲット テーブルに行が書き込まれます。 すべての書き込みは、遅延しきい値に基づいて遅延されます。 古い集計状態は、しきい値に達するとドロップされます。
アップデート 結果が計算されると、行がターゲット テーブルに書き込まれ、新しいデータが到着すると更新および上書きできます。 古い集計状態は、しきい値に達するとドロップされます。
完了 集計状態はドロップされません。 ターゲット テーブルは、各トリガーで書き換えられます。

ストリーム ストリーム結合の透かしと出力

複数のストリーム間の結合は追加モードのみをサポートし、一致したレコードは検出される各バッチに書き込まれます。 内部結合の場合、Databricks では、各ストリーミング データ ソースに透かししきい値を設定することをお勧めします。 これにより、古いレコードの状態情報を破棄できます。 透かしがない場合、構造化ストリーミングでは、結合の両側のすべてのキーを各トリガーと結合しようとします。

構造化ストリーミングには、外部結合をサポートするための特別なセマンティクスがあります。 外部結合には透かしが必須です。これは、一致しない場合に null 値でキーを書き込む必要がある場合を示します。 外部結合は、データ処理中に一致しないレコードを記録する場合に役立ちますが、結合は追加操作としてテーブルに書き込むだけなので、遅延しきい値を超えるまで、この欠落データは記録されません。

構造化ストリーミングで複数の透かしポリシーを使用して遅延データしきい値を制御する

複数の構造化ストリーミング入力を操作する場合は、複数のウォーターマークを設定して、到着遅延データの許容しきい値を制御できます。 ウォーターマークを構成すると、状態情報を制御し、待機時間に影響を与えることができます。

ストリーミング クエリには、結合または結合された複数の入力ストリームを含めできます。 各入力ストリームには、ステートフル操作に対して許容する必要がある、遅いデータのしきい値が異なる場合があります。 これらのしきい値は、各入力ストリームで withWatermarks("eventTime", delay) を使用して指定します。 ストリーム同士の結合を使用したクエリの例を次に示します。

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

クエリの実行中、構造化ストリームでは、各入力ストリームに表示される最大イベント時間を個別に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作に使用する単一のグローバル ウォーターマークを選択します。 既定では、最小値がグローバル ウォーターマークとして選択されます。これは、ストリームの 1 つが他のストリームより遅れている場合 (たとえば、アップストリーム障害が原因でストリームの 1 つがデータの受信を停止する)、誤ってドロップされるデータがないことを保証するためです。 言い換えると、グローバル ウォーターマークは最も遅いストリームのペースで安全に移動し、クエリの出力はそれに従って遅くなります。

結果をより迅速に取得するには、複数のウォーターマーク ポリシーを設定し、SQL 構成 spark.sql.streaming.multipleWatermarkPolicymax (既定値は min) に設定することで、最大値をグローバル ウォーターマークとして選択します。 これにより、グローバル ウォーターマークは最速のストリームのペースで移動できます。 ただし、この構成を使用すると、最も低速なストリームからデータが削除されます。 このため、Databricks では、この構成は注意して使用することをお勧めします。

ウォーターマーク内の重複をドロップする

Databricks Runtime 13.3 LTS 以降では、一意の識別子を使用して、ウォーターマークのしきい値内のレコードを重複除去できます。

構造化ストリーミングでは、1 回だけ処理が保証されますが、データ ソースからレコードが自動的に重複除去されることはありません。 dropDuplicatesWithinWatermark を使用すると、指定したフィールドのレコードを重複除去できます。これにより、一部のフィールドが異なる場合 (イベント時刻や到着時刻など) でも、ストリームから重複を削除できます。

指定したウォーターマーク内に到着した重複レコードはドロップされます。 この保証は 1 方向でのみ厳格であり、指定されたしきい値の範囲外に到着した重複レコードもドロップされる可能性があります。 すべての重複を削除するには、重複したイベント間の最大タイムスタンプ差よりも長い透かしの遅延しきい値を設定する必要があります。

次の例のように、dropDuplicatesWithinWatermark メソッドを使用するにはウォーターマークを指定する必要があります。

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])