Megosztás a következőn keresztül:


Oktatóanyag: ETL-folyamat létrehozása változásadat-rögzítéssel

Megtudhatja, hogyan hozhat létre és helyezhet üzembe ETL-(kinyerési, átalakító és betöltési) folyamatokat változási adatrögzítéssel (CDC) a Lakeflow Spark Deklaratív folyamatok (SDP) használatával az adatvezényléshez és az automatikus betöltéshez. Az ETL-folyamatok implementálják a forrásrendszerekből származó adatok beolvasásának lépéseit, átalakítják az adatokat olyan követelmények alapján, mint az adatminőség-ellenőrzések és a duplikációk rögzítése, és az adatokat egy célrendszerbe, például egy adattárházba vagy egy adattóba írják.

Ebben az oktatóanyagban egy MySQL-adatbázis táblájából customers származó adatokat fog használni a következőkre:

  • Bontsa ki a módosításokat egy tranzakciós adatbázisból a Debezium vagy egy másik eszköz használatával, és mentse őket a felhőalapú objektumtárba (S3, ADLS vagy GCS). Ebben az oktatóanyagban kihagyja egy külső CDC-rendszer beállítását, és ehelyett hamis adatokat hoz létre az oktatóanyag egyszerűsítése érdekében.
  • Az Automatikus betöltő használatával növekményesen töltheti be az üzeneteket a felhőobjektum-tárolóból, és tárolhatja a nyers üzeneteket a customers_cdc táblában. Az Automatikus betöltő a sémára következtet, és kezeli a sémafejlődést.
  • Hozza létre a táblát az customers_cdc_clean adatminőség elvárásokkal való ellenőrzéséhez. Például soha ne legyen idnull, mert az upsert műveletek futtatására használják.
  • Végezze el AUTO CDC ... INTO a megtisztított CDC-adatokon a módosítások végleges customers táblába való beillesztését.
  • Annak bemutatása, hogyan hozhat létre egy folyamat egy 2. típusú, lassan változó dimenziótáblát (SCD2) az összes változás nyomon követéséhez.

A cél a nyers adatok közel valós idejű betöltése, és egy táblázat létrehozása az elemzői csapat számára az adatminőség biztosítása mellett.

Az oktatóanyag a medallion Lakehouse architektúrát használja, ahol nyers adatokat vesz fel a bronzrétegen keresztül, megtisztítja és ellenőrzi az adatokat az ezüst réteggel, és dimenziómodellezést és összesítést alkalmaz az aranyréteg használatával. További információért lásd: Mi a medallion lakehouse architektúra?

A megvalósított folyamat a következőképpen néz ki:

Adatcsatorna CDC-vel

További információ a folyamatról, az automatikus betöltőről és a CDC-ről: Lakeflow Spark Deklaratív folyamatok, Mi az automatikus betöltő?, és mi az a változásadat-rögzítés (CDC)?

Requirements

Az oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:

Adatrögzítés módosítása ETL-folyamatban

A változásadat-rögzítés (CDC) egy tranzakciós adatbázisban (például MySQL vagy PostgreSQL) vagy adattárházban végrehajtott rekordok módosításainak rögzítésére szolgáló folyamat. A CDC rögzíti az olyan műveleteket, mint az adattörlések, a hozzáfűzések és a frissítések, általában streamként a táblák külső rendszerekben való újravalósításához. A CDC lehetővé teszi a növekményes betöltést, miközben nincs szükség tömeges terheléses frissítésekre.

Megjegyzés:

Az oktatóanyag egyszerűsítése érdekében hagyja ki egy külső CDC-rendszer beállítását. Tegyük fel, hogy JSON-fájlként futtatja és menti a CDC-adatokat a felhőobjektum-tárolóban (S3, ADLS vagy GCS). Ez az oktatóanyag a Faker tár használatával hozza létre az oktatóanyagban használt adatokat.

CDC rögzítése

Számos CDC-eszköz érhető el. Az egyik vezető nyílt forráskódú megoldás a Debezium, de léteznek olyan implementációk, amelyek leegyszerűsítik az adatforrásokat, például a Fivetran, a Qlik Replikálás, a StreamSets, a Talend, az Oracle GoldenGate és az AWS DMS.

Ebben az oktatóanyagban CDC-adatokat használ egy külső rendszerből, például a Debeziumból vagy a DMS-ből. A Debezium minden módosított sort rögzít. Általában elküldi az adatmódosítások előzményeit a Kafka-témaköröknek, vagy fájlként menti őket.

