Condividi tramite


Informazioni di riferimento sul linguaggio Python per Delta Live Tables

Questo articolo contiene informazioni dettagliate sull'interfaccia di programmazione Python di Delta Live Tables.

Per informazioni sull'API SQL, vedere le informazioni di riferimento sul linguaggio SQL per Delta Live Tables.

Per informazioni dettagliate sulla configurazione del caricatore automatico, vedere Che cos'è il caricatore automatico?.

Prima di iniziare

Di seguito sono riportate considerazioni importanti quando si implementano le pipeline con l'interfaccia Python di Delta Live Tables:

  • Poiché le funzioni table() e view() Python vengono richiamate più volte durante la pianificazione e l'esecuzione degli aggiornamenti della pipeline, non includere il codice in una di queste funzioni che potrebbero avere effetti collaterali, ad esempio codice che modifica i dati o invia un messaggio di posta elettronica. Per evitare comportamenti imprevisti, le funzioni Python che definiscono i set di dati devono includere solo il codice necessario per definire la tabella o la vista.
  • Per eseguire operazioni come l'invio di messaggi di email o l'integrazione con un servizio di monitoraggio esterno, in particolare nelle funzioni che definiscono i set di dati, usare hook di eventi. L'implementazione di queste operazioni nelle funzioni che definiscono i set di dati causerà un comportamento imprevisto.
  • Le funzioni table e view Python devono restituire un DataFrame. Alcune funzioni che operano sui DataFrame non restituiscono DataFrame e non devono essere usate. Queste operazioni includono funzioni come collect(), count(), toPandas(), save() e saveAsTable(). Poiché le trasformazioni del DataFrame vengono eseguite dopo la risoluzione del grafico completo del flusso di dati, l'uso di tali operazioni potrebbe avere effetti collaterali imprevisti.

Importare il modulo Python dlt

Le funzioni Python di Delta Live Tables sono definite nel modulo dlt. Le pipeline implementate con l'API Python devono importare questo modulo:

import dlt

Creare una vista materializzata o una tabella di streaming Delta Live Tables

In Python, Delta Live Tables determina se aggiornare un set di dati come vista materializzata o tabella di streaming in base alla query di definizione. L'espressione Decorator @table può essere usata per definire sia le viste materializzate che le tabelle di streaming.

Per definire una vista materializzata in Python, applicare @table a una query che esegue una lettura statica su un'origine dati. Per definire una tabella di streaming, applicare @table a una query che esegue un flusso letto su un'origine dati o usare la funzione create_streaming_table(). Entrambi i tipi di set di dati hanno la stessa specifica di sintassi indicata di seguito:

Nota

Per usare l'argomento cluster_by per abilitare il clustering liquido, la pipeline deve essere configurata per l'uso del canale di anteprima.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Creare una vista Delta Live Tables

Per definire una visualizzazione in Python, applicare l’espressione Decorator @view. Analogamente all'espressione Decorator @table, è possibile usare le viste in Delta Live Tables per set di dati statici o di streaming. Di seguito è riportata la sintassi per la definizione delle viste con Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Esempio: definire tabelle e visualizzazioni

Per definire una tabella o una vista in Python, applicare l'espressione Decorator @dlt.view o @dlt.table a una funzione. È possibile usare il nome della funzione o il parametro name per assegnare il nome della tabella o della vista. L'esempio seguente definisce due set di dati diversi: una vista denominata taxi_raw che accetta un file JSON come origine di input e una tabella denominata filtered_data che accetta la vista taxi_raw come input:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Esempio: accedere a un set di dati definito nella stessa pipeline

Oltre a leggere da origini dati esterne, è possibile accedere ai set di dati definiti nella stessa pipeline con la funzione read() di Delta Live Tables. L'esempio seguente illustra la creazione di un set di dati customers_filtered usando la funzione read():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

