Delen via


Naslaginformatie over Python-taal voor Delta Live Tables

In dit artikel vindt u meer informatie over de Python-programmeerinterface van Delta Live Tables.

Zie de sql-taalreferentie voor Delta Live Tables voor meer informatie over de SQL-API.

Zie Wat is Automatisch laadprogramma ? voor meer informatie over het configureren van automatisch laden.

Voordat u begint

Hier volgen belangrijke overwegingen bij het implementeren van pijplijnen met de Python-interface van Delta Live Tables:

  • Omdat python table() en view() functies meerdere keren worden aangeroepen tijdens de planning en uitvoering van een pijplijnupdate, moet u geen code opnemen in een van deze functies die mogelijk bijwerkingen hebben (bijvoorbeeld code waarmee gegevens worden gewijzigd of een e-mailbericht wordt verzonden). Om onverwacht gedrag te voorkomen, moeten uw Python-functies die gegevenssets definiëren alleen de code bevatten die nodig is om de tabel of weergave te definiëren.
  • Als u bewerkingen wilt uitvoeren zoals het verzenden van e-mailberichten of het integreren met een externe bewakingsservice, met name in functies die gegevenssets definiëren, gebruikt u gebeurtenishook. Het implementeren van deze bewerkingen in de functies die uw gegevenssets definiëren, veroorzaakt onverwacht gedrag.
  • Python table en view functies moeten een DataFrame retourneren. Sommige functies die op DataFrames werken, retourneren geen DataFrames en mogen niet worden gebruikt. Deze bewerkingen omvatten functies zoals collect(), count(), toPandas(), en save()saveAsTable(). Omdat DataFrame-transformaties worden uitgevoerd nadat de volledige gegevensstroomgrafiek is opgelost, kan het gebruik van dergelijke bewerkingen onbedoelde bijwerkingen hebben.

dlt De Python-module importeren

Python-functies voor Delta Live Tables worden gedefinieerd in de dlt module. Uw pijplijnen die zijn geïmplementeerd met de Python-API, moeten deze module importeren:

import dlt

Een gerealiseerde weergave of streamingtabel voor Delta Live-tabellen maken

In Python bepaalt Delta Live Tables of een gegevensset moet worden bijgewerkt als een gerealiseerde weergave of streamingtabel op basis van de definitiequery. De @table decorator kan worden gebruikt om zowel gerealiseerde weergaven als streamingtabellen te definiëren.

Als u een gerealiseerde weergave in Python wilt definiëren, past u deze toe op @table een query die een statische leesbewerking uitvoert op basis van een gegevensbron. Als u een streamingtabel wilt definiëren, moet u van toepassing zijn op @table een query die een streaming-leesbewerking uitvoert op een gegevensbron of de functie create_streaming_table() gebruikt. Beide typen gegevenssets hebben dezelfde syntaxisspecificatie als volgt:

Notitie

Als u het cluster_by argument wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Een Delta Live Tables-weergave maken

Als u een weergave in Python wilt definiëren, past u de @view decorator toe. Net als de @table decorator kunt u weergaven in Delta Live Tables gebruiken voor statische of streaminggegevenssets. Hier volgt de syntaxis voor het definiëren van weergaven met Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Voorbeeld: Tabellen en weergaven definiëren

Als u een tabel of weergave in Python wilt definiëren, past u de @dlt.view of @dlt.table decorator toe op een functie. U kunt de functienaam of de name parameter gebruiken om de tabel- of weergavenaam toe te wijzen. In het volgende voorbeeld worden twee verschillende gegevenssets gedefinieerd: een weergave taxi_raw die een JSON-bestand als invoerbron gebruikt en een tabel filtered_data die de taxi_raw weergave als invoer gebruikt:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Voorbeeld: Toegang krijgen tot een gegevensset die is gedefinieerd in dezelfde pijplijn

Naast het lezen van externe gegevensbronnen hebt u toegang tot gegevenssets die zijn gedefinieerd in dezelfde pijplijn met de functie Delta Live Tables read() . In het volgende voorbeeld ziet u hoe u een customers_filtered gegevensset maakt met behulp van de read() functie:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

