Partager via


create_auto_cdc_from_snapshot_flow

La create_auto_cdc_from_snapshot_flow fonction crée un flux qui utilise la fonctionnalité de capture de données modifiées (CDC) des pipelines déclaratifs Lakeflow Spark pour traiter les données sources issues des instantanés de base de données. Découvrez comment la capture de données modifiées est-elle implémentée avec l’API AUTO CDC FROM SNAPSHOT ?.

Note

Cette fonction remplace la fonction apply_changes_from_snapshot()précédente. Les deux fonctions ont la même signature. Databricks recommande la mise à jour pour utiliser le nouveau nom.

Important

Vous devez disposer d’une table de diffusion en continu cible pour cette opération. Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table(). Vous ne pouvez pas cibler la même table de diffusion en continu à la fois avec create_auto_cdc_from_snapshot_flow() et create_auto_cdc_flow().

Syntaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Note

Pour AUTO CDC FROM SNAPSHOT le traitement, le comportement par défaut consiste à insérer une nouvelle ligne lorsqu’un enregistrement correspondant avec la ou les mêmes clés n’existe pas dans la cible. Si un enregistrement correspondant existe, il est mis à jour uniquement si l’une des valeurs de la ligne a changé. Les lignes avec des clés présentes dans la cible, mais qui ne sont plus présentes dans la source, sont supprimées.

Pour en savoir plus sur le traitement CDC avec des captures instantanées, consultez les API AUTO CDC : simplifiez la capture de données modifiées à l'aide de pipelines. Pour obtenir des exemples d’utilisation de la create_auto_cdc_from_snapshot_flow() fonction, consultez les exemples d’ingestion d’instantanés périodiques et d’ingestion d’instantanés historiques .

Paramètres

Paramètre Type Descriptif
target str Obligatoire. Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d’exécuter la create_auto_cdc_from_snapshot_flow() fonction.
source str ou lambda function Obligatoire. Nom d’une table ou d’une vue pour un instantané à intervalles réguliers, ou une fonction lambda Python qui retourne le DataFrame d’instantané à traiter, ainsi que la version de l’instantané. Consultez Implémenter l’argumentsource.
keys list Obligatoire. Colonne ou combinaison de colonnes qui identifient de manière unique une ligne dans les données sources. Cela permet d’identifier les événements cdc qui s’appliquent à des enregistrements spécifiques dans la table cible. Vous pouvez spécifier l’une ou l’autre des options suivantes :
  • Liste de chaînes : ["userId", "orderId"]
  • Liste des fonctions Spark SQL col() : [col("userId"), col("orderId"]. Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId).
stored_as_scd_type str ou int Indique s’il faut stocker des enregistrements en tant que type SCD 1 ou SCD type 2. Défini sur pour 1 le type SCD 1 ou 2 pour le type SCD 2. La valeur par défaut est la méthode SCD de type 1.
track_history_column_list ou track_history_except_column_list list Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Permet track_history_column_list de spécifier la liste complète des colonnes à suivre. Permet track_history_except_column_list de spécifier les colonnes à exclure du suivi. Vous pouvez déclarer une valeur en tant que liste de chaînes ou en tant que fonctions Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Les arguments des col() fonctions ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais vous ne pouvez pas utiliser col(source.userId). La valeur par défaut est d’inclure toutes les colonnes de la table cible lorsque ni un argument track_history_column_list ni track_history_except_column_list n'est passé à la fonction.

Implémenter l’argument source

La create_auto_cdc_from_snapshot_flow() fonction inclut l’argument source . Pour le traitement des instantanés historiques, l’argument source est censé être une fonction lambda Python qui retourne deux valeurs à la create_auto_cdc_from_snapshot_flow() fonction : un DataFrame Python contenant les données d’instantané à traiter et une version d’instantané.

Voici la signature de la fonction lambda :

lambda Any => Optional[(DataFrame, Any)]
  • L'argument de la fonction lambda est la version instantanée la plus récemment traitée.
  • La valeur de retour de la fonction lambda est None ou un tuple de deux valeurs : la première valeur du tuple est un DataFrame contenant l’instantané à traiter. La deuxième valeur du tuple est la version de l'instantané qui représente l’ordre logique de l’instantané.

Exemple qui implémente et appelle la fonction lambda :

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Le runtime de pipelines déclaratifs Spark Lakeflow effectue les étapes suivantes chaque fois que le pipeline qui contient la fonction create_auto_cdc_from_snapshot_flow() est déclenché :

  1. Exécute la next_snapshot_and_version fonction pour charger le DataFrame d’instantané suivant et la version d’instantané correspondante.
  2. Si aucun DataFrame n’est retourné, l’exécution est terminée et la mise à jour du pipeline est marquée comme terminée.
  3. Détecte les modifications apportées à la nouvelle capture instantanée et les applique de manière incrémentielle à la table cible.
  4. Retourne à l’étape 1 pour charger l’instantané suivant et sa version.