Python-Sprachreferenz zu Delta Live Tables

Dieser Artikel enthält Details für die Python-Programmierschnittstelle für Delta Live Tables.

Informationen zur SQL-API finden Sie in der Sprachreferenz zu Delta Live Tables.

Ausführliche Informationen zum Konfigurieren des automatischen Ladens finden Sie unter Automatisches Laden.

Einschränkungen

Für die Python-Schnittstelle von Delta Live Tables gelten die folgenden Einschränkungen:

  • Die Python-Funktionen table und view müssen einen DataFrame zurückgeben. Einige Funktionen, die mit DataFrames arbeiten, geben keine zurück und sollten deshalb nicht verwendet werden. Weil DataFrame-Transformationen ausgeführt werden, nachdem das vollständige Dataflowdiagramm aufgelöst wurde, könnte die Verwendung solcher Vorgänge unbeabsichtigte Nebenwirkungen haben. Diese Vorgänge umfassen Funktionen wie collect(), count(), toPandas(), save() und saveAsTable(). Sie können diese Funktionen jedoch außerhalb von table- oder view-Funktionsdefinitionen einbeziehen, weil dieser Code während der Graphinitialisierungsphase nur einmal ausgeführt wird.
  • Die pivot()-Funktion wird nicht unterstützt. Der pivot-Vorgang in Spark erfordert Eager Loading von Eingabedaten, um das Schema der Ausgabe zu berechnen. Diese Funktion wird in Delta Live Tables nicht unterstützt.

Importieren des Python-Moduls dlt

Python-Funktionen für Delta Live Tables werden im dlt-Modul definiert. Ihre mit der Python-API implementierten Pipelines müssen dieses Modul importieren:

import dlt

Erstellen einer materialisierten Sicht oder Streamingtabelle für Delta Live Tables

In Python bestimmt Delta Live Tables basierend auf der definierenden Abfrage, ob ein Dataset als materialisierte Sicht oder als Streamingtabelle aktualisiert werden soll. Das Decorator-Element @table wird verwendet, um materialisierte Sichten und Streamingtabellen zu definieren.

Um eine materialisierte Sicht in Python zu definieren, wenden Sie @table auf eine Abfrage an, die einen statischen Lesevorgang für eine Datenquelle ausführt. Um eine Streamingtabelle zu definieren, wenden Sie @table auf eine Abfrage an, die einen Streaminglesevorgang für eine Datenquelle ausführt. Beide Datasettypen haben die gleiche Syntaxspezifikation:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Erstellen einer Delta Live Tables-Sicht

Um eine Sicht in Python zu definieren, wenden Sie den @view-Decorator an. Sichten können in Delta Live Tables genau wie das Decorator-Element @table entweder für statische Datasets oder für Streamingdatasets verwendet werden. Hier sehen Sie die Syntax zum Definieren von Sichten mit Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Beispiel: Definieren von Tabellen und Sichten

Wenden Sie das Decorator-Element @dlt.view oder @dlt.table auf eine Funktion an, um eine Sicht oder Tabelle in Python zu definieren. Sie können den Funktionsnamen oder den name-Parameter verwenden, um den Tabellen- oder Sichtnamen zuzuweisen. Im folgenden Beispiel werden zwei verschiedene Datasets definiert: eine Sicht namens taxi_raw, die eine JSON-Datei als Eingabequelle verwendet, und eine Tabelle namens filtered_data, die die taxi_raw-Ansicht als Eingabe verwendet:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Beispiel: Zugreifen auf ein Dataset, das in der gleichen Pipeline definiert ist

Neben dem Lesen aus externen Datenquellen können Sie mit der Delta Live Tables-Funktion read() auch auf Datasets zugreifen, die in der gleichen Pipeline definiert sind. Im folgenden Beispiel wird das Erstellen eines customers_filtered-Datasets mithilfe der Funktion read() veranschaulicht:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Sie können auch die Funktion spark.table() verwenden, um auf ein Dataset zuzugreifen, das in der gleichen Pipeline definiert ist. Wenn Sie die spark.table()-Funktion verwenden, um auf ein Dataset zuzugreifen, das in der Pipeline definiert ist, stellen Sie im Funktionsargument dem Datasetnamen das LIVE-Schlüsselwort voran:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Beispiel: Lesen aus einer Tabelle, die in einem Metastore registriert ist