Be kell vennie a CDC-adatokat a customers táblából (JSON formátum), ellenőriznie kell, hogy helyesek-e, majd a Lakehouse-ban kell hasznosítania az ügyfelek tábláját.

CDC-bemenet a Debeziumból

Minden módosításhoz egy JSON-üzenet érkezik, amely tartalmazza a frissíteni kívánt sor összes mezőjét (id, , firstname, lastname, emailaddress). Az üzenet további metaadatokat is tartalmaz:

  • operation: Egy műveleti kód, általában (DELETE, APPEND, UPDATE).
  • operation_date: Az egyes műveletek rekordjának dátuma és időbélyege.

Az olyan eszközök, mint a Debezium, fejlettebb kimenetet hozhatnak létre, például a módosítás előtti sorértéket, de ez az oktatóanyag kihagyja őket az egyszerűség kedvéért.

1. lépés: Folyamat létrehozása

Hozzon létre egy új ETL-folyamatot a CDC-adatforrás lekérdezéséhez és táblák létrehozásához a munkaterületen.

  1. A munkaterületen kattintson a Plusz ikonra. Új a bal felső sarokban.

  2. Kattintson az ETL-folyamat elemre.

  3. Módosítsa a pipeline címét Pipelines with CDC tutorial vagy egy Ön által preferált névre.

  4. A cím alatt válasszon ki egy katalógust és sémát, amelyhez írási engedélyekkel rendelkezik.

    Ez a katalógus és séma alapértelmezés szerint használatos, ha nem ad meg katalógust vagy sémát a kódban. A kód bármilyen katalógusba vagy sémába írhat a teljes elérési út megadásával. Ez az oktatóanyag az itt megadott alapértelmezett értékeket használja.

  5. A Speciális beállítások között válassza a Start with an empty file (Indítás üres fájllal) lehetőséget.

  6. Válasszon egy mappát a kódhoz. A Tallózás gombra kattintva tallózhat a munkaterület mappáinak listájában. Bármelyik mappát kiválaszthatja, amelyhez írási engedélyekkel rendelkezik.

    A verziókövetés használatához válasszon ki egy Git-mappát. Ha új mappát szeretne létrehozni, válassza a Plusz ikont.

  7. Az oktatóanyaghoz használni kívánt nyelv alapján válassza a Pythont vagy az SQL-t a fájl nyelvéhez.

  8. A Kiválasztás gombra kattintva hozza létre a folyamatot ezekkel a beállításokkal, és nyissa meg a Lakeflow Pipelines-szerkesztőt.

Most már rendelkezik egy üres folyamatlánccal, amely alapértelmezett katalógussal és sémával rendelkezik. Ezután állítsa be a mintaadatokat importálásra az oktatóanyagban.

2. lépés: Az ebben az oktatóanyagban importálandó mintaadatok létrehozása

Erre a lépésre nincs szükség, ha saját adatokat importál egy meglévő forrásból. Ebben az oktatóanyagban hamis adatokat hozhat létre példaként az oktatóanyaghoz. Hozzon létre egy jegyzetfüzetet a Python-adatgenerálási szkript futtatásához. Ezt a kódot csak egyszer kell futtatni a mintaadatok létrehozásához, ezért hozza létre a folyamat mappájában explorations , amely nem folyamatfrissítés részeként fut.

Megjegyzés:

Ez a kód a Faker használatával hozza létre a minta CDC-adatokat. A Faker automatikusan telepíthető, ezért az oktatóanyag a következőt használja %pip install faker: . A jegyzetfüzet hamisítótól való függőségét is beállíthatja. Lásd: Függőségek hozzáadása a jegyzetfüzethez.

  1. A Lakeflow Pipelines-szerkesztőben az eszközböngésző oldalsávjának bal oldalán kattintson a Plusz ikonra.Adja hozzá, majd válassza a Feltárás lehetőséget.

  2. Adjon meg egy nevet, például Setup dataválassza a Pythont. Elhagyhatja az alapértelmezett célmappát, amely egy új explorations mappa.

  3. Kattintson a Létrehozás gombra. Ezzel létrehoz egy jegyzetfüzetet az új mappában.

  4. Írja be az alábbi kódot az első cellába. A(z) <my_catalog> és <my_schema> definícióját módosítania kell úgy, hogy megegyezzen az előző eljárásban kiválasztott alapértelmezett katalógussal és sémával.

    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"
    
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
    volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exist, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  5. Az oktatóanyagban használt adatkészlet létrehozásához írja be a Shift + Enter parancsot a kód futtatásához:

  6. Opcionális. Az oktatóanyagban használt adatok előnézetéhez írja be a következő kódot a következő cellába, és futtassa a kódot. Frissítse a katalógust és a sémát az előző kód elérési útjának megfelelően.

    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"
    
    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
    

