Partager via


Informations de référence sur le langage Python dans Delta Live Tables

Cet article contient des détails sur l’interface de programmation Python de Delta Live Tables.

Pour plus d’informations sur l’API SQL, consultez Informations de référence sur le langage SQL dans Delta Live Tables.

Pour plus de détails spécifiques à la configuration d’Auto Loader, voir Qu’est-ce qu’Auto Loader ?.

Avant de commencer

Les considérations suivantes sont importantes quand vous implémentez des pipelines avec l’interface Python de Delta Live Tables :

  • Étant donné que les fonctions Python table() et view() sont appelées plusieurs fois pendant la planification et l’exécution de la mise à jour d’un pipeline, n’incluez pas de code dans une de ces fonctions qui serait susceptible d’avoir des effets secondaires (par exemple, du code qui modifie des données ou envoie un e-mail). Pour éviter un comportement inattendu, vos fonctions Python qui définissent des jeux de données doivent inclure seulement le code nécessaire pour définir la table ou la vue.
  • Pour effectuer des opérations telles que l’envoi d’e-mails ou l’intégration à un service de surveillance externe, en particulier dans les fonctions qui définissent des jeux de données, utilisez des hooks d’événements. L’implémentation de ces opérations dans les fonctions qui définissent vos jeux de données va provoquer un comportement inattendu.
  • Les fonctions Python table et view doivent retourner un DataFrame. Certaines fonctions qui fonctionnent sur des DataFrames ne retournent pas de DataFrames et ne doivent pas être utilisées. Ces opérations incluent des fonctions telles que collect(), count(), toPandas(), save() et saveAsTable(). Étant donné que les transformations DataFrame sont exécutées une fois le graphe de flux de données complet résolu, l’utilisation de telles opérations peut avoir des effets secondaires inattendus. Toutefois, vous pouvez inclure ces fonctions en dehors des définitions de fonction table ou view, car ce code est exécuté une fois pendant la phase d’initialisation du graphe.

Importer le module Python dlt

Les fonctions Python Delta Live Tables sont définies dans le module dlt. Vos pipelines implémentés avec l’API Python doivent importer ce module :

import dlt

Créer une vue matérialisée ou une table de streaming Delta Live Tables

En Python, Delta Live Tables détermine s'il convient de mettre à jour un ensemble de données en tant que vue matérialisée ou table de streaming en fonction de la requête de définition. Le décorateur @table peut être utilisé pour définir à la fois des vues matérialisées et des tables de diffusion en continu.

Pour définir une vue matérialisée en Python, appliquez @table à une requête qui effectue une lecture statique sur une source de données. Pour définir une table de diffusion en continu, appliquez @table à une requête qui effectue une lecture en continu sur une source de données ou utilisez la fonction create_streaming_table(). Les deux types d’ensembles de données ont la même spécification de syntaxe comme suit :

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Créer une vue Delta Live Tables

Pour définir une vue dans Python, appliquez l'élément décoratif @view. Comme le décorateur @table, vous pouvez utiliser des vues dans Delta Live Tables pour des ensembles de données statiques ou en streaming. Voici la syntaxe pour définir des vues avec Python :

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemple : Définir des tables et des vues

Pour définir une table ou une vue en Python, appliquez le décorateur @dlt.view ou @dlt.table à une fonction. Vous pouvez utiliser le nom de la fonction ou le paramètre name pour attribuer le nom de la table ou de la vue. L’exemple suivant définit deux jeux de données différents : une vue appelée taxi_raw qui prend un fichier JSON comme source d’entrée, et une table appelée filtered_data qui prend la vue taxi_raw comme entrée :

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Exemple : accéder à un ensemble de données défini dans le même pipeline

En plus de lire à partir de sources de données externes, vous pouvez accéder à des ensembles de données définis dans le même pipeline avec la fonction Delta Live Tables read(). L'exemple suivant montre la création d'un ensemble de données customers_filtered à l'aide de la fonction read() :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Vous pouvez également utiliser la fonction spark.table() pour accéder à un ensemble de données défini dans le même pipeline. Lorsque vous utilisez la fonction spark.table() pour accéder à un jeu de données défini dans le pipeline, dans l’argument de fonction, ajoutez le mot clé LIVE au début du nom du jeu de données :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Exemple : lecture à partir d'une table enregistrée dans un métastore

Pour lire les données d'une table enregistrée dans le metastore Hive, dans l'argument de la fonction, omettez le mot-clé LIVE et qualifiez éventuellement le nom de la table avec le nom de la base de données :

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Pour obtenir un exemple de lecture à partir d'une table Unity Catalog, consultez Ingérer des données dans un pipeline Unity Catalog.

Exemple : accédez à un ensemble de données à l'aide de spark.sql

Vous pouvez également retourner un jeu de données à l’aide d’une expression spark.sql dans une fonction de requête. Pour lire dans un jeu de données interne, ajoutez LIVE. au début le nom du jeu de données :

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Créer une table à utiliser comme cible des opérations de streaming

Utilisez la fonction create_streaming_table() pour créer une table cible pour la sortie des enregistrements par les opérations de diffusion en continu, y compris les enregistrements de sortie apply_changes(), apply_changes_from_snapshot() et @append_flow.

Remarque

Les fonctions create_target_table() et create_streaming_live_table() sont obsolètes. Databricks recommande de mettre à jour le code existant pour utiliser la fonction create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Arguments
name

Entrez : str

Nom de la table.

Ce paramètre est obligatoire.
comment

Entrez : str

Description facultative de la table.
spark_conf

Entrez : dict

Liste facultative de configurations Spark pour l’exécution de cette requête.
table_properties

Entrez : dict

Liste facultative des propriétés de table disponibles pour la table.
partition_cols

Entrez : array

Liste facultative d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table.
path

Entrez : str

Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.
schema

Type : str ou StructType

Définition de schéma facultative pour la table. Les schémas peuvent être définis en tant que chaîne SQL DDL ou avec Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Entrez : dict

Contraintes facultative de qualité des données pour la table. Consultez attentes multiples.

Contrôler la façon dont les tables sont matérialisées

Les tables offrent également un contrôle supplémentaire de leur matérialisation :

Remarque

Pour les tables d’une taille inférieure à 1 To, Databricks recommande de laisser Delta Live Tables contrôler l’organisation des données. Vous ne devez pas spécifier de colonnes de partition, sauf si vous pensez que votre table dépassera un téraoctet.

Exemple : spécifier un schéma et des colonnes de partition

Vous pouvez éventuellement spécifier un schéma de table à l’aide d’une chaîne StructType Python ou SQL DDL. Quand elle est spécifiée avec une chaîne DDL, la définition peut inclure des colonnes générées.

L'exemple suivant crée une table appelée sales avec un schéma spécifié à l'aide de Python StructType :

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

L'exemple suivant spécifie le schéma d'une table à l'aide d'une chaîne DDL, définit une colonne générée et définit une colonne de partition :

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Par défaut, Delta Live Tables déduit le schéma de la définition table si vous ne spécifiez pas de schéma.

Configurer une table de streaming pour ignorer les modifications dans une table de streaming source

Remarque

  • Le drapeau skipChangeCommits ne fonctionne qu'avec spark.readStream en utilisant la fonction option(). Vous ne pouvez pas utiliser cet indicateur dans une fonction dlt.read_stream().
  • Vous ne pouvez pas utiliser l’indicateur skipChangeCommits lorsque la table de streaming source est définie comme cible d’une fonction apply_changes().

Par défaut, les tables de streaming nécessitent des sources en ajout uniquement. Lorsqu'une table de streaming utilise une autre table de streaming comme source et que la table de streaming source nécessite des mises à jour ou des suppressions, par exemple, le traitement du « droit à l'oubli » RGPD, l'indicateur skipChangeCommits peut être défini lors de la lecture de la table de streaming pour ignorer ces modifications. Pour plus d’informations sur cet indicateur, consultez Ignorer les mises à jour et les suppressions.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Propriétés des tables dynamiques Python Delta

Les tableaux suivants décrivent les options et propriétés que vous pouvez spécifier lors de la définition de tables et de vues avec Delta Live Tables :

@table ou @view
name

Entrez : str

Nom facultatif pour la table ou la vue. S’il n’est pas défini, le nom de la fonction est utilisé comme nom de la table ou de la vue.
comment

Entrez : str

Description facultative de la table.
spark_conf

Entrez : dict

Liste facultative de configurations Spark pour l’exécution de cette requête.
table_properties

Entrez : dict

Liste facultative des propriétés de table disponibles pour la table.
path

Entrez : str

Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.
partition_cols

Entrez : a collection of str

Collection facultative, comme list d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table.
schema

Type : str ou StructType

Définition de schéma facultative pour la table. Les schémas peuvent être définis en tant que chaîne SQL DDL ou avec Python
StructType.
temporary

Entrez : bool

