Megosztás a következőn keresztül:


Delta Live Tables Python nyelvi referencia

Ez a cikk a Delta Live Tables Python programozási felületének részleteit tartalmazza.

Az SQL API-val kapcsolatos információkért tekintse meg a Delta Live Tables SQL nyelvi referenciáit.

Az automatikus betöltő konfigurálásával kapcsolatos részletekért lásd: Mi az automatikus betöltő?

Előkészületek

A folyamatok Delta Live Tables Python-felülettel történő implementálása során az alábbiakat érdemes figyelembe venni:

  • Mivel a Folyamatfrissítés tervezése és futtatása során a Python table() és view() a függvények többször is meghívásra kerülnek, ne tartalmazzon olyan kódot ezen függvények egyikében, amelyek esetleg mellékhatásokat okoznak (például az adatokat módosító vagy e-mailt küldő kód). A váratlan viselkedés elkerülése érdekében az adathalmazokat meghatározó Python-függvényeknek csak a tábla vagy nézet definiálásához szükséges kódot kell tartalmazniuk.
  • Az olyan műveletek végrehajtásához, mint az e-mailek küldése vagy egy külső monitorozási szolgáltatással való integráció, különösen az adathalmazokat meghatározó függvényekben, használjon eseményhookokat. Ha ezeket a műveleteket az adathalmazokat meghatározó függvényekben implementálja, az váratlan viselkedést fog okozni.
  • A Pythonnak table és view a függvénynek egy DataFrame-et kell visszaadnia. A DataFrame-en működő egyes függvények nem adnak vissza DataFrame-eket, ezért nem használhatók. Ezek a műveletek olyan függvényeket tartalmaznak, mint collect()a , count(), toPandas(), save()és saveAsTable(). Mivel a DataFrame-átalakítások végrehajtása a teljes adatfolyam-gráf feloldása után történik, az ilyen műveletek használata nem várt mellékhatásokat eredményezhet.

A Python-modul dlt importálása

A Delta Live Tables Python-függvényei a dlt modulban vannak definiálva. A Python API-val implementált folyamatoknak importálnia kell ezt a modult:

import dlt

Delta Live Tables materialized nézet vagy streamelési tábla létrehozása

A Pythonban a Delta Live Tables meghatározza, hogy egy adathalmazt materializált nézetként vagy streamelési táblázatként kell-e frissíteni a definiáló lekérdezés alapján. A @table dekorátor a materializált nézetek és a streamelési táblák definiálására is használható.

Ha materializált nézetet szeretne definiálni a Pythonban, alkalmazzon @table egy olyan lekérdezésre, amely statikus olvasást végez egy adatforráson. Streamelési tábla definiálásához alkalmazzon @table olyan lekérdezésre, amely streamelési olvasást végez egy adatforráson, vagy használja a create_streaming_table() függvényt. Mindkét adathalmaztípus szintaxis-specifikációja megegyezik az alábbiakkal:

Feljegyzés

A folyékony fürtözés engedélyezéséhez az cluster_by argumentumot úgy kell konfigurálni, hogy az előnézeti csatornát használja.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Delta Live Tables nézet létrehozása

A nézet Pythonban való definiálásához alkalmazza a dekoratőrt @view . A @table dekoratőrhöz hasonlóan a Delta Live Tables nézetei statikus vagy streamelt adathalmazokhoz is használhatók. A Nézetek Pythonnal való definiálása a következő szintaxist tartalmazza:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Példa: Táblák és nézetek definiálása

Ha pythonos táblát vagy nézetet szeretne definiálni, alkalmazza a @dlt.view dekorátort egy @dlt.table függvényre. A függvény vagy a name paraméter használatával rendelheti hozzá a tábla vagy a nézet nevét. Az alábbi példa két különböző adatkészletet határoz meg: egy úgynevezett taxi_raw nézetet, amely egy JSON-fájlt fogad bemeneti forrásként, és egy táblát, filtered_data amely bemenetként veszi a taxi_raw nézetet:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Példa: Hozzáférés az ugyanabban a folyamatban definiált adatkészlethez

