この記事では、構造化ストリーミングで foreachBatch を使用して、既存のストリーミング シンクがないデータ ソースにストリーミング クエリの出力を書き込む方法について説明します。
コード パターン streamingDF.writeStream.foreachBatch(...) を使用すると、ストリーミング クエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。
foreachBatch で使用される関数は、次の 2 つのパラメーターを受け取ります。
- マイクロバッチの出力データを含む DataFrame。
- マイクロバッチの一意の ID。
構造化ストリーミングの Delta Lake マージ操作には foreachBatch を使用する必要があります。
foreachBatchを使用したストリーミング クエリからのアップサートを参照してください。
追加の DataFrame 操作を適用する
ストリーミング DataFrame でサポートされている DataFrame および Dataset 操作は多くはありません。Spark では、このようなケースでの増分プランの生成がサポートされていないためです。
foreachBatch() を使用すると、マイクロバッチ出力ごとにこれらの操作のいくつかを適用できます。 たとえば、foreachBatch() と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードで Delta テーブルに書き込むことができます。 詳細については、MERGE INTOを参照してください。
重要
-
foreachBatch()で保証されているのは、少なくとも 1 回の書き込みのみです。 ただし、関数に提供されるbatchIdを使用することで、出力を重複除去し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンド ツー エンドのセマンティクスを自分で考える必要があります。 -
foreachBatch()は、基本的にストリーミング クエリのマイクロバッチ実行に依存しているため、連続処理モードでは機能しません。 連続モードでデータを書き込む場合は、代わりにforeach()を使用します。 - ステートフル演算子で
foreachBatchを使用する場合は、処理が完了する前に各バッチを完全に使用することが重要です。 各バッチ DataFrame を完全に処理するを参照してください
空のデータフレームは、foreachBatch() で呼び出すことができます。ユーザー コードには、適切な操作を実行できるように回復性が必要です。 次に 1 つの例を示します。
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 での foreachBatch の動作の変更
標準アクセス モードで構成されたコンピューティング上の Databricks Runtime 14.0 以降では、次の動作の変更が適用されます。
-
print()コマンドの出力はドライバー ログに記録されます。 - 関数内で
dbutils.widgetsサブモジュールにはアクセスできません。 - 関数内で参照されるファイル、モジュール、オブジェクトはすべてシリアル化可能であり、Spark 上で使用可能である必要があります。
既存のバッチ データ ソースを再利用する
foreachBatch() を使用すると、構造化ストリーミングをサポートしていない可能性があるデータ シンクに対して、既存のバッチ データ ライターを使用できます。 次に例をいくつか示します。
foreachBatch() からは、他にも多くのバッチ データ ソースを使用できます。
データ ソースと外部サービスへの接続を参照してください。
複数の場所に書き込む
ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks は、最適な並列処理とスループットを実現するために、複数の構造化ストリーミング ライターを使用することを推奨しています。
foreachBatch を使用して複数のシンクに書き込みを行うと、ストリーミング書き込みの実行がシリアル化され、マイクロバッチごとの待機時間が増加する可能性があります。
foreachBatch を使用して複数の Delta テーブルに書き込む場合は、foreachBatch でのべき等性のあるテーブル書き込みを参照してください。
各バッチの DataFrame を完全に使い切る
ステートフル演算子を使用する場合 (たとえば、 dropDuplicatesWithinWatermarkを使用する場合)、各バッチイテレーションで DataFrame 全体を使用するか、クエリを再起動する必要があります。 DataFrame 全体を使用しない場合、ストリーミング クエリは次のバッチで失敗します。
これは、いくつかのケースで発生する可能性があります。 次の例は、DataFrame を正しく使用しないクエリを修正する方法を示しています。
バッチのサブセットを意図的に使用する
バッチのサブセットのみを考慮する場合は、次のようなコードを使用できます。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
この場合、 batch_df.show(2) はバッチ内の最初の 2 つの項目のみを処理しますが、必要な項目が多い場合は、それらの項目を使用する必要があります。 次のコードは、完全な DataFrame を使用します。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
ここでは、 do_nothing 関数は DataFrame の残りの部分を自動的に無視します。
バッチでのエラーの処理
foreachBatch プロセスの実行中にエラーが発生する可能性があります。 次のようなコードを作成できます (この場合、このサンプルでは、問題を示すために意図的にエラーが発生します)。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
エラーを処理 (およびサイレント モードで飲み込む) と、バッチの残りの部分が消費されない可能性があります。 この状況を処理するには、2 つのオプションがあります。
まず、エラーを再発生させ、オーケストレーションレイヤーに渡してバッチを再試行することができます。 これは、一時的な問題である場合にエラーを解決したり、運用チームが手動で修正しようとするエラーを発生させたりする可能性があります。 これを行うには、 partial_func コードを次のように変更します。
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
2 番目のオプションは、例外をキャッチし、バッチの残りの部分を無視する場合は、コードをこれに変更することです。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
このコードでは、 do_nothing 関数を使用して、バッチの残りの部分を自動的に無視します。