PyArrow の RecordBatchを使用してデータを処理するデータ ソース ライターの基本クラス。
Spark Row オブジェクトの反復子で動作するDataSourceWriterとは異なり、このクラスは、データを書き込むときに Arrow 形式に最適化されています。 Arrow をネイティブにサポートするシステムまたはライブラリとやり取りするときに、パフォーマンスを向上させることができます。 このクラスを実装し、 DataSource.writer() からインスタンスを返して、Arrow を使用してデータ ソースを書き込み可能にします。
構文
from pyspark.sql.datasource import DataSourceArrowWriter
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
...
メソッド
| メソッド | 説明 |
|---|---|
write(iterator) |
PyArrow RecordBatch オブジェクトの反復子をシンクに書き込みます。 各 Executor で 1 回呼び出されます。 コミット メッセージがない場合は、 WriterCommitMessageまたは None を返します。 このメソッドは抽象メソッドであり、実装する必要があります。 |
commit(messages) |
すべての Executor から収集されたコミット メッセージの一覧を使用して、書き込みジョブをコミットします。 すべてのタスクが正常に実行されたときにドライバーで呼び出されます。 このプロパティは、DataSourceWriter から継承されています。 |
abort(messages) |
すべての Executor から収集されたコミット メッセージの一覧を使用して、書き込みジョブを中止します。 1 つ以上のタスクが失敗したときにドライバーで呼び出されます。 このプロパティは、DataSourceWriter から継承されています。 |
メモ
- ドライバーは、すべての Executor からコミット メッセージを収集し、すべてのタスクが成功した場合は
commit()に渡すか、タスクが失敗した場合にabort()します。 - 書き込みタスクが失敗した場合、そのコミット メッセージは、
commit()またはabort()に渡された一覧にNoneされます。
例示
すべてのバッチの行をカウントする方向ベースのライターを実装します。
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")