Adatok átalakítása Delta Live-táblákkal
Ez a cikk azt ismerteti, hogyan deklarálhatja az adathalmazok transzformációit a Delta Live Tables használatával, és hogyan adhatja meg a rekordok lekérdezési logikával történő feldolgozását. Példákat is tartalmaz a Delta Live Tables-folyamatok létrehozásához használt gyakori átalakítási mintákra.
Adathalmazt bármely olyan lekérdezéshez definiálhat, amely DataFrame-et ad vissza. Az Apache Spark beépített műveleteit, UDF-jeit, egyéni logikáját és MLflow-modelljeit használhatja átalakításként a Delta Live Tables folyamatában. Miután az adatokat betöltötte a Delta Live Tables-folyamatba, új adatkészleteket határozhat meg a felsőbb rétegbeli forrásokhoz új streamelési táblák, materializált nézetek és nézetek létrehozásához.
Az állapotalapú feldolgozás delta élő táblákkal való hatékony végrehajtásának megismeréséhez tekintse meg az állapotalapú feldolgozás optimalizálása a Delta Élő táblákban vízjelekkel című témakört.
Mikor érdemes nézeteket, materializált nézeteket és streamelési táblákat használni?
A folyamat-lekérdezések megvalósításakor válassza ki a legjobb adathalmaztípust, hogy azok hatékonyak és karbantarthatók legyenek.
Fontolja meg egy nézet használatát a következők végrehajtásához:
- A nagyobb vagy összetettebb lekérdezések könnyebben kezelhetővé alakíthatók.
- Köztes eredmények ellenőrzése elvárások alapján.
- Csökkentse a tárolási és számítási költségeket a nem szükséges eredményekhez. Mivel a táblák materializáltak, további számítási és tárolási erőforrásokat igényelnek.
Érdemes materializált nézetet használni, ha:
- Több alsóbb rétegbeli lekérdezés is használja a táblát. Mivel a nézetek igény szerint vannak kiszámítva, a nézetet a rendszer minden alkalommal újra kiszámítja, amikor lekérdezik a nézetet.
- Más folyamatok, feladatok vagy lekérdezések is felhasználják a táblát. Mivel a nézetek nem materializáltak, csak ugyanabban a folyamatban használhatja őket.
- A fejlesztés során meg szeretné tekinteni egy lekérdezés eredményeit. Mivel a táblák materializáltak, és megtekinthetők és lekérdezhetők a folyamaton kívül, a táblák használata a fejlesztés során segíthet a számítások helyességének ellenőrzésében. Az ellenőrzést követően konvertálja a materializálást nem igénylő lekérdezéseket nézetekké.
Érdemes lehet streamelési táblázatot használni a következő esetekben:
- A lekérdezések egy folyamatosan vagy növekményesen növekvő adatforráson alapulnak.
- A lekérdezési eredményeket növekményesen kell kiszámítani.
- A folyamatnak nagy átviteli sebességre és alacsony késésre van szüksége.
Feljegyzés
A streamelési táblák mindig a streamelési forrásokhoz vannak definiálva. Streamelési források használatával APPLY CHANGES INTO
cdc-hírcsatornákból származó frissítéseket is alkalmazhat. Lásd : AZ APPLY CHANGES API-k: A változásadatok rögzítésének egyszerűsítése Delta Live-táblákkal.
Táblák kizárása a célsémából
Ha olyan köztes táblákat kell kiszámítania, amelyek nem külső használatra készültek, megakadályozhatja, hogy a kulcsszavak használatával közzétehesse őket egy TEMPORARY
sémában. Az ideiglenes táblák továbbra is a Delta Live Tables szemantikája szerint tárolják és dolgozzák fel az adatokat, de nem érhetők el az aktuális folyamaton kívül. Az ideiglenes tábla az azt létrehozó folyamat teljes élettartama alatt megmarad. Ideiglenes táblák deklarálásához használja az alábbi szintaxist:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Streamelési táblák és materializált nézetek egyesítése egyetlen folyamatban
A streamelési táblák öröklik az Apache Spark strukturált streamelés feldolgozási garanciáit, és úgy vannak konfigurálva, hogy csak hozzáfűző adatforrásokból származó lekérdezéseket dolgozzanak fel, ahol az új sorok mindig a forrástáblába kerülnek módosítás helyett.
Feljegyzés
Bár a streamelési táblák alapértelmezés szerint csak hozzáfűző adatforrásokat igényelnek, ha egy streamforrás egy másik, frissítéseket vagy törléseket igénylő streamelési tábla, a skipChangeCommits jelölővel felülbírálhatja ezt a viselkedést.
A gyakori streamelési minta magában foglalja a forrásadatok betöltését a folyamat kezdeti adatkészleteinek létrehozásához. Ezeket a kezdeti adatkészleteket gyakran bronztábláknak nevezik, és gyakran hajtanak végre egyszerű átalakításokat.
Ezzel szemben a folyamat utolsó táblái, más néven aranytáblák gyakran bonyolult összesítéseket vagy olvasást igényelnek egy APPLY CHANGES INTO
művelet céljaiból. Mivel ezek a műveletek alapvetően frissítéseket hoznak létre a hozzáfűzések helyett, a streamelési táblák bemeneteként nem támogatottak. Ezek az átalakítások jobban megfelelnek a materializált nézeteknek.
A streamelési táblák és a materializált nézetek egyetlen folyamatba való keverésével egyszerűsítheti a folyamatot, elkerülheti a nyers adatok költséges újrabetöltését vagy újrafeldolgozását, és az SQL teljes mértékben képes összetett összesítéseket kiszámítani egy hatékonyan kódolt és szűrt adathalmazon keresztül. Az alábbi példa az ilyen típusú vegyes feldolgozást szemlélteti:
Feljegyzés
Ezek a példák az Automatikus betöltő használatával töltik be a fájlokat a felhőbeli tárolóból. Ahhoz, hogy az automatikus betöltővel töltsön be fájlokat egy Unity Catalog engedélyezett csővezetékbe, a külső helyek címet kell használnia. Ha többet szeretne tudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el a Unity Catalog használata a Delta Live Tables-folyamatokkal című témakört.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return dlt.read_stream("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return dlt.read("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
További információ arról, hogyan használhatja az Automatikus betöltőt JSON-fájlok növekményes betöltésére az Azure Storage-ból.
Stream-statikus illesztések
A stream-statikus illesztések jó választásnak számítanak a csak hozzáfűző adatok folyamatos adatfolyamának denormalizálásakor, elsősorban statikus dimenziótáblával.
Az egyes folyamatfrissítések során a streamből származó új rekordok a statikus tábla legfrissebb pillanatképével lesznek összekapcsolva. Ha a rekordok a streamelési tábla megfelelő adatainak feldolgozása után hozzáadódnak vagy frissülnek a statikus táblában, az eredményül kapott rekordok csak akkor lesznek újraszámolva, ha teljes frissítést hajtanak végre.
Az aktivált végrehajtáshoz konfigurált folyamatokban a statikus tábla a frissítés indításakor adja vissza az eredményeket. A folyamatos végrehajtásra konfigurált folyamatokban a rendszer a statikus tábla legújabb verzióját kérdezi le minden alkalommal, amikor a tábla feldolgoz egy frissítést.
A következő példa egy stream-statikus illesztésre:
Python
@dlt.table
def customer_sales():
return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
Az aggregátumok hatékony kiszámítása
Streamelési táblák használatával növekményesen kiszámíthatja az olyan egyszerű eloszlási aggregátumokat, mint a darabszám, a min, a max vagy az összeg, valamint az algebrai aggregátumokat, például az átlagot vagy a szórást. A Databricks növekményes összesítést javasol korlátozott számú csoporttal rendelkező lekérdezésekhez, például záradékkal rendelkező GROUP BY country
lekérdezésekhez. Minden frissítés csak új bemeneti adatokat olvas be.
Ha többet szeretne megtudni a növekményes összesítéseket végrehajtó Delta Live Tables-lekérdezések írásáról, tekintse meg az ablakos összesítések vízjelekkel történő végrehajtását ismertető témakört.
MLflow-modellek használata Delta Live Tables-folyamatban
Feljegyzés
Ha MLflow-modelleket szeretne használni egy Unity-katalógust használó folyamatban, a folyamatot konfigurálni kell a preview
csatorna használatára. A current
csatorna használatához konfigurálnia kell a folyamatot a Hive metaadattárban való közzétételre.
MLflow-betanított modelleket használhat a Delta Live Tables-folyamatokban. Az MLflow-modellek átalakításként vannak kezelve az Azure Databricksben, ami azt jelenti, hogy Spark DataFrame-bemeneten működnek, és Spark DataFrame-ként adnak vissza eredményeket. Mivel a Delta Live Tables adatkészleteket definiál a DataFrame-eken, az MLflow-t használó Apache Spark-számítási feladatokat csak néhány sornyi kóddal konvertálhatja Delta Live-táblákká. További információ az MLflow-ról: ML-életciklus kezelése az MLflow használatával.
Ha már van egy Python-jegyzetfüzete, amely MLflow-modellt hív meg, ezt a kódot a dekorátor használatával módosíthatja a Delta Live Tableshez, és biztosíthatja, hogy a @dlt.table
függvények definiálva legyenek az átalakítási eredmények visszaadásához. A Delta Live Tables alapértelmezés szerint nem telepíti az MLflow-t, ezért ellenőrizze, hogy ön %pip install mlflow
és mlflow
dlt
importálja-e a jegyzetfüzet tetején. A Delta Live Tables szintaxisának bemutatása: Delta Live Tables-folyamat implementálása a Pythonnal.
Ha MLflow-modelleket szeretne használni a Delta Live Tablesben, hajtsa végre a következő lépéseket:
- Szerezze be az MLflow-modell futtatási azonosítóját és modellnevét. A futtatási azonosító és a modell neve az MLflow-modell URI-jának létrehozásához használatos.
- Az URI használatával definiáljon egy Spark UDF-et az MLflow-modell betöltéséhez.
- Az MLflow-modell használatához hívja meg a tábladefiníciók UDF-ét.
Az alábbi példa a minta alapszintaxisát mutatja be:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return dlt.read(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Teljes példaként az alábbi kód egy Spark UDF-et loaded_model_udf
határoz meg, amely betölti a hitelkockázati adatokra betanított MLflow-modellt. Az előrejelzéshez használt adatoszlopokat a rendszer argumentumként továbbítja a UDF-nek. A táblázat loan_risk_predictions
kiszámítja az előrejelzéseket az egyes sorokhoz a következőben loan_risk_input_data
: .
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return dlt.read("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Manuális törlések vagy frissítések megőrzése
A Delta Live Tables lehetővé teszi a rekordok manuális törlését vagy frissítését egy táblából, és frissítési műveletet hajthat végre az alsóbb rétegbeli táblák újrafordításához.
A Delta Live Tables alapértelmezés szerint minden folyamat frissítésekor a bemeneti adatok alapján újrakomponálta a táblaeredményeket, ezért gondoskodnia kell arról, hogy a törölt rekord ne legyen újra betöltve a forrásadatokból. Ha úgy állítja be a pipelines.reset.allowed
táblatulajdonságot, hogy false
megakadályozza a táblák frissítését, de nem akadályozza meg, hogy növekményes írások folyjanak a táblába, vagy új adatok folyjanak a táblába.
Az alábbi ábra egy példát mutat be két streamelési táblázat használatával:
raw_user_table
nyers felhasználói adatokat használ egy forrásból.bmi_table
Növekményesen számítja ki a BMI-pontszámokat a súly és a magasságraw_user_table
alapján.
Manuálisan szeretné törölni vagy frissíteni a felhasználói rekordokat a raw_user_table
rendszerből, majd újrafordítani a bmi_table
rekordot.
Az alábbi kód bemutatja, hogy false
a pipelines.reset.allowed
táblatulajdonság úgy van-e beállítva, hogy letiltsa a teljes frissítéstraw_user_table
, hogy a tervezett módosítások idővel megmaradjanak, de az alsóbb rétegbeli táblák újrafordításra kerülnek egy folyamatfrissítés futtatásakor:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);