Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Los orígenes de datos personalizados de PySpark se crean mediante la API DataSource de Python (PySpark), que permite leer desde orígenes de datos personalizados y escribir en receptores de datos personalizados en Apache Spark mediante Python. Puede usar orígenes de datos personalizados de PySpark para definir conexiones personalizadas a sistemas de datos e implementar funcionalidad adicional para crear orígenes de datos reutilizables.
Nota:
Los orígenes de datos personalizados de PySpark requieren Databricks Runtime 15.4 LTS y versiones posteriores, o la versión 2 del entorno sin servidor.
Clase DataSource
El DataSource de PySpark es una clase base que proporciona métodos para crear lectores y escritores de datos.
Implementación de la subclase del origen de datos
Según el caso de uso, cualquier subclase debe implementar lo siguiente para que un origen de datos sea legible, grabable o ambos:
| Propiedad o método | Descripción |
|---|---|
name |
Obligatorio. El nombre del origen de datos |
schema |
Obligatorio. Esquema del origen de datos que se va a leer o escribir. |
reader() |
Debe devolver un DataSourceReader para que el origen de datos sea legible por lotes. |
writer() |
Debe devolver un DataSourceWriter, para que el sumidero de datos sea escribible (lote). |
streamReader() o simpleStreamReader() |
Debe devolver un DataSourceStreamReader para que la transmisión de datos sea accesible (transmisión). |
streamWriter() |
Debe devolver un DataSourceStreamWriter para que el flujo de datos sea escribible (transmisión en directo). |
Nota:
Los métodos definidos por el usuario DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader y DataSourceStreamWriter deben ser serializables. En otras palabras, deben ser un diccionario o un diccionario anidado que contenga un tipo primitivo.
Registrar el origen de datos
Después de implementar la interfaz, debe registrarla y, luego, puede cargarla o usarla tal como se muestra en el siguiente ejemplo:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Ejemplo 1: Creación de un DataSource de PySpark para una consulta por lotes
Para demostrar las funcionalidades de lectura del DataSource de PySpark, cree un origen de datos que genere datos de ejemplo mediante el paquete faker de Python. Para obtener más información sobre faker, consulte la documentación de Faker.
Instale el paquete faker ejecutando el siguiente comando:
%pip install faker
Paso 1: Implementar el lector para una consulta por lotes
En primer lugar, implemente la lógica del lector para generar datos de ejemplo. Use la biblioteca faker instalada para rellenar cada campo del esquema.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Paso 2: Definir el origen de datos de ejemplo
A continuación, defina el nuevo DataSource de PySpark como una subclase de DataSource con un nombre, un esquema y un lector. El método reader() debe definirse para leer desde un origen de datos en una consulta por lotes.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Paso 3: Registro y uso del origen de datos de ejemplo
Para usar el origen de datos, regístrelo. De forma predeterminada, el FakeDataSource tiene tres filas, y el esquema predeterminado incluye estos campos de string: name, date, zipcode y state. En el ejemplo siguiente se registra, carga y genera el origen de datos de ejemplo con los valores predeterminados:
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Solo se admiten campos de tipo string, pero puede especificar un esquema con cualquier campo que corresponda a los proveedores de paquetes faker para generar datos aleatorios para pruebas y desarrollo. En el siguiente ejemplo se carga el origen de datos con los campos name y company:
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Para cargar el origen de datos con un número personalizado de filas, especifique la opción numRows. En el siguiente ejemplo se especifican 5 filas:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Ejemplo 2: Creación de un origen de datos de GitHub de PySpark mediante variantes
Para demostrar el uso de variantes en una fuente de datos de PySpark, en este ejemplo se crea un origen de datos que lee los pull requests de GitHub.
Nota:
Las variantes se admiten con orígenes de datos personalizados de PySpark en Databricks Runtime 17.1 y versiones posteriores.
Para obtener información sobre las variantes, consulte Consultar datos de variantes.
Paso 1: Implementar el lector para recuperar solicitudes de incorporación de cambios
En primer lugar, implementar la lógica de lectura para recuperar solicitudes de pull del repositorio de GitHub especificado.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
Paso 2: Definición del origen de datos de GitHub
A continuación, defina el nuevo origen de datos de GitHub de PySpark como una subclase de DataSource con un nombre, un esquema y un método reader(). El esquema incluye estos campos: id, title, user, created_at, . updated_at El user campo se define como una variante.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
Paso 3: Registro y uso del origen de datos
Para usar el origen de datos, regístrelo. En el ejemplo siguiente, se registra, luego se carga el origen de datos y se generan tres filas de los datos de PR del repositorio de GitHub.
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
Ejemplo 3: Creación de un origen de datos pySpark para la lectura y escritura en streaming
Para demostrar las funcionalidades de lectura y escritura de streaming del DataSource de PySpark, cree un origen de datos de ejemplo que genere dos columnas en cada microlote mediante el paquete faker de Python. Para obtener más información sobre faker, consulte la documentación de Faker.
Instale el paquete faker ejecutando el siguiente comando:
%pip install faker
Paso 1: Implementar el lector de secuencias
En primer lugar, implemente el lector de datos de streaming de ejemplo que genera dos filas en cada microbatch. Puede implementar DataSourceStreamReader, o si el origen de datos tiene un rendimiento bajo y no requiere la creación de particiones, puede implementar SimpleDataSourceStreamReader en su lugar. Hay que implementar simpleStreamReader() o streamReader() y solo se invoca simpleStreamReader() cuando streamReader() no está implementado.
Implementación de DataSourceStreamReader
La instancia de streamReader tiene un desplazamiento de enteros que aumenta en 2 en cada microlote, implementado con la interfaz de DataSourceStreamReader.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implementación de SimpleDataSourceStreamReader
La instancia de SimpleStreamReader es la misma que la instancia de FakeStreamReader que genera dos filas en cada lote, pero implementada con la interfaz de SimpleDataSourceStreamReader sin particiones.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Paso 2: Implementar el sistema de escritura de secuencias
A continuación, implemente el escritor de streaming. Este escritor de datos de streaming escribe la información de metadatos de cada microlote en una ruta de acceso local.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Paso 3: Definir el origen de datos de ejemplo
Ahora defina el nuevo dataSource de PySpark como una subclase de DataSource con un nombre, un esquema y métodos streamReader() y streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Paso 4: Registro y uso del origen de datos de ejemplo
Para usar el origen de datos, regístrelo. Una vez registrado, puede usarlo en consultas de streaming como origen o destino pasando un nombre corto o un nombre completo a format(). En el ejemplo siguiente se registra el origen de datos y, luego, se inicia una consulta que lee del origen de datos de ejemplo y genera salidas en la consola:
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Como alternativa, el código siguiente usa la secuencia de ejemplo como receptor y especifica una ruta de acceso de salida:
spark.dataSource.register(FakeStreamDataSource)
# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"
query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)
Ejemplo 4: Creación de un conector de streaming de Google BigQuery
En el ejemplo siguiente se muestra cómo crear un conector de streaming personalizado para Google BigQuery (BQ) mediante un dataSource de PySpark. Databricks proporciona un conector de Spark para la ingesta por lotes de BigQuery, y Lakehouse Federation también puede conectarse de forma remota a cualquier conjunto de datos de BigQuery y extraer datos a través de la creación de catálogos externos, pero tampoco admite flujos de trabajo de streaming incrementales o continuos. Este conector permite la migración de datos por fases de forma incremental y la migración casi en tiempo real desde tablas de BigQuery alimentadas por fuentes de transmisión con puntos de comprobación persistentes.
Este conector personalizado tiene las siguientes características:
- Compatible con Structured Streaming y las canalizaciones declarativas de Spark de Lakeflow.
- Admite el seguimiento incremental de registros y la ingesta continua de streaming y sigue la semántica de Structured Streaming.
- Usa la API de Almacenamiento de BigQuery con un protocolo basado en RPC para una transmisión de datos más rápida y económica.
- Escribe tablas migradas directamente en Unity Catalog.
- Administra los puntos de control automáticamente mediante un campo incremental basado en fecha o marca de tiempo.
- Admite la ingesta por lotes con
Trigger.AvailableNow(). - No requiere almacenamiento intermedio en la nube.
- Serializa la transmisión de datos de BigQuery mediante el formato Flecha o Avro.
- Controla el paralelismo automático y distribuye el trabajo entre los trabajadores de Spark en función del volumen de datos.
- Adecuado para la migración de capas Raw y Bronze desde BigQuery, con compatibilidad con migraciones de capas Silver y Gold mediante patrones SCD Type 1 o Type 2.
Prerrequisitos
Antes de implementar el conector personalizado, instale los paquetes necesarios:
%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage
Paso 1: Implementar el lector de secuencias
En primer lugar, implemente el lector de datos de streaming. La DataSourceStreamReader subclase debe implementar los métodos siguientes:
initialOffset(self) -> dictlatestOffset(self) -> dictpartitions(self, start: dict, end: dict) -> Sequence[InputPartition]read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]commit(self, end: dict) -> Nonestop(self) -> None
Para obtener más información sobre cada método, consulte Métodos.
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging
start_time = time.time()
class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx
class BQStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")
self.last_offset = None
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'
return {"offset": str(self.last_offset)}
def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery
if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'
client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]
if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}
return {"offset": str(self.last_offset)}
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
# project_id = self.auth_project_id
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"
# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO
parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]
def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []
for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
Paso 2: Definir el origen de datos
A continuación, defina el origen de datos personalizado. La DataSource subclase debe implementar los métodos siguientes:
name(cls) -> strschema(self) -> Union[StructType, str]
Para obtener más información sobre cada método, consulte Métodos.
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery
class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "bigquery-streaming"
def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)
return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"
def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)
Paso 3: Configurar e iniciar la consulta de streaming
Por último, registre el conector y configure e inicie la consulta de streaming:
spark.dataSource.register(BQStreamDataSource)
# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.
query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)
(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)
Orden de ejecución
El orden de ejecución de la función de la secuencia personalizada se describe a continuación.
Para cargar el dataframe de flujo de Spark:
name(cls)
schema()
Para microbatch (n) de un nuevo inicio de consulta o al reiniciar una consulta existente (punto de control nuevo o existente):
partitions(end_offset, end_offset) # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()
Para el siguiente microbatch (n+1) de una consulta en ejecución en un punto de control existente:
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
Nota:
La latestOffset función organiza los puntos de control. Comparta una variable de punto de control de un tipo primitivo entre funciones y la devuelva como diccionario. Por ejemplo: return {"offset": str(self.last_offset)}
Solución de problemas
Si la salida es el siguiente error, tu equipo no admite fuentes de datos personalizadas de PySpark. Debe usar Databricks Runtime 15.2 o superior.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000