Wenn Sie Daten aus einer Tabelle lesen möchten, die im Hive-Metastore registriert ist, lassen Sie im Funktionsargument das Schlüsselwort LIVE weg, und qualifizieren Sie optional den Tabellennamen mit dem Datenbanknamen:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Ein Beispiel für das Lesen aus einer Unity Catalog-Tabelle finden Sie unter Erfassen von Daten in einer Unity Catalog-Pipeline.

Beispiel: Zugreifen auf ein Dataset unter Verwendung von spark.sql

Sie können ein Dataset auch mithilfe eines spark.sql-Ausdrucks in einer Abfragefunktion zurückgeben. Um aus einem internen Dataset zu lesen, müssen Sie dem Datasetnamen LIVE. voranstellen:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Schreiben in eine Streamingtabelle aus mehreren Quelldatenströmen

Wichtig

Die Unterstützung von Delta Live Tables für @append_flow befindet sich in der Public Preview.

Sie können den @append_flow-Dekorateur verwenden, um aus mehreren Streamingquellen in eine Streamingtabelle zu schreiben, um Folgendes auszuführen:

  • Hinzufügen und Entfernen von Streamingquellen, die Daten an eine vorhandene Streamingtabelle anfügen, ohne dass eine vollständige Aktualisierung erforderlich ist. Sie können beispielsweise eine Tabelle haben, in der regionale Daten aus jeder Region kombiniert werden, in der Sie arbeiten. Wenn neue Regionen eingeführt werden, können Sie der Tabelle die neuen Regionsdaten hinzufügen, ohne eine vollständige Aktualisierung durchzuführen.
  • Aktualisieren Sie eine Streamingtabelle, indem Sie fehlende historische Daten (Backfilling) anfügen. Sie haben beispielsweise eine vorhandene Streamingtabelle, in die ein Apache Kafka-Thema geschrieben wird. Sie haben auch historische Daten in einer Tabelle gespeichert, die Sie genau einmal in die Streamingtabelle eingefügt haben, und Sie können die Daten nicht streamen, da Sie eine komplexe Aggregation durchführen müssen, bevor Sie die Daten einfügen.

Verwenden Sie die Funktion create_streaming_table() zum Erstellen einer Zieltabelle für die Datensatzausgabe durch die @append_flow Verarbeitung.

Hinweis

Wenn Sie Datenqualitätseinschränkungen mit Erwartungendefinieren müssen, definieren Sie die Erwartungen an die Zieltabelle als Teil der create_streaming_table() Funktion. Sie können die Erwartungen in der @append_flow Definition nicht definieren.

Es folgt die Syntax für @append_flow:

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)

Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Themen

Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen erstellt kafka_target und in die Streamingtabelle aus zwei Kafka-Themen geschrieben:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

Beispiel: Ausführen eines einmaligen Datenrückfüllens

Im folgenden Beispiel wird eine Abfrage ausgeführt, um verlaufsgeschichtliche Daten an eine Streamingtabelle anzufügen:

Hinweis

Um sicherzustellen, dass ein wahrer einmaliger Rücklauf ausgeführt wird, wenn die Backfill-Abfrage Teil einer Pipeline ist, die auf geplanter oder kontinuierlicher Basis ausgeführt wird, entfernen Sie die Abfrage nach der Ausführung der Pipeline einmal. Um neue Daten anzufügen, wenn sie im Backfill-Verzeichnis eingeht, lassen Sie die Abfrage an Ort und Stelle.

import dlt

@dlt.table()
def csv_target():
  return spark.readStream.format("csv").load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream.format("csv").load("path/to/backfill/data/dir")

Erstellen einer Tabelle, die als Ziel von Streamingvorgängen verwendet werden soll

Verwenden Sie die create_streaming_table() Funktion, um eine Zieltabelle für die Ausgabe von Datensätzen durch Streamingvorgänge zu erstellen, einschließlich apply_changes() und @append_flow Ausgabedatensätze.