Créez une table, mais ne publiez pas de métadonnées pour la table. Le mot clé temporary indique à Delta Live Tables de créer une table qui est disponible pour le pipeline, mais qui ne doit pas être accessible en dehors du pipeline. Pour réduire le temps de traitement, une table temporaire persiste pendant la durée de vie du pipeline qui la crée, pas uniquement pour une seule mise à jour.

La valeur par défaut est False.
Définition de la table ou de la vue
def <function-name>()

Fonction Python qui définit le jeu de données. Si le paramètre name n’est pas défini, <function-name> est utilisé comme nom du jeu de données cible.
query

Instruction SQL Spark qui retourne un jeu de données Spark ou un DataFrame Koalas.

Utilisez dlt.read() ou spark.table() pour effectuer une lecture complète à partir d’un jeu de données défini dans le même pipeline. Lorsque vous utilisez la fonction spark.table() pour lire dans un jeu de données défini dans le même pipeline, ajoutez le mot clé LIVE au début du nom du jeu de données dans l’argument de fonction. Par exemple, pour lire dans un jeu de données nommé customers :

spark.table("LIVE.customers")

Vous pouvez également utiliser la fonction spark.table() pour lire dans une table inscrite dans le metastore en omettant le mot clé LIVE et en qualifiant éventuellement le nom de la table avec le nom de la base de données :

spark.table("sales.customers")

Utilisez dlt.read_stream() pour effectuer une lecture en streaming à partir d’un jeu de données défini dans le même pipeline.

Utilisez la fonction spark.sql pour définir une requête SQL afin de créer le jeu de données de retour.

Utilisez la syntaxe PySpark pour définir des requêtes Delta Live Tables avec Python.
Attentes
@expect("description", "constraint")

Déclarez une contrainte de qualité des données identifiée par
description. Si une ligne enfreint l’attente, incluez la ligne dans le jeu de données cible.
@expect_or_drop("description", "constraint")

Déclarez une contrainte de qualité des données identifiée par
description. Si une ligne enfreint l’attente, supprimez la ligne du jeu de données cible.
@expect_or_fail("description", "constraint")

Déclarez une contrainte de qualité des données identifiée par
description. Si une ligne enfreint l’attente, arrêtez immédiatement l’exécution.
@expect_all(expectations)

Déclarez une ou plusieurs contraintes de qualité des données.
expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, incluez la ligne dans le jeu de données cible.
@expect_all_or_drop(expectations)

Déclarez une ou plusieurs contraintes de qualité des données.
expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, supprimez la ligne du jeu de données cible.
@expect_all_or_fail(expectations)

Déclarez une ou plusieurs contraintes de qualité des données.
expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, arrêtez immédiatement l’exécution.

Modifier la capture de données d’un flux de modification avec Python dans Delta Live Tables

Utilisez la fonction apply_changes() dans l’API Python pour utiliser la fonctionnalité de capture des changements de données (CDC) Delta Live Tables pour traiter les données sources à partir d’un flux de changements de données (CDF).

Important

Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible apply_changes(), vous devez inclure les colonnes __START_AT et __END_AT avec le même type de données que les champs sequence_by.

Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table() dans l’interface Python Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Remarque

Pour le traitement de APPLY CHANGES, le comportement par défaut des évènements INSERT et UPDATE consiste à effectuer un upsert des évènements CDC depuis la source : mettre à jour les lignes de la table cible qui correspondent aux clés spécifiées, ou insérer une nouvelle ligne quand un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements DELETE peut être spécifiée avec la condition APPLY AS DELETE WHEN.

Pour en savoir plus sur le traitement CDC avec un flux de changements, consultez API APPLY CHANGES : Simplifiez la capture des changements de données avec Delta Live Tables. Pour obtenir un exemple de l’utilisation de la fonction apply_changes(), consultez Exemple : traitement SCD type 1 et SCD type 2 avec des données sources CDF.

Important

Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible apply_changes, vous devez inclure les colonnes __START_AT et __END_AT avec le même type de données que le champ sequence_by.

Consultez API APPLY CHANGES : Simplifier la capture des changements de données avec Delta Live Tables.

Arguments
target

Entrez : str

Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d'exécuter la fonction apply_changes().

Ce paramètre est obligatoire.
source

Entrez : str

Source de données contenant les enregistrements de capture des changements de données.

Ce paramètre est obligatoire.
keys

Entrez : list

Colonne ou combinaison de colonnes identifiant de façon unique une ligne dans les données sources. Utilisée pour identifier les événements de capture des changements de données qui s’appliquent à des enregistrements spécifiques dans la table cible.

Vous pouvez spécifier l’un des éléments suivants :

