Megosztás a következőn keresztül:


Adatok átalakítása Delta Live-táblákkal

Ez a cikk azt ismerteti, hogyan deklarálhatja az adathalmazok transzformációit a Delta Live Tables használatával, és hogyan adhatja meg a rekordok lekérdezési logikával történő feldolgozását. Példákat is tartalmaz a Delta Live Tables csővezetékek létrehozásához használt gyakori átalakítási mintákra.

Adathalmazt bármely olyan lekérdezéshez definiálhat, amely DataFrame-et ad vissza. Az Apache Spark beépített műveleteit, UDF-jeit, egyéni logikáját és MLflow-modelljeit használhatja átalakításként a Delta Live Tables folyamatában. Miután az adatokat betöltötte a Delta Live Tables-folyamatba, új adatkészleteket határozhat meg a felsőbb rétegbeli forrásokhoz új streamelési táblák, materializált nézetek és nézetek létrehozásához.

Ha szeretné megtudni, hogyan végezhet hatékonyan állapotalapú feldolgozást a Delta Live Tables használatával, tekintse meg Az állapotalapú feldolgozás optimalizálása a Delta Élő táblákban vízjelekkelcímű témakört.

Mikor érdemes nézeteket, materializált nézeteket és streamelési táblákat használni?

A folyamat-lekérdezések megvalósításakor válassza ki a legjobb adathalmaztípust, hogy azok hatékonyak és karbantarthatók legyenek.

Fontolja meg egy nézet használatát a következők végrehajtásához:

  • A nagyobb vagy összetettebb lekérdezések könnyebben kezelhetővé alakíthatók.
  • Köztes eredmények ellenőrzése elvárások alapján.
  • Csökkentse a tárolási és számítási költségeket a nem szükséges eredményekhez. Mivel a táblák materializáltak, további számítási és tárolási erőforrásokat igényelnek.

Érdemes materializált nézetet használni, ha:

  • Több alsóbb rétegbeli lekérdezés is használja a táblát. Mivel a nézetek igény szerint vannak kiszámítva, a nézetet a rendszer minden alkalommal újra kiszámítja, amikor lekérdezik a nézetet.
  • Más folyamatok, feladatok vagy lekérdezések is felhasználják a táblát. Mivel a nézetek nem materializáltak, csak ugyanabban a folyamatban használhatja őket.
  • A fejlesztés során meg szeretné tekinteni egy lekérdezés eredményeit. Mivel a táblák materializáltak, és megtekinthetők és lekérdezhetők a folyamaton kívül, a táblák használata a fejlesztés során segíthet a számítások helyességének ellenőrzésében. Az ellenőrzést követően konvertálja a materializálást nem igénylő lekérdezéseket nézetekké.

Érdemes lehet streamelési táblázatot használni a következő esetekben:

  • A lekérdezések egy folyamatosan vagy növekményesen növekvő adatforráson alapulnak.
  • A lekérdezési eredményeket növekményesen kell kiszámítani.
  • A folyamatnak nagy átviteli sebességre és alacsony késésre van szüksége.

Feljegyzés

A streamelési táblák mindig a streamelési forrásokhoz vannak definiálva. Streamelési források használatával APPLY CHANGES INTO cdc-hírcsatornákból származó frissítéseket is alkalmazhat. Lásd A VÁLTOZÁSOK ALKALMAZÁSA API-kat: Egyszerűsítse a változáskövetést a Delta Live Tables segítségével.

Táblák kizárása a célsémából

Ha nem külső használatra szánt köztes táblákat kell kiszámítania, megakadályozhatja, hogy közzétegyék őket egy sémában a TEMPORARY kulcsszóval. Az ideiglenes táblák továbbra is a Delta Live Tables szemantikája szerint tárolják és dolgozzák fel az adatokat, de nem érhetők el az aktuális folyamaton kívül. Az ideiglenes tábla az azt létrehozó folyamat teljes élettartama alatt megmarad. Ideiglenes táblák deklarálásához használja az alábbi szintaxist:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Streamelési táblák és materializált nézetek egyesítése egyetlen folyamatban

