Python-Sprachreferenz zu Delta Live Tables
Dieser Artikel enthält Details für die Python-Programmierschnittstelle für Delta Live Tables.
Informationen zur SQL-API finden Sie in der Sprachreferenz zu Delta Live Tables.
Ausführliche Informationen zum Konfigurieren des automatischen Ladens finden Sie unter Automatisches Laden.
Einschränkungen
Für die Python-Schnittstelle von Delta Live Tables gelten die folgenden Einschränkungen:
- Die Python-Funktionen
table
undview
müssen einen DataFrame zurückgeben. Einige Funktionen, die mit DataFrames arbeiten, geben keine zurück und sollten deshalb nicht verwendet werden. Weil DataFrame-Transformationen ausgeführt werden, nachdem das vollständige Dataflowdiagramm aufgelöst wurde, könnte die Verwendung solcher Vorgänge unbeabsichtigte Nebenwirkungen haben. Diese Vorgänge umfassen Funktionen wiecollect()
,count()
,toPandas()
,save()
undsaveAsTable()
. Sie können diese Funktionen jedoch außerhalb vontable
- oderview
-Funktionsdefinitionen einbeziehen, weil dieser Code während der Graphinitialisierungsphase nur einmal ausgeführt wird. - Die
pivot()
-Funktion wird nicht unterstützt. Derpivot
-Vorgang in Spark erfordert Eager Loading von Eingabedaten, um das Schema der Ausgabe zu berechnen. Diese Funktion wird in Delta Live Tables nicht unterstützt.
Importieren des Python-Moduls dlt
Python-Funktionen für Delta Live Tables werden im dlt
-Modul definiert. Ihre mit der Python-API implementierten Pipelines müssen dieses Modul importieren:
import dlt
Erstellen einer materialisierten Sicht oder Streamingtabelle für Delta Live Tables
In Python bestimmt Delta Live Tables basierend auf der definierenden Abfrage, ob ein Dataset als materialisierte Sicht oder als Streamingtabelle aktualisiert werden soll. Das Decorator-Element @table
wird verwendet, um materialisierte Sichten und Streamingtabellen zu definieren.
Um eine materialisierte Sicht in Python zu definieren, wenden Sie @table
auf eine Abfrage an, die einen statischen Lesevorgang für eine Datenquelle ausführt. Um eine Streamingtabelle zu definieren, wenden Sie @table
auf eine Abfrage an, die einen Streaminglesevorgang für eine Datenquelle ausführt. Beide Datasettypen haben die gleiche Syntaxspezifikation:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Erstellen einer Delta Live Tables-Sicht
Um eine Sicht in Python zu definieren, wenden Sie den @view
-Decorator an. Sichten können in Delta Live Tables genau wie das Decorator-Element @table
entweder für statische Datasets oder für Streamingdatasets verwendet werden. Hier sehen Sie die Syntax zum Definieren von Sichten mit Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Beispiel: Definieren von Tabellen und Sichten
Wenden Sie das Decorator-Element @dlt.view
oder @dlt.table
auf eine Funktion an, um eine Sicht oder Tabelle in Python zu definieren. Sie können den Funktionsnamen oder den name
-Parameter verwenden, um den Tabellen- oder Sichtnamen zuzuweisen. Im folgenden Beispiel werden zwei verschiedene Datasets definiert: eine Sicht namens taxi_raw
, die eine JSON-Datei als Eingabequelle verwendet, und eine Tabelle namens filtered_data
, die die taxi_raw
-Ansicht als Eingabe verwendet:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Beispiel: Zugreifen auf ein Dataset, das in der gleichen Pipeline definiert ist
Neben dem Lesen aus externen Datenquellen können Sie mit der Delta Live Tables-Funktion read()
auch auf Datasets zugreifen, die in der gleichen Pipeline definiert sind. Im folgenden Beispiel wird das Erstellen eines customers_filtered
-Datasets mithilfe der Funktion read()
veranschaulicht:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
Sie können auch die Funktion spark.table()
verwenden, um auf ein Dataset zuzugreifen, das in der gleichen Pipeline definiert ist. Wenn Sie die spark.table()
-Funktion verwenden, um auf ein Dataset zuzugreifen, das in der Pipeline definiert ist, stellen Sie im Funktionsargument dem Datasetnamen das LIVE
-Schlüsselwort voran:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Beispiel: Lesen aus einer Tabelle, die in einem Metastore registriert ist
Wenn Sie Daten aus einer Tabelle lesen möchten, die im Hive-Metastore registriert ist, lassen Sie im Funktionsargument das Schlüsselwort LIVE
weg, und qualifizieren Sie optional den Tabellennamen mit dem Datenbanknamen:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Ein Beispiel für das Lesen aus einer Unity Catalog-Tabelle finden Sie unter Erfassen von Daten in einer Unity Catalog-Pipeline.
Beispiel: Zugreifen auf ein Dataset unter Verwendung von spark.sql
Sie können ein Dataset auch mithilfe eines spark.sql
-Ausdrucks in einer Abfragefunktion zurückgeben. Um aus einem internen Dataset zu lesen, müssen Sie dem Datasetnamen LIVE.
voranstellen:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Schreiben in eine Streamingtabelle aus mehreren Quelldatenströmen
Wichtig
Die Unterstützung von Delta Live Tables für @append_flow
befindet sich in der Public Preview.
Sie können den @append_flow
-Dekorateur verwenden, um aus mehreren Streamingquellen in eine Streamingtabelle zu schreiben, um Folgendes auszuführen:
- Hinzufügen und Entfernen von Streamingquellen, die Daten an eine vorhandene Streamingtabelle anfügen, ohne dass eine vollständige Aktualisierung erforderlich ist. Sie können beispielsweise eine Tabelle haben, in der regionale Daten aus jeder Region kombiniert werden, in der Sie arbeiten. Wenn neue Regionen eingeführt werden, können Sie der Tabelle die neuen Regionsdaten hinzufügen, ohne eine vollständige Aktualisierung durchzuführen.
- Aktualisieren Sie eine Streamingtabelle, indem Sie fehlende historische Daten (Backfilling) anfügen. Sie haben beispielsweise eine vorhandene Streamingtabelle, in die ein Apache Kafka-Thema geschrieben wird. Sie haben auch historische Daten in einer Tabelle gespeichert, die Sie genau einmal in die Streamingtabelle eingefügt haben, und Sie können die Daten nicht streamen, da Sie eine komplexe Aggregation durchführen müssen, bevor Sie die Daten einfügen.
Verwenden Sie die Funktion create_streaming_table() zum Erstellen einer Zieltabelle für die Datensatzausgabe durch die @append_flow
Verarbeitung.
Hinweis
Wenn Sie Datenqualitätseinschränkungen mit Erwartungendefinieren müssen, definieren Sie die Erwartungen an die Zieltabelle als Teil der create_streaming_table()
Funktion. Sie können die Erwartungen in der @append_flow
Definition nicht definieren.
Es folgt die Syntax für @append_flow
:
import dlt
dlt.create_streaming_table("<target-table-name>")
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment") # optional
def <function-name>():
return (<streaming query>)
Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Themen
Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen erstellt kafka_target
und in die Streamingtabelle aus zwei Kafka-Themen geschrieben:
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
Beispiel: Ausführen eines einmaligen Datenrückfüllens
Im folgenden Beispiel wird eine Abfrage ausgeführt, um verlaufsgeschichtliche Daten an eine Streamingtabelle anzufügen:
Hinweis
Um sicherzustellen, dass ein wahrer einmaliger Rücklauf ausgeführt wird, wenn die Backfill-Abfrage Teil einer Pipeline ist, die auf geplanter oder kontinuierlicher Basis ausgeführt wird, entfernen Sie die Abfrage nach der Ausführung der Pipeline einmal. Um neue Daten anzufügen, wenn sie im Backfill-Verzeichnis eingeht, lassen Sie die Abfrage an Ort und Stelle.
import dlt
@dlt.table()
def csv_target():
return spark.readStream.format("csv").load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream.format("csv").load("path/to/backfill/data/dir")
Erstellen einer Tabelle, die als Ziel von Streamingvorgängen verwendet werden soll
Verwenden Sie die create_streaming_table()
Funktion, um eine Zieltabelle für die Ausgabe von Datensätzen durch Streamingvorgänge zu erstellen, einschließlich apply_changes() und @append_flow Ausgabedatensätze.
Hinweis
Die Funktionen create_target_table()
und create_streaming_live_table()
sind veraltet. Databricks empfiehlt das Aktualisieren des vorhandenen Codes, um die create_streaming_table()
-Funktion zu verwenden.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumente |
---|
name Typ: str Der Tabellenname. Dieser Parameter ist erforderlich. |
comment Typ: str Dies ist eine optionale Beschreibung für die Tabelle |
spark_conf Typ: dict Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage. |
table_properties Typ: dict Eine optionale Liste der Tabelleneigenschaften für die Tabelle |
partition_cols Typ: array Eine optionale Liste einer oder mehrerer Spalten, die zum Partitionieren der Tabelle verwendet werden sollen |
path Typ: str Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline. |
schema Typ: str oder StructType Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python definiert werden. StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Optionale Datenqualitätseinschränkungen für die Tabelle. Siehe Mehrere Erwartungen. |
Steuern der Materialisierung von Tabellen
Tabellen bieten auch zusätzliche Steuerung ihrer Materialisierung:
- Geben Sie an, wie Tabellen mit
partition_cols
partitioniert werden. Sie können die Partitionierung verwenden, um Abfragen zu beschleunigen. - Sie können Tabelleneigenschaften festlegen, wenn Sie eine Sicht oder Tabelle definieren. Weitere Informationen finden Sie unter Delta Live Tables-Tabelleneigenschaften.
- Legen Sie mithilfe der
path
-Einstellung einen Speicherort für Tabellendaten fest. Standardmäßig werden Tabellendaten am Speicherort der Pipeline gespeichert, wennpath
nicht festgelegt ist. - Sie können Generierte Spalten in Ihrer Schemadefinition verwenden. Weitere Informationen finden Sie unter Beispiel: Angeben des Schemas und der Partitionsspalten.
Hinweis
Für Tabellen mit einer Größe von weniger als 1 TB empfiehlt Databricks, Delta Live Tables die Strukturierung der Daten steuern zu lassen. Wenn Sie nicht davon ausgehen, dass die Größe Ihrer Tabelle über ein Terabyte hinausgeht, sollten Sie im Allgemeinen keine Partitionsspalten angeben.
Beispiel: Angeben des Schemas und der Partitionsspalten
Sie können optional ein Tabellenschema mithilfe eines Python-StructType
oder einer SQL-DDL-Zeichenfolge angeben. Wenn eine DDL-Zeichenfolge angegeben wird, kann die Definition generierte Spalten enthalten.
Im folgenden Beispiel wird eine Tabelle namens sales
mit einem Schema erstellt, das mithilfe eines Python-Strukturtyps StructType
angegeben wird:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Im folgenden Beispiel wird das Schema für eine Tabelle mithilfe einer DDL-Zeichenfolge angegeben, eine generierte Spalte definiert und eine Partitionsspalte definiert:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Standardmäßig leitet Delta Live Tables das Schema aus der table
-Definition ab, wenn Sie kein Schema angeben.
Konfigurieren einer Streamingtabelle, sodass Änderungen in einer Quellstreamingtabelle ignoriert werden
Hinweis
- Das Flag
skipChangeCommits
funktioniert nur, wennspark.readStream
die Funktionoption()
verwendet. Das Flag kann nicht in einerdlt.read_stream()
-Funktion verwendet werden. - Sie können das
skipChangeCommits
-Flag nicht verwenden, wenn die Quellstreamingtabelle als Ziel einer apply_changes()-Funktion definiert ist.
Für Streamingtabellen sind standardmäßig reine Anfügequellen erforderlich. Wenn eine Streamingtabelle eine andere Streamingtabelle als Quelle verwendet und die Quellstreamingtabelle Aktualisierungen oder Löschvorgänge erfordert (etwa aufgrund des Rechts auf Vergessen der DSGVO), kann das Flag skipChangeCommits
beim Lesen der Quellstreamingtabelle festgelegt werden, um diese Änderungen zu ignorieren. Weitere Informationen zu diesem Flag finden Sie unter Ignorieren von Updates und Löschungen.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Eigenschaften von Python-Delta Live Tables
In den folgenden Tabellen werden die Optionen und Eigenschaften beschrieben, die Sie beim Definieren von Tabellen und Sichten mit Delta Live Tables angeben können:
@table oder @view |
---|
name Typ: str Ein optionaler Name für die Tabelle oder Sicht. Wenn er nicht definiert ist, wird der Funktionsname als Tabellen- oder Sichtname verwendet. |
comment Typ: str Dies ist eine optionale Beschreibung für die Tabelle |
spark_conf Typ: dict Eine optionale Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage. |
table_properties Typ: dict Eine optionale Liste der Tabelleneigenschaften für die Tabelle |
path Typ: str Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline. |
partition_cols Typ: a collection of str Eine optionale Sammlung – beispielsweise eine Liste ( list ) – mit mindestens einer Spalte zum Partitionieren der Tabelle. |
schema Typ: str oder StructType Eine optionale Schemadefinition für die Tabelle. Schemas können als SQL-DDL-Zeichenfolge oder mit Python definiert werden. StructType . |
temporary Typ: bool Erstellen Sie eine Tabelle, veröffentlichen Sie jedoch keine Metadaten für die Tabelle. Das Schlüsselwort temporary weist Delta Live Tables an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, auf die aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, wird eine temporäre Tabelle für die Lebensdauer der Pipeline beibehalten, die sie erstellt, und nicht nur für ein einzelnes Update.Die Standardeinstellung lautet „false“. |
Tabellen- oder Sichtdefinition |
---|
def <function-name>() Eine Python-Funktion, die das Dataset definiert. Wenn der name -Parameter nicht festgelegt ist, wird <function-name> als Zieldatasetname verwendet. |
query Eine Spark SQL-Anweisung, die ein Spark-Dataset oder einen Koalas-DataFrame zurückgibt. Verwenden Sie dlt.read() oder spark.table() für das vollständige Lesen aus einem Dataset, das in derselben Pipeline definiert ist. Wenn Sie die spark.table() -Funktion verwenden, um aus einem Dataset zu lesen, das in derselben Pipeline definiert ist, stellen Sie im Funktionsargument dem Datasetnamen das LIVE -Schlüsselwort voran. So lesen Sie beispielsweise aus einem Dataset mit dem Namen customers :spark.table("LIVE.customers") Sie können auch die spark.table() -Funktion verwenden, um aus einer im Metastore registrierten Tabelle zu lesen, indem Sie das LIVE -Schlüsselwort weglassen und optional den Tabellennamen mit dem Datenbanknamen qualifizieren:spark.table("sales.customers") Verwenden Sie dlt.read_stream() für das Streaming aus einem Dataset, das in derselben Pipeline definiert ist.Verwenden Sie die spark.sql -Funktion, um eine SQL-Abfrage zum Erstellen des Rückgabedatasets zu definieren.Verwenden Sie die PySpark-Syntax, um Delta Live Tables-Abfragen mit Python zu definieren. |
Erwartungen |
---|
@expect("description", "constraint") Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch description . Wenn eine Zeile gegen die Erwartung verstößt, schließen Sie die Zeile in das Zieldataset ein. |
@expect_or_drop("description", "constraint") Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch description . Wenn eine Zeile gegen die Erwartung verstößt, löschen Sie die Zeile aus dem Zieldataset. |
@expect_or_fail("description", "constraint") Deklarieren Sie eine Datenqualitätseinschränkung, die identifiziert wird durch description . Wenn eine Zeile gegen die Erwartung verstößt, beenden Sie die Ausführung sofort. |
@expect_all(expectations) Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen. expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, schließen Sie die Zeile in das Zieldataset ein. |
@expect_all_or_drop(expectations) Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen. expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, löschen Sie die Zeile aus dem Zieldataset. |
@expect_all_or_fail(expectations) Deklarieren Sie eine oder mehrere Datenqualitätseinschränkungen. expectations ist ein Python-Wörterbuch, wobei der Schlüssel die Beschreibung der Erwartung und der Wert die Erwartungseinschränkung ist. Wenn eine Zeile gegen eine der Erwartungen verstößt, beenden Sie die Ausführung sofort. |
Change Data Capture mit Python in Delta Live Tables
Verwenden Sie die apply_changes()
-Funktion in der Python-API, um die CDC-Funktionalität in Delta Live Tables zu verwenden: Die Delta Live Tables-Python-Schnittstelle bietet auch die Funktion create_streaming_table(). Sie können diese Funktion verwenden, um die von der apply_changes()
Funktion erforderliche Zieltabelle zu erstellen.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Hinweis
Das Standardverhalten für INSERT
- und UPDATE
-Ereignisse ist das Ausführen eines Upserts von CDC-Ereignissen aus der Quelle: das Aktualisieren aller Zeilen in der Zieltabelle, die mit den angegebenen Schlüsseln übereinstimmen, oder das Einfügen einer neuen Zeile, wenn kein übereinstimmender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung von DELETE
-Ereignissen kann mit der APPLY AS DELETE WHEN
-Bedingung angegeben werden.
Wichtig
Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der apply_changes
-Zieltabelle angeben, müssen Sie auch die Spalten __START_AT
und __END_AT
mit demselben Datentyp wie das Feld sequence_by
angeben.
Siehe Vereinfachte Change Data Capture mit der ÄNDERUNGEN-ANWENDEN-API in Delta Live Tables.
Argumente |
---|
target Typ: str Der Name der zu aktualisierenden Tabelle. Sie können die Funktion create_streaming_table() verwenden, um die Zieltabelle zu erstellen, bevor Sie die Funktion apply_changes() ausführen.Dieser Parameter ist erforderlich. |
source Typ: str Die Datenquelle, die CDC-Datensätze enthält. Dieser Parameter ist erforderlich. |
keys Typ: list Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten. Sie können eins der folgenden Elemente angeben: * Eine Liste von Zeichenfolgen: ["userId", "orderId"] * Eine Liste von Spark-SQL col() -Funktionen: [col("userId"), col("orderId"] Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist erforderlich. |
sequence_by Typ: str oder col() Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Delta Live Tables verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die in nicht ordnungsgemäßer Reihenfolge eingehen. Sie können eins der folgenden Elemente angeben: * Eine Zeichenfolge: "sequenceNum" * Eine Spark SQL col() -Funktion: col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist erforderlich. |
ignore_null_updates Typ: bool Ermöglicht das Erfassen von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und ignore_null_updates True ist, behalten Spalten mit einer null ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit dem Wert null . Wenn ignore_null_updates ist False , werden vorhandene Werte mit null -Werten überschrieben.Dieser Parameter ist optional. Der Standardwert ist False . |
apply_as_deletes Typ: str oder expr() Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Um nicht sortierte Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Tombstone in der zugrunde liegenden Delta-Tabelle beibehalten, und im Metastore wird eine Sicht erstellt, die diese Tombstones herausfiltert. Das Aufbewahrungsintervall kann konfiguriert werden mit:pipelines.cdc.tombstoneGCThresholdInSeconds Tabelleneigenschaft.Sie können eins der folgenden Elemente angeben: * Eine Zeichenfolge: "Operation = 'DELETE'" * Eine Spark SQL expr() -Funktion: expr("Operation = 'DELETE'") Dieser Parameter ist optional. |
apply_as_truncates Typ: str oder expr() Gibt an, wann ein CDC-Ereignis als TRUNCATE der gesamten Tabelle behandelt werden sollte. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.Der Parameter apply_as_truncates wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt das Abschneiden nicht.Sie können eins der folgenden Elemente angeben: * Eine Zeichenfolge: "Operation = 'TRUNCATE'" * Eine Spark SQL expr() -Funktion: expr("Operation = 'TRUNCATE'") Dieser Parameter ist optional. |
column_list except_column_list Typ: list Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Verwenden Sie column_list , um die vollständige Liste der einzuschließenden Spalten anzugeben. Verwenden Sie except_column_list , um die auszuschließende Spalten anzugeben. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col() -Funktionen deklarieren:* column_list = ["userId", "name", "city"] .* column_list = [col("userId"), col("name"), col("city")] * except_column_list = ["operation", "sequenceNum"] * except_column_list = [col("operation"), col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist optional. Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein column_list - oder except_column_list -Argument an die Funktion übergeben wird. |
stored_as_scd_type Typ: str oder int Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen. Für SCD-Typ 1 auf 1 oder für SCD-Typ 2 auf 2 festgelegt.Diese Klausel ist optional. Der Standardwert ist SCD-Typ 1. |
track_history_column_list track_history_except_column_list Typ: list Eine Teilmenge der Ausgabespalten, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Verwenden Sie track_history_column_list , um die vollständige Liste der Spalten anzugeben, die nachverfolgt werden sollen. Verwendungtrack_history_except_column_list zum Angeben der Spalten, die von der Nachverfolgung ausgeschlossen werden sollen. Sie können einen Wert entweder als Liste von Zeichenfolgen oder als Spark SQL-col() -Funktionen deklarieren: - track_history_column_list = ["userId", "name", "city"] . - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumente für col() -Funktionen können keine Qualifizierer enthalten. Beispielsweise können Sie col(userId) verwenden, aber nicht col(source.userId) .Dieser Parameter ist optional. Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn kein track_history_column_list - odertrack_history_except_column_list -Argument an die Funktion übergeben wird. |