Megosztás a következőn keresztül:


Lakeflow Spark Deklaratív folyamatok kód fejlesztése SQL-lel

A Lakeflow Spark Deklaratív folyamatok (SDP) számos új SQL-kulcsszót és függvényt vezet be a folyamat materializált nézeteinek és streamtábláinak meghatározásához. A folyamatok fejlesztésének SQL-támogatása a Spark SQL alapjaira épül, és támogatja a strukturált streamelési funkciókat.

A PySpark DataFrame-eket ismerő felhasználók szívesebben fejlesztenek folyamatkódot a Pythonnal. A Python szélesebb körű tesztelést és műveleteket támogat, amelyek az SQL-vel való implementálásuk során kihívást jelentenek, például metaprogramozási műveletek. Lásd: Folyamatkód fejlesztése Python-használatával.

A folyamat SQL-szintaxisának teljes körű referenciáját lásd: Pipeline SQL language reference.

Az SQL alapjai folyamatfejlesztéshez

Az SQL-kód, amely folyamatadatkészleteket hoz létre, a CREATE OR REFRESH szintaxist használja materializált nézetek és lekérdezési eredmények alapján streamelési táblák meghatározására.

A STREAM kulcsszó azt jelzi, hogy a SELECT záradékban hivatkozott adatforrást streamelési szemantikával kell-e olvasni.

A folyamatkonfiguráció során megadott katalógusba és sémába beolvassa és beírja az alapértelmezett értéket. Lásd: Célkatalógus és sémabeállítása.

A folyamat forráskódja kritikusan különbözik az SQL-szkriptektől: Az SDP kiértékeli a folyamaton konfigurált összes forráskódfájl összes adathalmazdefinícióját, és adatfolyam-gráfot hoz létre a lekérdezések futtatása előtt. A forrásfájlokban megjelenő lekérdezések sorrendje határozza meg a kód kiértékelésének sorrendjét, a lekérdezések végrehajtásának sorrendjét azonban nem.

Materializált nézet létrehozása AZ SQL használatával

Az alábbi példakód bemutatja a materializált nézet SQL-sel való létrehozásának alapszintaxisát:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Streamelési tábla létrehozása SQL-lel

Az alábbi példakód bemutatja a streamelési tábla SQL-sel való létrehozásának alapszintaxisát. Streamelési tábla forrásának olvasásakor a STREAM kulcsszó azt jelzi, hogy a forráshoz streamelési szemantikát használ. Ne használja a STREAM kulcsszót materializált nézet létrehozásakor:

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Megjegyzés:

A STREAM kulcsszóval stream-szemantikát használhat a forrásból való olvasáshoz. Ha az olvasás egy meglévő rekord módosítását vagy törlését tapasztalja, hibaüzenet jelenik meg. A legbiztonságosabb, ha statikus vagy csak hozzáfűző forrásokból olvas. A módosítási véglegesítéseket tartalmazó adatok betöltéséhez használhatja a Pythont és a SkipChangeCommits hibák kezelésére szolgáló lehetőséget.

Adatok betöltése objektumtárolóból

A folyamatok támogatják az adatok betöltését az Azure Databricks által támogatott összes formátumból. Lásd: Adatformátum beállításai.

Megjegyzés:

A példák az automatikusan a munkaterületére csatlakoztatott /databricks-datasets alatt elérhető adatok felhasználását mutatják be. A Databricks kötetútvonalak vagy felhőalapú URI-k használatát javasolja a felhőobjektum-tárolóban tárolt adatokra való hivatkozáshoz. Lásd: Mik azok a Unity Catalog-kötetek?.

A Databricks az automatikus betöltő és a streamelő táblák használatát javasolja, amikor növekményes betöltési számítási feladatokat konfigurál a felhőobjektum-tárolóban tárolt adatokhoz. Lásd : Mi az automatikus betöltő?.

Az SQL a read_files függvénnyel hívja meg az Automatikus betöltő funkciót. A streamelési olvasást az STREAM és read_fileskulcsszavakkal is konfigurálnia kell.

Az alábbiak ismertetik az SQL read_files szintaxisát.

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Az automatikus betöltő beállításai kulcs-érték párokból állnak. A támogatott formátumokkal és beállításokkal kapcsolatos részletekért lásd: Beállítások.

Az alábbi példa létrehoz egy streamelési táblát JSON-fájlokból az Automatikus betöltő használatával:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

read_files függvény tömbösített szemantikát is támogat a materializált nézetek létrehozásához. Az alábbi példa kötegfeldolgozási szemantikával olvas be egy JSON-könyvtárat, és létrehoz egy materializált nézetet.

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Adatok validálása az elvárások alapján