U kunt de spark.table() functie ook gebruiken voor toegang tot een gegevensset die is gedefinieerd in dezelfde pijplijn. Wanneer u de spark.table() functie gebruikt om toegang te krijgen tot een gegevensset die in de pijplijn is gedefinieerd, heeft het functieargument het LIVE trefwoord voorafgegaan aan de naam van de gegevensset:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Voorbeeld: Lezen uit een tabel die is geregistreerd in een metastore

Als u gegevens wilt lezen uit een tabel die is geregistreerd in de Hive-metastore, laat u in het functieargument het LIVE trefwoord weg en moet u desgewenst de tabelnaam kwalificeren met de databasenaam:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Zie Gegevens opnemen in een Unity Catalog-pijplijn voor een voorbeeld van het lezen uit een Unity Catalog-tabel.

Voorbeeld: Toegang krijgen tot een gegevensset met behulp van spark.sql

U kunt ook een gegevensset retourneren met behulp van een spark.sql expressie in een queryfunctie. Als u een interne gegevensset wilt lezen, gaat LIVE. u vooraf aan de naam van de gegevensset:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Een tabel maken die moet worden gebruikt als doel van streamingbewerkingen

Gebruik de create_streaming_table() functie om een doeltabel te maken voor records die worden uitgevoerd door streamingbewerkingen, waaronder apply_changes(), apply_changes_from_snapshot()en @append_flow uitvoerrecords.

Notitie

De create_target_table() functies en create_streaming_live_table() functies zijn afgeschaft. Databricks raadt aan om bestaande code bij te werken om de create_streaming_table() functie te gebruiken.

Notitie

Als u het cluster_by argument wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumenten
name

Type: str

De tabelnaam.

Deze parameter is vereist.
comment

Type: str

Een optionele beschrijving voor de tabel.
spark_conf

Type: dict

Een optionele lijst met Spark-configuraties voor de uitvoering van deze query.
table_properties

Type: dict

Een optionele lijst met tabeleigenschappen voor de tabel.
partition_cols

Type: array

Een optionele lijst met een of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel.
cluster_by

Type: array

Schakel eventueel liquide clustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clusteringsleutels.

Zie Liquid clustering gebruiken voor Delta-tabellen.
path

Type: str

Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn.
schema

Type: str of StructType

Een optionele schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Type: dict

Optionele beperkingen voor gegevenskwaliteit voor de tabel. Bekijk meerdere verwachtingen.
row_filter (Openbare preview)

Type: str

Een optionele rijfiltercomponent voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers.

Bepalen hoe tabellen worden gerealiseerd

Tabellen bieden ook extra controle over hun materialisatie:

Notitie

Voor tabellen met een grootte van minder dan 1 TB raadt Databricks aan om delta livetabellen de gegevensorganisatie te laten beheren. U moet geen partitiekolommen opgeven, tenzij u verwacht dat de tabel groter wordt dan een terabyte.

Voorbeeld: Een schema en partitiekolommen opgeven

U kunt desgewenst een tabelschema opgeven met behulp van een Python StructType - of SQL DDL-tekenreeks. Wanneer deze is opgegeven met een DDL-tekenreeks, kan de definitie gegenereerde kolommen bevatten.

In het volgende voorbeeld wordt een tabel sales gemaakt met een schema dat is opgegeven met behulp van een Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

In het volgende voorbeeld wordt het schema voor een tabel opgegeven met behulp van een DDL-tekenreeks, wordt een gegenereerde kolom gedefinieerd en wordt een partitiekolom gedefinieerd:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Delta Live Tables geeft standaard het schema af van de table definitie als u geen schema opgeeft.

Een streamingtabel configureren om wijzigingen in een bronstreamingtabel te negeren

Notitie

  • De skipChangeCommits vlag werkt alleen met spark.readStream het gebruik van de option() functie. U kunt deze vlag niet gebruiken in een dlt.read_stream() functie.
  • U kunt de skipChangeCommits vlag niet gebruiken wanneer de bronstreamingtabel is gedefinieerd als het doel van een functie apply_changes().

