Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Kelas dasar untuk pembaca sumber data streaming yang disederhanakan.
Dibandingkan DataSourceStreamReaderdengan , SimpleDataSourceStreamReader tidak memerlukan partisi data perencanaan. Metode ini read() memungkinkan membaca data dan merencanakan offset terbaru secara bersamaan.
Karena SimpleDataSourceStreamReader membaca rekaman di driver Spark untuk menentukan offset akhir setiap batch tanpa partisi, hanya cocok untuk kasus penggunaan ringan di mana laju input dan ukuran batch kecil. Gunakan DataSourceStreamReader saat throughput baca tinggi dan tidak dapat ditangani oleh satu proses.
Ditambahkan dalam Databricks Runtime 15.3
Sintaksis
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Metode
| Metode | Deskripsi |
|---|---|
initialOffset() |
Mengembalikan offset awal sumber data streaming. Kueri streaming baru mulai membaca dari offset ini. |
read(start) |
Membaca semua data yang tersedia dari offset awal dan mengembalikan tuple dari iterator rekaman dan offset akhir untuk upaya baca berikutnya. |
readBetweenOffsets(start, end) |
Membaca semua data yang tersedia antara offset awal dan akhir tertentu. Dipanggil selama pemulihan kegagalan untuk membaca kembali batch secara deterministik. |
commit(end) |
Menginformasikan sumber bahwa Spark telah selesai memproses semua data untuk offset kurang dari atau sama dengan end. |
Examples
Tentukan pembaca sumber data streaming kustom yang disederhanakan:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()