Sdílet prostřednictvím


Kurz: Sestavení kanálu ETL pomocí deklarativních kanálů Sparku Lakeflow

Tento kurz vysvětluje, jak vytvořit a nasadit ETL (extrakce, transformace a načítání) potrubí pro orchestraci dat pomocí Lakeflow Spark deklarativních pipeline a Auto Loaderu. Kanál ETL implementuje kroky pro čtení dat ze zdrojových systémů, transformaci těchto dat na základě požadavků, jako jsou kontroly kvality dat a odstranění duplicit, a zápis dat do cílového systému, jako je datový sklad nebo datové jezero.

V tomto kurzu použijete pipeline a Auto Loader k:

  • Ingestování nezpracovaných zdrojových dat do cílové tabulky
  • Transformujte nezpracovaná zdrojová data a zapište transformovaná data do dvou cílových materializovaných zobrazení.
  • Zadejte dotaz na transformovaná data.
  • Automatizujte kanál ETL pomocí úlohy Databricks.

Další informace o kanálech a Auto Loaderu najdete v tématu Deklarativní kanály Sparku lakeflow a Co je Auto Loader?

Požadavky

K dokončení tohoto kurzu musíte splnit následující požadavky:

Informace o datové sadě

Datová sada použitá v tomto příkladu je podmnožinou Million Song Dataset, což je kolekce charakteristik a metadat pro současné hudební skladby. Tato datová sada je dostupná v ukázkových datových sadách zahrnutých v pracovním prostoru Azure Databricks.

Krok 1: Vytvořte potrubí

Nejprve vytvořte kanál definováním datových sad v souborech (označovaných jako zdrojový kód) pomocí syntaxe kanálu. Každý soubor zdrojového kódu může obsahovat pouze jeden jazyk, ale do kanálu můžete přidat více souborů specifických pro jazyk. Další informace najdete v tématu Deklarativní kanály Sparku pro Lakeflow.

Tento kurz používá bezserverové výpočetní prostředky a katalog Unity. Pro všechny možnosti konfigurace, které nejsou zadány, použijte výchozí nastavení. Pokud není výpočetní prostředí bez serveru ve vašem pracovním prostoru povolené nebo podporované, můžete kurz dokončit tak, jak je napsané pomocí výchozího nastavení výpočetních prostředků.

Pokud chcete vytvořit nový kanál, postupujte takto:

  1. V pracovním prostoru klikněte na ikonu Plus.Nový na bočním panelu a pak vyberte Kanál ETL.
  2. Zadejte jedinečný název kanálu.
  3. Přímo pod názvem vyberte výchozí katalog a schéma pro data, která vygenerujete. V transformacích můžete zadat další cíle, ale tento kurz používá tyto výchozí hodnoty. Musíte mít oprávnění k katalogu a schématu, které vytvoříte. Viz Požadavky.
  4. Pro účely tohoto kurzu vyberte Začít s prázdným souborem.
  5. V cestě ke složce zadejte umístění zdrojových souborů nebo přijměte výchozí (složku uživatele).
  6. Jako jazyk prvního zdrojového souboru zvolte Python nebo SQL (pracovní proces může kombinovat různé jazyky, ale každý soubor musí být v jednom jazyce).
  7. Klepněte na tlačítko Vybrat.

Zobrazí se editor kanálu pro nový kanál. Vytvoří se prázdný zdrojový soubor pro váš jazyk připravený k první transformaci.

Krok 2: Vývoj logiky kanálu

V tomto kroku použijete Editor kanálů Lakeflow k interaktivnímu vývoji a ověření zdrojového kódu pro kanál.

Kód používá Auto Loader k přírůstkovému příjmu dat. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů. Další informace najdete v tématu Co je Auto Loader?

Prázdný soubor zdrojového kódu se automaticky vytvoří a nakonfiguruje pro potrubí. Soubor se vytvoří ve složce transformace vašeho pipeline. Ve výchozím nastavení jsou všechny soubory *.py a *.sql ve složce transformace součástí zdroje vašeho kanálu.

  1. Zkopírujte a vložte následující kód do zdrojového souboru. Nezapomeňte použít jazyk, který jste vybrali pro soubor v kroku 1.

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Tento zdroj obsahuje kód pro tři dotazy. Tyto dotazy můžete také umístit do samostatných souborů, abyste mohli soubory a kód uspořádat tak, jak chcete.

  2. Klikněte na ikonu Přehrát.Spustit soubor nebo Spustit kanál pro zahájení aktualizace připojeného kanálu. S pouze jedním zdrojovým souborem v řetězci jsou funkčně ekvivalentní.

