Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Klasa podstawowa do przesyłania strumieniowego czytników źródeł danych.
Czytniki strumieni źródła danych są odpowiedzialne za wyprowadzanie danych ze źródła danych przesyłanych strumieniowo. Zaimplementuj tę klasę i zwróć wystąpienie z DataSource.streamReader() , aby źródło danych było czytelne jako źródło przesyłania strumieniowego.
Składnia
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
Methods
| Metoda | Opis |
|---|---|
initialOffset() |
Zwraca początkowe przesunięcie źródła danych przesyłania strumieniowego jako dict. Nowe zapytanie przesyłania strumieniowego rozpoczyna odczytywanie z tego przesunięcia. Musi zwracać pary klucz-wartość przesunięcia typów pierwotnych w formacie JSON lub dict formacie. Zgłasza, PySparkNotImplementedError jeśli nie zaimplementowano. |
latestOffset(start, limit) |
Zwraca najnowsze przesunięcie dostępne jako dict, biorąc pod uwagę przesunięcie rozpoczęcia i limit odczytu. Źródło może zwrócić to samo przesunięcie, co start w przypadku braku nowych danych. Źródło musi zawsze przestrzegać podanego limitobiektu . Musi zwracać pary klucz-wartość przesunięcia typów pierwotnych w formacie JSON lub dict formacie. Zgłasza, PySparkNotImplementedError jeśli nie zaimplementowano. |
partitions(start, end) |
Zwraca sekwencję InputPartition obiektów reprezentujących dane między start i end przesunięciami. Zwraca pustą sekwencję, jeśli start jest równa end. Każdy InputPartition reprezentuje podział danych, który może być przetwarzany przez jedno zadanie platformy Spark. |
read(partition) |
Generuje dane dla danej partycji i zwraca iterator krotki, wierszy lub obiektów PyArrow RecordBatch . Każda krotka lub wiersz jest konwertowana na wiersz w końcowej ramce danych. Ta metoda jest abstrakcyjna i musi zostać zaimplementowana. |
commit(end) |
Informuje źródło, że platforma Spark ukończyła przetwarzanie wszystkich danych dla przesunięć mniejszych lub równych end. Platforma Spark będzie żądać tylko przesunięć większych niż end w przyszłości. |
stop() |
Zatrzymuje źródło i zwalnia wszystkie przydzielone zasoby. Wywoływane po zakończeniu zapytania przesyłania strumieniowego. |
Notatki
-
read()jest statyczna i bezstanowa. Nie należy uzyskiwać dostępu do składowych klas modyfikowalnych ani zachowywać stanu w pamięci między różnymi wywołaniamiread()klasy . - Wszystkie wartości partycji zwracane przez
partitions()program muszą być obiektami do wyboru. - Przesunięcia są reprezentowane jako
dictlub rekursywnedict, których klucze i wartości są typami pierwotnymi: liczba całkowita, ciąg lub wartość logiczna.
Examples
Zaimplementuj czytnik przesyłania strumieniowego odczytujący z sekwencji indeksowanych rekordów:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")