Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Les sources de données personnalisées PySpark sont créées à l’aide de l’API DataSource Python (PySpark), qui permet la lecture à partir de sources de données personnalisées et l’écriture dans des récepteurs de données personnalisés dans Apache Spark à l’aide de Python. Vous pouvez utiliser des sources de données personnalisées PySpark pour définir des connexions personnalisées aux systèmes de données et implémenter des fonctionnalités supplémentaires pour générer des sources de données réutilisables.
Remarque
Les sources de données personnalisées PySpark nécessitent Databricks Runtime 15.4 LTS et versions ultérieures, ou un environnement serverless version 2.
Classe DataSource
La classe de base DataSource PySpark fournit des méthodes pour créer des lecteurs et des enregistreurs de données.
Implémenter la sous-classe de source de données
Selon votre cas d’usage, les éléments suivants doivent être implémentés par n’importe quelle sous-classe pour rendre une source de données lisible, accessible en écriture ou les deux :
| Propriété ou Méthode | Descriptif |
|---|---|
name |
Obligatoire. Nom de la source de données |
schema |
Obligatoire. Le schéma de la source de données à lire ou écrire |
reader() |
Doit retourner une valeur DataSourceReader pour rendre la source de données lisible (lot) |
writer() |
Doit retourner une valeur DataSourceWriter pour rendre le récepteur de données accessible en écriture (lot) |
streamReader() ou simpleStreamReader() |
Doit retourner une valeur DataSourceStreamReader pour rendre le flux de données lisible (diffusion en continu) |
streamWriter() |
Doit retourner une valeur DataSourceStreamWriter pour rendre le flux de données accessible en écriture (diffusion en continu) |
Remarque
Les DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter et leurs méthodes définis par l’utilisateur doivent être sérialisables. En d’autres termes, ils doivent être un dictionnaire ou un dictionnaire imbriqué qui contient un type primitif.
Inscrire la source de données
Après avoir implémenté l’interface, vous devez l’inscrire, puis vous pouvez la charger ou l’utiliser autrement, comme illustré dans l’exemple suivant :
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Exemple 1 : Créer une source de données PySpark pour la requête par lots
Pour illustrer les fonctionnalités de lecteur de source de données PySpark, créez une source de données qui génère des exemples de données à l’aide du package Python faker. Pour plus d’informations sur faker, consultez la documentation Faker.
Installez le package faker à l’aide de la commande suivante :
%pip install faker
Étape 1 : Implémenter le lecteur pour une requête par lots
Tout d’abord, implémentez la logique de lecteur pour générer des exemples de données. Utilisez la bibliothèque faker installée pour remplir chaque champ du schéma.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Étape 2 : Définir l’exemple de Source de données
Ensuite, définissez votre nouvelle source de données PySpark comme une sous-classe de DataSource, avec un nom, un schéma et un lecteur. La méthode reader() doit être définie pour lire à partir d’une source de données dans une requête par lots.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Étape 3 : inscrire et utiliser l’exemple de source de données
Pour utiliser la source de données, inscrivez-la. Par défaut, la FakeDataSource comporte trois lignes et le schéma par défaut inclut les champs string suivants : name, , date, zipcode, state. L’exemple suivant inscrit, charge et génère un exemple de source de données avec les valeurs par défaut :
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Seuls string les champs sont pris en charge, mais vous pouvez spécifier un schéma avec tous les champs correspondant aux faker champs des fournisseurs de package pour générer des données aléatoires pour les tests et le développement. L’exemple suivant charge la source de données avec les champs name et company :
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Pour charger la source de données avec un nombre personnalisé de lignes, spécifiez l’option numRows. L'exemple suivant spécifie 5 lignes :
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Exemple 2 : Créer une source de données GitHub PySpark à l’aide de variantes
Pour illustrer l’utilisation de variantes dans une source de données PySpark, cet exemple crée une source de données qui lit les pull requests à partir de GitHub.
Remarque
Les variantes sont prises en charge avec des sources de données personnalisées PySpark dans Databricks Runtime 17.1 et versions ultérieures.
Pour plus d’informations sur les variantes, consultez Les données de variante de requête.
Étape 1 : Implémenter le module de lecture pour récupérer les requêtes de tirage
Tout d’abord, implémentez la logique de lecture pour récupérer les pull requests à partir du référentiel GitHub spécifié.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
Étape 2 : Définir gitHub DataSource
Ensuite, définissez votre nouvelle source de données GitHub PySpark en tant que sous-classe portant DataSource un nom, un schéma et une méthode reader(). Le schéma inclut les champs suivants : id, , titleuser, created_at, updated_at. Le user champ est défini en tant que variante.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
Étape 3 : Inscrire et utiliser la source de données
Pour utiliser la source de données, inscrivez-la. L'exemple suivant enregistre, charge la source de données, puis affiche trois lignes des données des pull requests du dépôt GitHub.
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
Exemple 3 : Créer une source de données PySpark pour la lecture et l’écriture en streaming
Pour illustrer les fonctionnalités de lecteur et d’enregistreur de flux PySpark DataSource, créez un exemple de source de données qui génère deux lignes dans chaque microbatch à l’aide du package Python faker. Pour plus d’informations sur faker, consultez la documentation Faker.
Installez le package faker à l’aide de la commande suivante :
%pip install faker
Étape 1 : Implémenter le lecteur de flux
Tout d’abord, implémentez l’exemple de lecteur de données de streaming qui génère deux lignes dans chaque microbatch. Vous pouvez implémenter DataSourceStreamReader, ou si la source de données a un débit faible et ne nécessite pas de partitionnement, vous pouvez l’implémenter SimpleDataSourceStreamReader à la place.
simpleStreamReader() ou streamReader() doit être implémentée et simpleStreamReader() est appelée uniquement lorsque streamReader() n’est pas implémentée.
Implémentation de DataSourceStreamReader
L’instance streamReader a un décalage entier qui augmente de 2 dans chaque microbatch, implémenté avec l’interface DataSourceStreamReader.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Implémentation SimpleDataSourceStreamReader
L’instance SimpleStreamReader est identique à l’instance FakeStreamReader qui génère deux lignes dans chaque lot, mais implémentée avec l’interface SimpleDataSourceStreamReader sans partitionnement.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Étape 2 : Implémenter l’enregistreur de flux
Implémentez ensuite l’enregistreur de streaming. Cet enregistreur de données de diffusion en continu écrit les informations de métadonnées de chaque microbatch dans un chemin local.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Étape 3 : Définir l’exemple de Source de données
Définissez maintenant votre nouvelle source de données PySpark en tant que sous-classe avec DataSource un nom, un schéma et des méthodes streamReader() et streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Étape 4 : Inscrire et utiliser l’exemple de source de données
Pour utiliser la source de données, inscrivez-la. Une fois inscrit, vous pouvez l’utiliser dans les requêtes de diffusion en continu en tant que source ou récepteur en passant un nom court ou un nom complet à format(). L’exemple suivant inscrit la source de données, puis démarre une requête qui lit à partir de la source de données d'exemple et envoie le résultat à la console.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Sinon, le code suivant utilise l’exemple de flux en tant que récepteur et spécifie un chemin de sortie :
spark.dataSource.register(FakeStreamDataSource)
# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"
query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)
Exemple 4 : Créer un connecteur de streaming Google BigQuery
L’exemple suivant montre comment créer un connecteur de streaming personnalisé pour Google BigQuery (BQ) à l’aide d’une source de données PySpark. Databricks fournit un connecteur Spark pour l’ingestion par lots BigQuery, et Lakehouse Federation peut également se connecter à distance à n’importe quel jeu de données BigQuery et extraire des données via la création de catalogues externes, mais ne prend en charge ni entièrement les flux de travail de streaming incrémentiels ni continus. Ce connecteur permet une migration incrémentielle en phase et une migration en temps quasi réel à partir de tables BigQuery nourries par des sources de diffusion en continu avec des points de contrôle persistants.
Ce connecteur personnalisé présente les fonctionnalités suivantes :
- Compatible avec Streaming structuré et les pipelines déclaratifs Lakeflow Spark.
- Prend en charge le suivi des enregistrements incrémentiels et l’ingestion continue de streaming, et suit la sémantique Structured Streaming.
- Utilise l’API Stockage BigQuery avec un protocole RPC pour une transmission de données plus rapide et moins coûteuse.
- Écrit des tables migrées directement dans le catalogue Unity.
- Gère automatiquement les points de contrôle à l’aide d’un champ incrémentiel basé sur une date ou un horodatage.
- Prend en charge l’ingestion par lots avec
Trigger.AvailableNow(). - Nécessite aucun stockage cloud intermédiaire.
- Sérialise la transmission de données BigQuery au format Arrow ou Avro.
- Gère le parallélisme automatique et distribue le travail entre les workers Spark en fonction du volume de données.
- Adapté à la migration des couches Brute et Bronze à partir de BigQuery, avec prise en charge des migrations des couches Argent et Or à l’aide de modèles SCD Type 1 ou Type 2.
Prerequisites
Avant d’implémenter le connecteur personnalisé, installez les packages requis :
%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage
Étape 1 : Implémenter le lecteur de flux
Tout d’abord, implémentez le lecteur de données de streaming. La DataSourceStreamReader sous-classe doit implémenter les méthodes suivantes :
initialOffset(self) -> dictlatestOffset(self) -> dictpartitions(self, start: dict, end: dict) -> Sequence[InputPartition]read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]commit(self, end: dict) -> Nonestop(self) -> None
Pour plus d’informations sur chaque méthode, consultez Méthodes.
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging
start_time = time.time()
class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx
class BQStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")
self.last_offset = None
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'
return {"offset": str(self.last_offset)}
def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery
if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'
client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]
if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}
return {"offset": str(self.last_offset)}
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
# project_id = self.auth_project_id
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"
# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO
parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]
def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []
for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
Étape 2 : Définir la source de données
Ensuite, définissez la source de données personnalisée. La DataSource sous-classe doit implémenter les méthodes suivantes :
name(cls) -> strschema(self) -> Union[StructType, str]
Pour plus d’informations sur chaque méthode, consultez Méthodes.
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery
class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "bigquery-streaming"
def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)
return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"
def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)
Étape 3 : Configurer et démarrer la requête de diffusion en continu
Enfin, inscrivez le connecteur, puis configurez et démarrez la requête de diffusion en continu :
spark.dataSource.register(BQStreamDataSource)
# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.
query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)
(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)
Ordre d’exécution
L’ordre d’exécution de la fonction du flux personnalisé est décrit ci-dessous.
Pour charger le DataFrame de flux Spark :
name(cls)
schema()
Pour le microbatch (n) d’un nouveau démarrage de requête ou lors du redémarrage d’une requête existante (nouveau ou point de contrôle existant) :
partitions(end_offset, end_offset) # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()
Pour le microbatch suivant (n+1) d’une requête en cours d’exécution sur un point de contrôle existant :
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
Remarque
La latestOffset fonction gère le point de contrôle. Partagez une variable de point de contrôle d’un type primitif entre les fonctions et retournez-la en tant que dictionnaire. Par exemple : return {"offset": str(self.last_offset)}
Dépannage
Si la sortie génère l’erreur suivante, votre calcul ne prend pas en charge les sources de données PySpark personnalisées. Vous devez utiliser Databricks Runtime 15.2 ou version ultérieure.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000