Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Eine Basisklasse für Streamingdatenquellenleser.
Datenquellendatenstromleser sind für die Ausgabe von Daten aus einer Streamingdatenquelle verantwortlich. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.streamReader() , um eine Datenquelle als Streamingquelle lesbar zu machen.
Syntax
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
Methodik
| Methode | Beschreibung |
|---|---|
initialOffset() |
Gibt den anfänglichen Offset der Streamingdatenquelle als Diktat zurück. Eine neue Streamingabfrage beginnt mit dem Lesen aus diesem Offset. Neu gestartete Abfragen werden stattdessen aus dem überprüften Offset fortgesetzt. |
partitions(start, end) |
Gibt eine Abfolge von InputPartition Objekten zurück, die die Daten zwischen start und end offsets darstellen. Gibt eine leere Sequenz zurück, wenn start dies gleich ist end. |
read(partition) |
Generiert Daten für eine bestimmte Partition und gibt einen Iterator von Tupeln, Zeilen oder PyArrow-Objekten RecordBatch zurück. Jedes Tupel oder jede Zeile wird in eine Zeile im endgültigen DataFrame konvertiert. Diese Methode ist abstrakt und muss implementiert werden. |
commit(end) |
Informiert die Quelle, dass Spark die Verarbeitung aller Daten für Offsets abgeschlossen hat, die kleiner oder gleich sind end. Spark fordert nur Offsets an, die höher sind als end in der Zukunft. |
stop() |
Beendet die Quelle und gibt alle ressourcen frei, die sie zugeordnet hat. Wird aufgerufen, wenn die Streamingabfrage beendet wird. |
Hinweise
-
read()ist statisch und zustandslos. Greifen Sie nicht auf änderbare Klassenmember zu oder behalten Sie den Speicherstatus zwischen verschiedenen Aufrufen vonread(). - Alle von ihnen zurückgegebenen
partitions()Partitionswerte müssen auswählbare Objekte sein. - Offsets werden als Diktat oder rekursives Diktat dargestellt, dessen Schlüssel und Werte grundtyptyp sind: ganze Zahl, Zeichenfolge oder boolesche Werte.
Beispiele
Implementieren eines Streaminglesers, der aus einer Sequenz von indizierten Datensätzen liest:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")