A külső adatforrásokból való olvasás mellett az ugyanabban a folyamatban definiált adathalmazokhoz is hozzáférhet a Delta Live Tables read() függvénnyel. Az alábbi példa bemutatja, customers_filtered hogyan hozhat létre adathalmazt a read() függvény használatával:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

A függvény használatával is elérheti az ugyanabban a spark.table() folyamatban definiált adatkészletet. Amikor a függvényt a spark.table() folyamatban definiált adathalmaz eléréséhez használja, a függvényargumentumban a kulcsszót az adathalmaz nevére előfüggegeli LIVE :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Példa: Olvasás metaadattárban regisztrált táblából

Ha a Hive metaadattárban regisztrált táblából szeretne adatokat olvasni, a függvényargumentumban hagyja ki a LIVE kulcsszót, és opcionálisan minősítse a tábla nevét az adatbázis nevével:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Ha egy Unity Catalog-táblából olvas be egy példát, tekintse meg az adatok Unity Catalog-folyamatba való betöltését.

Példa: Adathalmaz elérése spark.sql

Egy adathalmazt egy lekérdezési függvényben lévő kifejezéssel spark.sql is visszaadhat. Belső adatkészletből való olvasáshoz az adathalmaz nevére van előre felfűzve LIVE. :

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

A streamelési műveletek céltáblájának létrehozása

create_streaming_table() A függvény használatával létrehozhat egy céltáblát a streamelési műveletek által előállított rekordokhoz, beleértve a apply_changes(), apply_changes_from_snapshot() és @append_flow kimeneti rekordokat.

Feljegyzés

A create_target_table() függvények elavultak create_streaming_live_table() . A Databricks a meglévő kód frissítését javasolja a create_streaming_table() függvény használatához.

Feljegyzés

A folyékony fürtözés engedélyezéséhez az cluster_by argumentumot úgy kell konfigurálni, hogy az előnézeti csatornát használja.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumentumok
name

Típus: str

A tábla neve.

Ez a paraméter kötelező.
comment

Típus: str

A tábla nem kötelező leírása.
spark_conf

Típus: dict

A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája.
table_properties

Típus: dict

A tábla tulajdonságainak választható listája.
partition_cols

Típus: array

A tábla particionálásához használandó egy vagy több oszlop választható listája.
cluster_by

Típus: array

Ha szeretné, engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat.

Lásd: Folyékony fürtözés használata Delta-táblákhoz.
path

Típus: str

A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárolási helyére lesz beállítva.
schema

Típus: str vagy StructType

A tábla választható sémadefiníciója. A sémák definiálhatók SQL DDL-sztringként vagy Pythonnal
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Típus: dict

A tábla opcionális adatminőségi korlátozásai. Több elvárás is megjelenik.
row_filter (Nyilvános előzetes verzió)

Típus: str

A tábla választható sorszűrő záradéka. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.

Táblák materializálásának szabályozása

A táblák a materializációjuk további szabályozását is lehetővé teszi:

  • A táblák particionálási partition_colsmódjának megadása. A particionálással felgyorsíthatja a lekérdezéseket.
  • Nézet vagy tábla definiálásakor táblatulajdonságokat állíthat be. Lásd: Delta Live Tables táblatulajdonságok.
  • A beállítással path állítsa be a táblaadatok tárolási helyét. A táblaadatok alapértelmezés szerint a folyamat tárolási helyén lesznek tárolva, ha path nincs beállítva.
  • A létrehozott oszlopokat használhatja a sémadefinícióban. Lásd például : Séma- és partícióoszlopok megadása.

Feljegyzés

Az 1 TB-nál kisebb méretű táblák esetében a Databricks azt javasolja, hogy a Delta Live Tables vezérelje az adatszervezést. Csak akkor adjon meg partícióoszlopokat, ha a tábla terabájton túli növekedésre számít.

Példa: Séma- és partícióoszlopok megadása

A táblázatsémát python vagy SQL DDL-sztring StructType használatával is megadhatja. Ha DDL-sztringgel van megadva, a definíció tartalmazhat generált oszlopokat.

