Compartir vía


Orígenes de datos personalizados de PySpark

Importante

Los orígenes de datos personalizados de PySpark se encuentran en versión preliminar pública en Databricks Runtime 15.2 y versiones posteriores. La compatibilidad con streaming está disponible en Databricks Runtime 15.3 y versiones posteriores.

La API DataSource de PySpark se crea mediante la API DataSource de Python (PySpark), que permite leer desde orígenes de datos personalizados y escribir en receptores de datos personalizados en Apache Spark mediante Python. Puede usar orígenes de datos personalizados de PySpark para definir conexiones personalizadas a sistemas de datos e implementar funcionalidad adicional para crear orígenes de datos reutilizables.

Clase DataSource

El DataSource de PySpark es una clase base que proporciona métodos para crear lectores y escritores de datos.

Implementación de la subclase del origen de datos

Según el caso de uso, cualquier subclase debe implementar lo siguiente para que un origen de datos sea legible, grabable o ambos:

Propiedad o método Descripción
name Obligatorio. El nombre del origen de datos
schema Necesario. Esquema del origen de datos que se va a leer o escribir.
reader() Debe devolver un DataSourceReader para que el origen de datos sea legible (lote).
writer() Debe devolver un DataSourceWriter para que el receptor de datos sea grabable (lote).
streamReader() o simpleStreamReader() Debe devolver un DataSourceStreamReader para que el flujo de datos sea legible (lote).
streamWriter() Debe devolver un DataSourceStreamWriter para que el flujo de datos sea grabable (streaming).

Nota:

Los DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader y DataSourceStreamWriter definidos por el usuario y sus métodos deben poder serializarse. En otras palabras, deben ser un diccionario o un diccionario anidado que contenga un tipo primitivo.

Registrar el origen de datos

Después de implementar la interfaz, debe registrarla y, luego, puede cargarla o usarla tal como se muestra en el siguiente ejemplo:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Ejemplo 1: Creación de un DataSource de PySpark para una consulta por lotes

Para demostrar las funcionalidades de lectura del DataSource de PySpark, cree un origen de datos que genere datos de ejemplo mediante el paquete faker de Python. Para obtener más información sobre faker, consulte la documentación de Faker.

Instale el paquete faker ejecutando el siguiente comando:

%pip install faker

Paso 1: Definición del DataSource de ejemplo

En primer lugar, defina el nuevo DataSource de PySpark como una subclase de DataSource, con un nombre, un esquema y un lector. El método reader() debe definirse para leer desde un origen de datos en una consulta por lotes.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Paso 2: Implementación del lector para una consulta por lotes

A continuación, implemente la lógica del lector para generar datos de ejemplo. Use la biblioteca faker instalada para rellenar cada campo del esquema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Paso 3: Registro y uso del origen de datos de ejemplo

Para usar el origen de datos, regístrelo. De forma predeterminada, el FakeDataSource tiene tres filas, y el esquema predeterminado incluye estos campos de string: name, date, zipcode y state. En el ejemplo siguiente se registra, carga y genera el origen de datos de ejemplo con los valores predeterminados:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Solo se admiten campos string, pero puede especificar un esquema con los campos correspondientes a campos de proveedores de paquetes faker para generar datos aleatorios para pruebas y desarrollo. En el siguiente ejemplo se carga el origen de datos con los campos name y company:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Para cargar el origen de datos con un número personalizado de filas, especifique la opción numRows. En el siguiente ejemplo se especifican 5 filas:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Ejemplo 2: Creación de un DataSource de PySpark para la lectura y escritura de streaming

Para demostrar las funcionalidades de lectura y escritura de streaming del DataSource de PySpark, cree un origen de datos de ejemplo que genere dos columnas en cada microlote mediante el paquete faker de Python. Para obtener más información sobre faker, consulte la documentación de Faker.

Instale el paquete faker ejecutando el siguiente comando:

%pip install faker

Paso 1: Definición del DataSource de ejemplo

En primer lugar, defina el nuevo DataSource de PySpark como una subclase de DataSource, con un nombre, un esquema y los métodos streamReader() y streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Paso 2: Implementación del lector de streaming

A continuación, implemente el lector de datos de streaming de ejemplo que genera dos filas en cada microlote. Puede implementar DataSourceStreamReader o, si el origen de datos tiene un rendimiento bajo y no requiere la creación de particiones, puede implementar SimpleDataSourceStreamReader. Hay que implementar simpleStreamReader() o streamReader() y solo se invoca simpleStreamReader() cuando streamReader() no está implementado.

Implementación de DataSourceStreamReader

La instancia de streamReader tiene un desplazamiento de enteros que aumenta en 2 en cada microlote, implementado con la interfaz de DataSourceStreamReader.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementación de SimpleDataSourceStreamReader

La instancia de SimpleStreamReader es la misma que la instancia de FakeStreamReader que genera dos filas en cada lote, pero implementada con la interfaz de SimpleDataSourceStreamReader sin particiones.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Paso 3: Implementación del escritor de streaming

Ahora, implemente el escritor de streaming. Este escritor de datos de streaming escribe la información de metadatos de cada microlote en una ruta de acceso local.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Paso 4: Registro y uso del origen de datos de ejemplo

Para usar el origen de datos, regístrelo. Una vez que esté registrado, puede usarlo en consultas de streaming como origen o receptor pasando un nombre corto o un nombre completo a format(). En el ejemplo siguiente se registra el origen de datos y, luego, se inicia una consulta que lee del origen de datos de ejemplo y genera salidas en la consola:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Como alternativa, en el ejemplo siguiente se usa el flujo de ejemplo como receptor y se especifica una ruta de acceso de salida:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Solución de problemas

Si la salida es el siguiente error, el proceso no admite orígenes de datos personalizados de PySpark. Debe usar Databricks Runtime 15.2 o superior.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000