Hinweis

Die Funktionen create_target_table() und create_streaming_live_table() sind veraltet. Databricks empfiehlt das Aktualisieren des vorhandenen Codes, um die create_streaming_table()-Funktion zu verwenden.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumente
name

Typ: str

Der Tabellenname.

Dieser Parameter ist erforderlich.
comment

Typ: str

Dies ist eine optionale Beschreibung für die Tabelle
spark_conf

Typ: dict

Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage.
table_properties

Typ: dict

Eine optionale Liste der Tabelleneigenschaften für die Tabelle
partition_cols

Typ: array

Eine optionale Liste einer oder mehrerer Spalten, die zum Partitionieren der Tabelle verwendet werden sollen
path

Typ: str

Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline.
schema

Typ: str oder StructType

Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python definiert werden.
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Optionale Datenqualitätseinschränkungen für die Tabelle. Siehe Mehrere Erwartungen.

Steuern der Materialisierung von Tabellen

Tabellen bieten auch zusätzliche Steuerung ihrer Materialisierung:

  • Geben Sie an, wie Tabellen mit partition_colspartitioniert werden. Sie können die Partitionierung verwenden, um Abfragen zu beschleunigen.
  • Sie können Tabelleneigenschaften festlegen, wenn Sie eine Sicht oder Tabelle definieren. Weitere Informationen finden Sie unter Delta Live Tables-Tabelleneigenschaften.
  • Legen Sie mithilfe der path-Einstellung einen Speicherort für Tabellendaten fest. Standardmäßig werden Tabellendaten am Speicherort der Pipeline gespeichert, wenn path nicht festgelegt ist.
  • Sie können Generierte Spalten in Ihrer Schemadefinition verwenden. Weitere Informationen finden Sie unter Beispiel: Angeben des Schemas und der Partitionsspalten.

Hinweis

Für Tabellen mit einer Größe von weniger als 1 TB empfiehlt Databricks, Delta Live Tables die Strukturierung der Daten steuern zu lassen. Wenn Sie nicht davon ausgehen, dass die Größe Ihrer Tabelle über ein Terabyte hinausgeht, sollten Sie im Allgemeinen keine Partitionsspalten angeben.

Beispiel: Angeben des Schemas und der Partitionsspalten

Sie können optional ein Tabellenschema mithilfe eines Python-StructType oder einer SQL-DDL-Zeichenfolge angeben. Wenn eine DDL-Zeichenfolge angegeben wird, kann die Definition generierte Spalten enthalten.

Im folgenden Beispiel wird eine Tabelle namens sales mit einem Schema erstellt, das mithilfe eines Python-Strukturtyps StructType angegeben wird:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Im folgenden Beispiel wird das Schema für eine Tabelle mithilfe einer DDL-Zeichenfolge angegeben, eine generierte Spalte definiert und eine Partitionsspalte definiert:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Standardmäßig leitet Delta Live Tables das Schema aus der table-Definition ab, wenn Sie kein Schema angeben.

Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden

Hinweis

  • Das Flag skipChangeCommits funktioniert nur, wenn spark.readStream die Funktion option() verwendet. Das Flag kann nicht in einer dlt.read_stream()-Funktion verwendet werden.
  • Sie können das skipChangeCommits-Flag nicht verwenden, wenn die Quellstreamingtabelle als Ziel einer apply_changes()-Funktion definiert ist.

Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn eine Streamingtabelle eine andere Streamingtabelle als Quelle verwendet und die Quellstreamingtabelle Aktualisierungen oder Löschvorgänge erfordert (etwa aufgrund des Rechts auf Vergessen der DSGVO), kann das Flag skipChangeCommits beim Lesen der Quellstreamingtabelle festgelegt werden, um diese Änderungen zu ignorieren. Weitere Informationen zu diesem Flag finden Sie unter Ignorieren von Updates und Löschungen.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Eigenschaften von Python-Delta Live Tables

In den folgenden Tabellen werden die Optionen und Eigenschaften beschrieben, die Sie beim Definieren von Tabellen und Sichten mit Delta Live Tables angeben können:

