DataSourceArrowWriter

Kelas dasar untuk penulis sumber data yang memproses data menggunakan PyArrow's RecordBatch.

Tidak seperti DataSourceWriter, yang bekerja dengan iterator objek Spark Row , kelas ini dioptimalkan untuk format Panah saat menulis data. Ini dapat menawarkan performa yang lebih baik saat berinteraksi dengan sistem atau pustaka yang secara asli mendukung Arrow. Terapkan kelas ini dan kembalikan instans dari DataSource.writer() untuk membuat sumber data dapat ditulis menggunakan Arrow.

Sintaksis

from pyspark.sql.datasource import DataSourceArrowWriter

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        ...

Metode

Metode Deskripsi
write(iterator) Menulis iterator objek PyArrow RecordBatch ke sink. Dipanggil sekali pada setiap pelaksana. Mengembalikan WriterCommitMessage, atau None jika tidak ada pesan penerapan. Metode ini abstrak dan harus diimplementasikan.
commit(messages) Menerapkan pekerjaan penulisan menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika semua tugas berhasil dijalankan. Diwarisi dari DataSourceWriter.
abort(messages) Membatalkan pekerjaan penulisan menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika satu atau beberapa tugas gagal. Diwarisi dari DataSourceWriter.

Catatan

  • Driver mengumpulkan pesan penerapan dari semua pelaksana dan meneruskannya ke commit() jika semua tugas berhasil, atau ke abort() jika ada tugas yang gagal.
  • Jika tugas tulis gagal, pesan penerapannya akan berada None dalam daftar yang diteruskan ke commit() atau abort().

Examples

Terapkan penulis berbasis Panah yang menghitung baris di semua batch:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")