Partilhar via


Fontes de dados personalizadas do PySpark

Importante

As fontes de dados personalizadas do PySpark estão em Visualização Pública no Databricks Runtime 15.2 e superior. O suporte de streaming está disponível no Databricks Runtime 15.3 e superior.

Um PySpark DataSource é criado pela API DataSource Python (PySpark), que permite ler fontes de dados personalizadas e gravar em coletores de dados personalizados no Apache Spark usando Python. Você pode usar fontes de dados personalizadas do PySpark para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais, para criar fontes de dados reutilizáveis.

DataSource classe

O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.

Implementar a subclasse da fonte de dados

Dependendo do seu caso de uso, o seguinte deve ser implementado por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambas:

Propriedade ou Método Description
name Obrigatório. O nome da fonte de dados
schema Obrigatório. O esquema da fonte de dados a ser lida ou gravada
reader() Deve retornar a DataSourceReader para tornar a fonte de dados legível (lote)
writer() Deve retornar a DataSourceWriter para tornar o coletor de dados gravável (lote)
streamReader() ou simpleStreamReader() Deve retornar a DataSourceStreamReader para tornar o fluxo de dados legível (streaming)
streamWriter() Deve retornar a DataSourceStreamWriter para tornar o fluxo de dados gravável (streaming)

Nota

O , , DataSourceWriterDataSourceReader, DataSourceStreamReader, , DataSourceStreamWritere seus métodos definidos pelo usuário DataSourcedevem poder ser serializados. Em outras palavras, eles devem ser um dicionário ou dicionário aninhado que contém um tipo primitivo.

Registar a fonte de dados

Depois de implementar a interface, você deve registrá-lo, então você pode carregá-lo ou usá-lo de outra forma, como mostrado no exemplo a seguir:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exemplo 1: Criar uma fonte de dados PySpark para consulta em lote

Para demonstrar os recursos do leitor PySpark DataSource, crie uma fonte de dados que gere dados de exemplo usando o faker pacote Python. Para obter mais informações sobre fakero , consulte a documentação do Faker.

Instale o faker pacote usando o seguinte comando:

%pip install faker

Etapa 1: Definir o exemplo DataSource

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, esquema e leitor. O reader() método deve ser definido para ler a partir de uma fonte de dados em uma consulta em lotes.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Etapa 2: Implementar o leitor para uma consulta em lote

Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca instalada faker para preencher cada campo no esquema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Etapa 3: Registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Por padrão, o FakeDataSource tem três linhas e o esquema inclui estes string campos: name, date, zipcode, state. O exemplo a seguir registra, carrega e gera saídas da fonte de dados de exemplo com os padrões:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Somente string campos são suportados, mas você pode especificar um esquema com quaisquer campos que correspondam aos faker campos dos provedores de pacotes para gerar dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com name e company campos:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Para carregar a fonte de dados com um número personalizado de linhas, especifique a numRows opção. O exemplo a seguir especifica 5 linhas:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Exemplo 2: Criar PySpark DataSource para streaming de leitura e gravação

Para demonstrar os recursos de leitor e gravador de fluxo do PySpark DataSource, crie uma fonte de dados de exemplo que gere duas linhas em cada microlote usando o faker pacote Python. Para obter mais informações sobre fakero , consulte a documentação do Faker.

Instale o faker pacote usando o seguinte comando:

%pip install faker

Etapa 1: Definir o exemplo DataSource

Primeiro, defina seu novo PySpark DataSource como uma subclasse de DataSource com um nome, esquema e métodos streamReader() e streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Etapa 2: Implementar o leitor de fluxo

Em seguida, implemente o exemplo de leitor de dados de streaming que gera duas linhas em cada microlote. Você pode implementar DataSourceStreamReadero , ou se a fonte de dados tiver baixa taxa de transferência e não exigir particionamento, você poderá implementá-la SimpleDataSourceStreamReader . Ou simpleStreamReader()streamReader() deve ser implementado, e simpleStreamReader() só é invocado quando streamReader() não é implementado.

Implementação de DataSourceStreamReader

A streamReader instância tem um deslocamento inteiro que aumenta em 2 em cada microlote, implementado com a DataSourceStreamReader interface.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementação do SimpleDataSourceStreamReader

A SimpleStreamReader instância é a mesma que gera FakeStreamReader duas linhas em cada lote, mas implementada com a SimpleDataSourceStreamReader interface sem particionamento.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Etapa 3: Implementar o gravador de fluxo

Agora implemente o gravador de streaming. Este gravador de dados de streaming grava as informações de metadados de cada microlote em um caminho local.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Etapa 4: Registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Depois que ele for regsitered, você poderá usá-lo em consultas de streaming como fonte ou coletor passando um nome curto ou completo para format(). O exemplo a seguir registra a fonte de dados e, em seguida, inicia uma consulta que lê a partir da fonte de dados de exemplo e envia para o console:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Como alternativa, o exemplo a seguir usa o fluxo de exemplo como um coletor e especifica um caminho de saída:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Resolução de Problemas

Se a saída for o seguinte erro, sua computação não suporta fontes de dados personalizadas do PySpark. Você deve usar o Databricks Runtime 15.2 ou superior.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000