@table oder @view
name

Typ: str

Ein optionaler Name für die Tabelle oder Sicht. Wenn er nicht definiert ist, wird der Funktionsname als Tabellen- oder Sichtname verwendet.
comment

Typ: str

Dies ist eine optionale Beschreibung für die Tabelle
spark_conf

Typ: dict

Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage.
table_properties

Typ: dict

Eine optionale Liste der Tabelleneigenschaften für die Tabelle
path

Typ: str

Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline.
partition_cols

Typ: a collection of str

Eine optionale Sammlung – beispielsweise eine Liste (list) – mit mindestens einer Spalte zum Partitionieren der Tabelle.
schema

Typ: str oder StructType

Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python definiert werden.
StructType.
temporary

Typ: bool

Erstellen Sie eine Tabelle, veröffentlichen Sie jedoch keine Metadaten für die Tabelle. Das Schlüsselwort temporary weist Delta Live Tables an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, auf die aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, wird eine temporäre Tabelle für die Lebensdauer der Pipeline beibehalten, die sie erstellt, und nicht nur für ein einzelnes Update.

Die Standardeinstellung lautet „false“.
Tabellen- oder Sichtdefinition
def <function-name>()

Eine Python-Funktion, die das Dataset definiert. Wenn der name-Parameter nicht festgelegt ist, wird <function-name> als Zieldatasetname verwendet.
query

Eine Spark SQL-Anweisung, die ein Spark-Dataset oder einen Koalas-DataFrame zurückgibt.

Verwenden Sie dlt.read() oder spark.table() für das vollständige Lesen aus einem Dataset, das in derselben Pipeline definiert ist. Wenn Sie die spark.table()-Funktion verwenden, um aus einem Dataset zu lesen, das in derselben Pipeline definiert ist, stellen Sie im Funktionsargument dem Datasetnamen das LIVE-Schlüsselwort voran. So lesen Sie beispielsweise aus einem Dataset mit dem Namen customers:

spark.table("LIVE.customers")

Sie können auch die spark.table()-Funktion verwenden, um aus einer im Metastore registrierten Tabelle zu lesen, indem Sie das LIVE-Schlüsselwort weglassen und optional den Tabellennamen mit dem Datenbanknamen qualifizieren:

spark.table("sales.customers")

Verwenden Sie dlt.read_stream() für das Streaming aus einem Dataset, das in derselben Pipeline definiert ist.

Verwenden Sie die spark.sql-Funktion, um eine SQL-Abfrage zum Erstellen des Rückgabedatasets zu definieren.

Verwenden Sie die PySpark-Syntax, um Delta Live Tables-Abfragen mit Python zu definieren.
Erwartungen
@expect("description", "constraint")

Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch
description. Wenn eine Zeile gegen die Erwartung verstößt, schließen Sie die Zeile in das Zieldataset ein.
@expect_or_drop("description", "constraint")

Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch
description. Wenn eine Zeile gegen die Erwartung verstößt, löschen Sie die Zeile aus dem Zieldataset.
@expect_or_fail("description", "constraint")

Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch
description. Wenn eine Zeile gegen die Erwartung verstößt, beenden Sie die Ausführung sofort.
@expect_all(expectations)

Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen.
expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, schließen Sie die Zeile in das Zieldataset ein.
@expect_all_or_drop(expectations)

Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen.
expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, löschen Sie die Zeile aus dem Zieldataset.
@expect_all_or_fail(expectations)

Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen.
expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, beenden Sie die Ausführung sofort.

Change Data Capture mit Python in Delta Live Tables

Verwenden Sie die apply_changes()-Funktion in der Python-API, um die CDC-Funktionalität in Delta Live Tables zu verwenden: Die Delta Live Tables-Python-Schnittstelle bietet auch die Funktion create_streaming_table(). Sie können diese Funktion verwenden, um die von der apply_changes() Funktion erforderliche Zieltabelle zu erstellen.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Hinweis