È anche possibile usare la funzione spark.table() per accedere a un set di dati definito nella stessa pipeline. Quando si usa la funzione spark.table() per accedere a un set di dati definito nella pipeline, nell'argomento della funzione viene anteposta la parola chiave LIVE al nome del set di dati:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Esempio: leggere da una tabella registrata in un metastore

Per leggere i dati da una tabella registrata nel metastore Hive, nell'argomento della funzione omettere la parola chiave LIVE e scegliere se qualificare il nome della tabella con il nome del database:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Per un esempio di lettura da una tabella di Unity Catalog, vedere Inserire dati in una pipeline di Unity Catalog.

Esempio: accedere a un set di dati usando spark.sql

È anche possibile restituire un set di dati usando un'espressione spark.sql in una funzione di query. Per leggere da un set di dati interno, anteporre LIVE. al nome del set di dati:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Creare una tabella da usare come destinazione delle operazioni di streaming

Usare la funzione create_streaming_table() per creare una tabella di destinazione per i record restituiti dalle operazioni di streaming, tra cui i record di output apply_changes(), apply_changes_from_snapshot() e @append_flow.

Nota

Le funzioni create_target_table() e create_streaming_live_table() sono deprecate. Databricks consiglia di aggiornare il codice esistente per usare la funzione create_streaming_table().

Nota

Per usare l'argomento cluster_by per abilitare il clustering liquido, la pipeline deve essere configurata per l'uso del canale di anteprima.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argomenti
name

Tipo: str

Il nome della tabella.

Questo parametro è obbligatorio.
comment

Tipo: str

Una descrizione facoltativa per la tabella.
spark_conf

Tipo: dict

Elenco facoltativo delle configurazioni di Spark per l'esecuzione di questa query.
table_properties

Tipo: dict

Elenco facoltativo di proprietà della tabella per la tabella.
partition_cols

Tipo: array

Elenco facoltativo di una o più colonne da utilizzare per il partizionamento della tabella.
cluster_by

Tipo: array

Facoltativamente, abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering.

Vedere Usare il clustering liquido per le tabelle Delta.
path

Tipo: str

Posizione di archiviazione facoltativa per i dati della tabella. Se non è impostata, il sistema usa per impostazione predefinita la posizione di archiviazione della pipeline.
schema

Tipo: str o StructType

Definizione facoltativa dello schema per la tabella. Gli schemi possono essere definiti come stringa DDL SQL o con Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Tipo: dict

Vincoli di qualità dei dati facoltativi per la tabella. Vedere più aspettative.
row_filter (Anteprima pubblica)

Tipo: str

Clausola di filtro di riga facoltativa per la tabella. Vedere Pubblicare tabelle con filtri di riga o maschere di colonna.

Controllare la modalità di materializzazione delle tabelle

Le tabelle offrono anche un controllo aggiuntivo della materializzazione:

  • Specificare la modalità di partizionamento delle tabelle tramite partition_cols. È possibile usare il partizionamento per velocizzare le query.
  • È possibile impostare le proprietà della tabella quando si definisce una vista o una tabella. Vedere Proprietà della tabella Delta Live Tables.
  • Impostare una posizione di archiviazione per i dati della tabella usando l'impostazione path. Per impostazione predefinita, i dati della tabella vengono archiviati nella posizione di archiviazione della pipeline, se path non è impostato.
  • È possibile usare colonne generate nella definizione dello schema. Vedere Esempio: specificare uno schema e colonne di partizione.

Nota

Per le tabelle di dimensioni inferiori a 1 TB, Databricks consiglia di consentire a Delta Live Tables di controllare l'organizzazione dei dati. Non è consigliabile specificare colonne di partizione, a meno che la tabella non si accresca oltre un terabyte.

Esempio: specificare uno schema e colonne di partizione

Facoltativamente, è possibile specificare uno schema di tabella usando Python StructType o una stringa DDL SQL. Se specificato con una stringa DDL, la definizione può includere colonne generate.