Voor streamingtabellen zijn standaard alleen-toevoegbronnen vereist. Wanneer een streamingtabel een andere streamingtabel als bron gebruikt en de bronstreamingtabel updates of verwijderingen vereist, bijvoorbeeld AVG 'recht om te vergeten' verwerking, kan de vlag worden ingesteld bij het skipChangeCommits lezen van de bronstreamingtabel om deze wijzigingen te negeren. Zie Updates en verwijderingen negeren voor meer informatie over deze vlag.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Voorbeeld: Tabelbeperkingen definiëren

Belangrijk

Tabelbeperkingen bevinden zich in openbare preview.

Wanneer u een schema opgeeft, kunt u primaire en refererende sleutels definiëren. De beperkingen zijn informatief en worden niet afgedwongen. Zie de CONSTRAINT-component in de sql-taalreferentie.

In het volgende voorbeeld wordt een tabel met een primaire en refererende-sleutelbeperking gedefinieerd:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Voorbeeld: Een rijfilter en kolommasker definiëren

Belangrijk

Rijfilters en kolommaskers bevinden zich in openbare preview.

Als u een gerealiseerde weergave of streamingtabel met een rijfilter en kolommasker wilt maken, gebruikt u de component ROW FILTER en de MASK-component. In het volgende voorbeeld ziet u hoe u een gerealiseerde weergave en een streamingtabel definieert met zowel een rijfilter als een kolommasker:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Zie Tabellen publiceren met rijfilters en kolommaskers voor meer informatie over rijfilters en kolommaskers.

Eigenschappen van Python Delta Live Tables

In de volgende tabellen worden de opties en eigenschappen beschreven die u kunt opgeven tijdens het definiëren van tabellen en weergaven met Delta Live Tables:

Notitie

Als u het cluster_by argument wilt gebruiken om liquide clustering in te schakelen, moet uw pijplijn zijn geconfigureerd voor het gebruik van het preview-kanaal.

@table of @view
name

Type: str

Een optionele naam voor de tabel of weergave. Als deze niet is gedefinieerd, wordt de functienaam gebruikt als tabel- of weergavenaam.
comment

Type: str

Een optionele beschrijving voor de tabel.
spark_conf

Type: dict

Een optionele lijst met Spark-configuraties voor de uitvoering van deze query.
table_properties

Type: dict

Een optionele lijst met tabeleigenschappen voor de tabel.
path

Type: str

Een optionele opslaglocatie voor tabelgegevens. Als dit niet is ingesteld, wordt het systeem standaard ingesteld op de opslaglocatie van de pijplijn.
partition_cols

Type: a collection of str

Een optionele verzameling, bijvoorbeeld een van een list of meer kolommen die moeten worden gebruikt voor het partitioneren van de tabel.
cluster_by

Type: array

Schakel eventueel liquide clustering in de tabel in en definieer de kolommen die moeten worden gebruikt als clusteringsleutels.

Zie Liquid clustering gebruiken voor Delta-tabellen.
schema

Type: str of StructType

Een optionele schemadefinitie voor de tabel. Schema's kunnen worden gedefinieerd als een SQL DDL-tekenreeks of met een Python StructType.
temporary

Type: bool

Maak een tabel, maar publiceer geen metagegevens voor de tabel. Met temporary het trefwoord worden Delta Live Tables geïnstrueerd om een tabel te maken die beschikbaar is voor de pijplijn, maar die niet buiten de pijplijn mag worden geopend. Om de verwerkingstijd te verminderen, blijft een tijdelijke tabel behouden gedurende de levensduur van de pijplijn die deze maakt, en niet slechts één update.

De standaardwaarde is 'Onwaar'.
row_filter (Openbare preview)

Type: str

Een optionele rijfiltercomponent voor de tabel. Zie Tabellen publiceren met rijfilters en kolommaskers.
Tabel- of weergavedefinitie
def <function-name>()

Een Python-functie die de gegevensset definieert. Als de name parameter niet is ingesteld, wordt deze <function-name> gebruikt als de naam van de doelgegevensset.
query

Een Spark SQL-instructie die een Spark-gegevensset of Koalas DataFrame retourneert.

