Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
La create_auto_cdc_flow() fonction crée un flux qui utilise la fonctionnalité de capture de données modifiées (CDC) des pipelines déclaratives Lakeflow Spark pour traiter les données sources à partir d’un flux de changement de données (CDF).
Note
Cette fonction remplace la fonction apply_changes()précédente. Les deux fonctions ont la même signature. Databricks recommande la mise à jour pour utiliser le nouveau nom.
Important
Vous devez déclarer une table de diffusion en continu cible pour appliquer des modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la create_auto_cdc_flow() table cible, vous devez inclure les colonnes __START_AT avec le même type de données que les champs __END_AT.
Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table() dans l’interface Python du pipeline.
Syntaxe
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Pour create_auto_cdc_flow le traitement, le comportement par défaut pour INSERT et UPDATE les événements consiste à mettre à jour les événements CDC à partir de la source : mettez à jour toutes les lignes de la table cible qui correspondent aux clés spécifiées ou insère une nouvelle ligne lorsqu’un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements pour DELETE peut être spécifiée avec le paramètre apply_as_deletes.
Pour en savoir plus sur le traitement CDC avec un flux de changements, consultez les API AUTO CDC : Simplifier la capture de données modifiées avec des pipelines. Pour obtenir un exemple d’utilisation de la create_auto_cdc_flow() fonction, consultez Exemple : type SCD 1 et traitement SCD type 2 avec des données sources CDF.
Paramètres
| Paramètre | Type | Descriptif |
|---|---|---|
target |
str |
Obligatoire. Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d’exécuter la create_auto_cdc_flow() fonction. |
source |
str |
Obligatoire. Source de données contenant des enregistrements CDC. |
keys |
list |
Obligatoire. Colonne ou combinaison de colonnes qui identifient de manière unique une ligne dans les données sources. Cela permet d’identifier les événements cdc qui s’appliquent à des enregistrements spécifiques dans la table cible. Vous pouvez spécifier l’une ou l’autre des options suivantes :
|
sequence_by |
str, col() ou struct() |
Obligatoire. Noms de colonnes spécifiant l’ordre logique des événements CDC dans les données sources. Lakeflow Spark Declarative Pipelines utilise ce séquencement pour gérer les événements de modification qui arrivent hors ordre. La colonne spécifiée doit être un type de données triable. Vous pouvez spécifier l’une ou l’autre des options suivantes :
|
ignore_null_updates |
bool |
Autoriser l’ingestion des mises à jour contenant un sous-ensemble des colonnes cibles. Lorsqu'un événement CDC correspond à une ligne existante et que ignore_null_updates est True, les colonnes avec une null conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec la valeur .null Quand ignore_null_updates est False, les valeurs existantes sont remplacées par les valeurs null.La valeur par défaut est False. |
apply_as_deletes |
str ou expr() |
Spécifie quand un événement CDC doit être traité comme une DELETE insertion/mise à jour plutôt qu'une. Vous pouvez spécifier l’une ou l’autre des options suivantes :
Pour gérer les données hors ordre, la ligne supprimée est temporairement conservée en tant que "tombstone" dans la table Delta sous-jacente, et une vue est créée dans le metastore qui filtre ces tombstones. L’intervalle de rétention est défini par défaut sur deux jours et peut être configuré avec la pipelines.cdc.tombstoneGCThresholdInSeconds propriété de table. |
apply_as_truncates |
str ou expr() |
Spécifie quand un événement CDC doit être traité comme une table TRUNCATEcomplète. Vous pouvez spécifier l’une ou l’autre des options suivantes :
Étant donné que cette clause supprime toutes les données de la table cible, elle doit être utilisée uniquement pour des cas d'utilisation spécifiques nécessitant cette fonctionnalité. Le apply_as_truncates paramètre est pris en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge les opérations de troncation. |
column_list ou except_column_list |
list |
Sous-ensemble de colonnes à inclure dans la table cible. Permet column_list de spécifier la liste complète des colonnes à inclure. Permet except_column_list de spécifier les colonnes à exclure. Vous pouvez déclarer une valeur en tant que liste de chaînes ou en tant que fonctions Spark SQL col() :
Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId). Par défaut, toutes les colonnes de la table cible sont incluses lorsqu'aucun argument column_list ou except_column_list n'est passé à la fonction. |
stored_as_scd_type |
str ou int |
Indique s’il faut stocker des enregistrements en tant que type SCD 1 ou SCD type 2. Défini sur pour 1 le type SCD 1 ou 2 pour le type SCD 2. La valeur par défaut est la méthode SCD de type 1. |
track_history_column_list ou track_history_except_column_list |
list |
Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Permet track_history_column_list de spécifier la liste complète des colonnes à suivre. Permet track_history_except_column_list de spécifier les colonnes à exclure du suivi. Vous pouvez déclarer une valeur en tant que liste de chaînes ou en tant que fonctions Spark SQL col() :
Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId). Par défaut, toutes les colonnes de la table cible sont incluses lorsqu'aucun argument track_history_column_list ou track_history_except_column_list n'est passé à la fonction. |
name |
str |
Nom du flux. S’il n’est pas fourni, la valeur par défaut est la même que target. |
once |
bool |
Si vous le souhaitez, définissez le flux en tant que flux à usage unique, tel qu’un remblai. L'utilisation de once=True change le flux de deux manières :
|