L'esempio seguente crea una tabella denominata sales con uno schema specificato usando un StructType Python:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Nell'esempio seguente viene specificato lo schema per una tabella usando una stringa DDL, vengono definite una colonna generata e una colonna di partizione:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Per impostazione predefinita, Delta Live Tables deduce lo schema dalla definizione table, se non si specifica uno schema.

Configurare una tabella di streaming per ignorare le modifiche in una tabella di streaming di origine

Nota

  • Il flag skipChangeCommits funziona solo con spark.readStream usando la funzione option(). Non è possibile usare questo flag in una funzione dlt.read_stream().
  • Non è possibile usare il flag skipChangeCommits quando la tabella di streaming di origine è definita come destinazione di una funzione apply_changes().

Per impostazione predefinita, le tabelle di streaming richiedono origini di sola accodamento. Quando una tabella di streaming usa un'altra tabella di streaming come origine e la tabella di streaming di origine richiede aggiornamenti o eliminazioni, ad esempio l'elaborazione del GDPR "diritto all'oblio", il flag skipChangeCommitspuò essere impostato durante la lettura della tabella di streaming di origine per ignorare tali modifiche. Per altre informazioni su questo flag, vedere Ignorare gli aggiornamenti e le eliminazioni.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Esempio: Definire vincoli di tabella

Importante

I vincoli di tabella sono disponibili in Anteprima pubblica.

Quando si specifica uno schema, è possibile definire chiavi primarie ed esterne. I vincoli sono informativi e non vengono applicati. Vedere la clausola VINCOLO nel riferimento al linguaggio SQL.

Nell'esempio seguente viene definita una tabella con un vincolo di chiave primaria ed esterna:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Esempio: Definire un filtro di riga e una maschera di colonna

Importante

I filtri di riga e le maschere di colonna sono disponibili in anteprima pubblica.

Per creare una vista materializzata o una tabella di streaming con un filtro di riga e una maschera di colonna, utilizzare la clausola ROW FILTER e la clausola MASK. Nell'esempio seguente viene illustrato come definire una vista materializzata e una tabella di streaming con un filtro di riga e una maschera di colonna:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Per altre informazioni sui filtri di riga e sulle maschere di colonna, vedere Pubblicare tabelle con filtri di riga e maschere di colonna.

Proprietà di Delta Live Tables di Python

Le tabelle seguenti descrivono le opzioni e le proprietà che è possibile specificare durante la definizione di tabelle e viste con Delta Live Tables:

Nota

Per usare l'argomento cluster_by per abilitare il clustering liquido, la pipeline deve essere configurata per l'uso del canale di anteprima.

@table oppure @view
name

Tipo: str

Nome facoltativo per la tabella o la vista. Se non definito, il nome della funzione viene usato come nome della tabella o della vista.
comment

Tipo: str

Una descrizione facoltativa per la tabella.
spark_conf

Tipo: dict

Elenco facoltativo delle configurazioni di Spark per l'esecuzione di questa query.
table_properties

Tipo: dict

Elenco facoltativo di proprietà della tabella per la tabella.
path

Tipo: str

Posizione di archiviazione facoltativa per i dati della tabella. Se non è impostata, il sistema usa per impostazione predefinita la posizione di archiviazione della pipeline.
partition_cols

Tipo: a collection of str

Raccolta facoltativa, ad esempio, un list di una o più colonne da utilizzare per il partizionamento della tabella.
cluster_by

Tipo: array

Facoltativamente, abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering.

Vedere Usare il clustering liquido per le tabelle Delta.
schema

Tipo: str o StructType

Definizione facoltativa dello schema per la tabella. Gli schemi possono essere definiti come stringa DDL SQL o con un StructType Python.
temporary

Tipo: bool

Creare una tabella, ma non pubblicare i metadati per la tabella. La parola chiave temporary dà l’istruzione a Delta Live Tables di creare una tabella disponibile per la pipeline, ma che non deve essere accessibile all'esterno della pipeline. Per ridurre il tempo di elaborazione, viene mantenuta una tabella temporanea per la durata della pipeline che la crea e non solo per un singolo aggiornamento.