Az elvárásokkal adatminőségi korlátozásokat állíthat be és kényszeríthet ki. Lásd: Adatminőség kezelése folyamatelvárásokkal.

Az alábbi kód egy valid_data nevű elvárást határoz meg, amely az adatbetöltés során null értékű rekordokat ad le:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A folyamat során definiált materializált nézetek és streamelési táblák lekérdezése

Az alábbi példa négy adatkészletet határoz meg:

  • Egy JSON-adatokat betöltő orders nevű streamelési tábla.
  • CSV-adatokat betöltő materializált nézet, amelynek neve customers.
  • A customer_orders és orders adathalmazok rekordjait összekapcsoló, customers nevű materializált nézet dátumra veti a rendelési időbélyeget, és kiválasztja a customer_id, order_number, stateés order_date mezőket.
  • Egy daily_orders_by_state nevű materializált nézet, amely összesíti az egyes államok rendeléseinek napi számát.

Megjegyzés:

A folyamat nézeteinek vagy tábláinak lekérdezésekor közvetlenül megadhatja a katalógust és a sémát, vagy használhatja a folyamatban konfigurált alapértelmezett beállításokat. Ebben a példában a orders, customersés customer_orders táblák a folyamathoz konfigurált alapértelmezett katalógusból és sémából vannak megírva és olvasva.

Az örökölt közzétételi mód a LIVE sémával kérdezi le a folyamatban definiált egyéb materializált nézeteket és streamtáblákat. Az új folyamatokban a LIVE sémaszintaxisa csendben figyelmen kívül lesz hagyva. Lásd: LIVE séma (régi változat).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Privát tábla definiálása

A záradékot PRIVATE materializált nézet vagy streamelési tábla létrehozásakor használhatja. Privát tábla létrehozásakor létrehozza a táblát, de nem hozza létre a tábla metaadatait. A PRIVATE záradék arra utasítja az SDP-t, hogy hozzon létre egy táblát, amely elérhető a folyamat számára, de nem érhető el a folyamaton kívül. A feldolgozási idő csökkentése érdekében egy privát tábla megmarad az azt létrehozó folyamat teljes élettartama alatt, és nem csak egyetlen frissítéssel.

A privát táblák neve megegyezhet a katalógus tábláinak nevével. Ha nem minősített nevet ad meg egy tábla számára egy folyamaton belül, ha egy privát tábla és egy katalógustábla is ilyen névvel rendelkezik, a rendszer a privát táblát használja.

A privát táblákra korábban ideiglenes táblákként hivatkoztak.

Rekordok végleges törlése materializált nézetből vagy streamelési táblából

Ha véglegesen törölni szeretné a rekordokat egy olyan streamelési táblából, amelyen engedélyezve van a törlési vektor, például a GDPR-megfelelőség érdekében, további műveleteket kell végrehajtani az objektum alapjául szolgáló Delta-táblákon. A rekordok streamelési táblából való törlésének biztosításához lásd: Rekordok végleges törlése streamelési táblából.

A materializált nézetek mindig a mögöttes táblák adatait tükrözik a frissítések során. A Materialized nézet adatainak törléséhez törölnie kell az adatokat a forrásból, és frissítenie kell a materializált nézetet.

Táblák vagy nézetek SQL-sel történő deklarálásakor használt értékek paraméterezése

A SET használatával megadhat egy konfigurációs értéket egy táblát vagy nézetet deklaráló lekérdezésben, beleértve a Spark-konfigurációkat is. Bármely tábla vagy nézet, amelyet egy forrásfájlban definiál, miután az SET utasítás hozzáfér a definiált értékhez. A SET utasítással megadott Spark-konfigurációk a Spark-lekérdezés végrehajtásakor használhatók a SET utasítást követő bármely táblához vagy nézethez. A konfigurációs érték lekérdezésben való olvasásához használja a sztring interpolációs szintaxisát ${}. Az alábbi példa egy startDate nevű Spark-konfigurációs értéket állít be, és ezt az értéket használja egy lekérdezésben:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Több konfigurációs érték megadásához minden értékhez használjon külön SET utasítást.

Korlátozások

A PIVOT záradék nem támogatott. A pivot Sparkban végzett művelethez a bemeneti adatok azonnali betöltése szükséges a kimeneti séma kiszámításához. Ez a képesség a csővezetékekben nem támogatott.

Megjegyzés:

A CREATE OR REFRESH LIVE TABLE szintaxis a materializált nézet létrehozásához elavult. Ehelyett használja a CREATE OR REFRESH MATERIALIZED VIEW.