스트리밍 데이터 원본 판독기를 위한 기본 클래스입니다.
데이터 원본 스트림 판독기는 스트리밍 데이터 원본에서 데이터를 출력하는 역할을 담당합니다. 이 클래스를 구현하고 스트리밍 원본으로 데이터 원본을 읽을 수 있도록 인스턴스 DataSource.streamReader() 를 반환합니다.
문법
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
메서드
| 메서드 | 설명 |
|---|---|
initialOffset() |
스트리밍 데이터 원본의 초기 오프셋을 받아쓰기로 반환합니다. 새 스트리밍 쿼리가 이 오프셋에서 읽기 시작합니다. 다시 시작한 쿼리는 대신 검사점 오프셋에서 다시 시작됩니다. |
partitions(start, end) |
오프셋 사이의 startend 데이터를 나타내는 개체 시 InputPartition 퀀스를 반환합니다. 같으면 start 빈 시퀀스를 반환합니다.end |
read(partition) |
지정된 파티션에 대한 데이터를 생성하고 튜플, 행 또는 PyArrow RecordBatch 개체의 반복기를 반환합니다. 각 튜플 또는 행은 최종 DataFrame의 행으로 변환됩니다. 이 메서드는 추상이며 구현해야 합니다. |
commit(end) |
Spark가 다음보다 작거나 같은 오프셋에 대한 모든 데이터 처리를 완료했음을 원본에 알릴 수 있습니다 end. Spark는 이후보다 end 더 큰 오프셋만 요청합니다. |
stop() |
원본을 중지하고 할당된 리소스를 해제합니다. 스트리밍 쿼리가 종료될 때 호출됩니다. |
Notes
-
read()가 정적이고 상태 비지정적입니다. 변경 가능한 클래스 멤버에 액세스하거나 다른 호출read()간에 메모리 내 상태를 유지하지 마세요. - 반환된
partitions()모든 파티션 값은 선택 가능한 개체여야 합니다. - 오프셋은 키와 값이 기본 형식인 정수, 문자열 또는 부울의 받아쓰기 또는 재귀 받아쓰기로 표시됩니다.
예제
인덱싱된 레코드 시퀀스에서 읽는 스트리밍 판독기를 구현합니다.
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")