Il valore predefinito è "False".
row_filter (Anteprima pubblica)

Tipo: str

Clausola di filtro di riga facoltativa per la tabella. Vedere Tabelle con filtri di riga o maschere di colonna.
Definizione di tabella o vista
def <function-name>()

Una funzione Python che definisce il set di dati. Se il parametro name non è impostato, <function-name> viene usato come nome del set di dati di destinazione.
query

Istruzione SQL Spark che restituisce un set di dati Spark o un DataFrame Koalas.

Usare dlt.read() o spark.table() per eseguire una lettura completa da un set di dati definito nella stessa pipeline. Quando si usa la funzione spark.table() per leggere da un set di dati definito nella stessa pipeline, anteporre la parola chiave LIVE al nome del set di dati nell'argomento della funzione. Ad esempio, per leggere da un set di dati denominato customers:

spark.table("LIVE.customers")

È anche possibile usare la funzione spark.table() per leggere da una tabella registrata nel metastore omettendo la parola chiave LIVE e qualificando facoltativamente il nome della tabella con il nome del database:

spark.table("sales.customers")

Usare dlt.read_stream() per eseguire una lettura di streaming da un set di dati definito nella stessa pipeline.

Usare la funzione spark.sql per definire una query SQL e creare il set di dati restituito.

Usare la sintassi PySpark per definire query di Delta Live Tables con Python.
Aspettative
@expect("description", "constraint")

Dichiarare un vincolo di qualità dei dati identificato da
description. Se una riga viola le aspettative, includere la riga nel set di dati di destinazione.
@expect_or_drop("description", "constraint")

Dichiarare un vincolo di qualità dei dati identificato da
description. Se una riga viola le aspettative, escludere la riga dal set di dati di destinazione.
@expect_or_fail("description", "constraint")

Dichiarare un vincolo di qualità dei dati identificato da
description. Se una riga viola le aspettative, arrestare immediatamente l'esecuzione.
@expect_all(expectations)

Dichiarare uno o più vincoli di qualità dei dati.
expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, includere la riga nel set di dati di destinazione.
@expect_all_or_drop(expectations)

Dichiarare uno o più vincoli di qualità dei dati.
expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, escludere la riga dal set di dati di destinazione.
@expect_all_or_fail(expectations)

Dichiarare uno o più vincoli di qualità dei dati.
expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, arrestare immediatamente l'esecuzione.

Change Data Capture da un feed di modifiche con Python in Delta Live Tables

Usare la funzione apply_changes() nell'API Python per usare la funzionalità CDC (Change Data Capture) di Delta Live Tables per elaborare i dati di origine da un feed di dati delle modifiche (CDF).

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione apply_changes(), è necessario includere le colonne __START_AT e __END_AT con lo stesso tipo di dati dei campi sequence_by.

Per creare la tabella di destinazione necessaria, è possibile usare la funzione create_streaming_table() nell'interfaccia Python di Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Nota

Per l'elaborazione di APPLY CHANGES, il comportamento predefinito per INSERT e UPDATE gli eventi consiste nell'eseguire l'upsert degli eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando un record corrispondente non esiste nella tabella di destinazione. La gestione degli eventi DELETE può essere specificata con la condizione APPLY AS DELETE WHEN.

Per altre informazioni sull'elaborazione CDC con un feed di modifiche, vedere APPLICA MODIFICHE API: semplificare il Change Data Capture con Delta Live Tables. Per un esempio di utilizzo della funzione apply_changes(), vedere Esempio: elaborazione SCD di tipo 1 e SCD di tipo 2 con dati di origine CDF.

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione apply_changes, è necessario includere le colonne __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.

Vedere APPLICA MODIFICHE API: semplificare il Change Data Capture con Delta Live Tables.

Argomenti
target

Tipo: str

Il nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la funzione apply_changes().

Questo parametro è obbligatorio.
source

Tipo: str

L’origine dati contenente record CDC.

Questo parametro è obbligatorio.
keys

