Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
Clase base para lectores de orígenes de datos de streaming.
Los lectores de flujos de origen de datos son responsables de generar datos de un origen de datos de streaming. Implemente esta clase y devuelva una instancia de DataSource.streamReader() para que un origen de datos sea legible como origen de streaming.
Sintaxis
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
Methods
| Método | Descripción |
|---|---|
initialOffset() |
Devuelve el desplazamiento inicial del origen de datos de streaming como un dict. Una nueva consulta de streaming comienza a leer desde este desplazamiento. En su lugar, las consultas reiniciadas se reanudan desde el desplazamiento de punto de control. |
partitions(start, end) |
Devuelve una secuencia de InputPartition objetos que representan los datos entre start los desplazamientos y end . Devuelve una secuencia vacía si start es igual a end. |
read(partition) |
Genera datos para una partición determinada y devuelve un iterador de tuplas, filas o objetos PyArrow RecordBatch . Cada tupla o fila se convierte en una fila en el dataframe final. Este método es abstracto y debe implementarse. |
commit(end) |
Informa al origen de que Spark ha completado el procesamiento de todos los datos para desplazamientos inferiores o iguales a end. Spark solo solicitará desplazamientos mayores que end en el futuro. |
stop() |
Detiene el origen y libera los recursos que haya asignado. Se invoca cuando finaliza la consulta de streaming. |
Notas
-
read()es estático y sin estado. No acceda a miembros de clase mutables ni mantenga el estado en memoria entre las diferentes invocaciones deread(). - Todos los valores de partición devueltos por
partitions()deben ser objetos seleccionables. - Los desplazamientos se representan como un dict o un dict recursivo cuyos valores y claves son tipos primitivos: entero, cadena o booleano.
Ejemplos
Implemente un lector de streaming que lea de una secuencia de registros indexados:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")