Vytvoření kompletního datového kanálu v Databricks

V tomto článku se dozvíte, jak vytvořit a nasadit komplexní kanál zpracování dat, včetně toho, jak ingestovat nezpracovaná data, transformovat data a spouštět analýzy na zpracovaných datech.

Poznámka:

Přestože tento článek ukazuje, jak vytvořit úplný datový kanál pomocí poznámkových bloků Databricks a úlohy Azure Databricks pro orchestraci pracovního postupu, Databricks doporučuje používat delta živé tabulky, deklarativní rozhraní pro vytváření spolehlivých, udržovatelných a testovatelných kanálů zpracování dat.

Co je datový kanál?

Datový kanál implementuje kroky potřebné k přesunu dat ze zdrojových systémů, transformaci dat na základě požadavků a uložení dat do cílového systému. Datový kanál zahrnuje všechny procesy potřebné k převodu nezpracovaných dat na připravená data, která můžou uživatelé využívat. Datový kanál může například připravit data, aby datoví analytici a datoví vědci mohli extrahovat hodnotu z dat prostřednictvím analýzy a generování sestav.

Běžným příkladem datového kanálu je pracovní postup extrakce, transformace a načítání (ETL). Při zpracování ETL se data ingestují ze zdrojových systémů a zapisují se do pracovní oblasti, transformují se na základě požadavků (zajištění kvality dat, odstranění duplicitních dat atd.) a následně se zapisují do cílového systému, jako je datový sklad nebo datové jezero.

Kroky datového kanálu

Abyste mohli začít vytvářet datové kanály v Azure Databricks, příklad uvedený v tomto článku vás provede vytvořením pracovního postupu zpracování dat:

  • Pomocí funkcí Azure Databricks můžete prozkoumat nezpracovanou datovou sadu.
  • Vytvořte poznámkový blok Databricks pro příjem nezpracovaných zdrojových dat a zapište nezpracovaná data do cílové tabulky.
  • Vytvořte poznámkový blok Databricks, který transformuje nezpracovaná zdrojová data a zapíše transformovaná data do cílové tabulky.
  • Vytvořte poznámkový blok Databricks pro dotazování transformovaných dat.
  • Automatizujte datový kanál pomocí úlohy Azure Databricks.

Požadavky

Příklad: Milion datová sada Song

Datová sada použitá v tomto příkladu je podmnožinou sady million Song Dataset, kolekce funkcí a metadat pro současné hudební skladby. Tato datová sada je dostupná v ukázkových datových sadách zahrnutých v pracovním prostoru Azure Databricks.

Krok 1: Vytvoření clusteru

Pokud chcete provést zpracování a analýzu dat v tomto příkladu, vytvořte cluster, který poskytuje výpočetní prostředky potřebné ke spuštění příkazů.

Poznámka:

Vzhledem k tomu, že tento příklad používá ukázkovou datovou sadu uloženou v DBFS a doporučuje zachovat tabulky v katalogu Unity, vytvoříte cluster nakonfigurovaný s režimem přístupu jednoho uživatele. Režim přístupu jednoho uživatele poskytuje úplný přístup k DBFS a zároveň umožňuje přístup ke katalogu Unity. Podívejte se na osvědčené postupy pro DBFS a Katalog Unity.

  1. Na bočním panelu klikněte na Výpočty .
  2. Na stránce Compute klikněte na Vytvořit cluster.
  3. Na stránce Nový cluster zadejte jedinečný název clusteru.
  4. V režimu přístupu vyberte Jednoho uživatele.
  5. V přístupu k jednomu uživateli nebo instančnímu objektu vyberte své uživatelské jméno.
  6. Ponechte zbývající hodnoty ve výchozím stavu a klikněte na Vytvořit cluster.

Další informace o clusterech Databricks najdete v tématu Výpočty.

Krok 2: Prozkoumání zdrojových dat

Informace o použití rozhraní Azure Databricks k prozkoumání nezpracovaných zdrojových dat najdete v tématu Prozkoumání zdrojových dat datového kanálu. Pokud chcete přejít přímo na ingestování a přípravu dat, pokračujte krokem 3: Ingestování nezpracovaných dat.

Krok 3: Ingestování nezpracovaných dat

V tomto kroku načtete nezpracovaná data do tabulky, aby byla k dispozici pro další zpracování. Ke správě datových prostředků na platformě Databricks, jako jsou tabulky, doporučuje Databricks katalog Unity. Pokud ale nemáte oprávnění k vytvoření požadovaného katalogu a schématu pro publikování tabulek do katalogu Unity, můžete i nadále provádět následující kroky publikováním tabulek do metastoru Hive.