Ez létrehoz egy nagy adatkészletet (hamis CDC-adatokkal), amelyet az oktatóanyag többi részében használhat. A következő lépésben töltse be az adatokat az Automatikus betöltő használatával.

3. lépés: Adatok növekményes betöltése az automatikus betöltővel

A következő lépés a nyers adatok betöltése a (hamisított) felhőbeli tárolóból egy bronzrétegbe.

Ez több okból is kihívást jelenthet, mivel a következőkre van szükség:

  • Nagy léptékű működés, ami akár több millió kis fájl betöltését is jelentheti.
  • Séma és JSON-típus következtetése.
  • Helytelen JSON-sémával rendelkező rossz rekordok kezelése.
  • Gondoskodjon a sémafejlődésről (például egy új oszlopról az ügyféltáblában).

Az Automatikus betöltő leegyszerűsíti ezt a betöltést, beleértve a sémakövetkeztetést és a sémafejlődést, miközben több millió bejövő fájlra skáláz. Auto Loader Pythonban elérhető cloudFiles-n keresztül, és SQL-ben SELECT * FROM STREAM read_files(...)-n keresztül, valamint különféle formátumokkal használható (JSON, CSV, Apache Avro stb.).

A tábla streamelési táblaként való definiálása garantálja, hogy csak új bejövő adatokat használ fel. Ha nem streamelési táblaként definiálja, az az összes rendelkezésre álló adatot megvizsgálja és betölti. További információkért tekintse meg a streamelési táblákat .

  1. A bejövő CDC-adatok Auto Loaderrel történő betöltéséhez másolja és illessze be a következő kódot abba a kódfájlba, amelyet a folyamat során (my_transformation.py) hozott létre. A Pythont vagy az SQL-t a folyamat létrehozásakor választott nyelv alapján használhatja. Feltétlenül cserélje le a <catalog> és <schema> helyőrzőket azokra, amelyeket az alapértelmezettként beállított a folyamat számára.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # Replace with the catalog and schema name that
    # you are using:
    path = "/Volumes/<catalog>/<schema>/raw_data/customers"
    
    
    # Create the target bronze table
    dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @dp.append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load(f"{path}")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        -- replace with the catalog/schema you are using:
        "/Volumes/<catalog>/<schema>/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. Kattintson a Lejátszás ikonra.Futtassa a fájlt vagy a futtatási folyamatot a csatlakoztatott folyamat frissítésének elindításához. A folyamat egyetlen forrásfájljával ezek funkcionálisan egyenértékűek.

Amikor a frissítés befejeződik, a szerkesztő frissítve lesz a munkafolyamatra vonatkozó információkkal.

  • A folyamatdiagram (DAG) a kódtól jobbra található oldalsávon egyetlen táblát jelenít meg. customers_cdc_bronze
  • A frissítés összegzése a folyamateszközök böngésző tetején látható.
  • A létrehozott táblázat részletei az alsó panelen jelennek meg, és a kijelöléssel tallózhat a táblázat adatai között.

Ez a felhőalapú tárolóból importált nyers bronzréteg-adatok. A következő lépésben törölje az adatokat egy ezüstrétegű táblázat létrehozásához.

4. lépés: Az adatminőség nyomon követése érdekében végzett törlés és elvárások

A bronzréteg definiálása után hozza létre az ezüst réteget az adatminőség szabályozására vonatkozó elvárások hozzáadásával. Ellenőrizze a következő feltételeket:

  • Az azonosító soha nem lehet null.
  • A CDC-művelettípusnak érvényesnek kell lennie.
  • Az automatikus betöltőnek helyesen kell olvasnia a JSON-t.

A rendszer elveti azokat a sorokat, amelyek nem felelnek meg ezeknek a feltételeknek.

