SimpleDataSourceStreamReader

Kelas dasar untuk pembaca sumber data streaming yang disederhanakan.

Dibandingkan DataSourceStreamReaderdengan , SimpleDataSourceStreamReader tidak memerlukan partisi data perencanaan. Metode ini read() memungkinkan membaca data dan merencanakan offset terbaru secara bersamaan.

Karena SimpleDataSourceStreamReader membaca rekaman di driver Spark untuk menentukan offset akhir setiap batch tanpa partisi, hanya cocok untuk kasus penggunaan ringan di mana laju input dan ukuran batch kecil. Gunakan DataSourceStreamReader saat throughput baca tinggi dan tidak dapat ditangani oleh satu proses.

Ditambahkan dalam Databricks Runtime 15.3

Sintaksis

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Metode

Metode Deskripsi
initialOffset() Mengembalikan offset awal sumber data streaming. Kueri streaming baru mulai membaca dari offset ini.
read(start) Membaca semua data yang tersedia dari offset awal dan mengembalikan tuple dari iterator rekaman dan offset akhir untuk upaya baca berikutnya.
readBetweenOffsets(start, end) Membaca semua data yang tersedia antara offset awal dan akhir tertentu. Dipanggil selama pemulihan kegagalan untuk membaca kembali batch secara deterministik.
commit(end) Menginformasikan sumber bahwa Spark telah selesai memproses semua data untuk offset kurang dari atau sama dengan end.

Examples

Tentukan pembaca sumber data streaming kustom yang disederhanakan:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()