Az alábbi példa egy Python StructTypehasználatával megadott sémával létrehozott sales táblát hoz létre:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Az alábbi példa egy tábla sémáját határozza meg egy DDL-sztring használatával, definiál egy generált oszlopot, és definiál egy partícióoszlopot:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Alapértelmezés szerint a Delta Live Tables a séma definíciójából table következtet, ha nem ad meg sémát.

Streamelési tábla konfigurálása a forrásstreamelési tábla módosításainak figyelmen kívül hagyásához

Feljegyzés

  • A skipChangeCommits jelölő csak a spark.readStream option() függvény használatával működik. Ezt a jelölőt nem használhatja függvényekben dlt.read_stream() .
  • A jelölő nem használható skipChangeCommits , ha a forrásstreamelési tábla egy apply_changes() függvény céljaként van definiálva.

Alapértelmezés szerint a streamelési táblák csak hozzáfűző forrásokat igényelnek. Ha egy streamelő tábla egy másik streamelési táblát használ forrásként, és a forrásstreamelési tábla frissítéseket vagy törléseket igényel, például a GDPR "az elfelejtettséghez való jog" feldolgozását, a skipChangeCommits jelölő a forrásstreamelési tábla olvasásakor beállítható, hogy figyelmen kívül hagyja ezeket a módosításokat. A jelölőről további információt a frissítések és törlések figyelmen kívül hagyása című témakörben talál.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Példa: Táblakorlátozások definiálása

Fontos

A táblakorlátozások nyilvános előzetes verzióban érhetők el.

Séma megadásakor megadhatja az elsődleges és az idegen kulcsokat. A korlátozások tájékoztató jellegűek, és nincsenek kényszerítve. Tekintse meg a CONSTRAINT záradékot az SQL nyelvi referenciájában.

Az alábbi példa egy elsődleges és idegenkulcs-korlátozással rendelkező táblát határoz meg:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Példa: Sorszűrő és oszlopmaszk definiálása

Fontos

A sorszűrők és az oszlopmaszkok nyilvános előzetes verzióban érhetők el.

Ha egy materializált nézetet vagy streamelési táblát szeretne létrehozni sorszűrővel és oszlopmaszkkal, használja a SORSZŰRŐ záradékot és a MASZK záradékot. Az alábbi példa bemutatja, hogyan definiálhat materializált nézetet és streamelési táblázatot sorszűrővel és oszlopmaszkkal is:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

A sorszűrőkkel és az oszlopmaszkokkal kapcsolatos további információkért lásd : Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.

Python Delta Live Tables tulajdonságai

Az alábbi táblázatok ismertetik a delta élő táblákkal való táblák és nézetek definiálása során megadható beállításokat és tulajdonságokat:

Feljegyzés

A folyékony fürtözés engedélyezéséhez az cluster_by argumentumot úgy kell konfigurálni, hogy az előnézeti csatornát használja.

@table vagy @view
name

Típus: str

A tábla vagy nézet nem kötelező neve. Ha nincs megadva, a függvény neve tábla- vagy nézetnévként lesz használva.
comment

Típus: str

A tábla nem kötelező leírása.
spark_conf

Típus: dict

A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája.
table_properties

Típus: dict

A tábla tulajdonságainak választható listája.
path

Típus: str

A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárolási helyére lesz beállítva.
partition_cols

Típus: a collection of str

Választható gyűjtemény, például a list tábla particionálásához használandó egy vagy több oszlop.
cluster_by

Típus: array

Ha szeretné, engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat.

Lásd: Folyékony fürtözés használata Delta-táblákhoz.
schema

Típus: str vagy StructType

A tábla választható sémadefiníciója. A sémák SQL DDL-sztringként vagy Python-sztringként StructTypeis definiálhatók.
temporary

Típus: bool

Hozzon létre egy táblát, de ne tegye közzé a tábla metaadatait. A temporary kulcsszó arra utasítja a Delta Live Tableset, hogy hozzon létre egy táblát, amely elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A feldolgozási idő csökkentése érdekében egy ideiglenes tábla megmarad az azt létrehozó folyamat teljes élettartama alatt, és nem csak egyetlen frissítéssel.