Gebruik dlt.read() of spark.table() om een volledige leesbewerking uit te voeren van een gegevensset die in dezelfde pijplijn is gedefinieerd. Wanneer u de spark.table() functie gebruikt om te lezen uit een gegevensset die in dezelfde pijplijn is gedefinieerd, wordt het LIVE trefwoord voorafgegaan door de naam van de gegevensset in het functieargument. Als u bijvoorbeeld wilt lezen uit een gegevensset met de naam customers:

spark.table("LIVE.customers")

U kunt de spark.table() functie ook gebruiken om te lezen uit een tabel die is geregistreerd in de metastore door het LIVE trefwoord weg te laten en eventueel de tabelnaam te kwalificeren met de databasenaam:

spark.table("sales.customers")

Gebruik dlt.read_stream() dit om een streaming-leesbewerking uit te voeren op basis van een gegevensset die in dezelfde pijplijn is gedefinieerd.

Gebruik de spark.sql functie om een SQL-query te definiëren om de retourgegevensset te maken.

Gebruik pySpark-syntaxis om Query's voor Delta Live Tables te definiëren met Python.
Verwachtingen
@expect("description", "constraint")

Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door
description. Als een rij in strijd is met de verwachting, neemt u de rij op in de doelgegevensset.
@expect_or_drop("description", "constraint")

Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door
description. Als een rij in strijd is met de verwachting, verwijdert u de rij uit de doelgegevensset.
@expect_or_fail("description", "constraint")

Een beperking voor gegevenskwaliteit declareren die is geïdentificeerd door
description. Als een rij in strijd is met de verwachting, stopt u onmiddellijk met de uitvoering.
@expect_all(expectations)

Declareer een of meer beperkingen voor gegevenskwaliteit.
expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij in strijd is met een van de verwachtingen, neemt u de rij op in de doelgegevensset.
@expect_all_or_drop(expectations)

Declareer een of meer beperkingen voor gegevenskwaliteit.
expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij een van de verwachtingen schendt, verwijdert u de rij uit de doelgegevensset.
@expect_all_or_fail(expectations)

Declareer een of meer beperkingen voor gegevenskwaliteit.
expectations is een Python-woordenlijst, waarbij de sleutel de beschrijving van de verwachting is en de waarde de verwachtingsbeperking is. Als een rij een van de verwachtingen schendt, stopt u onmiddellijk met de uitvoering.

Gegevens vastleggen vanuit een wijzigingenfeed wijzigen met Python in Delta Live Tables

Gebruik de apply_changes() functie in de Python-API om cdc-functionaliteit (Delta Live Tables change data capture) te gebruiken om brongegevens te verwerken vanuit een CDF (Change Data Feed).

Belangrijk

U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de apply_changes() doeltabel opgeeft, moet u de __START_AT en __END_AT kolommen met hetzelfde gegevenstype als de sequence_by velden opnemen.

Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de Python-interface van Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Notitie

Voor APPLY CHANGES verwerking is het standaardgedrag voor INSERT en UPDATE gebeurtenissen het uitvoeren van upsert CDC-gebeurtenissen uit de bron: werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutel(en) of voeg een nieuwe rij in wanneer er geen overeenkomende record in de doeltabel bestaat. Verwerking voor DELETE gebeurtenissen kan worden opgegeven met de APPLY AS DELETE WHEN voorwaarde.

Zie De WIJZIGINGEN-API's TOEPASSEN voor meer informatie over CDC-verwerking met een wijzigingenfeed: Het vastleggen van wijzigingen vereenvoudigen met Delta Live Tables. Zie Voorbeeld: SCD type 1 en SCD type 2 verwerken met CDF-brongegevens voor een voorbeeld van het gebruik van de apply_changes() functie.

Belangrijk

U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het apply_changes doeltabelschema opgeeft, moet u de __START_AT en __END_AT kolommen met hetzelfde gegevenstype als het sequence_by veld opnemen.

Zie de APPLY CHANGES API's: Vereenvoudig het vastleggen van wijzigingsgegevens met Delta Live Tables.

Argumenten
target

Type: str

De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de apply_changes() functie uitvoert.