További információkért tekintse meg az adatminőségi elvárások kezelése adatfeldolgozási folyamatokkal .

  1. A folyamategységek böngésző oldalsávjáról kattintson a Plusz ikonra.Hozzáadás, majd átalakítás.

  2. Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) a forráskódfájlhoz. Kombinálhat és összeilleszthet nyelveket egy folyamatban, így választhat, hogy melyik nyelvet használja ehhez a lépéshez.

  3. Ha egy megtisztított táblával rendelkező ezüstréteget szeretne létrehozni, és korlátozásokat szeretne bevezetni, másolja és illessze be az alábbi kódot az új fájlba (a fájl nyelvétől függően válassza a Pythont vagy az SQL-t).

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @dp.append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          spark.readStream.table("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  4. Kattintson a Lejátszás ikonra.Futtassa a fájlt vagy a futtatási folyamatot a csatlakoztatott folyamat frissítésének elindításához.

    Mivel most két forrásfájl van, ezek nem ugyanazt teszik, de ebben az esetben a kimenet ugyanaz.

    • A folyamat futtatása a teljes folyamatot futtatja, beleértve a 3. lépésben szereplő kódot is. Ha a bemeneti adatok frissítése folyamatban lenne, az bármilyen forrásban történt változást átemelne a bronzrétegbe. Ez nem futtatja a kódot az adatbeállítási lépésből, mert az a feltárások mappában található, és nem része a folyamat forrásának.
    • A futtatott fájl csak az aktuális forrásfájlt futtatja. Ebben az esetben a bemeneti adatok frissítése nélkül ez létrehozza a gyorsítótárazott bronztáblából származó ezüstadatokat. A folyamatkód létrehozásakor vagy szerkesztésekor célszerű csak ezt a fájlt futtatni a gyorsabb iteráció érdekében.

A frissítés befejezése után láthatja, hogy a folyamatdiagramon most két táblázat látható (a bronzrétegtől függően ezüst réteggel), az alsó panel pedig mindkét tábla részleteit jeleníti meg. A folyamategységek böngészőjének felső része mostantól több futtatási időpontot jelenít meg, de csak a legutóbbi futtatás részleteit.

Ezután hozza létre a tábla utolsó aranyréteg-verzióját customers .

5. lépés: Az ügyfelek táblázatának létrehozása automatikus CDC-folyamattal

Eddig a pillanatig a táblák minden lépésben továbbadták a CDC adatokat. Most hozza létre a customers táblát úgy, hogy az tartalmazza a legfrissebb nézetet, és legyen az eredeti tábla replikája, ne pedig az azt létrehozó CDC műveletek listája.

Nehéz kézzel megvalósítani. A legutóbbi sor megtartásához figyelembe kell vennie például az adatdeduplikációt.

A Lakeflow Spark Deklaratív Folyamatok kifogástalanul megoldja ezeket a kihívásokat a AUTO CDC művelettel.

  1. A folyamategységek böngésző oldalsávjáról kattintson a Plusz ikonra.Hozzáadás és átalakítás.

  2. Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) az új forráskódfájlhoz. Ehhez a lépéshez ismét kiválaszthatja a kívánt nyelvet, de használja az alábbi megfelelő kódot.

  3. Ha a CDC-adatokat AUTO CDC a Lakeflow Spark deklaratív folyamataival szeretné feldolgozni, másolja és illessze be az alábbi kódot az új fájlba.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dp.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  4. Kattintson a Lejátszás ikonra.Futtassa a fájlt a csatlakoztatott folyamat frissítésének elindításához.

Amikor a frissítés befejeződött, láthatja, hogy a folyamatdiagram 3 táblát jelenít meg, a bronztól az ezüstig az aranyig haladva.

6. lépés: A frissítési előzmények nyomon követése lassan változó 2. dimenziótípussal (SCD2)

Gyakran szükség van egy olyan táblázat létrehozására, amely nyomon követi az összes változást, amelyek a APPEND, UPDATE és DELETE eredményeként jönnek létre.

  • Előzmények: Meg szeretné őrizni a táblázat összes módosításának előzményeit.
  • Nyomon követhetőség: Látni szeretné, hogy melyik művelet történt.

SCD2 és Lakeflow SDP

A Delta támogatja a változási adatfolyamot (CDF), és table_change lekérdezheti a táblamódosításokat az SQL-ben és a Pythonban. A CDF fő felhasználási esete azonban a folyamatok módosításainak rögzítése, nem pedig a táblamódosítások teljes nézetének létrehozása az elejétől kezdve.

A megvalósítás különösen összetett feladat, ha sorrenden kívüli eseményekkel van dolgunk. Ha időbélyegzővel kell sorrendbe rendeznie a módosításokat, és olyan módosítást kell kapnia, amely a múltban történt, hozzá kell fűznie egy új bejegyzést az SCD-táblához, és frissítenie kell az előző bejegyzéseket.

A Lakeflow SDP eltávolítja ezt a bonyolultságot, és lehetővé teszi egy külön táblázat létrehozását, amely az idő kezdetétől kezdve tartalmazza az összes módosítást. Ezt a táblázatot ezután nagy méretekben, adott partíciókkal vagy szükség esetén ZORDER-oszlopokkal is használhatja. Nem sorrendben lévő mezőket a rendszer a _sequence_by alapján kezeli.

