一個用於資料串流寫入器的基底類別,使用 PyArrow 的 RecordBatch。
與 DataSourceStreamWriter不同,該類別可透過 Spark Row 物件的迭代器運作,該類別在撰寫串流資料時針對 Arrow 格式進行最佳化。 當與原生支援 Arrow 的系統或函式庫進行串流應用時,它能提供更好的效能。 實作這個類別並回傳一個 DataSource.streamWriter() 實例,讓資料來源可以用 Arrow 寫成串流匯入。
語法
from pyspark.sql.datasource import DataSourceStreamArrowWriter
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...
方法
| 方法 | 說明 |
|---|---|
write(iterator) |
會將 PyArrow RecordBatch 物件的迭代器寫入串流匯流器。 每微批次會呼叫執行者一次。 回傳 ,或None若無提交訊息則回傳WriterCommitMessage。 此方法為抽象且必須實作。 |
commit(messages, batchId) |
透過從所有執行者收集的提交訊息清單,提交微批次。 當微批次中的所有任務成功執行時,會在驅動程式中呼叫。 繼承自 DataSourceStreamWriter。 |
abort(messages, batchId) |
利用從所有執行者收集的提交訊息清單中止微批次。 當微批次中的一個或多個任務失敗時,會呼叫驅動程式。 繼承自 DataSourceStreamWriter。 |
Notes
- 驅動程式會從所有執行者收集提交訊息,若所有任務成功
abort()或任務失敗則傳commit()達給執行者。 - 若寫入任務失敗,其提交訊息會被
None傳送commit()到或abort()的清單中。 -
batchId唯一識別每個微批次,並隨著每處理一個微批次增加1。
Examples
實作一個基於 Arrow 的串流寫入器,能計算每個微批次的列數:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")