Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Kelas dasar untuk penulis sumber data.
Penulis sumber data bertanggung jawab untuk menyimpan data ke sumber data. Terapkan kelas ini dan kembalikan instans dari DataSource.writer() untuk membuat sumber data dapat ditulis.
Ditambahkan dalam Databricks Runtime 14.3 LTS
Sintaksis
from pyspark.sql.datasource import DataSourceWriter
class MyDataSourceWriter(DataSourceWriter):
def write(self, iterator):
...
Metode
| Metode | Deskripsi |
|---|---|
write(iterator) |
Menulis data ke dalam sumber data. Dipanggil sekali pada setiap pelaksana. Menerima iterator Row objek dan mengembalikan WriterCommitMessage, atau None jika tidak ada pesan penerapan. Metode ini abstrak dan harus diimplementasikan. |
commit(messages) |
Menerapkan pekerjaan penulisan menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika semua tugas berhasil dijalankan. |
abort(messages) |
Membatalkan pekerjaan penulisan menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika satu atau beberapa tugas gagal. |
Catatan
- Driver mengumpulkan pesan penerapan dari semua pelaksana dan meneruskannya ke
commit()jika semua tugas berhasil, atau keabort()jika ada tugas yang gagal. - Jika tugas tulis gagal, pesan penerapannya akan berada
Nonedalam daftar yang diteruskan kecommit()atauabort().
Examples
Terapkan penulis dasar yang menyimpan baris ke file:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceWriter(DataSourceWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "w") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")