Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Kelas dasar untuk penulis aliran data yang memproses data menggunakan PyArrow's RecordBatch.
Tidak seperti DataSourceStreamWriter, yang bekerja dengan iterator objek Spark Row , kelas ini dioptimalkan untuk format Panah saat menulis data streaming. Ini dapat menawarkan performa yang lebih baik saat berinteraksi dengan sistem atau pustaka yang secara asli mendukung Arrow untuk kasus penggunaan streaming. Terapkan kelas ini dan kembalikan instans dari DataSource.streamWriter() untuk membuat sumber data dapat ditulis sebagai sink streaming menggunakan Arrow.
Sintaksis
from pyspark.sql.datasource import DataSourceStreamArrowWriter
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...
Metode
| Metode | Deskripsi |
|---|---|
write(iterator) |
Menulis iterator objek PyArrow RecordBatch ke sink streaming. Dipanggil pada pelaksana sekali per microbatch. Mengembalikan WriterCommitMessage, atau None jika tidak ada pesan penerapan. Metode ini abstrak dan harus diimplementasikan. |
commit(messages, batchId) |
Menerapkan microbatch menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika semua tugas dalam microbatch berhasil dijalankan. Diwarisi dari DataSourceStreamWriter. |
abort(messages, batchId) |
Membatalkan microbatch menggunakan daftar pesan penerapan yang dikumpulkan dari semua pelaksana. Dipanggil pada driver ketika satu atau beberapa tugas dalam microbatch gagal. Diwarisi dari DataSourceStreamWriter. |
Catatan
- Driver mengumpulkan pesan penerapan dari semua pelaksana dan meneruskannya ke
commit()jika semua tugas berhasil, atau keabort()jika ada tugas yang gagal. - Jika tugas tulis gagal, pesan penerapannya akan berada
Nonedalam daftar yang diteruskan kecommit()atauabort(). -
batchIdsecara unik mengidentifikasi setiap mikrobatch dan kenaikan 1 dengan setiap mikrobatch diproses.
Examples
Terapkan penulis aliran berbasis Panah yang menghitung baris per microbatch:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")