foreachBatch を使用して任意のデータ シンクに書き込む
この記事では、構造化ストリーミングで foreachBatch
を使用して、既存のストリーミング シンクがないデータ ソースにストリーミング クエリの出力を書き込む方法について説明します。
コード パターン streamingDF.writeStream.foreachBatch(...)
を使用すると、ストリーミング クエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。 foreachBatch
で使用される関数は、次の 2 つのパラメーターを受け取ります。
- マイクロバッチの出力データを含む DataFrame。
- マイクロバッチの一意の ID。
構造化ストリーミングの Delta Lake マージ操作には foreachBatch
を使用する必要があります。 「foreachBatch を使用したストリーミング クエリからの upsert」 をご覧ください。
追加の DataFrame 操作を適用する
ストリーミング DataFrame でサポートされている DataFrame および Dataset 操作は多くはありません。Spark では、このようなケースでの増分プランの生成がサポートされていないためです。 foreachBatch()
を使用すると、マイクロバッチ出力ごとにこれらの操作のいくつかを適用できます。 たとえば、foreachBath()
と SQL MERGE INTO
操作を使用して、ストリーミング集計の出力を更新モードで Delta テーブルに書き込むことができます。 詳細については、「MERGE INTO」を参照してください。
重要
foreachBatch()
で保証されているのは、少なくとも 1 回の書き込みのみです。 ただし、関数に提供されるbatchId
を使用することで、出力を重複除去し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンド ツー エンドのセマンティクスを自分で考える必要があります。foreachBatch()
は、基本的にストリーミング クエリのマイクロバッチ実行に依存しているため、連続処理モードでは機能しません。 連続モードでデータを書き込む場合は、代わりにforeach()
を使用します。
空のデータフレームは、foreachBatch()
で呼び出すことができます。ユーザー コードには、適切な操作を実行できるように回復性が必要です。 次に 1 つの例を示します。
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 での foreachBatch
の動作の変更
共有アクセス モードで構成されたコンピューティング上の Databricks Runtime 14.0 以降では、次の動作の変更が適用されます。
print()
コマンドは、ドライバー ログに出力を書き込みます。- 関数内の
dbutils.widgets
サブモジュールにアクセスすることはできません。 - 関数で参照されるすべてのファイル、モジュール、またはオブジェクトは、シリアル化可能であり、Spark で使用できる必要があります。
既存のバッチ データ ソースを再利用する
foreachBatch()
を使用すると、構造化ストリーミングをサポートしていない可能性があるデータ シンクに対して、既存のバッチ データ ライターを使用できます。 次に例をいくつか示します。
foreachBatch()
からは、他にも多くのバッチ データ ソースを使用できます。 データ ソースへの接続に関するページを参照してください。
複数の場所に書き込む
ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks は、最適な並列処理とスループットを実現するために、複数の構造化ストリーミング ライターを使用することを推奨しています。
foreachBatch
を使用して複数のシンクに書き込みを行うと、ストリーミング書き込みの実行がシリアル化され、マイクロバッチごとの待機時間が増加する可能性があります。
foreachBatch
を使用して複数の Delta テーブルに書き込む場合は、「foreachBatch でのべき等テーブルの書き込み」を参照してください。