共用方式為


資料來源串流箭頭作家

一個用於資料串流寫入器的基底類別,使用 PyArrow 的 RecordBatch

DataSourceStreamWriter不同,該類別可透過 Spark Row 物件的迭代器運作,該類別在撰寫串流資料時針對 Arrow 格式進行最佳化。 當與原生支援 Arrow 的系統或函式庫進行串流應用時,它能提供更好的效能。 實作這個類別並回傳一個 DataSource.streamWriter() 實例,讓資料來源可以用 Arrow 寫成串流匯入。

語法

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

方法

方法 說明
write(iterator) 會將 PyArrow RecordBatch 物件的迭代器寫入串流匯流器。 每微批次會呼叫執行者一次。 回傳 ,或None若無提交訊息則回傳WriterCommitMessage。 此方法為抽象且必須實作。
commit(messages, batchId) 透過從所有執行者收集的提交訊息清單,提交微批次。 當微批次中的所有任務成功執行時,會在驅動程式中呼叫。 繼承自 DataSourceStreamWriter
abort(messages, batchId) 利用從所有執行者收集的提交訊息清單中止微批次。 當微批次中的一個或多個任務失敗時,會呼叫驅動程式。 繼承自 DataSourceStreamWriter

Notes

  • 驅動程式會從所有執行者收集提交訊息,若所有任務成功abort()或任務失敗則傳commit()達給執行者。
  • 若寫入任務失敗,其提交訊息會被 None 傳送 commit() 到或 abort()的清單中。
  • batchId 唯一識別每個微批次,並隨著每處理一個微批次增加1。

Examples

實作一個基於 Arrow 的串流寫入器,能計算每個微批次的列數:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")