Freigeben über


Python-Sprachreferenz zu Delta Live Tables

Dieser Artikel enthält ausführliche Informationen zur Python-Programmierschnittstelle für Delta Live Tables.

Informationen zur SQL-API finden Sie in der Sprachreferenz zu Delta Live Tables.

Ausführliche Informationen zum Konfigurieren des automatischen Ladens finden Sie unter Automatisches Laden.

Vorbemerkungen

Beachten Sie die folgenden wichtigen Punkte, wenn Sie Pipelines mit der Python-Schnittstelle für Delta Live Tables implementieren:

  • Die Python-Funktionen table() und view() werden im Rahmen der Planung und Ausführung einer Pipelineaktualisierung mehrmals aufgerufen. Fügen Sie daher in diese Funktionen keinen Code ein, der Nebenwirkungen haben kann (beispielsweise Code, der Daten ändert oder eine E-Mail sendet). Um unerwartetes Verhalten zu vermeiden, sollten Ihre Python-Funktionen, die Datasets definieren, nur den Code enthalten, der zum Definieren der Tabelle oder Sicht erforderlich ist.
  • Verwenden Sie Ereignishooks, um Vorgänge wie das Senden von E-Mails oder die Integration in einen externen Überwachungsdienst auszuführen. Das gilt insbesondere bei Funktionen, die Datasets definieren. Die Implementierung dieser Vorgänge in den Funktionen, die Ihre Datasets definieren, führt zu unerwartetem Verhalten.
  • Die Python-Funktionen table und view müssen einen DataFrame zurückgeben. Einige Funktionen, die mit DataFrames arbeiten, geben keine zurück und sollten deshalb nicht verwendet werden. Diese Vorgänge umfassen Funktionen wie collect(), count(), toPandas(), save() und saveAsTable(). Weil DataFrame-Transformationen ausgeführt werden, nachdem das vollständige Dataflowdiagramm aufgelöst wurde, könnte die Verwendung solcher Vorgänge unbeabsichtigte Nebenwirkungen haben.

Importieren des Python-Moduls dlt

Python-Funktionen für Delta Live Tables werden im dlt-Modul definiert. Ihre mit der Python-API implementierten Pipelines müssen dieses Modul importieren:

import dlt

Erstellen einer materialisierten Sicht oder Streamingtabelle für Delta Live Tables

In Python bestimmt Delta Live Tables basierend auf der definierenden Abfrage, ob ein Dataset als materialisierte Sicht oder als Streamingtabelle aktualisiert werden soll. Der @table-Dekorateur kann verwendet werden, um sowohl materialisierte Ansichten als auch Streamingtabellen zu definieren.

Um eine materialisierte Sicht in Python zu definieren, wenden Sie @table auf eine Abfrage an, die einen statischen Lesevorgang für eine Datenquelle ausführt. Um eine Streamingtabelle zu definieren, wenden Sie @table auf eine Abfrage an, die einen Streaming-Lesevorgang für eine Datenquelle durchführt oder die Funktion create_streaming_table() verwendet. Beide Datasettypen haben die gleiche Syntaxspezifikation:

Hinweis

Damit das Argument cluster_by zum Aktivieren des Liquid Clustering verwenden werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Erstellen einer Delta Live Tables-Sicht

Um eine Sicht in Python zu definieren, wenden Sie den @view-Decorator an. Sichten können in Delta Live Tables genau wie das Decorator-Element @table entweder für statische Datasets oder für Streamingdatasets verwendet werden. Hier sehen Sie die Syntax zum Definieren von Sichten mit Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Beispiel: Definieren von Tabellen und Sichten

Wenden Sie das Decorator-Element @dlt.view oder @dlt.table auf eine Funktion an, um eine Sicht oder Tabelle in Python zu definieren. Sie können den Funktionsnamen oder den name-Parameter verwenden, um den Tabellen- oder Sichtnamen zuzuweisen. Im folgenden Beispiel werden zwei verschiedene Datasets definiert: eine Sicht namens taxi_raw, die eine JSON-Datei als Eingabequelle verwendet, und eine Tabelle namens filtered_data, die die taxi_raw-Ansicht als Eingabe verwendet:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Beispiel: Zugreifen auf ein Dataset, das in der gleichen Pipeline definiert ist