K ingestování dat doporučuje Databricks používat automatický zavaděč. Auto Loader automaticky rozpozná a zpracuje nové soubory při jejich doručení do cloudového úložiště objektů.

Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času. Databricks doporučuje odvozování schématu při použití automatického zavaděče. Jak je ale vidět v kroku zkoumání dat, data skladeb neobsahují informace hlavičky. Protože záhlaví není uloženo s daty, budete muset explicitně definovat schéma, jak je znázorněno v dalším příkladu.

  1. Na bočním panelu klikněte na New IconNový a v nabídce vyberte Poznámkový blok. Zobrazí se dialogové okno Vytvořit poznámkový blok .

  2. Zadejte název poznámkového bloku, Ingest songs datanapříklad . Standardně:

    • Python je vybraný jazyk.
    • Poznámkový blok je připojený k poslednímu použitému clusteru. V tomto případě cluster, který jste vytvořili v kroku 1: Vytvoření clusteru.
  3. Do první buňky poznámkového bloku zadejte následující:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Pokud používáte Katalog Unity, nahraďte <table-name> ho názvem katalogu, schématu a tabulky, který bude obsahovat ingestované záznamy (například data_pipelines.songs_data.raw_song_data). V opačném případě nahraďte <table-name> názvem tabulky, která bude obsahovat ingestované záznamy, raw_song_datanapříklad .

    Nahraďte <checkpoint-path> cestou k adresáři v DBFS za účelem údržby souborů kontrolních bodů, /tmp/pipeline_get_started/_checkpoint/song_datanapříklad .

  4. Klikněte na Run Menupoložku a vyberte Spustit buňku. Tento příklad definuje schéma dat pomocí informací z README, ingestuje skladby data ze všech souborů obsažených v file_patha zapíše data do tabulky určené table_name.

Krok 4: Příprava nezpracovaných dat

Pokud chcete připravit nezpracovaná data pro analýzu, následující kroky transformují nezpracovaná data skladeb vyfiltrováním nepotřebných sloupců a přidáním nového pole obsahujícího časové razítko pro vytvoření nového záznamu.

  1. Na bočním panelu klikněte na New IconNový a v nabídce vyberte Poznámkový blok. Zobrazí se dialogové okno Vytvořit poznámkový blok .

  2. Zadejte název poznámkového bloku. Například Prepare songs data. Změňte výchozí jazyk na SQL.

  3. Do první buňky poznámkového bloku zadejte následující:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Pokud používáte Katalog Unity, nahraďte <table-name> ho názvem katalogu, schématu a tabulky, který bude obsahovat filtrované a transformované záznamy (například data_pipelines.songs_data.prepared_song_data). V opačném případě nahraďte <table-name> názvem tabulky, která bude obsahovat filtrované a transformované záznamy (například prepared_song_data).

    Nahraďte <raw-songs-table-name> názvem tabulky, která obsahuje nezpracované záznamy skladeb přijatých v předchozím kroku.

  4. Klikněte na Run Menupoložku a vyberte Spustit buňku.

Krok 5: Dotazování transformovaných dat

V tomto kroku rozšíříte kanál zpracování přidáním dotazů pro analýzu dat skladeb. Tyto dotazy používají připravené záznamy vytvořené v předchozím kroku.

  1. Na bočním panelu klikněte na New IconNový a v nabídce vyberte Poznámkový blok. Zobrazí se dialogové okno Vytvořit poznámkový blok .

  2. Zadejte název poznámkového bloku. Například Analyze songs data. Změňte výchozí jazyk na SQL.

  3. Do první buňky poznámkového bloku zadejte následující:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Nahraďte <prepared-songs-table-name> názvem tabulky obsahující připravená data. Například data_pipelines.songs_data.prepared_song_data.

  4. Klikněte Down Caret v nabídce akcí buňky, vyberte Přidat buňku pod a do nové buňky zadejte následující:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Nahraďte <prepared-songs-table-name> názvem připravené tabulky vytvořené v předchozím kroku. Například data_pipelines.songs_data.prepared_song_data.

  5. Pokud chcete spouštět dotazy a zobrazit výstup, klikněte na Spustit vše.

Krok 6: Vytvoření úlohy Azure Databricks pro spuštění kanálu