Az alapértelmezett érték a "False".
row_filter (Nyilvános előzetes verzió)

Típus: str

A tábla választható sorszűrő záradéka. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.
Tábla- vagy nézetdefiníció
def <function-name>()

Az adathalmazt meghatározó Python-függvény. Ha a name paraméter nincs beállítva, akkor <function-name> a rendszer a céladatkészlet neveként használja.
query

Spark SQL-utasítás, amely Spark-adatkészletet vagy Koalas DataFrame-et ad vissza.

Az ugyanabban a folyamatban definiált adathalmaz teljes olvasásának használata dlt.read() vagy spark.table() végrehajtása. Ha a függvényt használja az ugyanabban a spark.table() folyamatban definiált adatkészletből való olvasásra, a kulcsszót LIVE a függvényargumentumban szereplő adathalmaz nevére kell előszűrésként használni. Például a következő nevű customersadatkészletből való olvasáshoz:

spark.table("LIVE.customers")

A függvény használatával spark.table() a metaadattárban regisztrált táblákból is olvashat, ha kihagyja a LIVE kulcsszót, és opcionálisan minősíti a tábla nevét az adatbázis nevével:

spark.table("sales.customers")

Streamelési olvasás végrehajtására használható dlt.read_stream() az ugyanabban a folyamatban definiált adatkészletből.

A függvény használatával spark.sql definiálhat egy SQL-lekérdezést a visszatérési adatkészlet létrehozásához.

A PySpark szintaxisával Delta Live Tables-lekérdezéseket definiálhat a Pythonnal.
Elvárások
@expect("description", "constraint")

Adatminőségi korlátozás deklarálása
description. Ha egy sor megsérti a várakozást, vegye fel a sort a céladatkészletbe.
@expect_or_drop("description", "constraint")

Adatminőségi korlátozás deklarálása
description. Ha egy sor megsérti a várakozást, ejtse el a sort a céladatkészletből.
@expect_or_fail("description", "constraint")

Adatminőségi korlátozás deklarálása
description. Ha egy sor megsérti az elvárást, azonnal állítsa le a végrehajtást.
@expect_all(expectations)

Deklarálhat egy vagy több adatminőségi korlátozást.
expectations Egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megsérti, a céladatkészletbe foglalja bele a sort.
@expect_all_or_drop(expectations)

Deklarálhat egy vagy több adatminőségi korlátozást.
expectations Egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megszegi, a céladatkészletből ejtse ki a sort.
@expect_all_or_fail(expectations)

Deklarálhat egy vagy több adatminőségi korlátozást.
expectations Egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármilyen elvárást megsért, azonnal állítsa le a végrehajtást.

Adatrögzítés módosítása változáscsatornából a Pythonnal a Delta Live Tablesben

apply_changes() A Python API-ban található függvény használatával a Delta Live Tables change data capture (CDC) funkcióval feldolgozhatja a forrásadatokat egy változásadatcsatornából (CDF).

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltábla sémájának apply_changes() megadásakor a __START_AT mezőkkel azonos adattípusú sequence_by és __END_AT oszlopokat kell tartalmaznia.

A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a Delta Live Tables Python felületén.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Feljegyzés

A feldolgozáshoz APPLY CHANGES az alapértelmezett viselkedés INSERT és UPDATE események a CDC-események forrásból való beolvasása : frissítse a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy szúrjon be egy új sort, ha egyező rekord nem található a céltáblában. Az események kezelése DELETE a APPLY AS DELETE WHEN feltétellel adható meg.

Ha többet szeretne megtudni a CDC változáscsatornával történő feldolgozásáról, olvassa el a VÁLTOZÁSOK ALKALMAZÁSA API-kat: Egyszerűsítse a változásadatok rögzítését a Delta Live Tables használatával. A függvény használatára apply_changes() példa: Példa: SCD 1. és SCD 2. típusú feldolgozás CDF-forrásadatokkal.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltáblaséma megadásakor apply_changes a mezővel azonos adattípusú és __END_AT típusú oszlopokat kell tartalmaznia sequence_by __START_AT.

Lásd : AZ APPLY CHANGES API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal.

Argumentumok
target

Típus: str

A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény végrehajtása előtt létrehozhatja a céltáblát apply_changes() .

Ez a paraméter kötelező.
source

Típus: str

A CDC-rekordokat tartalmazó adatforrás.

Ez a paraméter kötelező.
keys

Típus: list

Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.

Megadhatja az alábbiakat:

- Sztringek listája: ["userId", "orderId"]
- A Spark SQL-függvények col() listája: [col("userId"), col("orderId"]

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).

Ez a paraméter kötelező.
sequence_by

Típus: str vagy col()

Az oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A Delta Live Tables ezzel a szekvenálással kezeli a sorrendből érkező módosítási eseményeket.

Megadhatja az alábbiakat:

- Egy sztring: "sequenceNum"
- Spark SQL-függvény col() : col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).

Ez a paraméter kötelező.
ignore_null_updates

Típus: bool

A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény egyezik egy meglévő sortalnull, és ignore_null_updates az, az Trueoszlopok megőrzik a célban meglévő értékeiket. Ez a beágyazott oszlopokra is vonatkozik, amelynek értéke a .null Ha ignore_null_updates igen, a Falsemeglévő értékek felülíródnak értékekkel null .

Ez a paraméter nem kötelező.

Az alapértelmezett érték False.
apply_as_deletes

Típus: str vagy expr()

Megadja, hogy a CDC-eseményeket mikor kell upsert helyett inkább upsertként DELETE kezelni. A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz konfigurálható a következővel:
pipelines.cdc.tombstoneGCThresholdInSecondstáblatulajdonság.

Megadhatja az alábbiakat:

- Egy sztring: "Operation = 'DELETE'"
- Spark SQL-függvény expr() : expr("Operation = 'DELETE'")

Ez a paraméter nem kötelező.
apply_as_truncates

Típus: str vagy expr()

Megadja, hogy a CDC-események mikor legyenek teljes táblaként TRUNCATEkezelve. Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.

A apply_as_truncates paraméter csak az 1. SCD-típus esetén támogatott. A 2. SCD-típus nem támogatja a csonkolási műveleteket.

Megadhatja az alábbiakat:

- Egy sztring: "Operation = 'TRUNCATE'"
- Spark SQL-függvény expr() : expr("Operation = 'TRUNCATE'")

Ez a paraméter nem kötelező.
column_list

except_column_list

Típus: list

A céltáblában szerepeltetni kívánt oszlopok egy részhalmaza. A belefoglalandó oszlopok teljes listájának megadására használható column_list . Itt except_column_list adhatja meg a kizárni kívánt oszlopokat. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).

Ez a paraméter nem kötelező.

Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem column_list vagy except_column_list argumentumot ad át a függvénynek.
stored_as_scd_type

Típus: str vagy int

A rekordok tárolása 1. vagy 2. SCD-típusként.

1 Az SCD 1- vagy 2 2-es SCD-típusra van állítva.

Ennek a záradéknak a használata nem kötelező.

Az alapértelmezett scd típus 1.
track_history_column_list

track_history_except_column_list

Típus: list

A céltábla előzményeihez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . Használat
track_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).

Ez a paraméter nem kötelező.

Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha track_history_column_list
track_history_except_column_list argumentumot ad át a függvénynek.

Adatrögzítés módosítása adatbázis-pillanatképekből a Pythonnal a Delta Live Tablesben

Fontos

Az APPLY CHANGES FROM SNAPSHOT API nyilvános előzetes verzióban érhető el.

A Python API függvényével apply_changes_from_snapshot() a Delta Live Tables adatrögzítési (CDC) funkcióját használhatja a forrásadatok adatbázis-pillanatképekből való feldolgozásához.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltábla sémájának apply_changes_from_snapshot() megadásakor a __START_AT mezővel azonos adattípusú sequence_by és __END_AT oszlopokat is tartalmaznia kell.

