Delta Live Tables Python nyelvi referencia
Ez a cikk a Delta Live Tables Python programozási felületének részleteit tartalmazza.
Az SQL API-val kapcsolatos információkért tekintse meg a Delta Live Tables SQL nyelvi referenciáit.
Az automatikus betöltő konfigurálásával kapcsolatos részletekért lásd: Mi az automatikus betöltő?
Előkészületek
A folyamatok Delta Live Tables Python-felülettel történő implementálása során az alábbiakat érdemes figyelembe venni:
- Mivel a Folyamatfrissítés tervezése és futtatása során a Python
table()
ésview()
a függvények többször is meghívásra kerülnek, ne tartalmazzon olyan kódot ezen függvények egyikében, amelyek esetleg mellékhatásokat okoznak (például az adatokat módosító vagy e-mailt küldő kód). A váratlan viselkedés elkerülése érdekében az adathalmazokat meghatározó Python-függvényeknek csak a tábla vagy nézet definiálásához szükséges kódot kell tartalmazniuk. - Az olyan műveletek végrehajtásához, mint az e-mailek küldése vagy egy külső monitorozási szolgáltatással való integráció, különösen az adathalmazokat meghatározó függvényekben, használjon eseményhookokat. Ha ezeket a műveleteket az adathalmazokat meghatározó függvényekben implementálja, az váratlan viselkedést fog okozni.
- A Pythonnak
table
ésview
a függvénynek egy DataFrame-et kell visszaadnia. A DataFrame-en működő egyes függvények nem adnak vissza DataFrame-eket, ezért nem használhatók. Ezek a műveletek olyan függvényeket tartalmaznak, mintcollect()
a ,count()
,toPandas()
,save()
éssaveAsTable()
. Mivel a DataFrame-átalakítások végrehajtása a teljes adatfolyam-gráf feloldása után történik, az ilyen műveletek használata nem várt mellékhatásokat eredményezhet.
A Python-modul dlt
importálása
A Delta Live Tables Python-függvényei a dlt
modulban vannak definiálva. A Python API-val implementált folyamatoknak importálnia kell ezt a modult:
import dlt
Delta Live Tables materialized nézet vagy streamelési tábla létrehozása
A Pythonban a Delta Live Tables meghatározza, hogy egy adathalmazt materializált nézetként vagy streamelési táblázatként kell-e frissíteni a definiáló lekérdezés alapján. A @table
dekorátor a materializált nézetek és a streamelési táblák definiálására is használható.
Ha materializált nézetet szeretne definiálni a Pythonban, alkalmazzon @table
egy olyan lekérdezésre, amely statikus olvasást végez egy adatforráson. Streamelési tábla definiálásához alkalmazzon @table
olyan lekérdezésre, amely streamelési olvasást végez egy adatforráson, vagy használja a create_streaming_table() függvényt. Mindkét adathalmaztípus szintaxis-specifikációja megegyezik az alábbiakkal:
Feljegyzés
A folyékony fürtözés engedélyezéséhez az cluster_by
argumentumot úgy kell konfigurálni, hogy az előnézeti csatornát használja.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Delta Live Tables nézet létrehozása
A nézet Pythonban való definiálásához alkalmazza a dekoratőrt @view
. A @table
dekoratőrhöz hasonlóan a Delta Live Tables nézetei statikus vagy streamelt adathalmazokhoz is használhatók. A Nézetek Pythonnal való definiálása a következő szintaxist tartalmazza:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Példa: Táblák és nézetek definiálása
Ha pythonos táblát vagy nézetet szeretne definiálni, alkalmazza a @dlt.view
dekorátort egy @dlt.table
függvényre. A függvény vagy a name
paraméter használatával rendelheti hozzá a tábla vagy a nézet nevét. Az alábbi példa két különböző adatkészletet határoz meg: egy úgynevezett taxi_raw
nézetet, amely egy JSON-fájlt fogad bemeneti forrásként, és egy táblát, filtered_data
amely bemenetként veszi a taxi_raw
nézetet:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Példa: Hozzáférés az ugyanabban a folyamatban definiált adatkészlethez
A külső adatforrásokból való olvasás mellett az ugyanabban a folyamatban definiált adathalmazokhoz is hozzáférhet a Delta Live Tables read()
függvénnyel. Az alábbi példa bemutatja, customers_filtered
hogyan hozhat létre adathalmazt a read()
függvény használatával:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
A függvény használatával is elérheti az ugyanabban a spark.table()
folyamatban definiált adatkészletet. Amikor a függvényt a spark.table()
folyamatban definiált adathalmaz eléréséhez használja, a függvényargumentumban a kulcsszót az adathalmaz nevére előfüggegeli LIVE
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Példa: Olvasás metaadattárban regisztrált táblából
Ha a Hive metaadattárban regisztrált táblából szeretne adatokat olvasni, a függvényargumentumban hagyja ki a LIVE
kulcsszót, és opcionálisan minősítse a tábla nevét az adatbázis nevével:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Ha egy Unity Catalog-táblából olvas be egy példát, tekintse meg az adatok Unity Catalog-folyamatba való betöltését.
Példa: Adathalmaz elérése spark.sql
Egy adathalmazt egy lekérdezési függvényben lévő kifejezéssel spark.sql
is visszaadhat. Belső adatkészletből való olvasáshoz az adathalmaz nevére van előre felfűzve LIVE.
:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
A streamelési műveletek céltáblájának létrehozása
create_streaming_table()
A függvény használatával létrehozhat egy céltáblát a streamelési műveletek által előállított rekordokhoz, beleértve a apply_changes(), apply_changes_from_snapshot() és @append_flow kimeneti rekordokat.
Feljegyzés
A create_target_table()
függvények elavultak create_streaming_live_table()
. A Databricks a meglévő kód frissítését javasolja a create_streaming_table()
függvény használatához.
Feljegyzés
A folyékony fürtözés engedélyezéséhez az cluster_by
argumentumot úgy kell konfigurálni, hogy az előnézeti csatornát használja.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumentumok |
---|
name Típus: str A tábla neve. Ez a paraméter kötelező. |
comment Típus: str A tábla nem kötelező leírása. |
spark_conf Típus: dict A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája. |
table_properties Típus: dict A tábla tulajdonságainak választható listája. |
partition_cols Típus: array A tábla particionálásához használandó egy vagy több oszlop választható listája. |
cluster_by Típus: array Ha szeretné, engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat. Lásd: Folyékony fürtözés használata Delta-táblákhoz. |
path Típus: str A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárolási helyére lesz beállítva. |
schema Típus: str vagy StructType A tábla választható sémadefiníciója. A sémák definiálhatók SQL DDL-sztringként vagy Pythonnal StructType . |
expect_all expect_all_or_drop expect_all_or_fail Típus: dict A tábla opcionális adatminőségi korlátozásai. Több elvárás is megjelenik. |
row_filter (Nyilvános előzetes verzió)Típus: str A tábla választható sorszűrő záradéka. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal. |
Táblák materializálásának szabályozása
A táblák a materializációjuk további szabályozását is lehetővé teszi:
- A táblák particionálási
partition_cols
módjának megadása. A particionálással felgyorsíthatja a lekérdezéseket. - Nézet vagy tábla definiálásakor táblatulajdonságokat állíthat be. Lásd: Delta Live Tables táblatulajdonságok.
- A beállítással
path
állítsa be a táblaadatok tárolási helyét. A táblaadatok alapértelmezés szerint a folyamat tárolási helyén lesznek tárolva, hapath
nincs beállítva. - A létrehozott oszlopokat használhatja a sémadefinícióban. Lásd például : Séma- és partícióoszlopok megadása.
Feljegyzés
Az 1 TB-nál kisebb méretű táblák esetében a Databricks azt javasolja, hogy a Delta Live Tables vezérelje az adatszervezést. Csak akkor adjon meg partícióoszlopokat, ha a tábla terabájton túli növekedésre számít.
Példa: Séma- és partícióoszlopok megadása
A táblázatsémát python vagy SQL DDL-sztring StructType
használatával is megadhatja. Ha DDL-sztringgel van megadva, a definíció tartalmazhat generált oszlopokat.
Az alábbi példa egy Python StructType
használatával megadott sémával létrehozott sales
táblát hoz létre:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Az alábbi példa egy tábla sémáját határozza meg egy DDL-sztring használatával, definiál egy generált oszlopot, és definiál egy partícióoszlopot:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Alapértelmezés szerint a Delta Live Tables a séma definíciójából table
következtet, ha nem ad meg sémát.
Streamelési tábla konfigurálása a forrásstreamelési tábla módosításainak figyelmen kívül hagyásához
Feljegyzés
- A
skipChangeCommits
jelölő csak aspark.readStream
option()
függvény használatával működik. Ezt a jelölőt nem használhatja függvényekbendlt.read_stream()
. - A jelölő nem használható
skipChangeCommits
, ha a forrásstreamelési tábla egy apply_changes() függvény céljaként van definiálva.
Alapértelmezés szerint a streamelési táblák csak hozzáfűző forrásokat igényelnek. Ha egy streamelő tábla egy másik streamelési táblát használ forrásként, és a forrásstreamelési tábla frissítéseket vagy törléseket igényel, például a GDPR "az elfelejtettséghez való jog" feldolgozását, a skipChangeCommits
jelölő a forrásstreamelési tábla olvasásakor beállítható, hogy figyelmen kívül hagyja ezeket a módosításokat. A jelölőről további információt a frissítések és törlések figyelmen kívül hagyása című témakörben talál.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Példa: Táblakorlátozások definiálása
Fontos
A táblakorlátozások nyilvános előzetes verzióban érhetők el.
Séma megadásakor megadhatja az elsődleges és az idegen kulcsokat. A korlátozások tájékoztató jellegűek, és nincsenek kényszerítve. Tekintse meg a CONSTRAINT záradékot az SQL nyelvi referenciájában.
Az alábbi példa egy elsődleges és idegenkulcs-korlátozással rendelkező táblát határoz meg:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Példa: Sorszűrő és oszlopmaszk definiálása
Fontos
A sorszűrők és az oszlopmaszkok nyilvános előzetes verzióban érhetők el.
Ha egy materializált nézetet vagy streamelési táblát szeretne létrehozni sorszűrővel és oszlopmaszkkal, használja a SORSZŰRŐ záradékot és a MASZK záradékot. Az alábbi példa bemutatja, hogyan definiálhat materializált nézetet és streamelési táblázatot sorszűrővel és oszlopmaszkkal is:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
A sorszűrőkkel és az oszlopmaszkokkal kapcsolatos további információkért lásd : Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal.
Python Delta Live Tables tulajdonságai
Az alábbi táblázatok ismertetik a delta élő táblákkal való táblák és nézetek definiálása során megadható beállításokat és tulajdonságokat:
Feljegyzés
A folyékony fürtözés engedélyezéséhez az cluster_by
argumentumot úgy kell konfigurálni, hogy az előnézeti csatornát használja.
@table vagy @view |
---|
name Típus: str A tábla vagy nézet nem kötelező neve. Ha nincs megadva, a függvény neve tábla- vagy nézetnévként lesz használva. |
comment Típus: str A tábla nem kötelező leírása. |
spark_conf Típus: dict A lekérdezés végrehajtásához szükséges Spark-konfigurációk választható listája. |
table_properties Típus: dict A tábla tulajdonságainak választható listája. |
path Típus: str A táblaadatok opcionális tárolási helye. Ha nincs beállítva, a rendszer alapértelmezés szerint a folyamat tárolási helyére lesz beállítva. |
partition_cols Típus: a collection of str Választható gyűjtemény, például a list tábla particionálásához használandó egy vagy több oszlop. |
cluster_by Típus: array Ha szeretné, engedélyezze a folyékony fürtözést a táblán, és definiálja a fürtözési kulcsként használni kívánt oszlopokat. Lásd: Folyékony fürtözés használata Delta-táblákhoz. |
schema Típus: str vagy StructType A tábla választható sémadefiníciója. A sémák SQL DDL-sztringként vagy Python-sztringként StructType is definiálhatók. |
temporary Típus: bool Hozzon létre egy táblát, de ne tegye közzé a tábla metaadatait. A temporary kulcsszó arra utasítja a Delta Live Tableset, hogy hozzon létre egy táblát, amely elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A feldolgozási idő csökkentése érdekében egy ideiglenes tábla megmarad az azt létrehozó folyamat teljes élettartama alatt, és nem csak egyetlen frissítéssel.Az alapértelmezett érték a "False". |
row_filter (Nyilvános előzetes verzió)Típus: str A tábla választható sorszűrő záradéka. Lásd: Táblázatok közzététele sorszűrőkkel és oszlopmaszkokkal. |
Tábla- vagy nézetdefiníció |
---|
def <function-name>() Az adathalmazt meghatározó Python-függvény. Ha a name paraméter nincs beállítva, akkor <function-name> a rendszer a céladatkészlet neveként használja. |
query Spark SQL-utasítás, amely Spark-adatkészletet vagy Koalas DataFrame-et ad vissza. Az ugyanabban a folyamatban definiált adathalmaz teljes olvasásának használata dlt.read() vagy spark.table() végrehajtása. Ha a függvényt használja az ugyanabban a spark.table() folyamatban definiált adatkészletből való olvasásra, a kulcsszót LIVE a függvényargumentumban szereplő adathalmaz nevére kell előszűrésként használni. Például a következő nevű customers adatkészletből való olvasáshoz:spark.table("LIVE.customers") A függvény használatával spark.table() a metaadattárban regisztrált táblákból is olvashat, ha kihagyja a LIVE kulcsszót, és opcionálisan minősíti a tábla nevét az adatbázis nevével:spark.table("sales.customers") Streamelési olvasás végrehajtására használható dlt.read_stream() az ugyanabban a folyamatban definiált adatkészletből.A függvény használatával spark.sql definiálhat egy SQL-lekérdezést a visszatérési adatkészlet létrehozásához.A PySpark szintaxisával Delta Live Tables-lekérdezéseket definiálhat a Pythonnal. |
Elvárások |
---|
@expect("description", "constraint") Adatminőségi korlátozás deklarálása description . Ha egy sor megsérti a várakozást, vegye fel a sort a céladatkészletbe. |
@expect_or_drop("description", "constraint") Adatminőségi korlátozás deklarálása description . Ha egy sor megsérti a várakozást, ejtse el a sort a céladatkészletből. |
@expect_or_fail("description", "constraint") Adatminőségi korlátozás deklarálása description . Ha egy sor megsérti az elvárást, azonnal állítsa le a végrehajtást. |
@expect_all(expectations) Deklarálhat egy vagy több adatminőségi korlátozást. expectations Egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megsérti, a céladatkészletbe foglalja bele a sort. |
@expect_all_or_drop(expectations) Deklarálhat egy vagy több adatminőségi korlátozást. expectations Egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármelyik elvárást megszegi, a céladatkészletből ejtse ki a sort. |
@expect_all_or_fail(expectations) Deklarálhat egy vagy több adatminőségi korlátozást. expectations Egy Python-szótár, ahol a kulcs a várakozás leírása, az érték pedig a várakozási kényszer. Ha egy sor bármilyen elvárást megsért, azonnal állítsa le a végrehajtást. |
Adatrögzítés módosítása változáscsatornából a Pythonnal a Delta Live Tablesben
apply_changes()
A Python API-ban található függvény használatával a Delta Live Tables change data capture (CDC) funkcióval feldolgozhatja a forrásadatokat egy változásadatcsatornából (CDF).
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltábla sémájának apply_changes()
megadásakor a __START_AT
mezőkkel azonos adattípusú sequence_by
és __END_AT
oszlopokat kell tartalmaznia.
A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a Delta Live Tables Python felületén.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Feljegyzés
A feldolgozáshoz APPLY CHANGES
az alapértelmezett viselkedés INSERT
és UPDATE
események a CDC-események forrásból való beolvasása : frissítse a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy szúrjon be egy új sort, ha egyező rekord nem található a céltáblában. Az események kezelése DELETE
a APPLY AS DELETE WHEN
feltétellel adható meg.
Ha többet szeretne megtudni a CDC változáscsatornával történő feldolgozásáról, olvassa el a VÁLTOZÁSOK ALKALMAZÁSA API-kat: Egyszerűsítse a változásadatok rögzítését a Delta Live Tables használatával. A függvény használatára apply_changes()
példa: Példa: SCD 1. és SCD 2. típusú feldolgozás CDF-forrásadatokkal.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltáblaséma megadásakor apply_changes
a mezővel azonos adattípusú és __END_AT
típusú oszlopokat kell tartalmaznia sequence_by
__START_AT
.
Lásd : AZ APPLY CHANGES API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal.
Argumentumok |
---|
target Típus: str A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény végrehajtása előtt létrehozhatja a céltáblát apply_changes() .Ez a paraméter kötelező. |
source Típus: str A CDC-rekordokat tartalmazó adatforrás. Ez a paraméter kötelező. |
keys Típus: list Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat: - Sztringek listája: ["userId", "orderId"] - A Spark SQL-függvények col() listája: [col("userId"), col("orderId"] A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId) de nem használhatja col(source.userId) .Ez a paraméter kötelező. |
sequence_by Típus: str vagy col() Az oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A Delta Live Tables ezzel a szekvenálással kezeli a sorrendből érkező módosítási eseményeket. Megadhatja az alábbiakat: - Egy sztring: "sequenceNum" - Spark SQL-függvény col() : col("sequenceNum") A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId) de nem használhatja col(source.userId) .Ez a paraméter kötelező. |
ignore_null_updates Típus: bool A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény egyezik egy meglévő sortal null , és ignore_null_updates az, az True oszlopok megőrzik a célban meglévő értékeiket. Ez a beágyazott oszlopokra is vonatkozik, amelynek értéke a .null Ha ignore_null_updates igen, a False meglévő értékek felülíródnak értékekkel null .Ez a paraméter nem kötelező. Az alapértelmezett érték False . |
apply_as_deletes Típus: str vagy expr() Megadja, hogy a CDC-eseményeket mikor kell upsert helyett inkább upsertként DELETE kezelni. A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz konfigurálható a következővel:pipelines.cdc.tombstoneGCThresholdInSeconds táblatulajdonság.Megadhatja az alábbiakat: - Egy sztring: "Operation = 'DELETE'" - Spark SQL-függvény expr() : expr("Operation = 'DELETE'") Ez a paraméter nem kötelező. |
apply_as_truncates Típus: str vagy expr() Megadja, hogy a CDC-események mikor legyenek teljes táblaként TRUNCATE kezelve. Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.A apply_as_truncates paraméter csak az 1. SCD-típus esetén támogatott. A 2. SCD-típus nem támogatja a csonkolási műveleteket.Megadhatja az alábbiakat: - Egy sztring: "Operation = 'TRUNCATE'" - Spark SQL-függvény expr() : expr("Operation = 'TRUNCATE'") Ez a paraméter nem kötelező. |
column_list except_column_list Típus: list A céltáblában szerepeltetni kívánt oszlopok egy részhalmaza. A belefoglalandó oszlopok teljes listájának megadására használható column_list . Itt except_column_list adhatja meg a kizárni kívánt oszlopokat. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId) de nem használhatja col(source.userId) .Ez a paraméter nem kötelező. Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem column_list vagy except_column_list argumentumot ad át a függvénynek. |
stored_as_scd_type Típus: str vagy int A rekordok tárolása 1. vagy 2. SCD-típusként. 1 Az SCD 1- vagy 2 2-es SCD-típusra van állítva.Ennek a záradéknak a használata nem kötelező. Az alapértelmezett scd típus 1. |
track_history_column_list track_history_except_column_list Típus: list A céltábla előzményeihez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . Használattrack_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId) de nem használhatja col(source.userId) .Ez a paraméter nem kötelező. Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha track_history_column_list track_history_except_column_list argumentumot ad át a függvénynek. |
Adatrögzítés módosítása adatbázis-pillanatképekből a Pythonnal a Delta Live Tablesben
Fontos
Az APPLY CHANGES FROM SNAPSHOT
API nyilvános előzetes verzióban érhető el.
A Python API függvényével apply_changes_from_snapshot()
a Delta Live Tables adatrögzítési (CDC) funkcióját használhatja a forrásadatok adatbázis-pillanatképekből való feldolgozásához.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltábla sémájának apply_changes_from_snapshot()
megadásakor a __START_AT
mezővel azonos adattípusú sequence_by
és __END_AT
oszlopokat is tartalmaznia kell.
A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a Delta Live Tables Python felületén.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Feljegyzés
A feldolgozáshoz APPLY CHANGES FROM SNAPSHOT
az alapértelmezett viselkedés egy új sor beszúrása, ha egy azonos kulccsal(ok) rendelkező rekord nem létezik a célban. Ha létezik egyező rekord, az csak akkor frissül, ha a sorban szereplő értékek bármelyike módosult. A célban található, de a forrásban már nem szereplő kulcsokat tartalmazó sorok törlődnek.
A CDC pillanatképekkel történő feldolgozásával kapcsolatos további információkért lásd : A VÁLTOZÁSOK ALKALMAZÁSA API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal. A függvény használatára vonatkozó példákért tekintse meg a apply_changes_from_snapshot()
rendszeres pillanatkép-betöltési és az előzmény-pillanatkép-betöltési példákat.
Argumentumok |
---|
target Típus: str A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény futtatása előtt létrehozhatja a céltáblát apply_changes() .Ez a paraméter kötelező. |
source Típus: str vagy lambda function Egy táblázat vagy nézet neve, amely rendszeres időközönként pillanatképet hoz létre, vagy egy Python lambda függvény, amely visszaadja a feldolgozandó pillanatkép dataFrame-et és a pillanatkép verzióját. Lásd: A forrás argumentum implementálása. Ez a paraméter kötelező. |
keys Típus: list Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat: - Sztringek listája: ["userId", "orderId"] - A Spark SQL-függvények col() listája: [col("userId"), col("orderId"] A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId) de nem használhatja col(source.userId) .Ez a paraméter kötelező. |
stored_as_scd_type Típus: str vagy int A rekordok tárolása 1. vagy 2. SCD-típusként. 1 Az SCD 1- vagy 2 2-es SCD-típusra van állítva.Ennek a záradéknak a használata nem kötelező. Az alapértelmezett scd típus 1. |
track_history_column_list track_history_except_column_list Típus: list A céltábla előzményeihez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . Használattrack_history_except_column_list a nyomon követésből kizárandó oszlopok megadásához. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId) de nem használhatja col(source.userId) .Ez a paraméter nem kötelező. Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha track_history_column_list track_history_except_column_list argumentumot ad át a függvénynek. |
Az source
argumentum megvalósítása
A apply_changes_from_snapshot()
függvény tartalmazza az argumentumot source
. Az előzmény pillanatképek feldolgozásához az source
argumentum egy Python lambda függvény lesz, amely két értéket ad vissza a apply_changes_from_snapshot()
függvénynek: a feldolgozandó pillanatképadatokat tartalmazó Python DataFrame-et és egy pillanatkép-verziót.
A lambda függvény aláírása a következő:
lambda Any => Optional[(DataFrame, Any)]
- A lambda függvény argumentuma a legutóbb feldolgozott pillanatkép-verzió.
- A lambda függvény
None
visszatérési értéke vagy két értékből álló rekord: A rekord első értéke a feldolgozandó pillanatképet tartalmazó DataFrame. A rekord második értéke a pillanatkép-verzió, amely a pillanatkép logikai sorrendjét jelöli.
Példa a lambda függvény implementálására és meghívására:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
A Delta Live Tables futtatókörnyezet a következő lépéseket hajtja végre minden alkalommal, amikor a függvényt apply_changes_from_snapshot()
tartalmazó folyamat aktiválódik:
- A függvény futtatásával
next_snapshot_and_version
betölti a következő pillanatkép-adatkeretet és a megfelelő pillanatkép-verziót. - Ha a DataFrame nem ad vissza, a futtatás leáll, és a folyamatfrissítés befejezettként van megjelölve.
- Észleli az új pillanatkép módosításait, és növekményesen alkalmazza őket a céltáblára.
- Visszatér az 1. lépéshez, hogy betöltse a következő pillanatképet és annak verzióját.
Korlátozások
A Delta Live Tables Python felülete a következő korlátozásokkal rendelkezik:
A pivot()
függvény nem támogatott. A pivot
Spark-művelethez a bemeneti adatok lelkes betöltésére van szükség a kimenet sémájának kiszámításához. Ez a képesség nem támogatott a Delta Live Tablesben.