Neben dem Lesen aus externen Datenquellen können Sie mit der Delta Live Tables-Funktion read() auch auf Datasets zugreifen, die in der gleichen Pipeline definiert sind. Im folgenden Beispiel wird das Erstellen eines customers_filtered-Datasets mithilfe der Funktion read() veranschaulicht:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Sie können auch die Funktion spark.table() verwenden, um auf ein Dataset zuzugreifen, das in der gleichen Pipeline definiert ist. Wenn Sie die spark.table()-Funktion verwenden, um auf ein Dataset zuzugreifen, das in der Pipeline definiert ist, stellen Sie im Funktionsargument dem Datasetnamen das LIVE-Schlüsselwort voran:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Beispiel: Lesen aus einer Tabelle, die in einem Metastore registriert ist

Wenn Sie Daten aus einer Tabelle lesen möchten, die im Hive-Metastore registriert ist, lassen Sie im Funktionsargument das Schlüsselwort LIVE weg, und qualifizieren Sie optional den Tabellennamen mit dem Datenbanknamen:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Ein Beispiel für das Lesen aus einer Unity Catalog-Tabelle finden Sie unter Erfassen von Daten in einer Unity Catalog-Pipeline.

Beispiel: Zugreifen auf ein Dataset unter Verwendung von spark.sql

Sie können ein Dataset auch mithilfe eines spark.sql-Ausdrucks in einer Abfragefunktion zurückgeben. Um aus einem internen Dataset zu lesen, müssen Sie dem Datasetnamen LIVE. voranstellen:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Erstellen einer Tabelle, die als Ziel von Streamingvorgängen verwendet werden soll

Verwenden Sie die create_streaming_table()-Funktion, um eine Zieltabelle für die Ausgabe von Datensätzen durch Streamingvorgänge zu erstellen, einschließlich apply_changes(), apply_changes_from_snapshot() und @append_flow-Ausgabedatensätze.

Hinweis

Die Funktionen create_target_table() und create_streaming_live_table() sind veraltet. Databricks empfiehlt das Aktualisieren des vorhandenen Codes, um die create_streaming_table()-Funktion zu verwenden.

Hinweis

Damit das Argument cluster_by zum Aktivieren des Liquid Clustering verwenden werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumente
name

Typ: str

Der Tabellenname.

Dieser Parameter ist erforderlich.
comment

Typ: str

Dies ist eine optionale Beschreibung für die Tabelle
spark_conf

Typ: dict

Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage.
table_properties

Typ: dict

Eine optionale Liste der Tabelleneigenschaften für die Tabelle
partition_cols

Typ: array

Eine optionale Liste einer oder mehrerer Spalten, die zum Partitionieren der Tabelle verwendet werden sollen
cluster_by

Typ: array

Optional können Sie das Liquid Clustering in der Tabelle aktivieren und die Spalten so definieren, die sie als Clusteringschlüssel verwendet werden.

Weitere Informationen finden Sie unter Verwenden von Liquid Clustering für Delta-Tabellen.
path

Typ: str

Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline.
schema

Typ: str oder StructType

Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python definiert werden
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Optionale Datenqualitätseinschränkungen für die Tabelle. Siehe Mehrere Erwartungen.
row_filter (Public Preview)

Typ: str

Eine optionale Zeilenfilterklausel für die Tabelle. Siehe Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken.

Steuern der Materialisierung von Tabellen

Tabellen bieten auch zusätzliche Steuerung ihrer Materialisierung:

  • Geben Sie an, wie Tabellen mit partition_colspartitioniert werden. Sie können die Partitionierung verwenden, um Abfragen zu beschleunigen.
  • Sie können Tabelleneigenschaften festlegen, wenn Sie eine Sicht oder Tabelle definieren. Weitere Informationen finden Sie unter Delta Live Tables-Tabelleneigenschaften.
  • Legen Sie mithilfe der path-Einstellung einen Speicherort für Tabellendaten fest. Standardmäßig werden Tabellendaten am Speicherort der Pipeline gespeichert, wenn path nicht festgelegt ist.
  • Sie können Generierte Spalten in Ihrer Schemadefinition verwenden. Weitere Informationen finden Sie unter Beispiel: Angeben des Schemas und der Partitionsspalten.

Hinweis

Für Tabellen mit einer Größe von weniger als 1 TB empfiehlt Databricks, Delta Live Tables die Strukturierung der Daten steuern zu lassen. Wenn Sie nicht davon ausgehen, dass die Größe Ihrer Tabelle über ein Terabyte hinausgeht, sollten Sie im Allgemeinen keine Partitionsspalten angeben.

Beispiel: Angeben des Schemas und der Partitionsspalten

