Kurz: Vytvoření prvního kanálu pomocí Editoru kanálů Lakeflow

Zjistěte, jak vytvořit nový kanál pomocí Deklarativních kanálů Sparku (SDP) pro orchestraci dat a automatického zavaděče. Tento kurz rozšiřuje ukázkový kanál vyčištěním dat a vytvořením dotazu, který vyhledá prvních 100 uživatelů.

V tomto kurzu se naučíte používat Editor kanálů Lakeflow k:

  • Vytvořte nový kanál s výchozí strukturou složek a začněte sadou ukázkových souborů.
  • Definujte omezení kvality dat pomocí očekávání.
  • Pomocí funkcí editoru můžete kanál rozšířit o novou transformaci, která provede analýzu dat.

Požadavky

Než začnete s tímto kurzem, musíte:

  • Přihlaste se k pracovnímu prostoru Azure Databricks.
  • Povolte pro svůj pracovní prostor katalog Unity.
  • Povolte editor kanálů Lakeflow pro váš pracovní prostor a musíte být přihlášeni. Viz Povolení editoru kanálů Lakeflow a aktualizovaného monitorování.
  • Máte oprávnění k vytvoření výpočetního prostředku nebo přístupu k výpočetnímu prostředku.
  • Máte oprávnění k vytvoření nového schématu v katalogu. Požadovaná oprávnění jsou ALL PRIVILEGES nebo USE CATALOG a CREATE SCHEMA.

Krok 1: Vytvoření kanálu

V tomto kroku vytvoříte kanál s použitím výchozí struktury složek a ukázek kódu. Ukázky kódu odkazují na users tabulku ve zdroji wanderbricks ukázkových dat.

  1. V pracovním prostoru Azure Databricks klikněte na ikonu plus.Nový, potom ikonu pipeline.ETL pipeline. Otevře se editor datového kanálu na stránce pro vytvoření datového kanálu.

  2. Kliknutím na záhlaví dáte kanálu název.

  3. Přímo pod názvem zvolte výchozí katalog a schéma pro výstupní tabulky. Ty se použijí, když v definicích kanálu nezadáte katalog a schéma.

  4. V části Další krok vašeho kanálu klikněte na ikonu Schéma.Začněte s ukázkovým kódem v SQL nebo ikonu Schéma.Začněte s ukázkovým kódem v Python podle vaší jazykové preference. Tím se změní výchozí jazyk ukázkového kódu, ale později můžete přidat kód v jiném jazyce. Tím se vytvoří výchozí struktura složek s ukázkovým kódem, která vám umožní začít.

  5. Ukázkový kód můžete zobrazit v prohlížeči prostředků pipeline na levé straně pracovní plochy. Pod transformations jsou dva soubory, které generují každou jednu datovou sadu pro kanál. Pod explorations je poznámkový blok s kódem, který vám umožní zobrazit výstup vašeho datového toku. Kliknutím na soubor můžete zobrazit a upravit kód v editoru.

    Výstupní datové sady ještě nebyly vytvořeny a graf kanálu na pravé straně obrazovky je prázdný.

  6. Pokud chcete spustit kód kanálu (kód ve transformations složce), klikněte na Spustit kanál v pravé horní části obrazovky.

    Po dokončení spuštění se v dolní části pracovního prostoru zobrazí dvě nové tabulky, které byly vytvořeny, sample_users_<pipeline-name> a sample_aggregation_<pipeline-name>. Uvidíte také, že graf pipeline na pravé straně pracovního prostoru teď zobrazuje dvě tabulky, včetně toho, že sample_users je zdrojem pro sample_aggregation.

Krok 2: Použití kontrol kvality dat

V tomto kroku přidáte do sample_users tabulky kontrolu kvality dat. K omezení dat použijete očekávání kanálu . V tomto případě odstraníte všechny záznamy uživatelů, které nemají platnou e-mailovou adresu, a vypíšete vyčištěnou tabulku jako users_cleaned.

  1. V prohlížeči prostředků pipeline klikněte na ikonu Plus a vyberte Transformace.

  2. V dialogovém okně Vytvořit nový transformační soubor proveďte následující výběry:

    • Zvolte Python nebo SQL pro Language. Nemusí se shodovat s předchozím výběrem.
    • Pojmenujte soubor. V tomto případě zvolte users_cleaned.
    • V části Cílová cesta ponechte výchozí hodnotu.
    • U typu datové sady buď ponechte vybranou možnost Žádná , nebo zvolte Materializované zobrazení. Pokud vyberete materializované zobrazení, vygeneruje vám vzorový kód.
  3. Kliknutím na Vytvořit vytvoříte soubor kódu transformace.

  4. V novém souboru kódu upravte kód tak, aby odpovídal následujícímu kódu (na základě výběru na předchozí obrazovce použijte SQL nebo Python). Nahraďte <pipeline-name> úplným názvem tabulky sample_users .

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.materialized_view
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  5. Kliknutím na Spustit kanál aktualizujte kanál. Teď by měla mít tři tabulky.

Krok 3: Analýza hlavních uživatelů

Dále získejte prvních 100 uživatelů podle počtu rezervací, které vytvořili. Spojte tabulku wanderbricks.bookings s materializovaným zobrazením users_cleaned.

  1. V prohlížeči prostředků pipeline klikněte na ikonu Plus a vyberte Transformace.

  2. V dialogovém okně Vytvořit nový transformační soubor proveďte následující výběry:

    • Zvolte Python nebo SQL pro Language. Není nutné, aby se shodoval s vašimi předchozími výběry.
    • Pojmenujte soubor. V tomto případě zvolte users_and_bookings.
    • V části Cílová cesta ponechte výchozí hodnotu.
    • U typu datové sady ponechte vybranou možnost Žádné.
  3. V novém souboru kódu upravte kód tak, aby odpovídal následujícímu kódu (na základě výběru na předchozí obrazovce použijte SQL nebo Python).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.materialized_view
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Kliknutím na Spustit kanál aktualizujte datové sady. Po dokončení spuštění uvidíte v Pipeline Graph čtyři tabulky, včetně nové users_and_bookings.

    Graf kanálu zobrazující čtyři tabulky v kanálu

Další kroky

Teď, když jste se dozvěděli, jak používat některé funkce editoru kanálů Lakeflow a vytvořit kanál, tady jsou některé další funkce, o kterých se dozvíte víc: