DataSourceStreamReader

Kelas dasar untuk pembaca sumber data streaming.

Pembaca aliran sumber data bertanggung jawab untuk menghasilkan data dari sumber data streaming. Terapkan kelas ini dan kembalikan instans dari DataSource.streamReader() untuk membuat sumber data dapat dibaca sebagai sumber streaming.

Ditambahkan dalam Databricks Runtime 15.2

Sintaksis

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Metode

Metode Deskripsi
initialOffset() Mengembalikan offset awal sumber data streaming sebagai dict. Kueri streaming baru mulai membaca dari offset ini. Harus mengembalikan pasangan nilai kunci offset dari jenis primitif dalam JSON atau dict format. PySparkNotImplementedError Menaikkan jika tidak diimplementasikan.
latestOffset(start, limit) Mengembalikan offset terbaru yang dicttersedia sebagai , mengingat offset awal dan batas baca. Sumber dapat mengembalikan offset yang sama seolah-olah start tidak ada data baru. Sumber harus selalu menghormati yang diberikan limit. Harus mengembalikan pasangan nilai kunci offset dari jenis primitif dalam JSON atau dict format. PySparkNotImplementedError Menaikkan jika tidak diimplementasikan.
partitions(start, end) Mengembalikan urutan InputPartition objek yang mewakili data antara start dan end offset. Mengembalikan urutan kosong jika start sama dengan end. Masing-masing InputPartition mewakili pemisahan data yang dapat diproses oleh satu tugas Spark.
read(partition) Menghasilkan data untuk partisi tertentu dan mengembalikan iterator objek tuple, baris, atau PyArrow RecordBatch . Setiap tuple atau baris dikonversi menjadi baris di DataFrame akhir. Metode ini abstrak dan harus diimplementasikan.
commit(end) Menginformasikan sumber bahwa Spark telah selesai memproses semua data untuk offset kurang dari atau sama dengan end. Spark hanya akan meminta offset yang lebih besar dari end di masa mendatang.
stop() Menghentikan sumber dan membebaskan sumber daya apa pun yang telah dialokasikannya. Dipanggil saat kueri streaming berakhir.

Catatan

  • read() statis dan tanpa status. Jangan mengakses anggota kelas yang dapat diubah atau menyimpan status dalam memori di antara pemanggilan yang berbeda dari read().
  • Semua nilai partisi yang dikembalikan oleh partitions() harus berupa objek yang dapat dipilih.
  • Offset direpresentasikan sebagai dict atau rekursif dict yang kunci dan nilainya adalah jenis primitif: bilangan bulat, string, atau boolean.

Examples

Terapkan pembaca streaming yang membaca dari urutan rekaman terindeks:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")