Sdílet prostřednictvím


Kurz: Vytvoření prvního kanálu pomocí Editoru kanálů Lakeflow

Zjistěte, jak vytvořit nový kanál pomocí Deklarativních kanálů Sparku (SDP) pro orchestraci dat a automatického zavaděče. Tento kurz rozšiřuje ukázkový kanál vyčištěním dat a vytvořením dotazu, který vyhledá prvních 100 uživatelů.

V tomto kurzu se naučíte používat Editor kanálů Lakeflow k:

  • Vytvořte nový kanál s výchozí strukturou složek a začněte sadou ukázkových souborů.
  • Definujte omezení kvality dat pomocí očekávání.
  • Pomocí funkcí editoru můžete kanál rozšířit o novou transformaci, která provede analýzu dat.

Požadavky

Než začnete s tímto kurzem, musíte:

  • Přihlaste se k pracovnímu prostoru Azure Databricks.
  • Povolte pro svůj pracovní prostor katalog Unity.
  • Povolte editor kanálů Lakeflow pro váš pracovní prostor a musíte být přihlášeni. Viz Povolení editoru kanálů Lakeflow a aktualizovaného monitorování.
  • Máte oprávnění k vytvoření výpočetního prostředku nebo přístupu k výpočetnímu prostředku.
  • Máte oprávnění k vytvoření nového schématu v katalogu. Požadovaná oprávnění jsou ALL PRIVILEGES nebo USE CATALOG a CREATE SCHEMA.

Krok 1: Vytvoření kanálu

V tomto kroku vytvoříte kanál s použitím výchozí struktury složek a ukázek kódu. Ukázky kódu odkazují na users tabulku ve zdroji wanderbricks ukázkových dat.

  1. V pracovním prostoru Azure Databricks klikněte na ikonu Plus.Nový a pak ikona Pipeline.ETL kanál. Otevře se editor datového kanálu na stránce pro vytvoření datového kanálu.

  2. Kliknutím na záhlaví dáte kanálu název.

  3. Přímo pod názvem zvolte výchozí katalog a schéma pro výstupní tabulky. Ty se použijí, když v definicích kanálu nezadáte katalog a schéma.

  4. V části Další krok kanálu klikněte na ikonu schématu.Začněte ukázkovým kódem v SQL nebo ikoně schématu.Začněte s ukázkovým kódem v Pythonu na základě vašich jazykových preferencí. Tím se změní výchozí jazyk ukázkového kódu, ale později můžete přidat kód v jiném jazyce. Tím se vytvoří výchozí struktura složek s ukázkovým kódem, která vám umožní začít.

  5. Ukázkový kód můžete zobrazit v prohlížeči prostředků pipeline na levé straně pracovní plochy. Pod transformations jsou dva soubory, které generují každou jednu datovou sadu pro kanál. Pod explorations je poznámkový blok s kódem, který vám umožní zobrazit výstup vašeho datového toku. Kliknutím na soubor můžete zobrazit a upravit kód v editoru.

    Výstupní datové sady ještě nebyly vytvořeny a graf kanálu na pravé straně obrazovky je prázdný.

  6. Pokud chcete spustit kód kanálu (kód ve transformations složce), klikněte na Spustit kanál v pravé horní části obrazovky.

    Po dokončení spuštění se v dolní části pracovního prostoru zobrazí dvě nové tabulky, které byly vytvořeny, sample_users_<pipeline-name> a sample_aggregation_<pipeline-name>. Uvidíte také, že graf pipeline na pravé straně pracovního prostoru teď zobrazuje dvě tabulky, včetně toho, že sample_users je zdrojem pro sample_aggregation.

Krok 2: Použití kontrol kvality dat

V tomto kroku přidáte do sample_users tabulky kontrolu kvality dat. K omezení dat použijete očekávání kanálu . V tomto případě odstraníte všechny záznamy uživatelů, které nemají platnou e-mailovou adresu, a vypíšete vyčištěnou tabulku jako users_cleaned.

  1. V prohlížeči prostředků pipeline klikněte na ikonu Plus a vyberte Transformace.

  2. V dialogovém okně Vytvořit nový transformační soubor proveďte následující výběry:

    • Zvolte buď Python nebo SQL pro Jazyk. Nemusí se shodovat s předchozím výběrem.
    • Pojmenujte soubor. V tomto případě zvolte users_cleaned.
    • V části Cílová cesta ponechte výchozí hodnotu.
    • U typu datové sady buď ponechte vybranou možnost Žádná , nebo zvolte Materializované zobrazení. Pokud vyberete materializované zobrazení, vygeneruje vám vzorový kód.
  3. V novém souboru kódu upravte kód tak, aby odpovídal následujícímu kódu (na základě vašeho výběru na předchozí obrazovce použijte SQL nebo Python). Nahraďte <pipeline-name> úplným názvem tabulky sample_users .

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Kliknutím na Spustit kanál aktualizujte kanál. Teď by měla mít tři tabulky.

Krok 3: Analýza hlavních uživatelů

Dále získejte nejlepších 100 uživatelů podle počtu vytvořených rezervací. Spojte tabulku wanderbricks.bookings s materializovaným zobrazením users_cleaned.

  1. V prohlížeči prostředků pipeline klikněte na ikonu Plus a vyberte Transformace.

  2. V dialogovém okně Vytvořit nový transformační soubor proveďte následující výběry:

    • Zvolte buď Python nebo SQL pro Jazyk. Není nutné, aby se shodoval s vašimi předchozími výběry.
    • Pojmenujte soubor. V tomto případě zvolte users_and_bookings.
    • V části Cílová cesta ponechte výchozí hodnotu.
    • U typu datové sady ponechte vybranou možnost Žádné.
  3. V novém souboru kódu upravte kód tak, aby odpovídal následujícímu kódu (na základě vašeho výběru na předchozí obrazovce použijte SQL nebo Python).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Kliknutím na Spustit kanál aktualizujte datové sady. Po dokončení spuštění uvidíte v Pipeline Graph čtyři tabulky, včetně nové users_and_bookings.

    Graf kanálu zobrazující čtyři tabulky v kanálu

Další kroky

Teď, když jste se dozvěděli, jak používat některé funkce editoru kanálů Lakeflow a vytvořit kanál, tady jsou některé další funkce, o kterých se dozvíte víc: