DataSourceWriter

Kelas dasar untuk penulis sumber data.

Penulis sumber data bertanggung jawab untuk menyimpan data ke sumber data. Terapkan kelas ini dan kembalikan instans dari DataSource.writer() untuk membuat sumber data dapat ditulis.

Ditambahkan dalam Databricks Runtime 14.3 LTS

Sintaksis

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Metode

Metode Deskripsi
write(iterator) Menulis data ke dalam sumber data. Dipanggil sekali pada setiap pelaksana. Menerima iterator Row objek dan mengembalikan WriterCommitMessage, atau None jika tidak ada pesan penerapan. Metode ini abstrak dan harus diimplementasikan.
commit(messages) Menerapkan pekerjaan penulisan menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika semua tugas berhasil dijalankan.
abort(messages) Membatalkan pekerjaan penulisan menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika satu atau beberapa tugas gagal.

Catatan

  • Driver mengumpulkan pesan penerapan dari semua pelaksana dan meneruskannya ke commit() jika semua tugas berhasil, atau ke abort() jika ada tugas yang gagal.
  • Jika tugas tulis gagal, pesan penerapannya akan berada None dalam daftar yang diteruskan ke commit() atau abort().

Examples

Terapkan penulis dasar yang menyimpan baris ke file:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")