SimpleDataSourceStreamReader

Klasa podstawowa do uproszczonego przesyłania strumieniowego czytników źródeł danych.

W porównaniu z DataSourceStreamReaderelementem SimpleDataSourceStreamReader nie wymaga planowania partycji danych. Metoda read() umożliwia odczytywanie danych i planowanie najnowszego przesunięcia w tym samym czasie.

Ponieważ SimpleDataSourceStreamReader odczytuje rekordy w sterowniku spark w celu określenia przesunięcia końcowego każdej partii bez partycjonowania, jest ona odpowiednia tylko w przypadku lekkich przypadków użycia, w których szybkość wprowadzania i rozmiar partii są małe. Użyj DataSourceStreamReader , gdy przepływność odczytu jest wysoka i nie można jej obsłużyć przez pojedynczy proces.

Dodano w środowisku Databricks Runtime 15.3

Składnia

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

Metoda Opis
initialOffset() Zwraca początkowe przesunięcie źródła danych przesyłania strumieniowego. Nowe zapytanie przesyłania strumieniowego rozpoczyna odczytywanie z tego przesunięcia.
read(start) Odczytuje wszystkie dostępne dane z przesunięcia początkowego i zwraca krotkę iteratora rekordów oraz przesunięcie końcowe dla następnej próby odczytu.
readBetweenOffsets(start, end) Odczytuje wszystkie dostępne dane między określonymi przesunięciami początkowymi i końcowymi. Wywoływane podczas odzyskiwania po awarii w celu ponownego odczytania partii deterministycznej.
commit(end) Informuje źródło, że platforma Spark ukończyła przetwarzanie wszystkich danych dla przesunięć mniejszych lub równych end.

Examples

Zdefiniuj niestandardowy uproszczony czytnik źródeł danych przesyłania strumieniowego:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()