Sie können optional ein Tabellenschema mithilfe eines Python-StructType oder einer SQL-DDL-Zeichenfolge angeben. Wenn eine DDL-Zeichenfolge angegeben wird, kann die Definition generierte Spalten enthalten.

Im folgenden Beispiel wird eine Tabelle namens sales mit einem Schema erstellt, das mithilfe eines Python-Strukturtyps StructType angegeben wird:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Im folgenden Beispiel wird das Schema für eine Tabelle mithilfe einer DDL-Zeichenfolge angegeben, eine generierte Spalte definiert und eine Partitionsspalte definiert:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Standardmäßig leitet Delta Live Tables das Schema aus der table-Definition ab, wenn Sie kein Schema angeben.

Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden

Hinweis

  • Das Flag skipChangeCommits funktioniert nur, wenn spark.readStream die Funktion option() verwendet. Das Flag kann nicht in einer dlt.read_stream()-Funktion verwendet werden.
  • Sie können das skipChangeCommits-Flag nicht verwenden, wenn die Quellstreamingtabelle als Ziel einer apply_changes()-Funktion definiert ist.

Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn eine Streamingtabelle eine andere Streamingtabelle als Quelle verwendet und die Quellstreamingtabelle Aktualisierungen oder Löschvorgänge erfordert (etwa aufgrund des Rechts auf Vergessen der DSGVO), kann das Flag skipChangeCommits beim Lesen der Quellstreamingtabelle festgelegt werden, um diese Änderungen zu ignorieren. Weitere Informationen zu diesem Flag finden Sie unter Ignorieren von Updates und Löschungen.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Beispiel: Definieren von Tabellenconstraints

Wichtig

Tabellenconstraints befinden sich in Public Preview.

Beim Angeben eines Schemas können Sie Primär- und Fremdschlüssel definieren. Die Einschränkungen dienen der Information und werden nicht erzwungen. Weitere Informationen finden Sie unter der CONSTRAINT-Klausel in der SQL-Sprachreferenz.

Im folgenden Beispiel wird eine Tabelle mit einer Primär- und Fremdschlüsseleinschränkung definiert:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Beispiel: Definieren eines Zeilenfilters und einer Spaltenmaske

Wichtig

Zeilenfilter und Spaltenformate befinden sich im Public Preview.

Um eine materialisierte Sicht oder Streamingtabelle mit einem Zeilenfilter und einer Spaltenmaske zu erstellen, verwenden Sie die ROW FILTER-Klausel und die MASK-Klausel. Im folgenden Beispiel wird veranschaulicht, wie Sie eine materialisierte Sicht und eine Streamingtabelle mit einem Zeilenfilter und einer Spaltenmaske definieren:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Weitere Informationen zu Zeilenfiltern und Spaltenmasken finden Sie unter Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken.

Eigenschaften von Python-Delta Live Tables

In den folgenden Tabellen werden die Optionen und Eigenschaften beschrieben, die Sie beim Definieren von Tabellen und Sichten mit Delta Live Tables angeben können:

Hinweis

Damit das Argument cluster_by zum Aktivieren des Liquid Clustering verwenden werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.

@table oder @view
name

Typ: str

Ein optionaler Name für die Tabelle oder Sicht. Wenn er nicht definiert ist, wird der Funktionsname als Tabellen- oder Sichtname verwendet.
comment

Typ: str

Dies ist eine optionale Beschreibung für die Tabelle
spark_conf

Typ: dict

Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage.
table_properties

Typ: dict

Eine optionale Liste der Tabelleneigenschaften für die Tabelle
path

Typ: str

Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline.
partition_cols

Typ: a collection of str

Eine optionale Sammlung – beispielsweise eine Liste (list) – mit mindestens einer Spalte zum Partitionieren der Tabelle.
cluster_by

Typ: array

Optional können Sie das Liquid Clustering in der Tabelle aktivieren und die Spalten so definieren, die sie als Clusteringschlüssel verwendet werden.

Weitere Informationen finden Sie unter Verwenden von Liquid Clustering für Delta-Tabellen.
schema

Typ: str oder StructType

Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python StructType definiert werden
temporary

Typ: bool

Erstellen Sie eine Tabelle, veröffentlichen Sie jedoch keine Metadaten für die Tabelle. Das Schlüsselwort temporary weist Delta Live Tables an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, auf die aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, wird eine temporäre Tabelle für die Lebensdauer der Pipeline beibehalten, die sie erstellt, und nicht nur für ein einzelnes Update.

