Compartilhar via


Fontes de dados personalizadas do PySpark

Importante

As fontes de dados personalizadas do PySpark estão em Visualização Pública no Databricks Runtime 15.2 e superior. O suporte ao streaming está disponível a partir da versão 15.3 do Databricks Runtime.

Um PySpark DataSource é criado pela API Python (PySpark) DataSource, que permite a leitura de fontes de dados personalizadas e gravação em coletores de dados personalizados no Apache Spark usando Python. Você pode usar fontes de dados personalizadas do PySpark para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais para criar fontes de dados reutilizáveis.

Classe DataSource

O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.

Implementar a subclasse da fonte de dados

Dependendo do caso de uso, o seguinte deve ser implementado por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambas:

Propriedade ou método Descrição
name Obrigatória. O nome da fonte de dados
schema Obrigatória. O esquema da fonte de dados a ser lido ou gravado
reader() Deve retornar um DataSourceReader para tornar a fonte de dados legível (lote)
writer() Deve retornar um DataSourceWriter para tornar o coletor de dados gravável (lote)
streamReader() ou simpleStreamReader() Deve retornar um DataSourceStreamReader para tornar o fluxo de dados legível (streaming)
streamWriter() Deve retornar um DataSourceStreamWriter para tornar o fluxo de dados gravável (streaming)

Observação

Os métodos definidos pelo usuário DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter e seus métodos devem poder ser serializados. Em outras palavras, eles devem ser um dicionário ou dicionário aninhado que contenha um tipo primitivo.

Registrar a fonte de dados

Após implementar a interface, você deverá registrá-la e, em seguida, pode carregá-la ou usá-la de outra forma, conforme mostrado no exemplo a seguir:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Exemplo 1: criar uma Fonte de dados do PySpark para consulta em lote

Para demonstrar os recursos de leitor da Fonte de dados do PySpark, crie uma fonte de dados que gere dados de exemplo usando o pacote do Python faker. Para obter mais informações sobre faker, consulte a documentação de Faker.

Instale o pacote faker usando o seguinte comando:

%pip install faker

Etapa 1: definir o exemplo de Fonte de dados

Primeiro, defina sua nova Fonte de dados do PySpark como uma subclasse de DataSource, com um nome, esquema e leitor. O método reader() deve ser definido para ler de uma fonte de dados em uma consulta em lote.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Etapa 2: implementar o leitor para uma consulta em lote

Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca faker instalada para preencher cada campo no esquema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Etapa 3: registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Por padrão, o FakeDataSource tem três linhas, e o esquema inclui estes campos de string: name, date, zipcode, state. O exemplo a seguir registra, carrega e gera a fonte de dados de exemplo com os padrões:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Há suporte apenas para campos string, mas você pode especificar um esquema com todos os campos que correspondem aos campos faker de provedores de pacotes para gerar dados aleatórios de teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com campos name e company:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Para carregar a fonte de dados com um número personalizado de linhas, especifique a opção numRows. O exemplo a seguir especifica cinco linhas:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Exemplo 2: criar a Fonte de dados do PySpark para streaming de leitura e gravação

Para demonstrar as funcionalidades de leitor de fluxo e gravador da Fonte de dados do PySpark, crie uma fonte de dados de exemplo que gera duas linhas em cada microlote usando o pacote do Python faker. Para obter mais informações sobre faker, consulte a documentação de Faker.

Instale o pacote faker usando o seguinte comando:

%pip install faker

Etapa 1: definir o exemplo de Fonte de dados

Em seguida, defina seu novo PySpark DataSource como uma subclasse de DataSource, com um nome, esquema e métodos streamReader() e streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Etapa 2: implementar o leitor de fluxo

Em seguida, implemente o leitor de dados de streaming de exemplo que gera duas linhas em cada microlote. Implemente DataSourceStreamReader ou se a fonte de dados tiver baixa taxa de transferência e não exigir particionamento, você poderá implementar SimpleDataSourceStreamReader. Tanto simpleStreamReader() como streamReader() deve ser implementado e simpleStreamReader() só é invocado quando streamReader() não é implementado.

Implementação do DataSourceStreamReader

A instância streamReader tem um deslocamento inteiro que aumenta duas vezes a cada microlote, implementado com a interface DataSourceStreamReader.

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementação de SimpleDataSourceStreamReader

A instância SimpleStreamReader é a mesma que a instância FakeStreamReader que gera duas linhas em cada lote, mas implementada com a interface SimpleDataSourceStreamReader sem particionamento.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Etapa 3: implementar o gravador de streaming

Agora implemente o gravador de streaming. Esse gravador de dados de streaming grava as informações de metadados de cada microlote em um caminho local.

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Etapa 4: registrar e usar a fonte de dados de exemplo

Para usar a fonte de dados, registre-a. Depois que ele for registrado, você poderá usá-lo em consultas de streaming como uma origem ou coletor passando um nome curto ou nome completo para format(). O exemplo a seguir registra a fonte de dados e inicia uma consulta que faz a leitura da fonte de dados de exemplo e as saídas para o console:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

Como alternativa, o exemplo a seguir usa o fluxo de exemplo como um coletor e especifica um caminho de saída:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

Solução de problemas

Se a saída for o seguinte erro, sua computação não oferecerá suporte a fontes de dados personalizadas do PySpark. Você precisa usar o Databricks Runtime 15.2 ou superior.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000