このページでは、構造化ストリーミングで foreachBatch を使用して、ストリーミング クエリの出力を、既存のストリーミング シンクがないデータ ソースに書き込む方法を示します。
コード パターン streamingDF.writeStream.foreachBatch(...) を使用すると、ストリーミング クエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。
foreachBatch で使用される関数は、次の 2 つのパラメーターを受け取ります。
- マイクロバッチの出力データを含む DataFrame。
- マイクロバッチの一意の ID。
構造化ストリーミングの Delta Lake マージ操作には foreachBatch を使用する必要があります。
foreachBatchを使用したストリーミング クエリからのアップサートを参照してください。
追加の DataFrame 操作を適用する
ストリーミング DataFrame でサポートされている DataFrame および Dataset 操作は多くはありません。Spark では、このようなケースでの増分プランの生成がサポートされていないためです。
foreachBatch() を使用すると、マイクロバッチ出力ごとにこれらの操作のいくつかを適用できます。 たとえば、foreachBatch() と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードで Delta テーブルに書き込むことができます。 詳細については、MERGE INTOを参照してください。
重要
-
foreachBatch()で保証されているのは、少なくとも 1 回の書き込みのみです。 ただし、関数に提供されるbatchIdを使用することで、出力を重複除去し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンド ツー エンドのセマンティクスを自分で考える必要があります。 -
foreachBatch()は、基本的にストリーミング クエリのマイクロバッチ実行に依存しているため、連続処理モードでは機能しません。 連続モードでデータを書き込む場合は、代わりにforeach()を使用します。 - ステートフル演算子で
foreachBatchを使用する場合は、処理が完了する前に各バッチを完全に使用することが重要です。 各バッチ DataFrame を完全に処理するを参照してください
空の DataFrame を処理する
foreachBatch() は空の DataFrame を受け取る可能性があるため、コードはこのシナリオを処理する必要があります。 そうしないと、クエリが失敗する可能性があります。
たとえば、Delta Lake がストリーミング ソースである場合、これらのシナリオでは空の DataFrame を foreachBatch()に渡すことができます。
-
OPTIMIZE処理するファイルがない場合: Delta Lake ソース テーブルでOPTIMIZE操作が実行されているが、処理するファイルがない場合、Structured Streaming はオフセット ログ エントリを書き込み、テーブルのバージョンをインクリメントします。 これにより、ファイルが読み取らなくても、シンクに空のマイクロバッチが生成されます。 - 物理プラン レベルでのファイルの剪定: 述語プッシュダウンまたはファイルの剪定によって物理プラン レベルですべてのレコードが削除された場合、結果はデータシンクへの空のコミットが行われます。
ユーザー コードは、適切な操作を可能にするために、空の DataFrame を処理する必要があります。 以下の例を参照してください。
Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
Scala
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 での foreachBatch の動作の変更
標準アクセス モードで構成されたコンピューティング上の Databricks Runtime 14.0 以降では、次の動作の変更が適用されます。
-
print()コマンドの出力はドライバー ログに記録されます。 - 関数内で
dbutils.widgetsサブモジュールにはアクセスできません。 - 関数内で参照されるファイル、モジュール、オブジェクトはすべてシリアル化可能であり、Spark 上で使用可能である必要があります。
既存のバッチ データ ソースを再利用する
foreachBatch() を使用すると、構造化ストリーミングをサポートしていない可能性があるデータ シンクに対して、既存のバッチ データ ライターを使用できます。 次に例をいくつか示します。
foreachBatch() からは、他にも多くのバッチ データ ソースを使用できます。
データ ソースと外部サービスへの接続を参照してください。
複数の場所に書き込む
ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks は、最適な並列処理とスループットを実現するために、複数の構造化ストリーミング ライターを使用することを推奨しています。
foreachBatch を使用して複数のシンクに書き込みを行うと、ストリーミング書き込みの実行がシリアル化され、マイクロバッチごとの待機時間が増加する可能性があります。
foreachBatchを使用して複数のデルタ テーブルに書き込む場合は、「べき等テーブルの書き込みにforeachBatchを使用する」を参照してください。
各バッチの DataFrame を完全に使い切る
ステートフル演算子を使用する場合 (たとえば、 dropDuplicatesWithinWatermarkを使用する場合)、各バッチイテレーションで DataFrame 全体を使用するか、クエリを再起動する必要があります。 DataFrame 全体を使用しない場合、ストリーミング クエリは次のバッチで失敗します。
これは、いくつかのケースで発生する可能性があります。 次の例は、DataFrame を正しく使用しないクエリを修正する方法を示しています。
バッチのサブセットを意図的に使用する
バッチのサブセットのみを考慮する場合は、次のようなコードを使用できます。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
この場合、 batch_df.show(2) はバッチ内の最初の 2 つの項目のみを処理しますが、必要な項目が多い場合は、それらの項目を使用する必要があります。 次のコードは、完全な DataFrame を使用します。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
ここでは、 do_nothing 関数は DataFrame の残りの部分を自動的に無視します。
バッチでのエラーの処理
foreachBatchでのエラー処理の場合、Databricks では、ストリーミング クエリを高速に失敗させ、代わりに Lakeflow ジョブや Apache エアフローなどのオーケストレーション レイヤーに依存して再試行ロジックを管理することをお勧めします。 これは、データ損失が発生する可能性がある複雑な再試行ループをコード内に構築するよりもはるかに安全です。
書き込みターゲットに基づくガイドラインを次に示します。
| ターゲット | 例示 | ガイダンス |
|---|---|---|
| DataFrame 操作 | Delta Lake テーブル | 再試行時にデータの正確性を保護し、べき等性を保証するためには、txnAppIdとtxnVersionの書き込みオプションを使用し、txnVersionをbatchIdにバインドする必要があります。 例外をローカルでキャッチして再試行しないでください。 代わりに、Databricks では、Spark メトリックが正確に維持され、データが重複せず、オーケストレーターが完全なバッチをクリーンに再試行できるように、エラーの伝達を許可することをお勧めします。 |
| カスタム コードと外部送信先 |
.collect()、OLTP データベース、メッセージ キュー、API |
独自のアイデムポテンスを実装してください。 すべての操作がバッチ間で再試行できることを前提とする必要があります。
batchIdが変わらない場合、操作の結果は変わらない必要があります。 短い接続タイムアウトなどの純粋に一時的なエラーを再試行することもできますが、最終的に再試行が失敗した場合は、部分的または重複する書き込みを避けるために細心の注意を払います。 最も安全な方法は、エラーを伝達させ、オーケストレーターがバッチ全体を再試行できるようにすることです。 |
例外の種類の例と、 foreachBatchでそれらを処理する方法に関する推奨事項を次に示します。
| 例外の種類 | 例示 | 推奨されるアクション |
|---|---|---|
| 一時的なシンク エラー |
SQLTransientConnectionException、HTTP 429、タイムアウト |
キャッチ: 再試行するか、デッドレターキューに送信する |
| シンクがべき等である場合、重複またはキー制約の違反が発生することがあります | SQLIntegrityConstraintViolationException |
キャッチ: ログと抑制 |
| カスタム再試行可能エラー | ラップされたソケット例外、再試行可能なデータベースエラー | キャッチ: メトリックをインクリメントし、制御された継続を許可する |
| ロジックまたはスキーマのエラー |
NullPointerException、 AttributeError、スキーマの不一致 |
伝達: Spark にクエリの失敗を許可する |
| 再試行できないシンク エラーまたはキャッチされないロジックのバグ |
ValueError、PermissionError |
伝達: Spark にクエリの失敗を許可する |
| 重大なエラー |
OutOfMemoryError、破損状態、データ整合性違反 |
伝達: Spark にクエリの失敗を許可する |
コード例: 例外処理
次の例では、 foreach でエラーを意図的に発生させ、エラーを処理するためのさまざまな方法を示しています。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
上記のコードはエラーを処理し、警告なしに抑制し、残りのバッチを使用しない可能性があります。 この状況を処理するには、2 つのオプションがあります。
まず、エラーを再発生させることができます。このエラーは、バッチを再試行するためにオーケストレーション レイヤーに渡されます。 これにより、一時的な問題であればエラーを解決できますが、問題が解決しない場合は運用チームに報告して手動で修正を試みてもらうことができます。 これを行うには、 partial_func コードを次のように変更します。
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
次に、例外をキャッチしてバッチの残りの部分を無視する場合は、 do_nothing 関数を使用するようにコードを変更して、バッチの残りの部分を自動的に無視できます。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
失敗レコードをデッドレターキューに書き込む
既定では、不適切なレコードが到着するとすぐにクエリが失敗します。 セカンダリの Delta Lake テーブルをデッドレター キュー (DLQ) として構成することで、このような中断を回避できます。
DLQ を使用すると、システムは失敗したレコードをセカンダリ テーブルにルーティングし、有効なデータの処理を中断せずに続行します。 DLQ テーブルを使用すると、不適切なレコードを後で検査して再処理できます。
次の場合は、このメソッドを使用します。
- ストリームには、スキーマの制約やビジネス ルールに違反する可能性があるさまざまなデータが含まれています。
- 監査またはコンプライアンス規則では、すべてのレコードを保持する必要があります。
例
次の例では、 foreachBatch を使用して、各マイクロバッチを有効なレコードと無効なレコードに分割します。 このクエリでは、バッチ DataFrame の cache() を使用してソースが 2 回スキャンされないようにし、各サブセットを独自の Delta Lake テーブルに書き込みます。
Python
from pyspark.sql.functions import current_timestamp, lit
main_table = "catalog.schema.orders"
dlq_table = "catalog.schema.orders_dlq"
app_id = "orders-streaming-job"
def process_orders(batch_df, batch_id):
if batch_df.isEmpty():
return
batch_df.cache()
valid_condition = "order_amount > 0 AND customer_id IS NOT NULL"
# Write valid records to the main table with idempotency options
batch_df.filter(valid_condition).write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable(main_table)
# Route invalid records to the dead-letter queue
invalid_df = batch_df.filter(f"NOT ({valid_condition})")
if not invalid_df.isEmpty():
invalid_df \
.withColumn("dlq_batch_id", lit(batch_id)) \
.withColumn("dlq_ingest_time", current_timestamp()) \
.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable(dlq_table)
batch_df.unpersist()
spark.readStream \
.format("delta") \
.table("catalog.schema.raw_orders") \
.writeStream \
.foreachBatch(process_orders) \
.option("checkpointLocation", "/path/to/checkpoint") \
.start()
Scala
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{current_timestamp, lit}
val mainTable = "catalog.schema.orders"
val dlqTable = "catalog.schema.orders_dlq"
val appId = "orders-streaming-job"
def processOrders(batchDf: DataFrame, batchId: Long): Unit = {
if (batchDf.isEmpty) return
batchDf.cache()
val validCondition = "order_amount > 0 AND customer_id IS NOT NULL"
// Write valid records to the main table with idempotency options
batchDf.filter(validCondition).write
.format("delta")
.mode("append")
.option("txnVersion", batchId)
.option("txnAppId", appId)
.saveAsTable(mainTable)
// Route invalid records to the dead-letter queue
val invalidDf = batchDf.filter(s"NOT ($validCondition)")
if (!invalidDf.isEmpty) {
invalidDf
.withColumn("dlq_batch_id", lit(batchId))
.withColumn("dlq_ingest_time", current_timestamp())
.write
.format("delta")
.mode("append")
.option("txnVersion", batchId)
.option("txnAppId", appId)
.saveAsTable(dlqTable)
}
batchDf.unpersist()
}
spark.readStream
.format("delta")
.table("catalog.schema.raw_orders")
.writeStream
.foreachBatch(processOrders _)
.option("checkpointLocation", "/path/to/checkpoint")
.start()
どちらの書き込みでも、べき等性を保証するために txnVersion と txnAppId が使用されます。 Spark が同じ batchIdでバッチを再生すると、Delta Lake は重複する書き込みをスキップします。
「foreachBatch」を使用してべき等テーブルを書き込む方法について参照してください。