SCD2-tábla létrehozásához használja az STORED AS SCD TYPE 2 opciót SQL-ben vagy a stored_as_scd_type="2" opciót Pythonban.

Megjegyzés:

Azt is korlátozhatja, hogy a funkció mely oszlopokat követi nyomon a következő beállítással: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. A folyamategységek böngésző oldalsávjáról kattintson a Plusz ikonra.Hozzáadás és átalakítás.

  2. Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) az új forráskódfájlhoz.

  3. Másolja és illessze be az alábbi kódot az új fájlba.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # create the table
    dp.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dp.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW customers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  4. Kattintson a Lejátszás ikonra.Futtassa a fájlt a csatlakoztatott folyamat frissítésének elindításához.

Ha a frissítés befejeződött, a folyamatdiagram tartalmazza az új customers_history táblát, amely szintén az ezüstréteg-táblázattól függ, az alsó panel pedig mind a 4 tábla részleteit mutatja.

7. lépés: Hozzon létre egy materializált nézetet, amely nyomon követi, hogy kik módosították a legtöbbet az adataikat

A tábla customers_history tartalmazza az összes előzménymódosítást, amelyet a felhasználó végzett az adataikon. Hozzon létre egy egyszerű materializált nézetet az aranyrétegben, amely nyomon követi, hogy ki módosította a legtöbbet az adataikat. Ez felhasználható csalásészlelési elemzésekhez vagy felhasználói javaslatokhoz egy valós forgatókönyvben. Emellett az SCD2-vel végzett módosítások alkalmazása már eltávolította az ismétlődéseket, így közvetlenül megszámolhatja a sorokat felhasználóazonosítónként.

  1. A folyamategységek böngésző oldalsávjáról kattintson a Plusz ikonra.Hozzáadás és átalakítás.

  2. Adjon meg egy nevet , és válasszon egy nyelvet (Python vagy SQL) az új forráskódfájlhoz.

  3. Másolja és illessze be az alábbi kódot az új forrásfájlba.

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    @dp.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        spark.read.table("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  4. Kattintson a Lejátszás ikonra.Futtassa a fájlt a csatlakoztatott folyamat frissítésének elindításához.

A frissítés befejezése után a folyamatgráfban egy új tábla található, amely a customers_history táblától függ, és az alsó panelen tekintheti meg. A pipeline most már teljes. Ezt egy teljes futtatási folyamat végrehajtásával tesztelheti. Az egyetlen lépés a folyamat rendszeres frissítésének ütemezése.

8. lépés: Feladat létrehozása az ETL-folyamat futtatásához

Ezután hozzon létre egy munkafolyamatot, amely automatizálja a folyamat adatbetöltési, feldolgozási és elemzési lépéseit egy Databricks-feladat használatával.

  1. A szerkesztő felületének tetején válassza ki az Ütemezés gombot.
  2. Ha megjelenik az Ütemezések párbeszédpanel, válassza az Ütemezés hozzáadása lehetőséget.
  3. Ekkor megnyílik az Új ütemezés párbeszédpanel, ahol létrehozhat egy feladatot a folyamat ütemezés szerinti futtatásához.
  4. Ha szeretné, adjon nevet a feladatnak.
  5. Alapértelmezés szerint az ütemezés naponta egyszer fut. Elfogadhatja ezt az alapértelmezett beállítást, vagy beállíthatja a saját ütemezését. A Speciális lehetőséget választva megadhatja, hogy a feladat mikor fusson. A További beállítások lehetőséget választva értesítéseket hozhat létre a feladat futtatásakor.
  6. A módosítások alkalmazásához és a feladat létrehozásához válassza a Létrehozás lehetőséget .

Most a munkafolyamat naponta fut, hogy naprakészen tartsa a pipeline-t. Az ütemezések listájának megtekintéséhez válassza ismét az Ütemezés lehetőséget. A folyamat ütemezéseit ezen a párbeszédpanelen kezelheti, beleértve az ütemezések hozzáadását, szerkesztését vagy eltávolítását.

Az ütemezés (vagy feladat) nevére kattintva a feladat lapjára léphet a Feladatok > folyamatok listájában. Innen megtekintheti a feladatfuttatások részleteit, beleértve a futtatások előzményeit, vagy futtathatja a feladatot azonnal a Futtatás most gombbal.

A feladatfuttatásokkal kapcsolatos további információkért tekintse meg a Lakeflow-feladatok monitorozását és megfigyelhetőségét .

További erőforrások