Udostępnij za pośrednictwem


Poradnik: jak zbudować potok ETL za pomocą Potoków Deklaratywnych Lakeflow

Dowiedz się, jak utworzyć i wdrożyć potok ETL (wyodrębnianie, przekształcanie i ładowanie) do orkiestracji danych za pomocą potoków deklaratywnych Lakeflow i Auto Loader. Potok ETL implementuje kroki odczytywania danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań, takich jak kontrole jakości danych i deduplikacja rekordów oraz zapisywanie danych w systemie docelowym, takim jak magazyn danych lub magazyn danych typu data lake.

W tym samouczku użyjesz Potoków Deklaratywnych Lakeflow i Modułu Ładującego Automatycznego do:

  • Wczytaj nieprzetworzone dane źródłowe do tabeli docelowej.
  • Przekształć nieprzetworzone dane źródłowe i zapisz przekształcone dane do dwóch zmaterializowanych widoków docelowych.
  • Wykonaj zapytanie dotyczące przekształconych danych.
  • Zautomatyzuj potok ETL za pomocą zadania Databricks.

Aby uzyskać więcej informacji na temat Lakeflow Declarative Pipelines i Auto Loader, zobacz Lakeflow Declarative Pipelines i What is Auto Loader?

Wymagania

Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:

Informacje o zestawie danych

Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.

Krok 1: Utwórz potok danych

Najpierw utworzysz przepływ ETL w Lakeflow Declarative Pipelines. Deklaratywne Potoki Lakeflow tworzą przepływy, rozwiązując zależności zdefiniowane w notesach lub plikach (nazywanych kodem źródłowym), przy użyciu składni Deklaratywnych Potoków Lakeflow. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale w pipeline można dodać wiele notesów lub plików specyficznych dla języka. Aby dowiedzieć się więcej, zobacz Deklaratywne potoki Lakeflow

Ważne

Pozostaw pole Kod źródłowy puste, aby utworzyć i skonfigurować notes do automatycznego tworzenia kodu źródłowego.

W tym samouczku użyto obliczeń bezserwerowych i Unity Catalog. Dla wszystkich opcji konfiguracji, które nie są określone, użyj ustawień domyślnych. Jeśli przetwarzanie bezserwerowe nie jest włączone lub obsługiwane w obszarze roboczym, możesz ukończyć samouczek zgodnie z instrukcjami napisanymi przy użyciu domyślnych ustawień obliczeniowych. Jeśli używasz domyślnych ustawień obliczeniowych, musisz ręcznie wybrać Unity Catalog pod Opcje magazynu w sekcji Miejsce docelowe interfejsu użytkownika Tworzenie potoku.

Aby utworzyć nowy potok ETL w Lakeflow Declarative Pipelines, wykonaj następujące kroki:

  1. W obszarze roboczym kliknij ikonę Przepływy pracy.Zadania i rury na pasku bocznym.
  2. W obszarze Nowy kliknij ETL Pipeline.
  3. W nazwa potokuwpisz unikatową nazwę potoku.
  4. Zaznacz pole wyboru Serverless.
  5. W Docelowym, aby skonfigurować lokalizację katalogu Unity, w której są publikowane tabele, wybierz istniejący Katalog i wpisz nową nazwę w schemacie, aby utworzyć nowy schemat w twoim katalogu.
  6. Kliknij pozycję Utwórz.

Interfejs użytkownika potoku zostanie wyświetlony dla nowego potoku.

Krok 2: Rozwijanie potoku

Ważne

Notesy mogą zawierać tylko jeden język programowania. Nie mieszaj kodu w języku Python i SQL w notesach potoku kodu źródłowego.

W tym kroku użyjesz notesów usługi Databricks do interaktywnego tworzenia i weryfikowania kodu źródłowego dla potoków deklaratywnych lakeflow.

Kod używa automatycznego modułu ładującego do pozyskiwania danych przyrostowych. Moduł ładujący automatycznie wykrywa i przetwarza nowe pliki, gdy docierają do magazynu obiektów w chmurze. Aby dowiedzieć się więcej, zobacz Co to jest moduł automatycznego ładowania?

Pusty notatnik kodu źródłowego jest automatycznie tworzony i konfigurowany dla pipeliny. Notatnik jest tworzony w nowym folderze w katalogu użytkownika. Nazwa nowego katalogu i pliku odpowiada nazwie pipeline'u. Na przykład /Users/someone@example.com/my_pipeline/my_pipeline.