A streamelési táblák öröklik az Apache Spark strukturált streamelés feldolgozási garanciáit, és úgy vannak konfigurálva, hogy csak hozzáfűző adatforrásokból származó lekérdezéseket dolgozzanak fel, ahol az új sorok mindig a forrástáblába kerülnek módosítás helyett.

Feljegyzés

Bár alapértelmezés szerint a streamelési táblák csak hozzáfűző adatforrásokat igényelnek, ha a streamforrás egy másik, frissítéseket vagy törléseket igénylő streamelési tábla, ezt a viselkedést felülbírálhatja a skipChangeCommits jelölővel.

A gyakori streamelési minta magában foglalja a forrásadatok betöltését a folyamat kezdeti adatkészleteinek létrehozásához. Ezeket a kezdeti adatkészleteket gyakran bronztábláknak nevezik, és gyakran hajtanak végre egyszerű átalakításokat.

Ezzel szemben a folyamat utolsó táblái, más néven aranytáblák gyakran bonyolult összesítéseket vagy olvasást igényelnek egy APPLY CHANGES INTO művelet céljaiból. Mivel ezek a műveletek alapvetően frissítéseket hoznak létre a hozzáfűzések helyett, a streamelési táblák bemeneteként nem támogatottak. Ezek az átalakítások jobban megfelelnek a materializált nézeteknek.

A streamelési táblák és a materializált nézetek egyetlen folyamatba való keverésével egyszerűsítheti a folyamatot, elkerülheti a nyers adatok költséges újrabetöltését vagy újrafeldolgozását, és az SQL teljes mértékben képes összetett összesítéseket kiszámítani egy hatékonyan kódolt és szűrt adathalmazon keresztül. Az alábbi példa az ilyen típusú vegyes feldolgozást szemlélteti:

Feljegyzés

Ezek a példák az Automatikus betöltő használatával töltik be a fájlokat a felhőbeli tárolóból. Fájlok betöltéséhez az automatikus betöltővel egy Unity Catalog-kompatibilis folyamatban a külső helyeket kell használnia. Ha többet szeretne megtudni a Unity Catalog és a Delta Live Tables használatáról, olvassa el A Unity katalógus használata Delta Live Tables-folyamatokkal.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

További információ arról, hogyan használhatja az Automatikus betöltőt JSON-fájlok növekményes betöltésére az Azure Storage-ból.

Stream-statikus illesztések

A folyamatos-statikus illesztések jó választás, amikor egy csak hozzáfűzhető adatokból álló folyamatos adatfolyamot denormalizálunk egy statikus dimenziótáblával együtt.

Az egyes folyamatfrissítések során a streamből származó új rekordok a statikus tábla legfrissebb pillanatképével lesznek összekapcsolva. Ha a rekordok a streamelési tábla megfelelő adatainak feldolgozása után hozzáadódnak vagy frissülnek a statikus táblában, az eredményül kapott rekordok csak akkor lesznek újraszámolva, ha teljes frissítést hajtanak végre.

Az aktivált végrehajtáshoz konfigurált folyamatokban a statikus tábla a frissítés indításakor adja vissza az eredményeket. A folyamatos végrehajtásra konfigurált folyamatokban a rendszer a statikus tábla legújabb verzióját kérdezi le minden alkalommal, amikor a tábla feldolgoz egy frissítést.

A következő példa egy stream-statikus illesztésre:

Python

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Az aggregátumok hatékony kiszámítása

Streamelési táblák használatával növekményesen kiszámíthatja az olyan egyszerű eloszlási aggregátumokat, mint a darabszám, a min, a max vagy az összeg, valamint az algebrai aggregátumokat, például az átlagot vagy a szórást. A Databricks növekményes összesítést javasol korlátozott számú csoporttal rendelkező lekérdezésekhez, például záradékkal rendelkező GROUP BY country lekérdezésekhez. Minden frissítés csak új bemeneti adatokat olvas be.

Ha többet szeretne megtudni a növekményes összesítéseket végrehajtó Delta Live Table-lekérdezések írásáról, olvassa el Ablakos összesítések végrehajtása vízjelekkel.

MLflow-modellek használata Delta Live Tables-folyamatban