Tipo: list

La colonna o combinazione di colonne che identifica in modo univoco ogni riga. Viene usata per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione.

È possibile specificare uno dei due valori seguenti:

- Un elenco di stringhe: ["userId", "orderId"]
- Un elenco di funzioni col() Spark SQL: [col("userId"), col("orderId"]

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId).

Questo parametro è obbligatorio.
sequence_by

Tipo: str o col()

Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. Delta Live Tables usa questa sequenziazione per gestire gli eventi di modifica che arrivano senza ordinamento.

È possibile specificare uno dei due valori seguenti:

- Una stringa: "sequenceNum"
- Una funzione col() Spark SQL: col("sequenceNum")

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId).

La colonna specificata deve essere un tipo di dati ordinabile.

Questo parametro è obbligatorio.
ignore_null_updates

Tipo: bool

Consentire l'inserimento di aggiornamenti contenenti un sottoinsieme delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e ignore_null_updates è True, le colonne con un null mantengono i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con il valore null. Quando ignore_null_updates è False, i valori esistenti vengono sovrascritti con valori null.

Il parametro è facoltativo.

Il valore predefinito è False.
apply_as_deletes

Tipo: str o expr()

Specifica quando un evento CDC deve essere considerato come un DELETE anziché un upsert. Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come rimozione definitiva nella tabella Delta sottostante e viene creata una vista nel metastore che filtra tali punti di recupero dell'applicazione. L'intervallo di conservazione può essere configurato con le
pipelines.cdc.tombstoneGCThresholdInSeconds proprietà table.

È possibile specificare uno dei due valori seguenti:

- Una stringa: "Operation = 'DELETE'"
- Una funzione expr() Spark SQL: expr("Operation = 'DELETE'")

Il parametro è facoltativo.
apply_as_truncates

Tipo: str o expr()

Specifica quando un evento CDC deve essere considerato come una tabella TRUNCATE completa. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.

Il parametro apply_as_truncates è supportato solo per SCD di tipo 1. Il tipo SCD 2 non supporta le operazioni di troncamento.

È possibile specificare uno dei due valori seguenti:

- Una stringa: "Operation = 'TRUNCATE'"
- Una funzione expr() Spark SQL: expr("Operation = 'TRUNCATE'")

Il parametro è facoltativo.
column_list

except_column_list

Tipo: list

Un sottoinsieme di colonne da includere nella tabella di destinazione. Utilizzare column_list per specificare l'elenco completo di colonne da includere. Utilizzare except_column_list per specificare le colonne da escludere. È possibile dichiarare un valore come elenco di stringhe o come funzioni Spark SQL col():

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId).

Il parametro è facoltativo.

L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando alla funzione non viene passato alcun argomento column_list o except_column_list .
stored_as_scd_type

Tipo: str o int

Indica se archiviare i record come SCCD di tipo 1 o SCD di tipo 2.

Impostare su 1 per SCD di tipo 1 o 2 per SCD di tipo 2.

La clausola è facoltativa.

L'impostazione predefinita è il tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Un sottoinsieme di colonne di output della cui cronologia tenere traccia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo di colonne di cui tenere traccia. Utilizzo
track_history_except_column_list per specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni Spark SQL col():
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId).

Il parametro è facoltativo.

L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando non viene passato nessun argomento track_history_column_list o
track_history_except_column_list alla funzione.

Modificare l'acquisizione dei dati dagli snapshot del database con Python in Delta Live Tables

Importante

L'API APPLY CHANGES FROM SNAPSHOT si trova in anteprima pubblica.

Usare la funzione apply_changes_from_snapshot() nell'API Python per usare la funzionalità CDC (Delta Live Tables Change Data Capture) per elaborare i dati di origine dagli snapshot del database.

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione apply_changes_from_snapshot(), è necessario includere anche le colonne __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.

Per creare la tabella di destinazione necessaria, è possibile usare la funzione create_streaming_table() nell'interfaccia Python di Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Nota

