다음을 통해 공유


DataSourceReader

데이터 원본 판독기를 위한 기본 클래스입니다.

데이터 원본 판독기는 데이터 원본에서 데이터를 출력하는 역할을 담당합니다. 이 클래스를 구현하고 데이터 원본을 읽을 수 있도록 인스턴스 DataSource.reader() 를 반환합니다.

문법

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

메서드

메서드 설명
pushFilters(filters) 데이터 원본으로 푸시다운할 수 있는 필터 목록을 사용하여 호출됩니다. Spark에서 평가해야 하는 반복 가능한 필터를 반환합니다. 기본적으로 모든 필터를 반환하며, 필터가 푸시다운되지 않음을 나타냅니다. pushFilters() 을(를) 수정 self할 수 있습니다. 개체는 수정 후 선택 가능한 상태로 유지되어야 합니다. self 변경 내용이 표시됩니다partitions().read()
partitions() 데이터 읽기를 병렬 작업으로 분할하는 개체 시 InputPartition 퀀스를 반환합니다. 기본적으로 단일 파티션을 반환합니다. 큰 데이터 세트를 읽을 때 성능 향상을 위해 재정의합니다. 반환된 partitions() 모든 파티션 값은 선택 가능한 개체여야 합니다.
read(partition) 지정된 파티션에 대한 데이터를 생성하고 튜플, 행 또는 PyArrow RecordBatch 개체의 반복기를 반환합니다. 각 튜플 또는 행은 최종 DataFrame의 행으로 변환됩니다. 이 메서드는 추상이며 구현해야 합니다.

예제

파티션 목록에서 행을 반환하는 기본 판독기를 구현합니다.

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

PyArrow RecordBatch를 사용하여 행 반환:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

필터를 지원 EqualTo 하도록 필터 푸시다운을 구현합니다.

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...