Partager via


create_auto_cdc_flow

La create_auto_cdc_flow() fonction crée un flux qui utilise la fonctionnalité de capture de données modifiées (CDC) des pipelines déclaratives Lakeflow Spark pour traiter les données sources à partir d’un flux de changement de données (CDF).

Note

Cette fonction remplace la fonction apply_changes()précédente. Les deux fonctions ont la même signature. Databricks recommande la mise à jour pour utiliser le nouveau nom.

Important

Vous devez déclarer une table de diffusion en continu cible pour appliquer des modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la create_auto_cdc_flow() table cible, vous devez inclure les colonnes __START_AT avec le même type de données que les champs __END_AT.

Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table() dans l’interface Python du pipeline.

Syntaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

Pour create_auto_cdc_flow le traitement, le comportement par défaut pour INSERT et UPDATE les événements consiste à mettre à jour les événements CDC à partir de la source : mettez à jour toutes les lignes de la table cible qui correspondent aux clés spécifiées ou insère une nouvelle ligne lorsqu’un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements pour DELETE peut être spécifiée avec le paramètre apply_as_deletes.

Pour en savoir plus sur le traitement CDC avec un flux de changements, consultez les API AUTO CDC : Simplifier la capture de données modifiées avec des pipelines. Pour obtenir un exemple d’utilisation de la create_auto_cdc_flow() fonction, consultez Exemple : type SCD 1 et traitement SCD type 2 avec des données sources CDF.

Paramètres

Paramètre Type Descriptif
target str Obligatoire. Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d’exécuter la create_auto_cdc_flow() fonction.
source str Obligatoire. Source de données contenant des enregistrements CDC.
keys list Obligatoire. Colonne ou combinaison de colonnes qui identifient de manière unique une ligne dans les données sources. Cela permet d’identifier les événements cdc qui s’appliquent à des enregistrements spécifiques dans la table cible. Vous pouvez spécifier l’une ou l’autre des options suivantes :
  • Liste de chaînes : ["userId", "orderId"]
  • Liste des fonctions Spark SQL col() : [col("userId"), col("orderId")]. Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId).
sequence_by str, col() ou struct() Obligatoire. Noms de colonnes spécifiant l’ordre logique des événements CDC dans les données sources. Lakeflow Spark Declarative Pipelines utilise ce séquencement pour gérer les événements de modification qui arrivent hors ordre. La colonne spécifiée doit être un type de données triable. Vous pouvez spécifier l’une ou l’autre des options suivantes :
  • Chaîne : "sequenceNum"
  • Fonction Spark SQL col() : col("sequenceNum"). Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId).
  • Combinaison struct() de plusieurs colonnes pour résoudre des égalités : struct("timestamp_col", "id_col"), il trie d’abord par le premier champ de structure, puis par le deuxième champ en cas d’égalité, et ainsi de suite.
ignore_null_updates bool Autoriser l’ingestion des mises à jour contenant un sous-ensemble des colonnes cibles. Lorsqu'un événement CDC correspond à une ligne existante et que ignore_null_updates est True, les colonnes avec une null conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec la valeur .null Quand ignore_null_updates est False, les valeurs existantes sont remplacées par les valeurs null.
La valeur par défaut est False.
apply_as_deletes str ou expr() Spécifie quand un événement CDC doit être traité comme une DELETE insertion/mise à jour plutôt qu'une. Vous pouvez spécifier l’une ou l’autre des options suivantes :
  • Chaîne : "Operation = 'DELETE'"
  • Une fonction Spark SQL expr() : expr("Operation = 'DELETE'")

Pour gérer les données hors ordre, la ligne supprimée est temporairement conservée en tant que "tombstone" dans la table Delta sous-jacente, et une vue est créée dans le metastore qui filtre ces tombstones. L’intervalle de rétention est défini par défaut sur deux jours et peut être configuré avec la pipelines.cdc.tombstoneGCThresholdInSeconds propriété de table.
apply_as_truncates str ou expr() Spécifie quand un événement CDC doit être traité comme une table TRUNCATEcomplète. Vous pouvez spécifier l’une ou l’autre des options suivantes :
  • Chaîne : "Operation = 'TRUNCATE'"
  • Une fonction Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Étant donné que cette clause supprime toutes les données de la table cible, elle doit être utilisée uniquement pour des cas d'utilisation spécifiques nécessitant cette fonctionnalité. Le apply_as_truncates paramètre est pris en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge les opérations de troncation.
column_list ou except_column_list list Sous-ensemble de colonnes à inclure dans la table cible. Permet column_list de spécifier la liste complète des colonnes à inclure. Permet except_column_list de spécifier les colonnes à exclure. Vous pouvez déclarer une valeur en tant que liste de chaînes ou en tant que fonctions Spark SQL col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId). Par défaut, toutes les colonnes de la table cible sont incluses lorsqu'aucun argument column_list ou except_column_list n'est passé à la fonction.
stored_as_scd_type str ou int Indique s’il faut stocker des enregistrements en tant que type SCD 1 ou SCD type 2. Défini sur pour 1 le type SCD 1 ou 2 pour le type SCD 2. La valeur par défaut est la méthode SCD de type 1.
track_history_column_list ou track_history_except_column_list list Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Permet track_history_column_list de spécifier la liste complète des colonnes à suivre. Permet track_history_except_column_list de spécifier les colonnes à exclure du suivi. Vous pouvez déclarer une valeur en tant que liste de chaînes ou en tant que fonctions Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId). Par défaut, toutes les colonnes de la table cible sont incluses lorsqu'aucun argument track_history_column_list ou track_history_except_column_list n'est passé à la fonction.
name str Nom du flux. S’il n’est pas fourni, la valeur par défaut est la même que target.
once bool Si vous le souhaitez, définissez le flux en tant que flux à usage unique, tel qu’un remblai. L'utilisation de once=True change le flux de deux manières :
  • La valeur de retour. streaming-query. doit être un DataFrame de traitement par lots dans ce cas, plutôt qu'un DataFrame de diffusion en continu.
  • Le flux est exécuté une fois par défaut. Si le pipeline est mis à jour avec une actualisation complète, le ONCE flux s’exécute à nouveau pour recréer les données.