Partekatu honen bidez:


DataSourceStreamReader

Clase base para lectores de orígenes de datos de streaming.

Los lectores de flujos de origen de datos son responsables de generar datos de un origen de datos de streaming. Implemente esta clase y devuelva una instancia de DataSource.streamReader() para que un origen de datos sea legible como origen de streaming.

Sintaxis

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

Método Descripción
initialOffset() Devuelve el desplazamiento inicial del origen de datos de streaming como un dict. Una nueva consulta de streaming comienza a leer desde este desplazamiento. En su lugar, las consultas reiniciadas se reanudan desde el desplazamiento de punto de control.
partitions(start, end) Devuelve una secuencia de InputPartition objetos que representan los datos entre start los desplazamientos y end . Devuelve una secuencia vacía si start es igual a end.
read(partition) Genera datos para una partición determinada y devuelve un iterador de tuplas, filas o objetos PyArrow RecordBatch . Cada tupla o fila se convierte en una fila en el dataframe final. Este método es abstracto y debe implementarse.
commit(end) Informa al origen de que Spark ha completado el procesamiento de todos los datos para desplazamientos inferiores o iguales a end. Spark solo solicitará desplazamientos mayores que end en el futuro.
stop() Detiene el origen y libera los recursos que haya asignado. Se invoca cuando finaliza la consulta de streaming.

Notas

  • read() es estático y sin estado. No acceda a miembros de clase mutables ni mantenga el estado en memoria entre las diferentes invocaciones de read().
  • Todos los valores de partición devueltos por partitions() deben ser objetos seleccionables.
  • Los desplazamientos se representan como un dict o un dict recursivo cuyos valores y claves son tipos primitivos: entero, cadena o booleano.

Ejemplos

Implemente un lector de streaming que lea de una secuencia de registros indexados:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")