Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Megtudhatja, hogyan hozhat létre és helyezhet üzembe ETL-(kinyerési, átalakító és betöltési) folyamatokat változási adatrögzítéssel (CDC) a Lakeflow Spark Deklaratív folyamatok (SDP) használatával az adatvezényléshez és az automatikus betöltéshez. Az ETL-folyamatok implementálják a forrásrendszerekből származó adatok beolvasásának lépéseit, átalakítják az adatokat olyan követelmények alapján, mint az adatminőség-ellenőrzések és a duplikációk rögzítése, és az adatokat egy célrendszerbe, például egy adattárházba vagy egy adattóba írják.
Ebben az oktatóanyagban egy MySQL-adatbázis táblájából customers származó adatokat fog használni a következőkre:
- Bontsa ki a módosításokat egy tranzakciós adatbázisból a Debezium vagy egy másik eszköz használatával, és mentse őket a felhőalapú objektumtárba (S3, ADLS vagy GCS). Ebben az oktatóanyagban kihagyja egy külső CDC-rendszer beállítását, és ehelyett hamis adatokat hoz létre az oktatóanyag egyszerűsítése érdekében.
- Az Automatikus betöltő használatával növekményesen töltheti be az üzeneteket a felhőobjektum-tárolóból, és tárolhatja a nyers üzeneteket a
customers_cdctáblában. Az Automatikus betöltő a sémára következtet, és kezeli a sémafejlődést. - Hozza létre a táblát az
customers_cdc_cleanadatminőség elvárásokkal való ellenőrzéséhez. Például soha ne legyenidnull, mert az upsert műveletek futtatására használják. - Végezze el
AUTO CDC ... INTOa megtisztított CDC-adatokon a módosítások véglegescustomerstáblába való beillesztését. - Annak bemutatása, hogyan hozhat létre egy folyamat egy 2. típusú, lassan változó dimenziótáblát (SCD2) az összes változás nyomon követéséhez.
A cél a nyers adatok közel valós idejű betöltése, és egy táblázat létrehozása az elemzői csapat számára az adatminőség biztosítása mellett.
Az oktatóanyag a medallion Lakehouse architektúrát használja, ahol nyers adatokat vesz fel a bronzrétegen keresztül, megtisztítja és ellenőrzi az adatokat az ezüst réteggel, és dimenziómodellezést és összesítést alkalmaz az aranyréteg használatával. További információért lásd: Mi a medallion lakehouse architektúra?
A megvalósított folyamat a következőképpen néz ki:
További információ a folyamatról, az automatikus betöltőről és a CDC-ről: Lakeflow Spark Deklaratív folyamatok, Mi az automatikus betöltő?, és mi az a változásadat-rögzítés (CDC)?
Requirements
Az oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:
- Jelentkezzen be egy Azure Databricks-munkaterületre.
- Engedélyezze a Unity-katalógust a munkaterületen.
- Engedélyezze a kiszolgáló nélküli számítást a fiókjához. A kiszolgáló nélküli Lakeflow Spark deklaratív folyamatok nem érhetők el minden munkaterület-régióban. Tekintse meg az elérhető régiók korlátozott regionális rendelkezésre állásával rendelkező funkciókat . Ha a kiszolgáló nélküli számítás nincs engedélyezve a fiókjában, a lépéseknek a munkaterület alapértelmezett számításával kell működnie.
- Rendelkezik engedéllyel számítási erőforrás létrehozására vagy egy számítási erőforráshoz való hozzáférésre.
- Új séma katalógusban való létrehozására vonatkozó engedélyekkel rendelkezik. A szükséges engedélyek a következők:
ALL PRIVILEGESvagyUSE CATALOG.CREATE SCHEMA - Rendelkezik engedéllyel egy új kötet meglévő sémában való létrehozásához. A szükséges engedélyek a következők:
ALL PRIVILEGESvagyUSE SCHEMA.CREATE VOLUME
Adatrögzítés módosítása ETL-folyamatban
A változásadat-rögzítés (CDC) egy tranzakciós adatbázisban (például MySQL vagy PostgreSQL) vagy adattárházban végrehajtott rekordok módosításainak rögzítésére szolgáló folyamat. A CDC rögzíti az olyan műveleteket, mint az adattörlések, a hozzáfűzések és a frissítések, általában streamként a táblák külső rendszerekben való újravalósításához. A CDC lehetővé teszi a növekményes betöltést, miközben nincs szükség tömeges terheléses frissítésekre.
Megjegyzés:
Az oktatóanyag egyszerűsítése érdekében hagyja ki egy külső CDC-rendszer beállítását. Tegyük fel, hogy JSON-fájlként futtatja és menti a CDC-adatokat a felhőobjektum-tárolóban (S3, ADLS vagy GCS). Ez az oktatóanyag a Faker tár használatával hozza létre az oktatóanyagban használt adatokat.
CDC rögzítése
Számos CDC-eszköz érhető el. Az egyik vezető nyílt forráskódú megoldás a Debezium, de léteznek olyan implementációk, amelyek leegyszerűsítik az adatforrásokat, például a Fivetran, a Qlik Replikálás, a StreamSets, a Talend, az Oracle GoldenGate és az AWS DMS.
Ebben az oktatóanyagban CDC-adatokat használ egy külső rendszerből, például a Debeziumból vagy a DMS-ből. A Debezium minden módosított sort rögzít. Általában elküldi az adatmódosítások előzményeit a Kafka-témaköröknek, vagy fájlként menti őket.
Be kell vennie a CDC-adatokat a customers táblából (JSON formátum), ellenőriznie kell, hogy helyesek-e, majd a Lakehouse-ban kell hasznosítania az ügyfelek tábláját.
CDC-bemenet a Debeziumból
Minden módosításhoz egy JSON-üzenet érkezik, amely tartalmazza a frissíteni kívánt sor összes mezőjét (id, , firstname, lastname, emailaddress). Az üzenet további metaadatokat is tartalmaz:
-
operation: Egy műveleti kód, általában (DELETE,APPEND,UPDATE). -
operation_date: Az egyes műveletek rekordjának dátuma és időbélyege.
Az olyan eszközök, mint a Debezium, fejlettebb kimenetet hozhatnak létre, például a módosítás előtti sorértéket, de ez az oktatóanyag kihagyja őket az egyszerűség kedvéért.
1. lépés: Folyamat létrehozása
Hozzon létre egy új ETL-folyamatot a CDC-adatforrás lekérdezéséhez és táblák létrehozásához a munkaterületen.
A munkaterületen kattintson a
Új a bal felső sarokban.
Kattintson az ETL-folyamat elemre.
Módosítsa a pipeline címét
Pipelines with CDC tutorialvagy egy Ön által preferált névre.A cím alatt válasszon ki egy katalógust és sémát, amelyhez írási engedélyekkel rendelkezik.
Ez a katalógus és séma alapértelmezés szerint használatos, ha nem ad meg katalógust vagy sémát a kódban. A kód bármilyen katalógusba vagy sémába írhat a teljes elérési út megadásával. Ez az oktatóanyag az itt megadott alapértelmezett értékeket használja.
A Speciális beállítások között válassza a Start with an empty file (Indítás üres fájllal) lehetőséget.
Válasszon egy mappát a kódhoz. A Tallózás gombra kattintva tallózhat a munkaterület mappáinak listájában. Bármelyik mappát kiválaszthatja, amelyhez írási engedélyekkel rendelkezik.
A verziókövetés használatához válasszon ki egy Git-mappát. Ha új mappát szeretne létrehozni, válassza a
Az oktatóanyaghoz használni kívánt nyelv alapján válassza a Pythont vagy az SQL-t a fájl nyelvéhez.
A Kiválasztás gombra kattintva hozza létre a folyamatot ezekkel a beállításokkal, és nyissa meg a Lakeflow Pipelines-szerkesztőt.
Most már rendelkezik egy üres folyamatlánccal, amely alapértelmezett katalógussal és sémával rendelkezik. Ezután állítsa be a mintaadatokat importálásra az oktatóanyagban.
2. lépés: Az ebben az oktatóanyagban importálandó mintaadatok létrehozása
Erre a lépésre nincs szükség, ha saját adatokat importál egy meglévő forrásból. Ebben az oktatóanyagban hamis adatokat hozhat létre példaként az oktatóanyaghoz. Hozzon létre egy jegyzetfüzetet a Python-adatgenerálási szkript futtatásához. Ezt a kódot csak egyszer kell futtatni a mintaadatok létrehozásához, ezért hozza létre a folyamat mappájában explorations , amely nem folyamatfrissítés részeként fut.
Megjegyzés:
Ez a kód a Faker használatával hozza létre a minta CDC-adatokat. A Faker automatikusan telepíthető, ezért az oktatóanyag a következőt használja %pip install faker: . A jegyzetfüzet hamisítótól való függőségét is beállíthatja. Lásd: Függőségek hozzáadása a jegyzetfüzethez.
A Lakeflow Pipelines-szerkesztőben az eszközböngésző oldalsávjának bal oldalán kattintson a
Adja hozzá, majd válassza a Feltárás lehetőséget.
Adjon meg egy nevet, például
Setup dataválassza a Pythont. Elhagyhatja az alapértelmezett célmappát, amely egy újexplorationsmappa.Kattintson a Létrehozás gombra. Ezzel létrehoz egy jegyzetfüzetet az új mappában.
Írja be az alábbi kódot az első cellába. A(z)
<my_catalog>és<my_schema>definícióját módosítania kell úgy, hogy megegyezzen az előző eljárásban kiválasztott alapértelmezett katalógussal és sémával.%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")Az oktatóanyagban használt adatkészlet létrehozásához írja be a Shift + Enter parancsot a kód futtatásához:
Opcionális. Az oktatóanyagban használt adatok előnézetéhez írja be a következő kódot a következő cellába, és futtassa a kódot. Frissítse a katalógust és a sémát az előző kód elérési útjának megfelelően.
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
Ez létrehoz egy nagy adatkészletet (hamis CDC-adatokkal), amelyet az oktatóanyag többi részében használhat. A következő lépésben töltse be az adatokat az Automatikus betöltő használatával.
3. lépés: Adatok növekményes betöltése az automatikus betöltővel
A következő lépés a nyers adatok betöltése a (hamisított) felhőbeli tárolóból egy bronzrétegbe.
Ez több okból is kihívást jelenthet, mivel a következőkre van szükség:
- Nagy léptékű működés, ami akár több millió kis fájl betöltését is jelentheti.
- Séma és JSON-típus következtetése.
- Helytelen JSON-sémával rendelkező rossz rekordok kezelése.
- Gondoskodjon a sémafejlődésről (például egy új oszlopról az ügyféltáblában).
Az Automatikus betöltő leegyszerűsíti ezt a betöltést, beleértve a sémakövetkeztetést és a sémafejlődést, miközben több millió bejövő fájlra skáláz. Auto Loader Pythonban elérhető cloudFiles-n keresztül, és SQL-ben SELECT * FROM STREAM read_files(...)-n keresztül, valamint különféle formátumokkal használható (JSON, CSV, Apache Avro stb.).
A tábla streamelési táblaként való definiálása garantálja, hogy csak új bejövő adatokat használ fel. Ha nem streamelési táblaként definiálja, az az összes rendelkezésre álló adatot megvizsgálja és betölti. További információkért tekintse meg a streamelési táblákat .
A bejövő CDC-adatok Auto Loaderrel történő betöltéséhez másolja és illessze be a következő kódot abba a kódfájlba, amelyet a folyamat során (
my_transformation.py) hozott létre. A Pythont vagy az SQL-t a folyamat létrehozásakor választott nyelv alapján használhatja. Feltétlenül cserélje le a<catalog>és<schema>helyőrzőket azokra, amelyeket az alapértelmezettként beállított a folyamat számára.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )Kattintson a
Futtassa a fájlt vagy a futtatási folyamatot a csatlakoztatott folyamat frissítésének elindításához. A folyamat egyetlen forrásfájljával ezek funkcionálisan egyenértékűek.
Amikor a frissítés befejeződik, a szerkesztő frissítve lesz a munkafolyamatra vonatkozó információkkal.
- A folyamatdiagram (DAG) a kódtól jobbra található oldalsávon egyetlen táblát jelenít meg.
customers_cdc_bronze - A frissítés összegzése a folyamateszközök böngésző tetején látható.
- A létrehozott táblázat részletei az alsó panelen jelennek meg, és a kijelöléssel tallózhat a táblázat adatai között.
Ez a felhőalapú tárolóból importált nyers bronzréteg-adatok. A következő lépésben törölje az adatokat egy ezüstrétegű táblázat létrehozásához.
4. lépés: Az adatminőség nyomon követése érdekében végzett törlés és elvárások
A bronzréteg definiálása után hozza létre az ezüst réteget az adatminőség szabályozására vonatkozó elvárások hozzáadásával. Ellenőrizze a következő feltételeket:
- Az azonosító soha nem lehet
null. - A CDC-művelettípusnak érvényesnek kell lennie.
- Az automatikus betöltőnek helyesen kell olvasnia a JSON-t.
A rendszer elveti azokat a sorokat, amelyek nem felelnek meg ezeknek a feltételeknek.
További információkért tekintse meg az adatminőségi elvárások kezelése adatfeldolgozási folyamatokkal .
A folyamategységek böngésző oldalsávjáról kattintson a
Hozzáadás, majd átalakítás.
Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) a forráskódfájlhoz. Kombinálhat és összeilleszthet nyelveket egy folyamatban, így választhat, hogy melyik nyelvet használja ehhez a lépéshez.
Ha egy megtisztított táblával rendelkező ezüstréteget szeretne létrehozni, és korlátozásokat szeretne bevezetni, másolja és illessze be az alábbi kódot az új fájlba (a fájl nyelvétől függően válassza a Pythont vagy az SQL-t).
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;Kattintson a
Futtassa a fájlt vagy a futtatási folyamatot a csatlakoztatott folyamat frissítésének elindításához.
Mivel most két forrásfájl van, ezek nem ugyanazt teszik, de ebben az esetben a kimenet ugyanaz.
- A folyamat futtatása a teljes folyamatot futtatja, beleértve a 3. lépésben szereplő kódot is. Ha a bemeneti adatok frissítése folyamatban lenne, az bármilyen forrásban történt változást átemelne a bronzrétegbe. Ez nem futtatja a kódot az adatbeállítási lépésből, mert az a feltárások mappában található, és nem része a folyamat forrásának.
- A futtatott fájl csak az aktuális forrásfájlt futtatja. Ebben az esetben a bemeneti adatok frissítése nélkül ez létrehozza a gyorsítótárazott bronztáblából származó ezüstadatokat. A folyamatkód létrehozásakor vagy szerkesztésekor célszerű csak ezt a fájlt futtatni a gyorsabb iteráció érdekében.
A frissítés befejezése után láthatja, hogy a folyamatdiagramon most két táblázat látható (a bronzrétegtől függően ezüst réteggel), az alsó panel pedig mindkét tábla részleteit jeleníti meg. A folyamategységek böngészőjének felső része mostantól több futtatási időpontot jelenít meg, de csak a legutóbbi futtatás részleteit.
Ezután hozza létre a tábla utolsó aranyréteg-verzióját customers .
5. lépés: Az ügyfelek táblázatának létrehozása automatikus CDC-folyamattal
Eddig a pillanatig a táblák minden lépésben továbbadták a CDC adatokat. Most hozza létre a customers táblát úgy, hogy az tartalmazza a legfrissebb nézetet, és legyen az eredeti tábla replikája, ne pedig az azt létrehozó CDC műveletek listája.
Nehéz kézzel megvalósítani. A legutóbbi sor megtartásához figyelembe kell vennie például az adatdeduplikációt.
A Lakeflow Spark Deklaratív Folyamatok kifogástalanul megoldja ezeket a kihívásokat a AUTO CDC művelettel.
A folyamategységek böngésző oldalsávjáról kattintson a
Hozzáadás és átalakítás.
Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) az új forráskódfájlhoz. Ehhez a lépéshez ismét kiválaszthatja a kívánt nyelvet, de használja az alábbi megfelelő kódot.
Ha a CDC-adatokat
AUTO CDCa Lakeflow Spark deklaratív folyamataival szeretné feldolgozni, másolja és illessze be az alábbi kódot az új fájlba.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;Kattintson a
Futtassa a fájlt a csatlakoztatott folyamat frissítésének elindításához.
Amikor a frissítés befejeződött, láthatja, hogy a folyamatdiagram 3 táblát jelenít meg, a bronztól az ezüstig az aranyig haladva.
6. lépés: A frissítési előzmények nyomon követése lassan változó 2. dimenziótípussal (SCD2)
Gyakran szükség van egy olyan táblázat létrehozására, amely nyomon követi az összes változást, amelyek a APPEND, UPDATE és DELETE eredményeként jönnek létre.
- Előzmények: Meg szeretné őrizni a táblázat összes módosításának előzményeit.
- Nyomon követhetőség: Látni szeretné, hogy melyik művelet történt.
SCD2 és Lakeflow SDP
A Delta támogatja a változási adatfolyamot (CDF), és table_change lekérdezheti a táblamódosításokat az SQL-ben és a Pythonban. A CDF fő felhasználási esete azonban a folyamatok módosításainak rögzítése, nem pedig a táblamódosítások teljes nézetének létrehozása az elejétől kezdve.
A megvalósítás különösen összetett feladat, ha sorrenden kívüli eseményekkel van dolgunk. Ha időbélyegzővel kell sorrendbe rendeznie a módosításokat, és olyan módosítást kell kapnia, amely a múltban történt, hozzá kell fűznie egy új bejegyzést az SCD-táblához, és frissítenie kell az előző bejegyzéseket.
A Lakeflow SDP eltávolítja ezt a bonyolultságot, és lehetővé teszi egy külön táblázat létrehozását, amely az idő kezdetétől kezdve tartalmazza az összes módosítást. Ezt a táblázatot ezután nagy méretekben, adott partíciókkal vagy szükség esetén ZORDER-oszlopokkal is használhatja. Nem sorrendben lévő mezőket a rendszer a _sequence_by alapján kezeli.
SCD2-tábla létrehozásához használja az STORED AS SCD TYPE 2 opciót SQL-ben vagy a stored_as_scd_type="2" opciót Pythonban.
Megjegyzés:
Azt is korlátozhatja, hogy a funkció mely oszlopokat követi nyomon a következő beállítással: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
A folyamategységek böngésző oldalsávjáról kattintson a
Hozzáadás és átalakítás.
Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) az új forráskódfájlhoz.
Másolja és illessze be az alábbi kódot az új fájlba.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;Kattintson a
Futtassa a fájlt a csatlakoztatott folyamat frissítésének elindításához.
Ha a frissítés befejeződött, a folyamatdiagram tartalmazza az új customers_history táblát, amely szintén az ezüstréteg-táblázattól függ, az alsó panel pedig mind a 4 tábla részleteit mutatja.
7. lépés: Hozzon létre egy materializált nézetet, amely nyomon követi, hogy kik módosították a legtöbbet az adataikat
A tábla customers_history tartalmazza az összes előzménymódosítást, amelyet a felhasználó végzett az adataikon. Hozzon létre egy egyszerű materializált nézetet az aranyrétegben, amely nyomon követi, hogy ki módosította a legtöbbet az adataikat. Ez felhasználható csalásészlelési elemzésekhez vagy felhasználói javaslatokhoz egy valós forgatókönyvben. Emellett az SCD2-vel végzett módosítások alkalmazása már eltávolította az ismétlődéseket, így közvetlenül megszámolhatja a sorokat felhasználóazonosítónként.
A folyamategységek böngésző oldalsávjáról kattintson a
Hozzáadás és átalakítás.
Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) az új forráskódfájlhoz.
Másolja és illessze be az alábbi kódot az új forrásfájlba.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY idKattintson a
Futtassa a fájlt a csatlakoztatott folyamat frissítésének elindításához.
A frissítés befejezése után a folyamatgráfban egy új tábla található, amely a customers_history táblától függ, és az alsó panelen tekintheti meg. A pipeline most már teljes. Ezt egy teljes futtatási folyamat végrehajtásával tesztelheti. Az egyetlen lépés a folyamat rendszeres frissítésének ütemezése.
8. lépés: Feladat létrehozása az ETL-folyamat futtatásához
Ezután hozzon létre egy munkafolyamatot, amely automatizálja a folyamat adatbetöltési, feldolgozási és elemzési lépéseit egy Databricks-feladat használatával.
- A szerkesztő felületének tetején válassza ki az Ütemezés gombot.
- Ha megjelenik az Ütemezések párbeszédpanel, válassza az Ütemezés hozzáadása lehetőséget.
- Ekkor megnyílik az Új ütemezés párbeszédpanel, ahol létrehozhat egy feladatot a folyamat ütemezés szerinti futtatásához.
- Ha szeretné, adjon nevet a feladatnak.
- Alapértelmezés szerint az ütemezés naponta egyszer fut. Elfogadhatja ezt az alapértelmezett beállítást, vagy beállíthatja a saját ütemezését. A Speciális lehetőséget választva megadhatja, hogy a feladat mikor fusson. A További beállítások lehetőséget választva értesítéseket hozhat létre a feladat futtatásakor.
- A módosítások alkalmazásához és a feladat létrehozásához válassza a Létrehozás lehetőséget .
Most a munkafolyamat naponta fut, hogy naprakészen tartsa a pipeline-t. Az ütemezések listájának megtekintéséhez válassza ismét az Ütemezés lehetőséget. A folyamat ütemezéseit ezen a párbeszédpanelen kezelheti, beleértve az ütemezések hozzáadását, szerkesztését vagy eltávolítását.
Az ütemezés (vagy feladat) nevére kattintva a feladat lapjára léphet a Feladatok > folyamatok listájában. Innen megtekintheti a feladatfuttatások részleteit, beleértve a futtatások előzményeit, vagy futtathatja a feladatot azonnal a Futtatás most gombbal.
A feladatfuttatásokkal kapcsolatos további információkért tekintse meg a Lakeflow-feladatok monitorozását és megfigyelhetőségét .
További erőforrások
- Lakeflow Spark deklaratív folyamatok
- Oktatóanyag: ETL-folyamat létrehozása a Lakeflow Spark deklaratív folyamataival
- Mi az a változásadat-rögzítés (CDC)?
- Az AUTO CDC API-k: A változási adatok rögzítésének egyszerűsítése csatornákkal
- Folyamat átalakítása Databricks-eszközcsomag-projektté
- Mi az automatikus betöltő?