foreachBatch を使用して任意のデータ シンクに書き込む

この記事では、構造化ストリーミングで foreachBatch を使用して、既存のストリーミング シンクがないデータ ソースにストリーミング クエリの出力を書き込む方法について説明します。

コード パターン streamingDF.writeStream.foreachBatch(...) を使用すると、ストリーミング クエリのすべてのマイクロバッチの出力データにバッチ関数を適用できます。 foreachBatch で使用される関数は、次の 2 つのパラメーターを受け取ります。

  • マイクロバッチの出力データを含む DataFrame。
  • マイクロバッチの一意の ID。

構造化ストリーミングの Delta Lake マージ操作には foreachBatch を使用する必要があります。 「foreachBatch を使用したストリーミング クエリからの upsert」 をご覧ください。

追加の DataFrame 操作を適用する

ストリーミング DataFrame でサポートされている DataFrame および Dataset 操作は多くはありません。Spark では、このようなケースでの増分プランの生成がサポートされていないためです。 foreachBatch() を使用すると、マイクロバッチ出力ごとにこれらの操作のいくつかを適用できます。 たとえば、foreachBath() と SQL MERGE INTO 操作を使用して、ストリーミング集計の出力を更新モードで Delta テーブルに書き込むことができます。 詳細については、「MERGE INTO」を参照してください。

重要

  • foreachBatch() で保証されているのは、少なくとも 1 回の書き込みのみです。 ただし、関数に提供される batchId を使用することで、出力を重複除去し、正確に 1 回の保証を得ることができます。 どちらの場合も、エンド ツー エンドのセマンティクスを自分で考える必要があります。
  • foreachBatch() は、基本的にストリーミング クエリのマイクロバッチ実行に依存しているため、連続処理モードでは機能しません。 連続モードでデータを書き込む場合は、代わりに foreach() を使用します。

空のデータフレームは、foreachBatch() で呼び出すことができます。ユーザー コードには、適切な操作を実行できるように回復性が必要です。 次に 1 つの例を示します。

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 での foreachBatch の動作の変更

共有アクセス モードを指定して構成したコンピューティング上の Databricks Runtime 14.0 以上では、forEachBatch は、REPL 環境ではなく、Apache Spark 上の分離された別個の Python プロセスで実行されます。 これは、シリアル化されて Spark にプッシュされ、セッション中はグローバル spark オブジェクトにアクセスできません。

その他のすべてのコンピューティング構成では、foreachBatch は、コードの残りの部分を実行するのと同じ Python REPL で実行されます。 そのため、この関数はシリアル化されません。

共有アクセス モードを指定して構成したコンピューティングで Databricks Runtime 14.0 以上を使用する場合は、次のコード例に示すように、Python で foreachBatch を使用するときに、ローカル DataFrame にスコープを設定した sparkSession 変数を使用する必要があります。

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

次の動作の変更が適用されます。

  • 関数内からグローバル Python 変数にアクセスすることはできません。
  • print() コマンドは、ドライバー ログに出力を書き込みます。
  • 関数で参照されるすべてのファイル、モジュール、またはオブジェクトは、シリアル化可能であり、Spark で使用できる必要があります。

既存のバッチ データ ソースを再利用する

foreachBatch() を使用すると、構造化ストリーミングをサポートしていない可能性があるデータ シンクに対して、既存のバッチ データ ライターを使用できます。 次に例をいくつか示します。

foreachBatch() からは、他にも多くのバッチ データ ソースを使用できます。 データ ソースへの接続に関するページを参照してください。

複数の場所に書き込む

ストリーミング クエリの出力を複数の場所に書き込む必要がある場合、Databricks は、最適な並列処理とスループットを実現するために、複数の構造化ストリーミング ライターを使用することを推奨しています。

foreachBatch を使用して複数のシンクに書き込みを行うと、ストリーミング書き込みの実行がシリアル化され、マイクロバッチごとの待機時間が増加する可能性があります。

foreachBatch を使用して複数の Delta テーブルに書き込む場合は、「foreachBatch でのべき等テーブルの書き込み」を参照してください。