Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Répliquer une table SGBDR externe à l’aide de
Cette page vous guide tout au long de la réplication d’une table à partir d’un système de gestion de base de données relationnelle externe (SGBDR) dans Azure Databricks à l’aide de l’API AUTO CDC dans les pipelines. Vous allez apprendre :
- Modèles courants pour la configuration des sources.
- Comment effectuer une copie complète ponctuelle des données existantes à l’aide d’un
onceflux. - Comment ingérer en continu de nouvelles modifications à l’aide d’un
changeflux.
Ce modèle est idéal pour créer des tables de dimension à variation lente (SCD) ou maintenir une table cible synchronisée avec un système d’enregistrement externe.
Avant de commencer
Ce guide suppose que vous avez accès aux jeux de données suivants à partir de votre source :
- Capture instantanée complète de la table source dans le stockage cloud. Ce jeu de données est utilisé pour la charge initiale.
- Flux de modification continu, rempli dans le même emplacement de stockage cloud (par exemple, à l’aide de Debezium, Kafka ou de la capture de données de changement (CDC) basée sur les journaux). Ce flux est l’entrée du processus en cours
AUTO CDC.
Configurer des vues de source
Tout d’abord, définissez deux vues sources pour remplir la rdbms_orders table cible à partir d’un chemin orders_snapshot_pathde stockage cloud. Les deux sont créés en tant que vues de streaming sur des données brutes dans le stockage en nuage. L’utilisation de vues offre une efficacité plus élevée, car les données ne doivent pas être écrites avant d’être utilisées dans le AUTO CDC processus.
- La première vue source est un instantané complet (
full_orders_snapshot) - La seconde est un flux de modification continu (
rdbms_orders_change_feed).
Les exemples de ce guide utilisent le stockage cloud comme source, mais vous pouvez utiliser n’importe quelle source prise en charge par les tables de diffusion en continu.
full_orders_snapshot()
Cette étape crée un pipeline avec une vue qui lit l’instantané complet initial des données de commandes.
Python
L’exemple Python suivant :
- Utilise
spark.readStreamavec Auto Loader (format("cloudFiles")) - Lit les fichiers JSON à partir d’un répertoire défini par
orders_snapshot_path - Définit
includeExistingFilesàtrueafin de garantir que les données historiques déjà présentes dans le chemin d'accès sont traitées - Définit
inferColumnTypessurtruepour déduire le schéma automatiquement - Retourne toutes les colonnes avec
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
L’exemple SQL suivant transmet les options sous la forme d’une carte de paires clé-valeur de chaîne.
orders_snapshot_path doit être disponible en tant que variable SQL (par exemple, définie à l’aide de paramètres de pipeline ou interpolée manuellement).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Cette étape crée une deuxième vue qui lit les données de changement incrémentielles (par exemple, à partir des journaux CDC ou des tables de changement). Il lit orders_cdc_path et part du principe que des fichiers JSON de style CDC sont déposés régulièrement dans ce chemin d’accès.
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
Dans l’exemple SQL suivant, ${orders_cdc_path} est une variable et peut être interpolée en définissant une valeur dans vos paramètres de pipeline ou en définissant explicitement une variable dans votre code.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Hydratation initiale (une fois le flux)
Maintenant que les sources sont configurées, AUTO CDC la logique fusionne les deux sources dans une table de streaming cible. Tout d’abord, utilisez un flux unique AUTO CDC avec ONCE=TRUE pour copier le contenu complet de la table SGBDR (base de données relationnelle) dans une table de streaming. Cette opération prépare la table cible avec des données historiques sans la relire dans les mises à jour ultérieures.
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Le once flux s’exécute une seule fois. Les nouveaux fichiers qui sont ajoutés à full_orders_snapshot après la création du pipeline sont ignorés.
Important
L’exécution d’une actualisation complète sur la rdbms_orders table de diffusion en continu réexécute le once processus. Si les données d’instantané initiales dans le stockage cloud ont été supprimées, cela entraîne une perte de données.
Flux de modification continu
Après la charge initiale de capture instantanée, utilisez un autre AUTO CDC flux pour ingérer en continu les modifications du flux CDC du SGBDR. Cela permet à votre rdbms_orders table de rester à jour avec les insertions, les mises à jour et les suppressions.
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Considérations
| Remplissage de l'idempotence | Un once flux s’exécute à nouveau uniquement lorsque la table cible est entièrement actualisée. |
|---|---|
| Plusieurs flux | Vous pouvez utiliser plusieurs flux de modification pour fusionner dans des corrections, des données arrivant tardivement ou d’autres flux, mais tous doivent partager un schéma et des clés. |
| Actualisation complète | Une actualisation complète sur la table de rdbms_orders streaming redémarre le flux once. Cela peut entraîner une perte de données si l’emplacement de stockage cloud initial a supprimé les données d’instantané initiales. |
| Ordre d’exécution du flux | L’ordre d’exécution du flux n’a pas d’importance. Le résultat final est le même. |
Ressources supplémentaires
- Connecteur SQL Server complètement managé dans Lakeflow Connect