Die Standardeinstellung lautet „false“.
row_filter (Public Preview)

Typ: str

Eine optionale Zeilenfilterklausel für die Tabelle. Siehe Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken.
Tabellen- oder Sichtdefinition
def <function-name>()

Eine Python-Funktion, die das Dataset definiert. Wenn der name-Parameter nicht festgelegt ist, wird <function-name> als Zieldatasetname verwendet.
query

Eine Spark SQL-Anweisung, die ein Spark-Dataset oder einen Koalas-DataFrame zurückgibt.

Verwenden Sie dlt.read() oder spark.table() für das vollständige Lesen aus einem Dataset, das in derselben Pipeline definiert ist. Wenn Sie die spark.table()-Funktion verwenden, um aus einem Dataset zu lesen, das in derselben Pipeline definiert ist, stellen Sie im Funktionsargument dem Datasetnamen das LIVE-Schlüsselwort voran. So lesen Sie beispielsweise aus einem Dataset mit dem Namen customers:

spark.table("LIVE.customers")

Sie können auch die spark.table()-Funktion verwenden, um aus einer im Metastore registrierten Tabelle zu lesen, indem Sie das LIVE-Schlüsselwort weglassen und optional den Tabellennamen mit dem Datenbanknamen qualifizieren:

spark.table("sales.customers")

Verwenden Sie dlt.read_stream() für das Streaming aus einem Dataset, das in derselben Pipeline definiert ist.

Verwenden Sie die spark.sql-Funktion, um eine SQL-Abfrage zum Erstellen des Rückgabedatasets zu definieren.

Verwenden Sie die PySpark-Syntax, um Delta Live Tables-Abfragen mit Python zu definieren.
Erwartungen
@expect("description", "constraint")

Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch
description. Wenn eine Zeile gegen die Erwartung verstößt, schließen Sie die Zeile in das Zieldataset ein.
@expect_or_drop("description", "constraint")

Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch
description. Wenn eine Zeile gegen die Erwartung verstößt, löschen Sie die Zeile aus dem Zieldataset.
@expect_or_fail("description", "constraint")

Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch
description. Wenn eine Zeile gegen die Erwartung verstößt, beenden Sie die Ausführung sofort.
@expect_all(expectations)

Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen.
expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, schließen Sie die Zeile in das Zieldataset ein.
@expect_all_or_drop(expectations)

Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen.
expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, löschen Sie die Zeile aus dem Zieldataset.
@expect_all_or_fail(expectations)

Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen.
expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, beenden Sie die Ausführung sofort.

Ändern der Datenerfassung aus einem Änderungsfeed mit Python in Delta Live Tables

Verwenden Sie die Funktion apply_changes() in der Python-API, um Delta Live Tables Change Data Capture (CDC)-Funktionen zum Verarbeiten von Quelldaten aus einem Änderungsdatenfeed (CDF) zu verwenden.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der apply_changes()-Zieltabelle angeben, müssen Sie die Spalten __START_AT und __END_AT mit demselben Datentyp wie sequence_by-Felder angeben.

Zum Erstellen der erforderlichen Zieltabelle können Sie die Funktion create_streaming_table() in der Python-Schnittstelle Delta Live Tables verwenden.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Hinweis

Bei APPLY CHANGES-Verarbeitung ist das Standardverhalten für INSERT- und UPDATE-Ereignisse das upsert CDC-Ereignisse aus der Quelle: Aktualisieren Sie alle Zeilen in der Zieltabelle, die den angegebenen Schlüsseln entsprechen, oder fügen Sie eine neue Zeile ein, wenn ein übereinstimmender Datensatz nicht in der Zieltabelle vorhanden ist. Die Behandlung von DELETE-Ereignissen kann mit der APPLY AS DELETE WHEN-Bedingung angegeben werden.

Weitere Informationen zur CDC-Verarbeitung mit einem Änderungsfeed finden Sie unter APPLY CHANGES-APIs: Vereinfachen der Datensatzerfassung von Änderungsdaten mit Delta Live Tables. Ein Beispiel für die Verwendung der apply_changes()-Funktion finden Sie unter Beispiel: SCD-Typ 1 und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das apply_changes-Zieltabellenschema angeben, müssen Sie auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by angeben.

Siehe Die APPLY CHANGES-APIs: Vereinfachen der Änderungsdatenerfassung mit Delta Live Tables.

Argumente
target

Typ: str