A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a Delta Live Tables Python felületén.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Feljegyzés

A feldolgozáshoz APPLY CHANGES FROM SNAPSHOT az alapértelmezett viselkedés egy új sor beszúrása, ha egy azonos kulccsal(ok) rendelkező rekord nem létezik a célban. Ha létezik egyező rekord, az csak akkor frissül, ha a sorban szereplő értékek bármelyike módosult. A célban található, de a forrásban már nem szereplő kulcsokat tartalmazó sorok törlődnek.

A CDC pillanatképekkel történő feldolgozásával kapcsolatos további információkért lásd : A VÁLTOZÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal. A függvény használatára vonatkozó példákért tekintse meg a apply_changes_from_snapshot() rendszeres pillanatkép-betöltési és az előzmény-pillanatkép-betöltési példákat.

Argumentumok
target

Típus: str

A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény futtatása előtt létrehozhatja a céltáblát apply_changes() .

Ez a paraméter kötelező.
source

Típus: str vagy lambda function

Egy táblázat vagy nézet neve, amely rendszeres időközönként pillanatképet hoz létre, vagy egy Python lambda függvény, amely visszaadja a feldolgozandó pillanatkép dataFrame-et és a pillanatkép verzióját. Lásd: A forrás argumentum implementálása.

Ez a paraméter kötelező.
keys

Típus: list

Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.

Megadhatja az alábbiakat:

- Sztringek listája: ["userId", "orderId"]
- A Spark SQL-függvények col() listája: [col("userId"), col("orderId"]

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).

Ez a paraméter kötelező.
stored_as_scd_type

Típus: str vagy int

A rekordok tárolása 1. vagy 2. SCD-típusként.

1 Az SCD 1- vagy 2 2-es SCD-típusra van állítva.

Ennek a záradéknak a használata nem kötelező.

Az alapértelmezett scd típus 1.
track_history_column_list

track_history_except_column_list

Típus: list

A céltábla előzményeihez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . Használat
track_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).

Ez a paraméter nem kötelező.

Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha track_history_column_list
track_history_except_column_list argumentumot ad át a függvénynek.

Az source argumentum megvalósítása

A apply_changes_from_snapshot() függvény tartalmazza az argumentumot source . Az előzmény pillanatképek feldolgozásához az source argumentum egy Python lambda függvény lesz, amely két értéket ad vissza a apply_changes_from_snapshot() függvénynek: a feldolgozandó pillanatképadatokat tartalmazó Python DataFrame-et és egy pillanatkép-verziót.

A lambda függvény aláírása a következő:

lambda Any => Optional[(DataFrame, Any)]
  • A lambda függvény argumentuma a legutóbb feldolgozott pillanatkép-verzió.
  • A lambda függvény None visszatérési értéke vagy két értékből álló rekord: A rekord első értéke a feldolgozandó pillanatképet tartalmazó DataFrame. A rekord második értéke a pillanatkép-verzió, amely a pillanatkép logikai sorrendjét jelöli.

Példa a lambda függvény implementálására és meghívására:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

A Delta Live Tables futtatókörnyezet a következő lépéseket hajtja végre minden alkalommal, amikor a függvényt apply_changes_from_snapshot() tartalmazó folyamat aktiválódik:

  1. A függvény futtatásával next_snapshot_and_version betölti a következő pillanatkép-adatkeretet és a megfelelő pillanatkép-verziót.
  2. Ha a DataFrame nem ad vissza, a futtatás leáll, és a folyamatfrissítés befejezettként van megjelölve.
  3. Észleli az új pillanatkép módosításait, és növekményesen alkalmazza őket a céltáblára.
  4. Visszatér az 1. lépéshez, hogy betöltse a következő pillanatképet és annak verzióját.

Korlátozások

A Delta Live Tables Python felülete a következő korlátozásokkal rendelkezik:

A pivot() függvény nem támogatott. A pivot Spark-művelethez a bemeneti adatok lelkes betöltésére van szükség a kimenet sémájának kiszámításához. Ez a képesség nem támogatott a Delta Live Tablesben.