一個用於串流資料來源讀取器的基底類別。
資料來源串流讀取器負責從串流資料來源輸出資料。 實作這個類別並回傳一個 DataSource.streamReader() 實例,讓資料來源可作為串流來源讀取。
語法
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
方法
| 方法 | 說明 |
|---|---|
initialOffset() |
會以指令(dict)回傳串流資料來源的初始偏移量。 一個新的串流查詢會從這個偏移量開始讀取。 重新啟動的查詢會從檢查點的偏移量繼續。 |
partitions(start, end) |
回傳一串InputPartition物件,代表介於 與 end 偏移量之間的start資料。 若 start 等於 end,則回傳一個空序列。 |
read(partition) |
為給定分割產生資料,並回傳元組、列或 PyArrow RecordBatch 物件的迭代器。 每個元組或列都會在最終資料框架中轉換成一列。 此方法為抽象且必須實作。 |
commit(end) |
通知來源 Spark 已完成處理所有偏移量小於 end等於 的資料。 Spark 只會請求比 end 未來更大的偏移量。 |
stop() |
停止源頭並釋放其分配的資源。 串流查詢結束時會被喚起。 |
Notes
-
read()是靜態且無狀態的。 請勿存取可變類別成員,或在不同呼叫read()間保持記憶體狀態。 - 所有由 回
partitions()傳的分割值必須是可選取的物件。 - 偏移量以字典或遞迴字典表示,其鍵與值為原始型別:整數、字串或布林。
Examples
實作一個串流讀取器,從一連串索引記錄讀取:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")