次の方法で共有


DataSourceStreamArrowWriter

PyArrow の RecordBatchを使用してデータを処理するデータ ストリーム ライターの基本クラス。

Spark Row オブジェクトの反復子で動作するDataSourceStreamWriterとは異なり、このクラスはストリーミング データを書き込むときに Arrow 形式用に最適化されています。 ストリーミング ユース ケースに対して Arrow をネイティブにサポートするシステムまたはライブラリとやり取りするときに、パフォーマンスを向上させることができます。 このクラスを実装し、 DataSource.streamWriter() からインスタンスを返して、Arrow を使用してストリーミング シンクとしてデータ ソースを書き込み可能にします。

構文

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

メソッド

メソッド 説明
write(iterator) PyArrow RecordBatch オブジェクトの反復子をストリーミング シンクに書き込みます。 マイクロバッチごとに 1 回実行プログラムで呼び出されます。 コミット メッセージがない場合は、 WriterCommitMessageまたは None を返します。 このメソッドは抽象メソッドであり、実装する必要があります。
commit(messages, batchId) すべての Executor から収集されたコミット メッセージの一覧を使用して、マイクロバッチをコミットします。 マイクロバッチ内のすべてのタスクが正常に実行されると、ドライバーで呼び出されます。 このプロパティは、DataSourceStreamWriter から継承されています。
abort(messages, batchId) すべての Executor から収集されたコミット メッセージの一覧を使用して、マイクロバッチを中止します。 マイクロバッチ内の 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。 このプロパティは、DataSourceStreamWriter から継承されています。

メモ

  • ドライバーは、すべての Executor からコミット メッセージを収集し、すべてのタスクが成功した場合は commit() に渡すか、タスクが失敗した場合に abort() します。
  • 書き込みタスクが失敗した場合、そのコミット メッセージは、commit()またはabort()に渡された一覧にNoneされます。
  • batchId は、各マイクロバッチを一意に識別し、各マイクロバッチを処理して 1 ずつインクリメントします。

例示

マイクロバッチあたりの行数をカウントする矢印ベースのストリーム ライターを実装します。

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")