Der Name der zu aktualisierenden Tabelle. Sie können die Funktion create_streaming_table() verwenden, um die Zieltabelle zu erstellen, bevor Sie die Funktion apply_changes() ausführen.

Dieser Parameter ist erforderlich.
source

Typ: str

Die Datenquelle, die CDC-Datensätze enthält.

Dieser Parameter ist erforderlich.
keys

Typ: list

Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

Sie können eins der folgenden Elemente angeben:

– Eine Liste von Zeichenfolgen: ["userId", "orderId"]
– Eine Liste von Spark-SQLcol()-Funktionen: [col("userId"), col("orderId"]

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist erforderlich.
sequence_by

Typ: str oder col()

Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Delta Live Tables verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die in nicht ordnungsgemäßer Reihenfolge eingehen.

Sie können eins der folgenden Elemente angeben:

– Eine Zeichenfolge: "sequenceNum"
– Eine Spark SQL-col()-Funktion: col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist erforderlich.
ignore_null_updates

Typ: bool

Ermöglicht das Erfassen von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und ignore_null_updates True ist, behalten Spalten mit einer null ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit dem Wert null. Wenn ignore_null_updates ist False, werden vorhandene Werte mit null-Werten überschrieben.

Dieser Parameter ist optional.

Der Standardwert ist False.
apply_as_deletes

Typ: str oder expr()

Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Um nicht sortierte Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Tombstone in der zugrunde liegenden Delta-Tabelle beibehalten, und im Metastore wird eine Sicht erstellt, die diese Tombstones herausfiltert. Das Aufbewahrungsintervall kann konfiguriert werden mit:
pipelines.cdc.tombstoneGCThresholdInSeconds (Tabelleneigenschaft).

Sie können eins der folgenden Elemente angeben:

– Eine Zeichenfolge: "Operation = 'DELETE'"
– Eine Spark SQL-expr()-Funktion: expr("Operation = 'DELETE'")

Dieser Parameter ist optional.
apply_as_truncates

Typ: str oder expr()

Gibt an, wann ein CDC-Ereignis als TRUNCATE der gesamten Tabelle behandelt werden sollte. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.

Der Parameter apply_as_truncates wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt keine Abkürzungsvorgänge.

Sie können eins der folgenden Elemente angeben:

– Eine Zeichenfolge: "Operation = 'TRUNCATE'"
– Eine Spark SQL-expr()-Funktion: expr("Operation = 'TRUNCATE'")

Dieser Parameter ist optional.
column_list

except_column_list

Typ: list

Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Verwenden Sie column_list, um die vollständige Liste der einzuschließenden Spalten anzugeben. Verwenden Sie except_column_list, um die auszuschließende Spalten anzugeben. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col()-Funktionen deklarieren:

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein column_list- oder except_column_list-Argument an die Funktion übergeben wird.
stored_as_scd_type

Typ: str oder int

Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

Für SCD-Typ 1 auf 1 oder für SCD-Typ 2 auf 2 festgelegt.

Diese Klausel ist optional.

Der Standardwert ist SCD-Typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Eine Teilmenge der Ausgabespalten, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Verwenden Sie track_history_column_list, um die vollständige Liste der Spalten anzugeben, die nachverfolgt werden sollen. Verwendung
track_history_except_column_list zum Angeben der Spalten, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col()-Funktionen deklarieren:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list- oder
track_history_except_column_list-Argument an die Funktion übergeben wird.

Ändern der Datenerfassung aus Datenbankmomentaufnahmen mit Python in Delta Live Tables

Wichtig

Die APPLY CHANGES FROM SNAPSHOT-API befindet sich in der öffentlichen Vorschau.

Verwenden Sie die apply_changes_from_snapshot()-Funktion in der Python-API, um Delta Live Tables Change Data Capture (CDC)-Funktionalität zu verwenden, um Quelldaten aus Datenbankmomentaufnahmen zu verarbeiten.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der apply_changes_from_snapshot()-Zieltabelle angeben, müssen Sie auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by angeben.

Zum Erstellen der erforderlichen Zieltabelle können Sie die Funktion create_streaming_table() in der Python-Schnittstelle Delta Live Tables verwenden.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Hinweis

Bei APPLY CHANGES FROM SNAPSHOT-Verarbeitung besteht das Standardverhalten darin, eine neue Zeile einzufügen, wenn im Ziel kein übereinstimmender Datensatz mit demselben Schlüssel vorhanden ist. Wenn ein übereinstimmender Datensatz vorhanden ist, wird er nur aktualisiert, wenn sich einer der Werte in der Zeile geändert hat. Zeilen mit Schlüsseln, die im Ziel vorhanden sind, aber nicht mehr in der Quelle vorhanden sind, werden gelöscht.

Weitere Informationen zur CDC-Verarbeitung mit Momentaufnahmen finden Sie unter APPLY CHANGES-APIs: Vereinfachen der Datensatzerfassung von Änderungsdaten mit Delta Live Tables. Beispiele für die Verwendung der apply_changes_from_snapshot()-Funktion finden Sie in den Beispielen für die regelmäßige Aufnahme von Momentaufnahmen und historischen Momentaufnahmen.

Argumente
target

Typ: str

Der Name der zu aktualisierenden Tabelle. Sie können die Funktion create_streaming_table() verwenden, um die Zieltabelle zu erstellen, bevor Sie die Funktion apply_changes() ausführen.

Dieser Parameter ist erforderlich.
source

Typ: str oder lambda function

Entweder der Name einer Tabelle oder Ansicht, von der in regelmäßigen Abständen eine Momentaufnahme zu nehmen ist, oder eine Python-Lambda-Funktion, die den zu verarbeitenden Snapshot DataFrame und die Momentaufnahmeversion zurückgibt. Siehe Implementieren des Quellarguments.

Dieser Parameter ist erforderlich.
keys

Typ: list

Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

Sie können eins der folgenden Elemente angeben:

– Eine Liste von Zeichenfolgen: ["userId", "orderId"]
– Eine Liste von Spark-SQLcol()-Funktionen: [col("userId"), col("orderId"]

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist erforderlich.
stored_as_scd_type

Typ: str oder int

Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

Für SCD-Typ 1 auf 1 oder für SCD-Typ 2 auf 2 festgelegt.

Diese Klausel ist optional.

Der Standardwert ist SCD-Typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Eine Teilmenge der Ausgabespalten, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Verwenden Sie track_history_column_list, um die vollständige Liste der Spalten anzugeben, die nachverfolgt werden sollen. Verwendung
track_history_except_column_list zum Angeben der Spalten, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col()-Funktionen deklarieren:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list- oder
track_history_except_column_list-Argument an die Funktion übergeben wird.

Implementieren des source-Arguments

Die apply_changes_from_snapshot()-Funktion enthält das source-Argument. Für die Verarbeitung historischer Momentaufnahmen wird erwartet, dass das source-Argument eine Python-Lambda-Funktion ist, die zwei Werte an die apply_changes_from_snapshot()-Funktion zurückgibt: Ein Python DataFrame, der die zu verarbeitenden Momentaufnahmen und eine Momentaufnahmeversion enthält.

Im Folgenden sehen Sie die Signatur der Lambda-Funktion:

lambda Any => Optional[(DataFrame, Any)]
  • Das Argument für die Lambda-Funktion ist die zuletzt verarbeitete Momentaufnahmeversion.
  • Der Rückgabewert der Lambda-Funktion ist None oder ein Tupel von zwei Werten: Der erste Wert des Tupels ist ein DataFrame, der die zu verarbeitende Momentaufnahme enthält. Der zweite Wert des Tupels ist die Momentaufnahmeversion, welche die logische Reihenfolge der Momentaufnahme darstellt.

Ein Beispiel, das die Lambda-Funktion implementiert und aufruft:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Die Delta Live Tables-Laufzeit führt jedes Mal die folgenden Schritte aus, wenn die Pipeline, welche die apply_changes_from_snapshot()-Funktion enthält, ausgelöst wird:

  1. Führt die next_snapshot_and_version-Funktion aus, um den nächsten Snapshot DataFrame und die entsprechende Momentaufnahmeversion zu laden.
  2. Wenn kein DataFrame zurückgegeben wird, wird die Ausführung beendet, und das Pipelineupdate ist als abgeschlossen markiert.
  3. Erkennt die Änderungen in der neuen Momentaufnahme und wendet sie inkrementell auf die Zieltabelle an.
  4. Kehrt zu Schritt 1 zurück, um die nächste Momentaufnahme und die zugehörige Version zu laden.

Einschränkungen

Die Python-Schnittstelle von Delta Live Tables unterliegt folgender Einschränkung:

Die pivot()-Funktion wird nicht unterstützt. Der pivot-Vorgang in Spark erfordert Eager Loading von Eingabedaten, um das Schema der Ausgabe zu berechnen. Diese Funktion wird in Delta Live Tables nicht unterstützt.