Feljegyzés

Ha MLflow-modelleket szeretne használni unitykatalógus-kompatibilis folyamatban, a folyamatot úgy kell konfigurálni, hogy az preview csatornát használja. A current csatorna használatához konfigurálnia kell a folyamatot a Hive metaadattárban való közzétételre.

MLflow-betanított modelleket használhat a Delta Live Tables-folyamatokban. Az MLflow-modellek átalakításként vannak kezelve az Azure Databricksben, ami azt jelenti, hogy Spark DataFrame-bemeneten működnek, és Spark DataFrame-ként adnak vissza eredményeket. Mivel a Delta Live Tables adatkészleteket definiál a DataFrame-eken, az MLflow-t használó Apache Spark-számítási feladatokat csak néhány sornyi kóddal konvertálhatja Delta Live-táblákká. További információ az MLflow-ról: MLflow a generatív AI-ügynök és az ML-modell életciklusa számára.

Ha már van egy Python-jegyzetfüzete, amely MLflow-modellt hív meg, ezt a kódot a Delta Live Tableshez igazíthatja a @dlt.table dekorátor használatával, és biztosíthatja, hogy a függvények definiálva legyenek az átalakítási eredmények visszaadásához. A Delta Live Tables alapértelmezés szerint nem telepíti az MLflow-t, ezért ellenőrizze, hogy telepítette-e az MLflow könyvtárakat a %pip install mlflow jelöléssel, és importálta-e a mlflow és dlt modulokat a jegyzetfüzet tetején. A Delta Live Tables szintaxisának bevezetéséhez lásd: Folyamatkód fejlesztése Python segítségével.

Ha MLflow-modelleket szeretne használni a Delta Live Tablesben, hajtsa végre a következő lépéseket:

  1. Szerezze be az MLflow-modell futtatási azonosítóját és modellnevét. A futtatási azonosító és a modell neve az MLflow-modell URI-jának létrehozásához használatos.
  2. Az URI használatával definiáljon egy Spark UDF-et az MLflow-modell betöltéséhez.
  3. Az MLflow-modell használatához hívja meg a tábladefiníciókban található UDF-et.

Az alábbi példa a minta alapszintaxisát mutatja be:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Teljes példaként az alábbi kód egy Spark UDF-et loaded_model_udf határoz meg, amely betölti a hitelkockázati adatokra betanított MLflow-modellt. Az előrejelzéshez használt adatoszlopokat a rendszer argumentumként továbbítja a UDF-nek. A tábla loan_risk_predictions kiszámítja a loan_risk_input_dataegyes sorainak előrejelzéseit.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Manuális törlések vagy frissítések megőrzése

A Delta Live Tables lehetővé teszi a rekordok manuális törlését vagy frissítését egy táblából, és frissítési műveletet hajthat végre az alsóbb rétegbeli táblák újrafordításához.

A Delta Live Tables alapértelmezés szerint minden folyamat frissítésekor a bemeneti adatok alapján újrakomponálta a táblaeredményeket, ezért gondoskodnia kell arról, hogy a törölt rekord ne legyen újra betöltve a forrásadatokból. A pipelines.reset.allowed táblatulajdonság false beállítása megakadályozza a táblák frissítését, de nem akadályozza meg, hogy növekményes írások folyjanak a táblákba vagy új adatok a táblába.

Az alábbi ábra egy példát mutat be két streamelési táblázat használatával:

  • raw_user_table nyers felhasználói adatokat használ egy forrásból.
  • bmi_table Növekményesen számítja ki a BMI-pontszámokat a súly és a magasság raw_user_tablealapján.

Manuálisan szeretné törölni vagy frissíteni a felhasználói rekordokat a raw_user_table-ból, és újraszámolni a bmi_table.

Adatdiagram megőrzése

Az alábbi kód bemutatja, hogy a pipelines.reset.allowed táblatulajdonságot false értékre állítja a raw_user_table teljes frissítésének letiltásához, hogy a tervezett módosítások idővel megmaradjanak, de az úgynevezett levezetett táblák újraszámításra kerülnek, amikor egy folyamat frissítése kerül futtatásra.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);