Per l'elaborazione di APPLY CHANGES FROM SNAPSHOT, il comportamento predefinito consiste nell'inserire una nuova riga quando un record corrispondente con le stesse chiavi non esiste nella destinazione. Se esiste un record corrispondente, viene aggiornato solo se uno dei valori nella riga è stato modificato. Le righe con chiavi presenti nella destinazione ma non più presenti nell'origine vengono eliminate.

Per altre informazioni sull'elaborazione CDC con snapshot, vedere APPLICA MODIFICHE API: semplificare il Change Data Capture con Delta Live Tables. Per esempi di utilizzo della funzione apply_changes_from_snapshot(), vedere gli esempi periodici di inserimento di snapshot e inserimento di snapshot cronologici.

Argomenti
target

Tipo: str

Il nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la funzione apply_changes().

Questo parametro è obbligatorio.
source

Tipo: str o lambda function

Il nome di una tabella o di una vista per lo snapshot periodicamente o una funzione lambda Python che restituisce il DataFrame dello snapshot da elaborare e la versione dello snapshot. Vedere Implementare l'argomento di origine.

Questo parametro è obbligatorio.
keys

Tipo: list

La colonna o combinazione di colonne che identifica in modo univoco ogni riga. Viene usata per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione.

È possibile specificare uno dei due valori seguenti:

- Un elenco di stringhe: ["userId", "orderId"]
- Un elenco di funzioni col() Spark SQL: [col("userId"), col("orderId"]

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId).

Questo parametro è obbligatorio.
stored_as_scd_type

Tipo: str o int

Indica se archiviare i record come SCCD di tipo 1 o SCD di tipo 2.

Impostare su 1 per SCD di tipo 1 o 2 per SCD di tipo 2.

La clausola è facoltativa.

L'impostazione predefinita è il tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Un sottoinsieme di colonne di output della cui cronologia tenere traccia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo di colonne di cui tenere traccia. Utilizzo
track_history_except_column_list per specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni Spark SQL col():
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId) ma non col(source.userId).

Il parametro è facoltativo.

L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando non viene passato nessun argomento track_history_column_list o
track_history_except_column_list alla funzione.

Implementare l'argomento source

La funzione apply_changes_from_snapshot() include l'argomento source. Per l'elaborazione degli snapshot cronologici, si prevede che l'argomento source sia una funzione lambda Python che restituisce due valori alla funzione apply_changes_from_snapshot(): un DataFrame Python contenente i dati dello snapshot da elaborare e una versione snapshot.

Di seguito è riportata la firma della funzione lambda:

lambda Any => Optional[(DataFrame, Any)]
  • L'argomento della funzione lambda è la versione snapshot elaborata più di recente.
  • Il valore restituito della funzione lambda è None o una tupla di due valori: il primo valore della tupla è un DataFrame contenente lo snapshot da elaborare. Il secondo valore della tupla è la versione dello snapshot che rappresenta l'ordine logico dello snapshot.

Esempio che implementa e richiama la funzione lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Il runtime di Delta Live Tables esegue i passaggi seguenti ogni volta che viene attivata la pipeline che contiene la funzione apply_changes_from_snapshot():

  1. Esegue la funzione next_snapshot_and_version per caricare il DataFrame dello snapshot successivo e la versione dello snapshot corrispondente.
  2. Se non viene restituito alcun DataFrame, l'esecuzione viene terminata e l'aggiornamento della pipeline viene contrassegnato come completo.
  3. Rileva le modifiche nel nuovo snapshot e le applica in modo incrementale alla tabella di destinazione.
  4. Tornare al passaggio 1 per caricare lo snapshot successivo e la relativa versione.

Limitazioni

L'interfaccia Python di Delta Live Tables presenta la limitazione seguente:

La funzione pivot() non è ancora supportata. L'operazione pivot in Spark richiede il caricamento eager dei dati di input per calcolare lo schema dell'output. Questa funzionalità non è supportata in Delta Live Tables.