Deze parameter is vereist.
source

Type: str

De gegevensbron met CDC-records.

Deze parameter is vereist.
keys

Type: list

De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel.

U kunt een van de volgende opties opgeven:

- Een lijst met tekenreeksen: ["userId", "orderId"]
- Een lijst met Spark SQL-functies col() : [col("userId"), col("orderId"]

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).

Deze parameter is vereist.
sequence_by

Type: str of col()

De kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. Delta Live Tables maakt gebruik van deze sequentiëring om wijzigingsgebeurtenissen af te handelen die niet op volgorde aankomen.

U kunt een van de volgende opties opgeven:

- Een tekenreeks: "sequenceNum"
- Een Spark SQL-functie col() : col("sequenceNum")

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).

De opgegeven kolom moet een sorteerbaar gegevenstype zijn.

Deze parameter is vereist.
ignore_null_updates

Type: bool

Toestaan dat updates worden opgenomen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en ignore_null_updates dit is True, behouden kolommen met een null behoud van de bestaande waarden in het doel. Dit geldt ook voor geneste kolommen met een waarde van null. Wanneer ignore_null_updates dat het is False, worden bestaande waarden overschreven met null waarden.

Deze parameter is optioneel.

De standaardwaarde is False.
apply_as_deletes

Type: str of expr()

Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een DELETE upsert in plaats van een upsert. Als u verouderde gegevens wilt verwerken, wordt de verwijderde rij tijdelijk bewaard als tombstone in de onderliggende Delta-tabel en wordt er een weergave gemaakt in de metastore die deze tombstones filtert. Het bewaarinterval kan worden geconfigureerd met de
pipelines.cdc.tombstoneGCThresholdInSeconds tabeleigenschap.

U kunt een van de volgende opties opgeven:

- Een tekenreeks: "Operation = 'DELETE'"
- Een Spark SQL-functie expr() : expr("Operation = 'DELETE'")

Deze parameter is optioneel.
apply_as_truncates

Type: str of expr()

Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE. Omdat deze component een volledig afkappen van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke use cases waarvoor deze functionaliteit is vereist.

De apply_as_truncates parameter wordt alleen ondersteund voor SCD-type 1. SCD-type 2 biedt geen ondersteuning voor afkappen van bewerkingen.

U kunt een van de volgende opties opgeven:

- Een tekenreeks: "Operation = 'TRUNCATE'"
- Een Spark SQL-functie expr() : expr("Operation = 'TRUNCATE'")

Deze parameter is optioneel.
column_list

except_column_list

Type: list

Een subset kolommen die moeten worden opgenomen in de doeltabel. Hiermee column_list geeft u de volledige lijst met kolommen op die u wilt opnemen. Gebruik except_column_list dit om de kolommen op te geven die moeten worden uitgesloten. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).

Deze parameter is optioneel.

De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen column_list of except_column_list argument wordt doorgegeven aan de functie.
stored_as_scd_type

Type: str of int

Of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2.

Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2.

Deze component is optioneel.

De standaardwaarde is SCD type 1.
track_history_column_list

track_history_except_column_list

Type: list

Een subset van uitvoerkolommen die moeten worden bijgehouden voor de geschiedenis in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Gebruik
track_history_except_column_list om de kolommen op te geven die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).

Deze parameter is optioneel.

De standaardinstelling is om alle kolommen in de doeltabel op te nemen als dat niet het is track_history_column_list of
track_history_except_column_list argument wordt doorgegeven aan de functie.

Gegevensopname wijzigen van databasemomentopnamen met Python in Delta Live Tables

Belangrijk

De APPLY CHANGES FROM SNAPSHOT API bevindt zich in openbare preview.

Gebruik de apply_changes_from_snapshot() functie in de Python-API om de cdc-functionaliteit (Delta Live Tables change data capture) te gebruiken om brongegevens uit momentopnamen van databases te verwerken.

Belangrijk

U moet een doelstreamingtabel declareren om wijzigingen toe te passen. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de apply_changes_from_snapshot() doeltabel opgeeft, moet u ook de __START_AT en __END_AT kolommen met hetzelfde gegevenstype als het sequence_by veld opnemen.

Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de Python-interface van Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Notitie