Po dokončení aktualizace se editor aktualizuje informacemi o vašem datovém toku.

  • Graf potrubí (DAG) na bočním panelu napravo od kódu zobrazuje tři tabulky, songs_raw, songs_prepared a top_artists_by_year.
  • Souhrn aktualizace se zobrazí v horní části prohlížeče prostředků vývojové linie.
  • Podrobnosti o vygenerovaných tabulkách se zobrazují v dolním podokně a data z tabulek můžete procházet tak, že je vyberete.

Patří sem nezpracovaná a vyčištěná data a také jednoduchá analýza, která umožňuje najít nejlepší umělce podle roku. V dalším kroku vytvoříte ad hoc dotazy pro další analýzu v samostatném souboru ve vašem kanálu.

Krok 3: Prozkoumání datových sad vytvořených datovým kanálem

V tomto kroku provedete ad-hoc dotazy na data zpracovaná v rámci ETL kanálu za účelem analýzy dat o skladbách v editoru Databricks SQL. Tyto dotazy používají připravené záznamy vytvořené v předchozím kroku.

Nejprve spusťte dotaz, který najde umělce, kteří každý rok vydali nejvíce skladeb od roku 1990.

  1. Na bočním panelu prohlížeče prostředků kanálu klikněte na ikonu Plus.Přidejte a pak Prozkoumání.

  2. Zadejte název a vyberte SQL pro soubor zkoumání. Poznámkový blok SQL se vytvoří v nové explorations složce. Soubory ve explorations složce se ve výchozím nastavení nespouštějí jako součást aktualizace kanálu. Poznámkový blok SQL obsahuje buňky, které můžete spouštět společně nebo samostatně.

  3. Pokud chcete vytvořit tabulku umělců, kteří po roce 1990 vydávají nejvíce skladeb v každém roce, zadejte do nového souboru SQL následující kód (pokud je v souboru ukázkový kód, nahraďte ho). Protože tento poznámkový blok není součástí datového toku, nepoužívá výchozí katalog a schéma. Nahraďte <catalog>.<schema> katalogem a schématem, které jste použili jako výchozí hodnoty pro pipelinu.

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Klikněte na ikonu Přehrát. Nebo stisknutím tohoto Shift + Enter dotazu spusťte tento dotaz.

Teď spusťte další dotaz, který najde skladby s beatem 4/4 a tanečním tempem.

  1. Do další buňky ve stejném souboru přidejte následující kód. Znovu nahraďte <catalog>.<schema> katalogem a schématem, které jste použili jako výchozí hodnoty pro kanál:

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Klikněte na ikonu Přehrát. Nebo stisknutím tohoto Shift + Enter dotazu spusťte tento dotaz.

Krok 4: Vytvořte úlohu pro spuštění potrubí

Dále vytvořte pracovní postup pro automatizaci postupu příjmu, zpracování a analýzy dat pomocí úlohy Databricks, která běží podle plánu.

  1. V horní části editoru zvolte tlačítko Plán .
  2. Pokud se zobrazí dialogové okno Plány , zvolte Přidat plán.
  3. Otevře se dialogové okno Nový plán, ve kterém můžete vytvořit úlohu pro spuštění vašeho pipeline podle časového plánu.
  4. Volitelně můžete úlohu pojmenovat.
  5. Ve výchozím nastavení je plán nastavený tak, aby běžel jednou denně. Tuto výchozí hodnotu můžete přijmout nebo si nastavit vlastní plán. Když zvolíte Upřesnit , můžete nastavit konkrétní čas, kdy se úloha spustí. Když vyberete Další možnosti , můžete při spuštění úlohy vytvářet oznámení.
  6. Výběrem Vytvořit aplikujete změny a vytvoříte úlohu.

Teď se úloha bude spouštět každý den, aby byl váš datový proud aktuální. Pokud chcete zobrazit seznam plánů, můžete znovu zvolit Plán . V dialogovém okně můžete spravovat plány pro váš pipeline, včetně jejich přidávání, úprav nebo odebírání.

Kliknutím na název plánu (nebo úlohy) přejdete na stránku úlohy v seznamu Úlohy a kanály . Odtud můžete zobrazit podrobnosti o spuštěních úloh, včetně historie spuštění, nebo spustit úlohu okamžitě pomocí tlačítka Spustit nyní .

Další informace o spuštěních úloh najdete v tématu Monitorování a pozorovatelnost úloh Lakeflow .

Další informace