Fontes de dados personalizadas do PySpark
Importante
As fontes de dados personalizadas do PySpark estão em Visualização Pública no Databricks Runtime 15.2 e superior. O suporte ao streaming está disponível a partir da versão 15.3 do Databricks Runtime.
Um PySpark DataSource é criado pela API Python (PySpark) DataSource, que permite a leitura de fontes de dados personalizadas e gravação em coletores de dados personalizados no Apache Spark usando Python. Você pode usar fontes de dados personalizadas do PySpark para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais para criar fontes de dados reutilizáveis.
Classe DataSource
O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados.
Implementar a subclasse da fonte de dados
Dependendo do caso de uso, o seguinte deve ser implementado por qualquer subclasse para tornar uma fonte de dados legível, gravável ou ambas:
Propriedade ou método | Descrição |
---|---|
name |
Obrigatória. O nome da fonte de dados |
schema |
Obrigatória. O esquema da fonte de dados a ser lido ou gravado |
reader() |
Deve retornar um DataSourceReader para tornar a fonte de dados legível (lote) |
writer() |
Deve retornar um DataSourceWriter para tornar o coletor de dados gravável (lote) |
streamReader() ou simpleStreamReader() |
Deve retornar um DataSourceStreamReader para tornar o fluxo de dados legível (streaming) |
streamWriter() |
Deve retornar um DataSourceStreamWriter para tornar o fluxo de dados gravável (streaming) |
Observação
Os métodos definidos pelo usuário DataSource
, DataSourceReader
, DataSourceWriter
, DataSourceStreamReader
, DataSourceStreamWriter
e seus métodos devem poder ser serializados. Em outras palavras, eles devem ser um dicionário ou dicionário aninhado que contenha um tipo primitivo.
Registrar a fonte de dados
Após implementar a interface, você deverá registrá-la e, em seguida, pode carregá-la ou usá-la de outra forma, conforme mostrado no exemplo a seguir:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Exemplo 1: criar uma Fonte de dados do PySpark para consulta em lote
Para demonstrar os recursos de leitor da Fonte de dados do PySpark, crie uma fonte de dados que gere dados de exemplo usando o pacote do Python faker
. Para obter mais informações sobre faker
, consulte a documentação de Faker.
Instale o pacote faker
usando o seguinte comando:
%pip install faker
Etapa 1: definir o exemplo de Fonte de dados
Primeiro, defina sua nova Fonte de dados do PySpark como uma subclasse de DataSource
, com um nome, esquema e leitor. O método reader()
deve ser definido para ler de uma fonte de dados em uma consulta em lote.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Etapa 2: implementar o leitor para uma consulta em lote
Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca faker
instalada para preencher cada campo no esquema.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Etapa 3: registrar e usar a fonte de dados de exemplo
Para usar a fonte de dados, registre-a. Por padrão, o FakeDataSource
tem três linhas, e o esquema inclui estes campos de string
: name
, date
, zipcode
, state
. O exemplo a seguir registra, carrega e gera a fonte de dados de exemplo com os padrões:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Há suporte apenas para campos string
, mas você pode especificar um esquema com todos os campos que correspondem aos campos faker
de provedores de pacotes para gerar dados aleatórios de teste e desenvolvimento. O exemplo a seguir carrega a fonte de dados com campos name
e company
:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Para carregar a fonte de dados com um número personalizado de linhas, especifique a opção numRows
. O exemplo a seguir especifica cinco linhas:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Exemplo 2: criar a Fonte de dados do PySpark para streaming de leitura e gravação
Para demonstrar as funcionalidades de leitor de fluxo e gravador da Fonte de dados do PySpark, crie uma fonte de dados de exemplo que gera duas linhas em cada microlote usando o pacote do Python faker
. Para obter mais informações sobre faker
, consulte a documentação de Faker.
Instale o pacote faker
usando o seguinte comando:
%pip install faker
Etapa 1: definir o exemplo de Fonte de dados
Em seguida, defina seu novo PySpark DataSource como uma subclasse de DataSource
, com um nome, esquema e métodos streamReader()
e streamWriter()
.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Etapa 2: implementar o leitor de fluxo
Em seguida, implemente o leitor de dados de streaming de exemplo que gera duas linhas em cada microlote. Implemente DataSourceStreamReader
ou se a fonte de dados tiver baixa taxa de transferência e não exigir particionamento, você poderá implementar SimpleDataSourceStreamReader
. Tanto simpleStreamReader()
como streamReader()
deve ser implementado e simpleStreamReader()
só é invocado quando streamReader()
não é implementado.
Implementação do DataSourceStreamReader
A instância streamReader
tem um deslocamento inteiro que aumenta duas vezes a cada microlote, implementado com a interface DataSourceStreamReader
.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementação de SimpleDataSourceStreamReader
A instância SimpleStreamReader
é a mesma que a instância FakeStreamReader
que gera duas linhas em cada lote, mas implementada com a interface SimpleDataSourceStreamReader
sem particionamento.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Etapa 3: implementar o gravador de streaming
Agora implemente o gravador de streaming. Esse gravador de dados de streaming grava as informações de metadados de cada microlote em um caminho local.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Etapa 4: registrar e usar a fonte de dados de exemplo
Para usar a fonte de dados, registre-a. Depois que ele for registrado, você poderá usá-lo em consultas de streaming como uma origem ou coletor passando um nome curto ou nome completo para format()
. O exemplo a seguir registra a fonte de dados e inicia uma consulta que faz a leitura da fonte de dados de exemplo e as saídas para o console:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Como alternativa, o exemplo a seguir usa o fluxo de exemplo como um coletor e especifica um caminho de saída:
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
Solução de problemas
Se a saída for o seguinte erro, sua computação não oferecerá suporte a fontes de dados personalizadas do PySpark. Você precisa usar o Databricks Runtime 15.2 ou superior.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000