Naslaginformatie over Python-taal voor Delta Live Tables
In dit artikel vindt u meer informatie over de Python-programmeerinterface van Delta Live Tables.
Zie de sql-taalreferentie voor Delta Live Tables voor meer informatie over de SQL-API.
Zie Wat is Automatisch laadprogramma ? voor meer informatie over het configureren van automatisch laden.
Voordat u begint
Hier volgen belangrijke overwegingen bij het implementeren van pijplijnen met de Python-interface van Delta Live Tables:
- Omdat python
table()
enview()
functies meerdere keren worden aangeroepen tijdens de planning en uitvoering van een pijplijnupdate, moet u geen code opnemen in een van deze functies die mogelijk bijwerkingen hebben (bijvoorbeeld code waarmee gegevens worden gewijzigd of een e-mailbericht wordt verzonden). Om onverwacht gedrag te voorkomen, moeten uw Python-functies die gegevenssets definiëren alleen de code bevatten die nodig is om de tabel of weergave te definiëren. - Als u bewerkingen wilt uitvoeren zoals het verzenden van e-mailberichten of het integreren met een externe bewakingsservice, met name in functies die gegevenssets definiëren, gebruikt u gebeurtenishook. Het implementeren van deze bewerkingen in de functies die uw gegevenssets definiëren, veroorzaakt onverwacht gedrag.
- Python
table
enview
functies moeten een DataFrame retourneren. Sommige functies die op DataFrames werken, retourneren geen DataFrames en mogen niet worden gebruikt. Deze bewerkingen omvatten functies zoalscollect()
,count()
,toPandas()
, ensave()
saveAsTable()
. Omdat DataFrame-transformaties worden uitgevoerd nadat de volledige gegevensstroomgrafiek is opgelost, kan het gebruik van dergelijke bewerkingen onbedoelde bijwerkingen hebben.
dlt
De Python-module importeren
Python-functies voor Delta Live Tables worden gedefinieerd in de dlt
module. Uw pijplijnen die zijn geïmplementeerd met de Python-API, moeten deze module importeren:
import dlt
Een gerealiseerde weergave of streamingtabel voor Delta Live-tabellen maken
In Python bepaalt Delta Live Tables of een gegevensset moet worden bijgewerkt als een gerealiseerde weergave of streamingtabel op basis van de definitiequery. De @table
decorator kan worden gebruikt om zowel gerealiseerde weergaven als streamingtabellen te definiëren.
Als u een gerealiseerde weergave in Python wilt definiëren, past u deze toe op @table
een query die een statische leesbewerking uitvoert op basis van een gegevensbron. Als u een streamingtabel wilt definiëren, moet u van toepassing zijn op @table
een query die een streaming-leesbewerking uitvoert op een gegevensbron of de functie create_streaming_table() gebruikt. Beide typen gegevenssets hebben dezelfde syntaxisspecificatie als volgt:
Notitie
Als u het cluster_by
argument wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Een Delta Live Tables-weergave maken
Als u een weergave in Python wilt definiëren, past u de @view
decorator toe. Net als de @table
decorator kunt u weergaven in Delta Live Tables gebruiken voor statische of streaminggegevenssets. Hier volgt de syntaxis voor het definiëren van weergaven met Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Voorbeeld: Tabellen en weergaven definiëren
Als u een tabel of weergave in Python wilt definiëren, past u de @dlt.view
of @dlt.table
decorator toe op een functie. U kunt de functienaam of de name
parameter gebruiken om de tabel- of weergavenaam toe te wijzen. In het volgende voorbeeld worden twee verschillende gegevenssets gedefinieerd: een weergave taxi_raw
die een JSON-bestand als invoerbron gebruikt en een tabel filtered_data
die de taxi_raw
weergave als invoer gebruikt:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Voorbeeld: Toegang krijgen tot een gegevensset die is gedefinieerd in dezelfde pijplijn
Naast het lezen van externe gegevensbronnen hebt u toegang tot gegevenssets die zijn gedefinieerd in dezelfde pijplijn met de functie Delta Live Tables read()
. In het volgende voorbeeld ziet u hoe u een customers_filtered
gegevensset maakt met behulp van de read()
functie:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
U kunt de spark.table()
functie ook gebruiken voor toegang tot een gegevensset die is gedefinieerd in dezelfde pijplijn. Wanneer u de spark.table()
functie gebruikt om toegang te krijgen tot een gegevensset die in de pijplijn is gedefinieerd, heeft het functieargument het LIVE
trefwoord voorafgegaan aan de naam van de gegevensset:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Voorbeeld: Lezen uit een tabel die is geregistreerd in een metastore
Als u gegevens wilt lezen uit een tabel die is geregistreerd in de Hive-metastore, laat u in het functieargument het LIVE
trefwoord weg en moet u desgewenst de tabelnaam kwalificeren met de databasenaam:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Zie Gegevens opnemen in een Unity Catalog-pijplijn voor een voorbeeld van het lezen uit een Unity Catalog-tabel.
Voorbeeld: Toegang krijgen tot een gegevensset met behulp van spark.sql
U kunt ook een gegevensset retourneren met behulp van een spark.sql
expressie in een queryfunctie. Als u een interne gegevensset wilt lezen, gaat LIVE.
u vooraf aan de naam van de gegevensset:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Een tabel maken die moet worden gebruikt als doel van streamingbewerkingen
Gebruik de create_streaming_table()
functie om een doeltabel te maken voor records die worden uitgevoerd door streamingbewerkingen, waaronder apply_changes(), apply_changes_from_snapshot()en @append_flow uitvoerrecords.
Notitie
De create_target_table()
functies en create_streaming_live_table()
functies zijn afgeschaft. Databricks raadt aan om bestaande code bij te werken om de create_streaming_table()
functie te gebruiken.
Notitie
Als u het cluster_by
argument wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumenten |
---|
name Type: str De tabelnaam. Deze parameter is vereist. |
comment Type: str Een optionele beschrijving voor de tabel. |
spark_conf Type: dict Een optionele lijst met Spark-configuraties voor de uitvoering van deze query. |
table_properties Type: dict Een optionele lijst met tabeleigenschappen voor de tabel. |
partition_cols Type: array Een optionele lijst met een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel. |
cluster_by Type: array Schakel eventueel liquide clustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clusteringsleutels. Zie Liquid clustering gebruiken voor Delta-tabellen. |
path Type: str Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn. |
schema Type: str of StructType Een optionele schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Type: dict Optionele beperkingen voor gegevenskwaliteit voor de tabel. Bekijk meerdere verwachtingen. |
row_filter (Openbare preview)Type: str Een optionele rijfiltercomponent voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers. |
Bepalen hoe tabellen worden gerealiseerd
Tabellen bieden ook extra controle over hun materialisatie:
- Geef op hoe tabellen worden gepartitioneerd met behulp van
partition_cols
. U kunt partitionering gebruiken om query's te versnellen. - U kunt tabeleigenschappen instellen wanneer u een weergave of tabel definieert. Zie tabeleigenschappen van Delta Live Tables.
- Stel een opslaglocatie in voor tabelgegevens met behulp van de
path
instelling. Tabelgegevens worden standaard opgeslagen op de opslaglocatie van de pijplijn alspath
deze niet is ingesteld. - U kunt gegenereerde kolommen in uw schemadefinitie gebruiken. Zie Voorbeeld: Geef een schema en partitiekolommen op.
Notitie
Voor tabellen met een grootte van minder dan 1 TB raadt Databricks aan om delta livetabellen de gegevensorganisatie te laten beheren. U moet geen partitiekolommen opgeven, tenzij u verwacht dat de tabel groter wordt dan een terabyte.
Voorbeeld: Een schema en partitiekolommen opgeven
U kunt desgewenst een tabelschema opgeven met behulp van een Python StructType
- of SQL DDL-tekenreeks. Wanneer deze is opgegeven met een DDL-tekenreeks, kan de definitie gegenereerde kolommen bevatten.
In het volgende voorbeeld wordt een tabel sales
gemaakt met een schema dat is opgegeven met behulp van een Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
In het volgende voorbeeld wordt het schema voor een tabel opgegeven met behulp van een DDL-tekenreeks, wordt een gegenereerde kolom gedefinieerd en wordt een partitiekolom gedefinieerd:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Delta Live Tables geeft standaard het schema af van de table
definitie als u geen schema opgeeft.
Een streamingtabel configureren om wijzigingen in een bronstreamingtabel te negeren
Notitie
- De
skipChangeCommits
vlag werkt alleen metspark.readStream
het gebruik van deoption()
functie. U kunt deze vlag niet gebruiken in eendlt.read_stream()
functie. - U kunt de
skipChangeCommits
vlag niet gebruiken wanneer de bronstreamingtabel is gedefinieerd als het doel van een functie apply_changes().
Voor streamingtabellen zijn standaard alleen-toevoegbronnen vereist. Wanneer een streamingtabel een andere streamingtabel als bron gebruikt en de bronstreamingtabel updates of verwijderingen vereist, bijvoorbeeld AVG 'recht om te vergeten' verwerking, kan de vlag worden ingesteld bij het skipChangeCommits
lezen van de bronstreamingtabel om deze wijzigingen te negeren. Zie Updates en verwijderingen negeren voor meer informatie over deze vlag.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Voorbeeld: Tabelbeperkingen definiëren
Belangrijk
Tabelbeperkingen bevinden zich in openbare preview.
Wanneer u een schema opgeeft, kunt u primaire en refererende sleutels definiëren. De beperkingen zijn informatief en worden niet afgedwongen. Zie de CONSTRAINT-component in de sql-taalreferentie.
In het volgende voorbeeld wordt een tabel met een primaire en refererende-sleutelbeperking gedefinieerd:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Voorbeeld: Een rijfilter en kolommasker definiëren
Belangrijk
Rijfilters en kolommaskers bevinden zich in openbare preview.
Als u een gerealiseerde weergave of streamingtabel met een rijfilter en kolommasker wilt maken, gebruikt u de component ROW FILTER en de MASK-component. In het volgende voorbeeld ziet u hoe u een gerealiseerde weergave en een streamingtabel definieert met zowel een rijfilter als een kolommasker:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Zie Tabellen publiceren met rijfilters en kolommaskers voor meer informatie over rijfilters en kolommaskers.
Eigenschappen van Python Delta Live Tables
In de volgende tabellen worden de opties en eigenschappen beschreven die u kunt opgeven tijdens het definiëren van tabellen en weergaven met Delta Live Tables:
Notitie
Als u het cluster_by
argument wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.
@table of @view |
---|
name Type: str Een optionele naam voor de tabel of weergave. Als deze niet is gedefinieerd, wordt de functienaam gebruikt als tabel- of weergavenaam. |
comment Type: str Een optionele beschrijving voor de tabel. |
spark_conf Type: dict Een optionele lijst met Spark-configuraties voor de uitvoering van deze query. |
table_properties Type: dict Een optionele lijst met tabeleigenschappen voor de tabel. |
path Type: str Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn. |
partition_cols Type: a collection of str Een optionele verzameling, bijvoorbeeld een van een list of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel. |
cluster_by Type: array Schakel eventueel liquide clustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clusteringsleutels. Zie Liquid clustering gebruiken voor Delta-tabellen. |
schema Type: str of StructType Een optionele schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python StructType . |
temporary Type: bool Maak een tabel, maar publiceer geen metagegevens voor de tabel. Met temporary het trefwoord worden Delta Live Tables geïnstrueerd om een tabel te maken die beschikbaar is voor de pijplijn, maar die niet buiten de pijplijn mag worden geopend. Om de verwerkingstijd te verminderen, blijft een tijdelijke tabel behouden gedurende de levensduur van de pijplijn die deze maakt, en niet slechts één update.De standaardwaarde is 'Onwaar'. |
row_filter (Openbare preview)Type: str Een optionele rijfiltercomponent voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers. |
Tabel- of weergavedefinitie |
---|
def <function-name>() Een Python-functie die de gegevensset definieert. Als de name parameter niet is ingesteld, wordt deze <function-name> gebruikt als de naam van de doelgegevensset. |
query Een Spark SQL-instructie die een Spark-gegevensset of Koalas DataFrame retourneert. Gebruik dlt.read() of spark.table() om een volledige leesbewerking uit te voeren van een gegevensset die in dezelfde pijplijn is gedefinieerd. Wanneer u de spark.table() functie gebruikt om te lezen uit een gegevensset die in dezelfde pijplijn is gedefinieerd, wordt het LIVE trefwoord voorafgegaan door de naam van de gegevensset in het functieargument. Als u bijvoorbeeld wilt lezen uit een gegevensset met de naam customers :spark.table("LIVE.customers") U kunt de spark.table() functie ook gebruiken om te lezen uit een tabel die is geregistreerd in de metastore door het LIVE trefwoord weg te laten en eventueel de tabelnaam te kwalificeren met de databasenaam:spark.table("sales.customers") Gebruik dlt.read_stream() dit om een streaming-leesbewerking uit te voeren op basis van een gegevensset die in dezelfde pijplijn is gedefinieerd.Gebruik de spark.sql functie om een SQL-query te definiëren om de retourgegevensset te maken.Gebruik pySpark-syntaxis om Query's voor Delta Live Tables te definiëren met Python. |
Verwachtingen |
---|
@expect("description", "constraint") Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door description . Als een rij in strijd is met de verwachting, neemt u de rij op in de doelgegevensset. |
@expect_or_drop("description", "constraint") Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door description . Als een rij in strijd is met de verwachting, verwijdert u de rij uit de doelgegevensset. |
@expect_or_fail("description", "constraint") Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door description . Als een rij in strijd is met de verwachting, stopt u onmiddellijk met de uitvoering. |
@expect_all(expectations) Declareer een of meer beperkingen voor gegevenskwaliteit. expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij in strijd is met een van de verwachtingen, neemt u de rij op in de doelgegevensset. |
@expect_all_or_drop(expectations) Declareer een of meer beperkingen voor gegevenskwaliteit. expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij een van de verwachtingen schendt, verwijdert u de rij uit de doelgegevensset. |
@expect_all_or_fail(expectations) Declareer een of meer beperkingen voor gegevenskwaliteit. expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij een van de verwachtingen schendt, stopt u onmiddellijk met de uitvoering. |
Gegevens vastleggen vanuit een wijzigingenfeed wijzigen met Python in Delta Live Tables
Gebruik de apply_changes()
functie in de Python-API om cdc-functionaliteit (Delta Live Tables change data capture) te gebruiken om brongegevens te verwerken vanuit een CDF (Change Data Feed).
Belangrijk
U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de apply_changes()
doeltabel opgeeft, moet u de __START_AT
en __END_AT
kolommen met hetzelfde gegevenstype als de sequence_by
velden opnemen.
Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de Python-interface van Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Notitie
Voor APPLY CHANGES
verwerking is het standaardgedrag voor INSERT
en UPDATE
gebeurtenissen het uitvoeren van upsert CDC-gebeurtenissen uit de bron: werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutel(en) of voeg een nieuwe rij in wanneer er geen overeenkomende record in de doeltabel bestaat. Verwerking voor DELETE
gebeurtenissen kan worden opgegeven met de APPLY AS DELETE WHEN
voorwaarde.
Zie De WIJZIGINGEN-API's TOEPASSEN voor meer informatie over CDC-verwerking met een wijzigingenfeed: Het vastleggen van wijzigingen vereenvoudigen met Delta Live Tables. Zie Voorbeeld: SCD type 1 en SCD type 2 verwerken met CDF-brongegevens voor een voorbeeld van het gebruik van de apply_changes()
functie.
Belangrijk
U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het apply_changes
doeltabelschema opgeeft, moet u de __START_AT
en __END_AT
kolommen met hetzelfde gegevenstype als het sequence_by
veld opnemen.
Zie de APPLY CHANGES API's: Vereenvoudig het vastleggen van wijzigingsgegevens met Delta Live Tables.
Argumenten |
---|
target Type: str De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de apply_changes() functie uitvoert.Deze parameter is vereist. |
source Type: str De gegevensbron met CDC-records. Deze parameter is vereist. |
keys Type: list De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven: - Een lijst met tekenreeksen: ["userId", "orderId"] - Een lijst met Spark SQL-functies col() : [col("userId"), col("orderId"] Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId) , maar u kunt het niet gebruiken col(source.userId) .Deze parameter is vereist. |
sequence_by Type: str of col() De kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. Delta Live Tables maakt gebruik van deze sequentiëring om wijzigingsgebeurtenissen af te handelen die niet op volgorde aankomen. U kunt een van de volgende opties opgeven: - Een tekenreeks: "sequenceNum" - Een Spark SQL-functie col() : col("sequenceNum") Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId) , maar u kunt het niet gebruiken col(source.userId) .De opgegeven kolom moet een sorteerbaar gegevenstype zijn. Deze parameter is vereist. |
ignore_null_updates Type: bool Toestaan dat updates worden opgenomen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en ignore_null_updates dit is True , behouden kolommen met een null behoud van de bestaande waarden in het doel. Dit geldt ook voor geneste kolommen met een waarde van null . Wanneer ignore_null_updates dat het is False , worden bestaande waarden overschreven met null waarden.Deze parameter is optioneel. De standaardwaarde is False . |
apply_as_deletes Type: str of expr() Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een DELETE upsert in plaats van een upsert. Als u verouderde gegevens wilt verwerken, wordt de verwijderde rij tijdelijk bewaard als tombstone in de onderliggende Delta-tabel en wordt er een weergave gemaakt in de metastore die deze tombstones filtert. Het bewaarinterval kan worden geconfigureerd met depipelines.cdc.tombstoneGCThresholdInSeconds tabeleigenschap.U kunt een van de volgende opties opgeven: - Een tekenreeks: "Operation = 'DELETE'" - Een Spark SQL-functie expr() : expr("Operation = 'DELETE'") Deze parameter is optioneel. |
apply_as_truncates Type: str of expr() Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE . Omdat deze component een volledig afkappen van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke use cases waarvoor deze functionaliteit is vereist.De apply_as_truncates parameter wordt alleen ondersteund voor SCD-type 1. SCD-type 2 biedt geen ondersteuning voor afkappen van bewerkingen.U kunt een van de volgende opties opgeven: - Een tekenreeks: "Operation = 'TRUNCATE'" - Een Spark SQL-functie expr() : expr("Operation = 'TRUNCATE'") Deze parameter is optioneel. |
column_list except_column_list Type: list Een subset kolommen die moeten worden opgenomen in de doeltabel. Hiermee column_list geeft u de volledige lijst met kolommen op die u wilt opnemen. Gebruik except_column_list dit om de kolommen op te geven die moeten worden uitgesloten. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId) , maar u kunt het niet gebruiken col(source.userId) .Deze parameter is optioneel. De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen column_list of except_column_list argument wordt doorgegeven aan de functie. |
stored_as_scd_type Type: str of int Of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2.Deze component is optioneel. De standaardwaarde is SCD type 1. |
track_history_column_list track_history_except_column_list Type: list Een subset van uitvoerkolommen die moeten worden bijgehouden voor de geschiedenis in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Gebruiktrack_history_except_column_list om de kolommen op te geven die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId) , maar u kunt het niet gebruiken col(source.userId) .Deze parameter is optioneel. De standaardinstelling is om alle kolommen in de doeltabel op te nemen als dat niet het is track_history_column_list oftrack_history_except_column_list argument wordt doorgegeven aan de functie. |
Gegevensopname wijzigen van databasemomentopnamen met Python in Delta Live Tables
Belangrijk
De APPLY CHANGES FROM SNAPSHOT
API bevindt zich in openbare preview.
Gebruik de apply_changes_from_snapshot()
functie in de Python-API om de cdc-functionaliteit (Delta Live Tables change data capture) te gebruiken om brongegevens uit momentopnamen van databases te verwerken.
Belangrijk
U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de apply_changes_from_snapshot()
doeltabel opgeeft, moet u ook de __START_AT
en __END_AT
kolommen met hetzelfde gegevenstype als het sequence_by
veld opnemen.
Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de Python-interface van Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Notitie
Voor APPLY CHANGES FROM SNAPSHOT
verwerking is het standaardgedrag het invoegen van een nieuwe rij wanneer een overeenkomende record met dezelfde sleutel(en) niet in het doel bestaat. Als er wel een overeenkomende record bestaat, wordt deze alleen bijgewerkt als een van de waarden in de rij is gewijzigd. Rijen met sleutels die aanwezig zijn in het doel, maar die niet meer aanwezig zijn in de bron, worden verwijderd.
Zie De APPLY CHANGES-API's voor meer informatie over CDC-verwerking met momentopnamen: Het vastleggen van wijzigingen vereenvoudigen met Delta Live Tables. Zie de voorbeelden van periodieke opname van momentopnamen en historische momentopnameopnamen voor voorbeelden van het gebruik van de apply_changes_from_snapshot()
functie.
Argumenten |
---|
target Type: str De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de apply_changes() functie uitvoert.Deze parameter is vereist. |
source Type: str of lambda function Ofwel de naam van een tabel of weergave om periodiek een momentopname te maken of een Python-lambda-functie die de dataframe voor momentopnamen retourneert die moet worden verwerkt en de versie van de momentopname. Zie Het bronargument implementeren. Deze parameter is vereist. |
keys Type: list De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven: - Een lijst met tekenreeksen: ["userId", "orderId"] - Een lijst met Spark SQL-functies col() : [col("userId"), col("orderId"] Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId) , maar u kunt het niet gebruiken col(source.userId) .Deze parameter is vereist. |
stored_as_scd_type Type: str of int Of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2.Deze component is optioneel. De standaardwaarde is SCD type 1. |
track_history_column_list track_history_except_column_list Type: list Een subset van uitvoerkolommen die moeten worden bijgehouden voor de geschiedenis in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Gebruiktrack_history_except_column_list om de kolommen op te geven die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId) , maar u kunt het niet gebruiken col(source.userId) .Deze parameter is optioneel. De standaardinstelling is om alle kolommen in de doeltabel op te nemen als dat niet het is track_history_column_list oftrack_history_except_column_list argument wordt doorgegeven aan de functie. |
source
Het argument implementeren
De apply_changes_from_snapshot()
functie bevat het source
argument. Voor het verwerken van historische momentopnamen wordt verwacht dat het source
argument een Python-lambda-functie is die twee waarden retourneert aan de apply_changes_from_snapshot()
functie: een Python DataFrame met de momentopnamegegevens die moeten worden verwerkt en een momentopnameversie.
Hier volgt de handtekening van de lambda-functie:
lambda Any => Optional[(DataFrame, Any)]
- Het argument voor de lambda-functie is de laatst verwerkte momentopnameversie.
- De retourwaarde van de lambda-functie is
None
of een tuple van twee waarden: de eerste waarde van de tuple is een DataFrame dat de momentopname bevat die moet worden verwerkt. De tweede waarde van de tuple is de momentopnameversie die de logische volgorde van de momentopname aangeeft.
Een voorbeeld waarmee de lambda-functie wordt geïmplementeerd en aangeroepen:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
De Delta Live Tables-runtime voert de volgende stappen uit telkens wanneer de pijplijn die de apply_changes_from_snapshot()
functie bevat, wordt geactiveerd:
- Hiermee wordt de
next_snapshot_and_version
functie uitgevoerd om het volgende dataframe voor momentopnamen en de bijbehorende momentopnameversie te laden. - Als er geen DataFrame wordt geretourneerd, wordt de uitvoering beëindigd en wordt de pijplijnupdate gemarkeerd als voltooid.
- Detecteert de wijzigingen in de nieuwe momentopname en past deze incrementeel toe op de doeltabel.
- Keert terug naar stap 1 om de volgende momentopname en de bijbehorende versie te laden.
Beperkingen
De Python-interface voor Delta Live Tables heeft de volgende beperking:
De pivot()
functie wordt niet ondersteund. Voor de pivot
bewerking in Spark is het laden van invoergegevens vereist om het schema van de uitvoer te berekenen. Deze mogelijkheid wordt niet ondersteund in Delta Live Tables.