Das Standardverhalten für INSERT- und UPDATE-Ereignisse ist das Ausführen eines Upserts von CDC-Ereignissen aus der Quelle: das Aktualisieren aller Zeilen in der Zieltabelle, die mit den angegebenen Schlüsseln übereinstimmen, oder das Einfügen einer neuen Zeile, wenn kein übereinstimmender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung von DELETE-Ereignissen kann mit der APPLY AS DELETE WHEN-Bedingung angegeben werden.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der apply_changes-Zieltabelle angeben, müssen Sie auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by angeben.

Siehe Vereinfachte Change Data Capture mit der ÄNDERUNGEN-ANWENDEN-API in Delta Live Tables.

Argumente
target

Typ: str

Der Name der zu aktualisierenden Tabelle. Sie können die Funktion create_streaming_table() verwenden, um die Zieltabelle zu erstellen, bevor Sie die Funktion apply_changes() ausführen.

Dieser Parameter ist erforderlich.
source

Typ: str

Die Datenquelle, die CDC-Datensätze enthält.

Dieser Parameter ist erforderlich.
keys

Typ: list

Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

Sie können eins der folgenden Elemente angeben:

* Eine Liste von Zeichenfolgen: ["userId", "orderId"]
* Eine Liste von Spark-SQLcol()-Funktionen: [col("userId"), col("orderId"]

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist erforderlich.
sequence_by

Typ: str oder col()

Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Delta Live Tables verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die in nicht ordnungsgemäßer Reihenfolge eingehen.

Sie können eins der folgenden Elemente angeben:

* Eine Zeichenfolge: "sequenceNum"
* Eine Spark SQLcol()-Funktion: col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist erforderlich.
ignore_null_updates

Typ: bool

Ermöglicht das Erfassen von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und ignore_null_updatesTrue ist, behalten Spalten mit einer null ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit dem Wert null. Wenn ignore_null_updates ist False, werden vorhandene Werte mit null-Werten überschrieben.

Dieser Parameter ist optional.

Der Standardwert ist False.
apply_as_deletes

Typ: str oder expr()

Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Um nicht sortierte Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Tombstone in der zugrunde liegenden Delta-Tabelle beibehalten, und im Metastore wird eine Sicht erstellt, die diese Tombstones herausfiltert. Das Aufbewahrungsintervall kann konfiguriert werden mit:
pipelines.cdc.tombstoneGCThresholdInSecondsTabelleneigenschaft.

Sie können eins der folgenden Elemente angeben:

* Eine Zeichenfolge: "Operation = 'DELETE'"
* Eine Spark SQLexpr()-Funktion: expr("Operation = 'DELETE'")

Dieser Parameter ist optional.
apply_as_truncates

Typ: str oder expr()

Gibt an, wann ein CDC-Ereignis als TRUNCATE der gesamten Tabelle behandelt werden sollte. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.

Der Parameter apply_as_truncates wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt das Abschneiden nicht.

Sie können eins der folgenden Elemente angeben:

* Eine Zeichenfolge: "Operation = 'TRUNCATE'"
* Eine Spark SQLexpr()-Funktion: expr("Operation = 'TRUNCATE'")

Dieser Parameter ist optional.
column_list

except_column_list

Typ: list

Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Verwenden Sie column_list, um die vollständige Liste der einzuschließenden Spalten anzugeben. Verwenden Sie except_column_list, um die auszuschließende Spalten anzugeben. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col()-Funktionen deklarieren:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein column_list- oder except_column_list-Argument an die Funktion übergeben wird.
stored_as_scd_type

Typ: str oder int

Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

Für SCD-Typ 1 auf 1 oder für SCD-Typ 2 auf 2 festgelegt.

Diese Klausel ist optional.

Der Standardwert ist SCD-Typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Eine Teilmenge der Ausgabespalten, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Verwenden Sie track_history_column_list, um die vollständige Liste der Spalten anzugeben, die nachverfolgt werden sollen. Verwendung
track_history_except_column_list zum Angeben der Spalten, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col()-Funktionen deklarieren: - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumente für col()-Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId).

Dieser Parameter ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list- oder
track_history_except_column_list-Argument an die Funktion übergeben wird.