Voor APPLY CHANGES FROM SNAPSHOT verwerking is het standaardgedrag het invoegen van een nieuwe rij wanneer een overeenkomende record met dezelfde sleutel(en) niet in het doel bestaat. Als er wel een overeenkomende record bestaat, wordt deze alleen bijgewerkt als een van de waarden in de rij is gewijzigd. Rijen met sleutels die aanwezig zijn in het doel, maar die niet meer aanwezig zijn in de bron, worden verwijderd.

Zie De APPLY CHANGES-API's voor meer informatie over CDC-verwerking met momentopnamen: Het vastleggen van wijzigingen vereenvoudigen met Delta Live Tables. Zie de voorbeelden van periodieke opname van momentopnamen en historische momentopnameopnamen voor voorbeelden van het gebruik van de apply_changes_from_snapshot() functie.

Argumenten
target

Type: str

De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de apply_changes() functie uitvoert.

Deze parameter is vereist.
source

Type: str of lambda function

Ofwel de naam van een tabel of weergave om periodiek een momentopname te maken of een Python-lambda-functie die de dataframe voor momentopnamen retourneert die moet worden verwerkt en de versie van de momentopname. Zie Het bronargument implementeren.

Deze parameter is vereist.
keys

Type: list

De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel.

U kunt een van de volgende opties opgeven:

- Een lijst met tekenreeksen: ["userId", "orderId"]
- Een lijst met Spark SQL-functies col() : [col("userId"), col("orderId"]

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).

Deze parameter is vereist.
stored_as_scd_type

Type: str of int

Of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2.

Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2.

Deze component is optioneel.

De standaardwaarde is SCD type 1.
track_history_column_list

track_history_except_column_list

Type: list

Een subset van uitvoerkolommen die moeten worden bijgehouden voor de geschiedenis in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Gebruik
track_history_except_column_list om de kolommen op te geven die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).

Deze parameter is optioneel.

De standaardinstelling is om alle kolommen in de doeltabel op te nemen als dat niet het is track_history_column_list of
track_history_except_column_list argument wordt doorgegeven aan de functie.

source Het argument implementeren

De apply_changes_from_snapshot() functie bevat het source argument. Voor het verwerken van historische momentopnamen wordt verwacht dat het source argument een Python-lambda-functie is die twee waarden retourneert aan de apply_changes_from_snapshot() functie: een Python DataFrame met de momentopnamegegevens die moeten worden verwerkt en een momentopnameversie.

Hier volgt de handtekening van de lambda-functie:

lambda Any => Optional[(DataFrame, Any)]
  • Het argument voor de lambda-functie is de laatst verwerkte momentopnameversie.
  • De retourwaarde van de lambda-functie is None of een tuple van twee waarden: de eerste waarde van de tuple is een DataFrame dat de momentopname bevat die moet worden verwerkt. De tweede waarde van de tuple is de momentopnameversie die de logische volgorde van de momentopname aangeeft.

Een voorbeeld waarmee de lambda-functie wordt geïmplementeerd en aangeroepen:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

De Delta Live Tables-runtime voert de volgende stappen uit telkens wanneer de pijplijn die de apply_changes_from_snapshot() functie bevat, wordt geactiveerd:

  1. Hiermee wordt de next_snapshot_and_version functie uitgevoerd om het volgende dataframe voor momentopnamen en de bijbehorende momentopnameversie te laden.
  2. Als er geen DataFrame wordt geretourneerd, wordt de uitvoering beëindigd en wordt de pijplijnupdate gemarkeerd als voltooid.
  3. Detecteert de wijzigingen in de nieuwe momentopname en past deze incrementeel toe op de doeltabel.
  4. Keert terug naar stap 1 om de volgende momentopname en de bijbehorende versie te laden.

Beperkingen

De Python-interface voor Delta Live Tables heeft de volgende beperking:

De pivot() functie wordt niet ondersteund. Voor de pivot bewerking in Spark is het laden van invoergegevens vereist om het schema van de uitvoer te berekenen. Deze mogelijkheid wordt niet ondersteund in Delta Live Tables.