Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Zjistěte, jak vytvořit a nasadit ETL pipeline (extrakce, transformace a načítání) s funkcí zachycování změn dat (CDC) pomocí deklarativních kanálů Lakeflow Spark (SDP) pro orchestraci dat a Auto Loader. Kanál ETL implementuje kroky pro čtení dat ze zdrojových systémů, transformaci těchto dat na základě požadavků, jako jsou kontroly kvality dat a odstranění duplicit, a zápis dat do cílového systému, jako je datový sklad nebo datové jezero.
V tomto kurzu použijete data z customers tabulky v databázi MySQL k:
- Extrahujte změny z transakční databáze pomocí Debezium nebo jiného nástroje a uložte je do cloudového úložiště objektů (S3, ADLS nebo GCS). V tomto kurzu přeskočíte nastavení externího systému CDC a místo toho vygenerujete falešná data, aby se tento kurz zjednodušil.
- Pomocí Auto Loader můžete přibývajícím způsobem načítat zprávy z cloudového objektového úložiště a ukládat nezpracované zprávy do tabulky
customers_cdc. Auto Loader odvodí schéma a zpracovává vývoj schématu. -
customers_cdc_cleanVytvořte tabulku pro kontrolu kvality dat pomocí očekávání. Napříkladidby nikdy nemělo býtnull, protože se používá ke spouštění operací upsert. - Proveďte operaci
AUTO CDC ... INTOna vyčištěná data CDC, abyste změny vložili do konečné tabulkycustomers. - Ukazuje, jak může kanál vytvořit tabulku pomalu se měnící dimenze typu 2 (SCD2), aby bylo možné sledovat všechny změny.
Cílem je ingestovat nezpracovaná data téměř v reálném čase a vytvořit tabulku pro tým analytiků a zároveň zajistit kvalitu dat.
Kurz používá architekturu medallion Lakehouse, kde ingestuje nezpracovaná data prostřednictvím bronzové vrstvy, čistí a ověřuje data pomocí stříbrné vrstvy a používá dimenzionální modelování a agregaci pomocí zlaté vrstvy. Další informace najdete v tématu Co je architektura jezera medallion?
Implementovaný tok vypadá takto:
Další informace o potrubí, Automatickém nakladači a CDC viz Deklarativní potrubí Sparku Lakeflow, Co je Automatický nakladač? a Co je zachytávání dat změn (CDC)?
Požadavky
K dokončení tohoto kurzu musíte splnit následující požadavky:
- Přihlaste se k pracovnímu prostoru Azure Databricks.
- Mějte povolený Unity katalog pro svůj pracovní prostor.
- Pro váš účet máte povolené výpočetní prostředky bez serveru . Deklarativní kanály Spark Lakeflow bez serveru nejsou dostupné ve všech oblastech pracovního prostoru. Viz Funkce s omezenou regionální dostupností pro dostupné oblasti. Pokud pro váš účet není povolený bezserverový výpočetní výkon, měly by kroky fungovat s výchozími výpočetními prostředky pro váš pracovní prostor.
- Máte oprávnění k vytvoření výpočetního prostředku nebo přístupu k výpočetnímu prostředku.
- Máte oprávnění k vytvoření nového schématu v katalogu. Požadovaná oprávnění jsou
ALL PRIVILEGESneboUSE CATALOGaCREATE SCHEMA. - Máte oprávnění k vytvoření nového svazku v existujícím schématu. Požadovaná oprávnění jsou
ALL PRIVILEGESneboUSE SCHEMAaCREATE VOLUME.
Změna zachytávání dat v kanálu ETL
Zachytávání dat změn (CDC) je proces, který zachycuje změny v záznamech provedených v transakční databázi (například MySQL nebo PostgreSQL) nebo datovém skladu. CDC zaznamenává operace, jako jsou odstranění dat, připojení a aktualizace, obvykle jako datový proud pro opětovné materializaci tabulek v externích systémech. CDC umožňuje přírůstkové načítání při eliminaci potřeby aktualizací načítaných hromadně.
Poznámka:
Pokud chcete tento kurz zjednodušit, přeskočte nastavení externího systému CDC. Předpokládejme, že jsou spuštěná a uložená data CDC jako soubory JSON v cloudovém úložišti objektů (S3, ADLS nebo GCS). Tento kurz používá knihovnu Faker k vygenerování dat použitých v tomto kurzu.
Zachytávání CDC
K dispozici jsou různé nástroje CDC. Jedním z předních open-sourceových řešení je Debezium, ale existují také další implementace, které zjednodušují zdroje dat, jako například Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate a AWS DMS.
V tomto kurzu použijete data CDC z externího systému, jako je Debezium nebo DMS. Debezium zachycuje každý změněný řádek. Obvykle odesílá historii změn dat do témat Kafka nebo je ukládá jako soubory.
Je nutné načíst informace CDC z tabulky customers ve formátu JSON, zkontrolovat jejich správnost, a následně vytvořit materiálovou podobu tabulky zákazníků v rámci Lakehouse.
Vstup CDC z Debezium
Pro každou změnu obdržíte zprávu JSON obsahující všechna pole řádku, která se aktualizují (id, , firstnamelastname, email, ). address Zpráva obsahuje také další metadata:
-
operation: Kód operace, obvykle (DELETE,APPEND,UPDATE). -
operation_date: Datum a časové razítko záznamu pro každou operaci.
Nástroje, jako je Debezium, můžou produkovat pokročilejší výstup, například hodnotu řádku před změnou, ale tento kurz je pro jednoduchost vynechá.
Krok 1: Vytvoření kanálu
Vytvořte nový kanál ETL pro dotazování zdroje dat CDC a generování tabulek v pracovním prostoru.
V pracovním prostoru klikněte na
Nový v levém horním rohu.
Klikněte na kanál ETL.
Změňte název potrubí na
Pipelines with CDC tutorialnebo na název, který dáváte přednost.Pod názvem zvolte katalog a schéma, pro které máte oprávnění k zápisu.
Tento katalog a schéma se ve výchozím nastavení používají, pokud v kódu nezadáte katalog nebo schéma. Kód může zapisovat do libovolného katalogu nebo schématu zadáním úplné cesty. V tomto kurzu se používají výchozí hodnoty, které zde zadáte.
V rozšířených možnostech vyberte Začít s prázdným souborem.
Zvolte složku pro kód. Seznam složek v pracovním prostoru můžete procházet výběrem možnosti Procházet . Můžete zvolit libovolnou složku, pro kterou máte oprávnění k zápisu.
Pokud chcete použít správu verzí, vyberte složku Gitu. Pokud potřebujete vytvořit novou složku, vyberte
V závislosti na jazyce, který chcete pro kurz použít, zvolte Python nebo SQL pro jazyk souboru.
Kliknutím na Vybrat vytvoříte kanál s těmito nastaveními a otevřete Editor kanálů Lakeflow.
Teď máte prázdný kanál s výchozím katalogem a schématem. Dále nastavte ukázková data pro import v kurzu.
Krok 2: Vytvoření ukázkových dat pro import v tomto kurzu
Tento krok není potřeba, pokud importujete vlastní data z existujícího zdroje. Pro účely tohoto kurzu vygenerujte falešná data jako příklad pro tento kurz. Vytvořte poznámkový blok pro spuštění skriptu generování dat Pythonu. Tento kód stačí spustit jenom jednou, aby se vygenerovala ukázková data, takže ho vytvořte ve složce kanálu explorations , která se nespustí jako součást aktualizace kanálu.
Poznámka:
Tento kód používá Faker k vygenerování ukázkových dat CDC. Faker je k dispozici k instalaci automaticky, takže kurz používá %pip install faker. Můžete také nastavit závislost na Fakeru pro notebook. Viz Přidání závislostí do poznámkového bloku.
V editoru Lakeflow Pipelines v bočním panelu prohlížeče prostředků vlevo od editoru klikněte na
Přidat, poté zvolte Průzkum.
Dejte tomu Name, jako třeba
Setup data, vyberte Python. Můžete ponechat výchozí cílovou složku, což je nováexplorationssložka.Klikněte na Vytvořit. Tím se vytvoří poznámkový blok v nové složce.
Do první buňky zadejte následující kód. Je nutné změnit definici
<my_catalog>a<my_schema>tak, aby odpovídaly výchozímu katalogu a schématu, které jste vybrali v předchozím postupu:%pip install faker # Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = dbName = db = "<my_schema>" spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`') volume_folder = f"/Volumes/{catalog}/{db}/raw_data" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exist, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")Pokud chcete vygenerovat datovou sadu použitou v kurzu, zadáním klávesy Shift + Enter spusťte kód:
Optional. Pokud chcete zobrazit náhled dat použitých v tomto kurzu, zadejte do další buňky následující kód a spusťte kód. Aktualizujte katalog a schéma tak, aby odpovídaly cestě z předchozího kódu.
# Update these to match the catalog and schema # that you used for the pipeline in step 1. catalog = "<my_catalog>" schema = "<my_schema>" display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
Tím se vygeneruje rozsáhlá datová sada (s falešnými daty CDC), kterou můžete použít ve zbytku kurzu. V dalším kroku načtěte data pomocí Auto Loader.
Krok 3: Přírůstkové načítání dat pomocí Auto Loaderu
Dalším krokem je importování nezpracovaných dat z cloudového úložiště (simulované) do bronzové vrstvy.
To může být náročné z několika důvodů, protože musíte:
- Provozujte ve velkém měřítku a potenciálně ingestujte miliony malých souborů.
- Odvození schématu a typu JSON
- Zpracování chybových záznamů s nesprávným schématem JSON
- Dbejte na vývoj schématu (například nový sloupec v tabulce zákazníků).
Automatický zavaděč zjednodušuje příjem dat, včetně odvozování schématu a vývoje schématu a zároveň škálování na miliony příchozích souborů. Auto Loader je k dispozici v Pythonu pomocí cloudFiles a v SQL pomocí SELECT * FROM STREAM read_files(...) a lze použít s různými formáty (JSON, CSV, Apache Avro atd.).
Definování tabulky jako streamované tabulky zaručuje, že budete využívat pouze nová příchozí data. Pokud ji nedefinujete jako streamovací tabulku, prohledá a ingestuje všechna dostupná data. Další informace najdete v tabulkách streamování .
Chcete-li ingestovat příchozí data CDC pomocí Auto Loaderu, zkopírujte a vložte následující kód do souboru kódu vytvořeného pomocí vašeho kanálu (nazvaný
my_transformation.py). V závislosti na jazyce, který jste zvolili při vytváření kanálu, můžete použít Python nebo SQL. Nezapomeňte nahradit<catalog>a<schema>těmi, které jste nastavili jako výchozí pro pipeline.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # Replace with the catalog and schema name that # you are using: path = "/Volumes/<catalog>/<schema>/raw_data/customers" # Create the target bronze table dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @dp.append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(f"{path}") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( -- replace with the catalog/schema you are using: "/Volumes/<catalog>/<schema>/raw_data/customers", format => "json", inferColumnTypes => "true" )Klikněte na
Spustit soubor nebo Spustit kanál pro zahájení aktualizace připojeného kanálu. S pouze jedním zdrojovým souborem v řetězci jsou funkčně ekvivalentní.
Po dokončení aktualizace se editor aktualizuje informacemi o vašem datovém toku.
- Graf pipeline (DAG) na bočním panelu napravo od kódu zobrazuje jednu tabulku
customers_cdc_bronze. - Souhrn aktualizace se zobrazí v horní části prohlížeče prostředků vývojové linie.
- Podrobnosti o vygenerované tabulce se zobrazují v dolním podokně a data z tabulky můžete procházet tak, že ji vyberete.
Jedná se o nezpracovaná data z bronzové vrstvy importovaná z cloudového úložiště. V dalším kroku vyčistěte data pro vytvoření tabulky stříbrné vrstvy.
Krok 4: Vyčištění a nastavení očekávání pro sledování kvality dat
Po definování bronzové vrstvy vytvořte stříbrnou vrstvu přidáním očekávání pro řízení kvality dat. Zkontrolujte následující podmínky:
- ID nesmí nikdy být
null. - Typ operace CDC musí být platný.
- Automatický zavaděč musí správně číst JSON.
Řádky, které nesplňují tyto podmínky, se zahodí.
Další informace najdete v tématu Správa kvality dat s očekáváními datového kanálu.
Na bočním panelu prohlížeče prostředků pipeline klikněte na
Přidejte, pak Transformaci.
Zadejte název a zvolte jazyk (Python nebo SQL) pro soubor zdrojového kódu. Jazyky v rámci procesního toku můžete míchat a kombinovat a vybrat si ten správný jazyk pro tento krok.
Pokud chcete vytvořit stříbrnou vrstvu s vyčištěnou tabulkou a uplatňovat omezení, zkopírujte do nového souboru následující kód (zvolte Python nebo SQL na základě jazyka souboru).
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @dp.append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( spark.readStream.table("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;Klikněte na
Spustit soubor nebo Spustit kanál pro zahájení aktualizace připojeného kanálu.
Vzhledem k tomu, že jsou teď dva zdrojové soubory, ty neprodělají totéž, ale v tomto případě je výstup stejný.
- Spuštění kanálu spustí celý kanál, včetně kódu z kroku 3. Pokud by se vaše vstupní data aktualizovala, přenesla by změny z tohoto zdroje do bronzové vrstvy. Tento kód se nespustí z kroku nastavení dat, protože je ve složce zkoumání a není součástí zdroje pro váš kanál.
- Spustit soubor spustí pouze aktuální zdrojový soubor. V takovém případě se bez aktualizace vstupních dat vygenerují stříbrná data z bronzové tabulky uložené v mezipaměti. Při vytváření nebo úpravě kódu vašeho workflow by bylo užitečné spustit pouze tento soubor pro rychlejší iteraci.
Po dokončení aktualizace uvidíte, že graf kanálu teď zobrazuje dvě tabulky (se stříbrnou vrstvou v závislosti na bronzové vrstvě) a dolní panel zobrazuje podrobnosti obou tabulek. V horní části prohlížeče prostředků kanálu se teď zobrazuje více časů spuštění, ale jenom podrobnosti o posledním spuštění.
Dále vytvořte konečnou verzi zlaté vrstvy tabulky customers.
Krok 5: Materializace tabulky zákazníků s tokem AUTO CDC
Do tohoto okamžiku tabulky předávaly data CDC dál v každém kroku. Teď vytvořte customers tabulku, která bude obsahovat nejaktuálnější zobrazení a zároveň bude replikou původní tabulky, nikoliv seznam operací CDC, které ji vytvořily.
Není snadné to implementovat ručně. Abyste zachovali nejnovější řádek, musíte zvážit například odstranění duplicitních dat.
Deklarativní kanály Sparku Lakeflow tyto problémy řeší pomocí AUTO CDC operace.
Na bočním panelu prohlížeče prostředků kanálu klikněte na
Přidání a transformace
Zadejte název a zvolte jazyk (Python nebo SQL) pro nový soubor zdrojového kódu. Pro tento krok můžete znovu zvolit jazyk, ale níže použijte správný kód.
Pokud chcete zpracovat data CDC pomocí
AUTO CDCdeklarativních kanálů Sparku Lakeflow, zkopírujte a vložte do nového souboru následující kód.Python
from pyspark import pipelines as dp from pyspark.sql.functions import * dp.create_streaming_table(name="customers", comment="Clean, materialized customers") dp.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;Klikněte na
Spuštěním souboru spusťte aktualizaci připojeného kanálu.
Po dokončení aktualizace uvidíte, že graf pipeline zobrazuje 3 tabulky, které přechází z bronzové do stříbrné a zlaté úrovně.
Krok 6: Sledování historie aktualizací pomocí pomalu se měnícího typu dimenze 2 (SCD2)
Často je nutné vytvořit tabulku se sledováním všech změn, které jsou výsledkem APPEND, UPDATEa DELETE:
- Historie: Chcete zachovat historii všech změn ve vaší tabulce.
- Sledovatelnost: Chcete zjistit, ke které operaci došlo.
SCD2 s Lakeflow SDP
Delta podporuje změny toku dat (CDF) a table_change může dotazovat úpravy tabulek v SQL a Pythonu. Hlavním případem použití CDF je ale zachycení změn v kanálu, nikoli vytvoření úplného zobrazení změn tabulky od začátku.
Implementace se stává obzvláště složitou, pokud máte události mimo pořadí. Pokud je nutné změny sekvencovat podle časového razítka a přijmout změny, ke kterým došlo v minulosti, musíte do tabulky SCD připojit novou položku a aktualizovat předchozí položky.
SDP lakeflow tuto složitost odebere a umožňuje vytvořit samostatnou tabulku, která obsahuje všechny úpravy od začátku času. Tuto tabulku pak můžete použít ve velkém měřítku s konkrétními oddíly nebo sloupci ZORDER v případě potřeby. Pole mimo pořadí jsou automaticky zpracována na základě _sequence_by.
Pokud chcete vytvořit tabulku SCD2, použijte možnost STORED AS SCD TYPE 2 v JAZYCE SQL nebo stored_as_scd_type="2" v Pythonu.
Poznámka:
Pomocí možnosti můžete také omezit sloupce, které funkce sleduje: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
Na bočním panelu prohlížeče prostředků kanálu klikněte na
Přidání a transformace
Zadejte název a zvolte jazyk (Python nebo SQL) pro nový soubor zdrojového kódu.
Zkopírujte a vložte následující kód do nového souboru.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * # create the table dp.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updatesSQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW customers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;Klikněte na
Spuštěním souboru spusťte aktualizaci připojeného kanálu.
Po dokončení aktualizace obsahuje graf kanálu novou tabulku customers_history, která je také závislá na tabulce stříbrné vrstvy, a na dolním panelu se zobrazují podrobnosti pro všechny 4 tabulky.
Krok 7: Vytvoření materializovaného zobrazení, které sleduje, kdo změnil své informace nejvíce
Tabulka customers_history obsahuje všechny historické změny, které uživatel provedl se svými informacemi. Vytvořte jednoduché materializované zobrazení ve zlaté vrstvě, které sleduje, kdo změnil své informace nejvíce. To se dá použít k analýze detekce podvodů nebo k doporučení uživatelům v reálném scénáři. Kromě toho použití změn SCD2 již odstranilo duplicity, takže můžete přímo spočítat řádky podle ID uživatele.
Na bočním panelu prohlížeče prostředků kanálu klikněte na
Přidání a transformace
Zadejte název a zvolte jazyk (Python nebo SQL) pro nový soubor zdrojového kódu.
Zkopírujte a vložte následující kód do nového zdrojového souboru.
Python
from pyspark import pipelines as dp from pyspark.sql.functions import * @dp.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( spark.read.table("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY idKlikněte na
Spuštěním souboru spusťte aktualizaci připojeného kanálu.
Po dokončení aktualizace je v grafu průběhu nová tabulka, která závisí na customers_history tabulce, a můžete ji zobrazit na spodním panelu. Váš pipeline je nyní dokončený. Můžete ho otestovat provedením úplného kanálu spuštění. Jediným zbývajícím krokem je naplánovat, aby se pipeline aktualizovala pravidelně.
Krok 8: Vytvoření úlohy pro spuštění kanálu ETL
Dále vytvořte pracovní postup pro automatizaci kroků příjmu, zpracování a analýzy dat v kanálu pomocí úlohy Databricks.
- V horní části editoru zvolte tlačítko Plán .
- Pokud se zobrazí dialogové okno Plány , zvolte Přidat plán.
- Otevře se dialogové okno Nový plán, ve kterém můžete vytvořit úlohu pro spuštění vašeho pipeline podle časového plánu.
- Volitelně můžete úlohu pojmenovat.
- Ve výchozím nastavení je plán nastavený tak, aby běžel jednou denně. Toto výchozí nastavení můžete přijmout nebo nastavit vlastní plán. Když zvolíte Upřesnit , můžete nastavit konkrétní čas, kdy se úloha spustí. Když vyberete Další možnosti , můžete při spuštění úlohy vytvářet oznámení.
- Výběrem Vytvořit aplikujete změny a vytvoříte úlohu.
Teď se úloha bude spouštět každý den, aby byl váš datový proud aktuální. Pokud chcete zobrazit seznam plánů, můžete znovu zvolit Plán . V dialogovém okně můžete spravovat harmonogramy pro váš pipeline, včetně přidávání, úprav nebo odebírání harmonogramů.
Kliknutím na název plánu (nebo úlohy) přejdete na stránku úlohy v seznamu Úlohy a kanály . Odtud můžete zobrazit podrobnosti o spuštěních úloh, včetně historie spuštění, nebo spustit úlohu okamžitě pomocí tlačítka Spustit nyní .
Další informace o spuštěních úloh najdete v tématu Monitorování a pozorovatelnost úloh Lakeflow .