Pomocí úlohy Azure Databricks můžete vytvořit pracovní postup pro automatizaci spouštění kroků pro příjem, zpracování a analýzu dat.

  1. V pracovním prostoru Datová Věda & Engineering udělejte jednu z těchto věcí:
    • Na bočním panelu klikněte na Jobs IconPracovní postupy a klikněte na .Create Job Button
    • Na bočním panelu klikněte na New IconNový a vyberte Úloha.
  2. V dialogovém okně úkolu na kartě Úkoly nahraďte možnost Přidat název úlohy... názvem vaší úlohy. Například "Pracovní postup Skladby".
  3. Do pole Název úkolu zadejte název prvního úkolu, Ingest_songs_datanapříklad .
  4. V části Typ vyberte typ úlohy Poznámkový blok .
  5. Ve zdroji vyberte Pracovní prostor.
  6. Pomocí prohlížeče souborů vyhledejte poznámkový blok pro příjem dat, klikněte na název poznámkového bloku a klikněte na Potvrdit.
  7. V clusteru vyberte Shared_job_cluster nebo cluster, který jste vytvořili v Create a cluster kroku.
  8. Klikněte na Vytvořit.
  9. Klikněte pod Add Task Button úkol, který jste právě vytvořili, a vyberte Poznámkový blok.
  10. Do pole Název úkolu zadejte název úkolu, Prepare_songs_datanapříklad .
  11. V části Typ vyberte typ úlohy Poznámkový blok .
  12. Ve zdroji vyberte Pracovní prostor.
  13. V prohlížeči souborů vyhledejte poznámkový blok pro přípravu dat, klikněte na název poznámkového bloku a klikněte na Potvrdit.
  14. V clusteru vyberte Shared_job_cluster nebo cluster, který jste vytvořili v Create a cluster kroku.
  15. Klikněte na Vytvořit.
  16. Klikněte pod Add Task Button úkol, který jste právě vytvořili, a vyberte Poznámkový blok.
  17. Do pole Název úkolu zadejte název úkolu, Analyze_songs_datanapříklad .
  18. V části Typ vyberte typ úlohy Poznámkový blok .
  19. Ve zdroji vyberte Pracovní prostor.
  20. Pomocí prohlížeče souborů vyhledejte poznámkový blok analýzy dat, klikněte na název poznámkového bloku a klikněte na Potvrdit.
  21. V clusteru vyberte Shared_job_cluster nebo cluster, který jste vytvořili v Create a cluster kroku.
  22. Klikněte na Vytvořit.
  23. Chcete-li spustit pracovní postup, klikněte na Run Now Buttontlačítko . Pokud chcete zobrazit podrobnosti o spuštění, klikněte na odkaz ve sloupci Čas zahájení spuštění spuštění v zobrazení spuštění úlohy. Kliknutím na každou úlohu zobrazíte podrobnosti o spuštění úlohy.
  24. Pokud chcete zobrazit výsledky po dokončení pracovního postupu, klikněte na konečný úkol analýzy dat. Zobrazí se stránka Výstup a zobrazí výsledky dotazu.

Krok 7: Naplánování úlohy datového kanálu

Poznámka:

Abychom si ukázali použití úlohy Azure Databricks k orchestraci naplánovaného pracovního postupu, tento příklad začínáme odděluje kroky příjmu dat, přípravy a analýzy do samostatných poznámkových bloků a každý poznámkový blok se pak použije k vytvoření úkolu v úloze. Pokud je veškeré zpracování obsažené v jednom poznámkovém bloku, můžete poznámkový blok snadno naplánovat přímo z uživatelského rozhraní poznámkového bloku Azure Databricks. Viz Vytvoření a správa naplánovaných úloh poznámkového bloku.

Běžným požadavkem je naplánované spuštění datového kanálu. Definování plánu úlohy, která kanál spouští:

  1. Na bočním panelu klikněte na Jobs IconPracovní postupy.
  2. Ve sloupci Název klikněte na název úlohy. Na bočním panelu se zobrazí podrobnosti o úloze.
  3. Na panelu Podrobnosti úlohy klikněte na Tlačítko Přidat aktivační událost a vyberte Typ triggeru Naplánovano.
  4. Zadejte období, počáteční čas a časové pásmo. Volitelně zaškrtněte políčko Zobrazit syntaxi Cron a zobrazte a upravte plán v syntaxi Quartz Cron.
  5. Klikněte na Uložit.

Další informace