数据源编写器的基类。
数据源编写器负责将数据保存到数据源。 实现此类并返回一个实例 DataSource.writer() ,使数据源可写。
在 Databricks Runtime 14.3 LTS 中添加
Syntax
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
方法
| 方法 | 说明 |
|---|---|
write(iterator) |
将数据写入数据源。 在每个执行程序上调用一次。 接受对象的迭代器 Row 并返回一个 WriterCommitMessage,或者 None 如果没有提交消息。 此方法是抽象的,必须实现。 |
commit(messages) |
使用从所有执行程序收集的提交消息列表提交写入作业。 成功运行所有任务时,在驱动程序上调用。 |
abort(messages) |
使用从所有执行程序收集的提交消息列表中止写入作业。 当一个或多个任务失败时,在驱动程序上调用。 |
备注
- 驱动程序从所有执行程序收集提交消息,并将其传递给
commit()所有任务是否成功,或传递到abort()任何任务失败。 - 如果写入任务失败,则其提交消息将
None位于传递给或commit()传递abort()的列表中。
示例
实现将行保存到文件的基本编写器:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")