Compartir vía


DataSourceStreamReader

Clase base para lectores de orígenes de datos de streaming.

Los lectores de flujos de origen de datos son responsables de generar datos de un origen de datos de streaming. Implemente esta clase y devuelva una instancia de DataSource.streamReader() para que un origen de datos sea legible como origen de streaming.

Sintaxis

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

Método Descripción
initialOffset() Devuelve el desplazamiento inicial del origen de datos de streaming como .dict Una nueva consulta de streaming comienza a leer desde este desplazamiento. Debe devolver pares clave-valor de desplazamiento de tipos primitivos en FORMATO JSON o dict . PySparkNotImplementedError Genera si no se implementa.
latestOffset(start, limit) Devuelve el desplazamiento más reciente disponible como , dictdado un desplazamiento inicial y un límite de lectura. El origen puede devolver el mismo desplazamiento que start si no hay datos nuevos. El origen siempre debe respetar el objeto especificado limit. Debe devolver pares clave-valor de desplazamiento de tipos primitivos en FORMATO JSON o dict . PySparkNotImplementedError Genera si no se implementa.
partitions(start, end) Devuelve una secuencia de InputPartition objetos que representan los datos entre start los desplazamientos y end . Devuelve una secuencia vacía si start es igual a end. Cada InputPartition representa una división de datos que se puede procesar mediante una tarea de Spark.
read(partition) Genera datos para una partición determinada y devuelve un iterador de tuplas, filas o objetos PyArrow RecordBatch . Cada tupla o fila se convierte en una fila en el dataframe final. Este método es abstracto y debe implementarse.
commit(end) Informa al origen de que Spark ha completado el procesamiento de todos los datos para desplazamientos inferiores o iguales a end. Spark solo solicitará desplazamientos mayores que end en el futuro.
stop() Detiene el origen y libera los recursos que haya asignado. Se invoca cuando finaliza la consulta de streaming.

Notas

  • read() es estático y sin estado. No acceda a miembros de clase mutables ni mantenga el estado en memoria entre las diferentes invocaciones de read().
  • Todos los valores de partición devueltos por partitions() deben ser objetos seleccionables.
  • Los desplazamientos se representan como un dict o recursivo dict cuyas claves y valores son tipos primitivos: entero, cadena o booleano.

Ejemplos

Implemente un lector de streaming que lea de una secuencia de registros indexados:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")