* Liste de chaînes : ["userId", "orderId"]
* Liste de fonctions col() Spark SQL : [col("userId"), col("orderId"]

Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais pas col(source.userId).

Ce paramètre est obligatoire.
sequence_by

Type : str ou col()

Nom de colonne spécifiant l’ordre logique des événements de capture des changements de données dans les données sources. Delta Live Tables utilise ce séquencement pour gérer les événements de modification qui se produisent dans le désordre.

Vous pouvez spécifier l’un des éléments suivants :

* Chaîne : "sequenceNum"
* Fonction col() Spark SQL : col("sequenceNum")

Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais pas col(source.userId).

Ce paramètre est obligatoire.
ignore_null_updates

Entrez : bool

Autoriser l’ingestion des mises à jour contenant un sous-ensemble des colonnes cibles. Quand un évènement CDC correspond à une ligne existante et que ignore_null_updates a la valeur True, les colonnes avec null conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec une valeur de null. Quand ignore_null_updates a la valeur False, les valeurs existantes sont remplacées par des valeurs null.

Ce paramètre est facultatif.

Par défaut, il s’agit de False.
apply_as_deletes

Type : str ou expr()

Spécifie quand un événement de capture des changements de données doit être traité en tant qu’opération DELETE plutôt qu’opération upsert. Pour gérer des données non ordonnées, la ligne supprimée est conservée temporairement en tant qu’objet tombstone dans la table Delta sous-jacente, et un affichage est créé dans le metastore, qui filtre ces objets tombstone. Vous pouvez configurer l’intervalle de conservation avec la
Propriété de table pipelines.cdc.tombstoneGCThresholdInSeconds.

Vous pouvez spécifier l’un des éléments suivants :

* Chaîne : "Operation = 'DELETE'"
* Fonction expr() Spark SQL : expr("Operation = 'DELETE'")

Ce paramètre est facultatif.
apply_as_truncates

Type : str ou expr()

Spécifie quand un événement de capture des changements de données doit être traité en tant que TRUNCATE de table complet. Étant donné que cette clause déclenche une troncation complète de la table cible, elle doit être utilisée uniquement pour des cas d’usage spécifiques nécessitant cette fonctionnalité.

Le paramètre apply_as_truncates est pris en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge les opérations de troncation.

Vous pouvez spécifier l’un des éléments suivants :

* Chaîne : "Operation = 'TRUNCATE'"
* Fonction expr() Spark SQL : expr("Operation = 'TRUNCATE'")

Ce paramètre est facultatif.
column_list

except_column_list

Entrez : list

Sous-ensemble de colonnes à inclure dans la table cible. Utilisez column_list pour spécifier la liste complète des colonnes à inclure. Utilisez except_column_list pour spécifier les colonnes à exclure. Vous pouvez déclarer les valeurs comme une liste de chaînes ou en tant que fonctions col() Spark SQL :

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais pas col(source.userId).

Ce paramètre est facultatif.

Par défaut toutes les colonnes de la table cible sont incluses quand aucun argument column_list ou except_column_list n’est passé à la fonction.
stored_as_scd_type

Type : str ou int

Indique s’il faut stocker des enregistrements en tant que méthode SCD de type 1 ou méthode SCD de type 2.

Défini sur 1 pour la méthode SCD de type 1 ou 2 pour la méthode SCD de type 2.

Cette clause est facultative.

La valeur par défaut est la méthode SCD de type 1.
track_history_column_list

track_history_except_column_list

Entrez : list

Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Utilisez track_history_column_list pour spécifier la liste complète des colonnes à suivre. Utilisation
track_history_except_column_list pour spécifier les colonnes à exclure du suivi. Vous pouvez déclarer les valeurs comme une liste de chaînes ou en tant que fonctions col() Spark SQL- track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais pas col(source.userId).

Ce paramètre est facultatif.

La valeur par défaut est d'inclure toutes les colonnes de la table cible lorsque aucune track_history_column_listou
Un argument track_history_except_column_list est transmis à la fonction.

Capture des changements de données à partir d’instantanés de base de données avec Python dans Delta Live Tables

Important

L’API APPLY CHANGES FROM SNAPSHOT est en préversion publique.

Utilisez la fonction apply_changes_from_snapshot() dans l’API Python pour utiliser la fonctionnalité de capture des changements de données (CDC) Delta Live Tables pour traiter les données sources à partir d’instantanés de base de données.

Important

Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible apply_changes_from_snapshot(), vous devez également inclure les colonnes __START_AT et __END_AT avec le même type de données que le champ sequence_by.

Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table() dans l’interface Python Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Remarque

Pour le traitement de APPLY CHANGES FROM SNAPSHOT, le comportement par défaut consiste à insérer une nouvelle ligne lorsqu’un enregistrement correspondant avec la ou les mêmes clés n’existe pas dans la cible. Si un enregistrement correspondant existe, il est mis à jour uniquement si l’une quelconque des valeurs de la ligne a changé. Les lignes avec des clés présentes dans la cible, mais qui ne sont plus présentes dans la source, sont supprimées.

Pour en savoir plus sur le traitement CDC avec des instantanés, consultez API APPLY CHANGES : Simplifiez la capture des changements de données avec Delta Live Tables. Pour des exemples d’utilisation de la fonction apply_changes_from_snapshot(), consultez les exemples ingestion d’instantané périodique et ingestion d’instantané historique.

Arguments
target

Entrez : str

Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d'exécuter la fonction apply_changes().

Ce paramètre est obligatoire.
source

Type : str ou lambda function

Nom d’une table ou d’une vue pour réaliser périodiquement un instantané ou d’une fonction lambda Python qui retourne le DataFrame de l’instantané à traiter et la version de l’instantané. Consultez Implémenter l’argument source.

Ce paramètre est obligatoire.
keys

Entrez : list

Colonne ou combinaison de colonnes identifiant de façon unique une ligne dans les données sources. Utilisée pour identifier les événements de capture des changements de données qui s’appliquent à des enregistrements spécifiques dans la table cible.

Vous pouvez spécifier l’un des éléments suivants :

* Liste de chaînes : ["userId", "orderId"]
* Liste de fonctions col() Spark SQL : [col("userId"), col("orderId"]

Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais pas col(source.userId).

Ce paramètre est obligatoire.
stored_as_scd_type

Type : str ou int

Indique s’il faut stocker des enregistrements en tant que méthode SCD de type 1 ou méthode SCD de type 2.

Défini sur 1 pour la méthode SCD de type 1 ou 2 pour la méthode SCD de type 2.

Cette clause est facultative.

La valeur par défaut est la méthode SCD de type 1.
track_history_column_list

track_history_except_column_list

Entrez : list

Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Utilisez track_history_column_list pour spécifier la liste complète des colonnes à suivre. Utilisation
track_history_except_column_list pour spécifier les colonnes à exclure du suivi. Vous pouvez déclarer les valeurs comme une liste de chaînes ou en tant que fonctions col() Spark SQL- track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId), mais pas col(source.userId).

Ce paramètre est facultatif.

La valeur par défaut est d'inclure toutes les colonnes de la table cible lorsque aucune track_history_column_listou
Un argument track_history_except_column_list est transmis à la fonction.

Implémenter l’argument source

La fonction apply_changes_from_snapshot() inclut l’argument source. Pour le traitement des instantanés historiques, l’argument source est censé être une fonction lambda Python qui retourne deux valeurs à la fonction apply_changes_from_snapshot() : un DataFrame Python contenant les données de l’instantané à traiter et une version de l’instantané.

Voici la signature de la fonction lambda :

lambda Any => Optional[(DataFrame, Any)]
  • L’argument de la fonction lambda est la version de l’instantanée traitée le plus récemment.
  • La valeur de retour de la fonction lambda est None ou un tuple de deux valeurs : la première valeur du tuple est un DataFrame contenant l’instantané à traiter. La deuxième valeur du tuple est la version de l’instantané qui représente l’ordre logique de l’instantané.

Exemple qui implémente et appelle la fonction lambda :

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Le runtime Delta Live Tables effectue les étapes suivantes chaque fois que le pipeline qui contient la fonction apply_changes_from_snapshot() est déclenché :

  1. Exécute la fonction next_snapshot_and_version pour charger le DataFrame de l’instantané suivant et la version de l’instantané correspondante.
  2. Si aucun DataFrame n’est retourné, l’exécution est terminée et la mise à jour du pipeline est marquée comme terminée.
  3. Détecte les changements apportés au nouvel instantané et les applique de manière incrémentielle à la table cible.
  4. Retourne à l’étape 1 pour charger l’instantané suivant et sa version.

Limitations

L’interface Python de Delta Live Tables a les limitations suivantes :

La fonction pivot() n’est pas prise en charge. L’opération pivot dans Spark nécessite un chargement hâtif des données d’entrée pour calculer le schéma de la sortie. Cette fonctionnalité n’est pas prise en charge dans Delta Live Tables.