Podczas tworzenia potoku możesz wybrać język Python lub SQL. Przykłady są uwzględniane w obu językach. W zależności od wybranego języka sprawdź, czy wybrano domyślny język notesu. Aby dowiedzieć się więcej na temat obsługi notatnika na potrzeby opracowywania kodu w potokach deklaratywnych Lakeflow, zobacz Tworzenie i debugowanie potoków ETL za pomocą notatnika w Lakeflow Declarative Pipelines.

  1. Link umożliwiający dostęp do tego notatnika znajduje się w polu Kod źródłowy w panelu Szczegóły potoku. Kliknij link, aby otworzyć notes przed przejściem do następnego kroku.

  2. Kliknij pozycję Połącz w prawym górnym rogu, aby otworzyć menu konfiguracji obliczeniowej.

  3. Najedź kursorem na nazwę pipeline'u utworzonego w kroku 1.

  4. Kliknij Połącz.

  5. Obok tytułu notesu u góry wybierz domyślny język notesu (Python lub SQL).

  6. Skopiuj i wklej następujący kod do komórki w notesie.

    Pyton

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Kliknij Start, aby rozpocząć aktualizację podłączonego potoku.

Krok 3. Wykonywanie zapytań dotyczących przekształconych danych

W tym kroku wykonasz zapytanie do danych przetworzonych w potoku ETL, aby przeanalizować dane dotyczące piosenek. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.

Najpierw uruchom zapytanie, które znajduje artystów, którzy wydali najwięcej piosenek każdego roku od 1990 roku.

  1. Na pasku bocznym kliknij pozycję Ikona edytora SQL w edytorze SQL.

  2. Kliknij ikonę Dodaj lub ikonę plus nowej karty i wybierz pozycję Utwórz nowe zapytanie z menu.

  3. Wprowadź następujące informacje:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Zastąp <catalog> nazwą katalogu i <schema> nazwą schematu, w których jest tabela. Na przykład data_pipelines.songs_data.top_artists_by_year.

  4. Kliknij Uruchom wybrane.

Teraz uruchom kolejne zapytanie, które znajduje piosenki z rytmem 4/4 i tempem tanecznym.

  1. Kliknij ikonę Dodaj lub ikonę plusa , a następnie wybierz pozycję Utwórz nowe zapytanie z menu.

  2. Wprowadź następujący kod:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Zastąp <catalog> nazwą katalogu i <schema> nazwą schematu, w których jest tabela. Na przykład data_pipelines.songs_data.songs_prepared.

  3. Kliknij Uruchom wybrane.

Krok 4. Tworzenie zadania w celu uruchomienia potoku

Następnie utwórz przepływ pracy w celu zautomatyzowania kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Databricks.

  1. W obszarze roboczym kliknij ikonę Przepływy pracy.Zadania i rury na pasku bocznym.
  2. W obszarze Nowy kliknij pozycję Zadanie.
  3. W polu tytuł zadania zastąp wartość Nowa data i godzina< zadania > nazwą zadania. Na przykład Songs workflow.
  4. W polu Nazwa zadania wprowadź nazwę pierwszego zadania, na przykład ETL_songs_data.
  5. W polu Typ wybierz pozycję Potok.
  6. W obszarze Potok wybierz potok utworzony w kroku 1.
  7. Kliknij pozycję Utwórz.
  8. Aby uruchomić przepływ pracy, kliknij pozycję Uruchom teraz. Aby wyświetlić szczegóły przebiegu, kliknij kartę Uruchomienia. Kliknij zadanie, aby zobaczyć szczegóły przebiegu konkretnego zadania.
  9. Aby wyświetlić wyniki po zakończeniu przepływu pracy, kliknij Przejdź do najnowszego pomyślnego uruchomienia lub Czas rozpoczęcia uruchomienia zadania. Zostanie wyświetlona strona Dane wyjściowe i wyświetli wyniki zapytania.

Aby uzyskać więcej informacji na temat przebiegów zadań, zobacz Monitorowanie i obserwacja dla zadań Lakeflow.

Krok 5: Zaplanuj zadanie przepływu danych

Aby uruchomić potok ETL zgodnie z ustalonym harmonogramem, wykonaj następujące kroki:

  1. Przejdź do interfejsu użytkownika zadania i potoków w tym samym obszarze roboczym usługi Azure Databricks co zadanie.
  2. Opcjonalnie wybierz filtry Zadania i Należące do mnie .
  3. W kolumnie Nazwa kliknij nazwę zadania. Na panelu bocznym są wyświetlane szczegóły zadania.
  4. Kliknij pozycję Dodaj wyzwalacz na panelu Harmonogramy i wyzwalacze , a następnie wybierz pozycję Zaplanowane w polu Typ wyzwalacza.
  5. Określ okres, czas rozpoczęcia i strefę czasową.
  6. Kliknij przycisk Zapisz.

Dowiedz się więcej