Lưu ý
Cần có ủy quyền mới truy nhập được vào trang này. Bạn có thể thử đăng nhập hoặc thay đổi thư mục.
Cần có ủy quyền mới truy nhập được vào trang này. Bạn có thể thử thay đổi thư mục.
A base class for data source writers.
Data source writers are responsible for saving data to a data source. Implement this class and return an instance from DataSource.writer() to make a data source writable.
Added in Databricks Runtime 14.3 LTS
Syntax
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
Methods
| Method | Description |
|---|---|
write(iterator) |
Writes data into the data source. Called once on each executor. Accepts an iterator of Row objects and returns a WriterCommitMessage, or None if there is no commit message. This method is abstract and must be implemented. |
commit(messages) |
Commits the writing job using a list of commit messages collected from all executors. Invoked on the driver when all tasks run successfully. |
abort(messages) |
Aborts the writing job using a list of commit messages collected from all executors. Invoked on the driver when one or more tasks failed. |
Notes
- The driver collects commit messages from all executors and passes them to
commit()if all tasks succeed, or toabort()if any task fails. - If a write task fails, its commit message will be
Nonein the list passed tocommit()orabort().
Examples
Implement a basic writer that saves rows to a file:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")