Udostępnij przez


Samouczek: tworzenie pierwszego potoku przy użyciu edytora potoków lakeflow

Dowiedz się, jak utworzyć nowy potok przy użyciu Deklaratywnych Potoków Spark Lakeflow (SDP) do orkiestracji danych i Automatycznego Ładowacza. Ten samouczek rozszerza przykładowy potok danych poprzez oczyszczanie danych i tworzenie zapytania w celu znalezienia 100 najlepszych użytkowników.

Z tego samouczka dowiesz się, jak używać edytora Lakeflow Pipelines do:

  • Utwórz nowy potok z domyślną strukturą folderów i zacznij od zestawu przykładowych plików.
  • Definiowanie ograniczeń dotyczących jakości danych przy użyciu oczekiwań.
  • Użyj funkcji edytora, aby rozszerzyć potok przy użyciu nowej transformacji w celu przeprowadzenia analizy danych.

Requirements

Przed rozpoczęciem tego samouczka należy wykonać następujące czynności:

  • Zaloguj się do obszaru roboczego usługi Azure Databricks.
  • Upewnij się, że Katalog Unity jest włączony dla Twojego obszaru roboczego.
  • Edytor potoków Lakeflow musi być włączony dla obszaru roboczego oraz musisz być zarejestrowany do tej opcji. Zobacz Włączanie edytora potoków lakeflow i aktualizowanie monitorowania.
  • Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
  • Musisz mieć uprawnienia do tworzenia nowego schematu w katalogu. Wymagane uprawnienia to ALL PRIVILEGES lub USE CATALOG i CREATE SCHEMA.

Krok 1: Utwórz potok

W tym kroku utworzysz pipeline przy użyciu domyślnej struktury folderów i przykładowego kodu. Przykłady kodu odwołują się do users tabeli w przykładowym wanderbricks źródle danych.

  1. W obszarze roboczym usługi Azure Databricks kliknij ikonę Plus.Nowy, a następnie ikona Potok.Potok ETL. Spowoduje to otwarcie edytora potoku na stronie tworzenia potoku.

  2. Kliknij nagłówek, aby nadać potokowi nazwę.

  3. Tuż pod nazwą wybierz domyślny wykaz i schemat dla tabel wyjściowych. Są one używane, gdy nie określasz katalogu i schematu w twoich definicjach potoków.

  4. W obszarze Następny krok potoku kliknij ikonę Schemat.Zacznij od przykładowego kodu w kodzie SQL lub ikonie schematu.Zacznij od przykładowego kodu w języku Python na podstawie preferencji języka. Spowoduje to zmianę domyślnego języka przykładowego kodu, ale możesz dodać kod w innym języku później. Spowoduje to utworzenie domyślnej struktury folderów z przykładowym kodem, aby rozpocząć pracę.

  5. Przykładowy kod można wyświetlić w przeglądarce zasobów potoku po lewej stronie obszaru roboczego. Poniżej transformations znajdują się dwa pliki, z których każdy generuje jeden zestaw danych potoku. W obszarze explorations znajduje się notes zawierający kod, który ułatwia wyświetlenie danych wyjściowych potoku. Kliknięcie pliku umożliwia wyświetlanie i edytowanie kodu w edytorze.

    Wyjściowe zestawy danych nie zostały jeszcze utworzone, a wykres potoku po prawej stronie ekranu jest pusty.

  6. Aby uruchomić kod potoku (kod w folderze transformations), kliknij Uruchom potok w prawym górnym rogu ekranu.

    Po zakończeniu przebiegu w dolnej części obszaru roboczego zostaną wyświetlone dwie nowe tabele, które zostały utworzone, sample_users_<pipeline-name> i sample_aggregation_<pipeline-name>. Możesz również zobaczyć, że wykres potoku po prawej stronie obszaru roboczego teraz pokazuje dwie tabele, w tym sample_users jest źródłem dla sample_aggregation elementu.

Krok 2. Stosowanie kontroli jakości danych

W tym kroku dodajesz kontrolę jakości danych do tabeli sample_users. Aby ograniczyć dane, należy użyć oczekiwań dla potoku. W takim przypadku usuniesz wszystkie rekordy użytkownika, które nie mają prawidłowego adresu e-mail, i wyświetlisz wyczyszczonej tabeli jako users_cleaned.

  1. W przeglądarce zasobów potoku kliknij ikonę plus i wybierz Przekształcenie.

  2. W oknie dialogowym Tworzenie nowego pliku transformacji wybierz następujące opcje:

    • Wybierz język Python lub SQL dla języka. To nie musi być zgodne z poprzednim wyborem.
    • Nadaj plikowi nazwę. W tym przypadku wybierz pozycję users_cleaned.
    • W polu Ścieżka docelowa pozostaw wartość domyślną.
    • W polu Typ zestawu danych pozostaw wartość Brak wybranego lub wybierz pozycję Zmaterializowany widok. W przypadku wybrania widoku zmaterializowanego zostanie wygenerowany przykładowy kod.
  3. W nowym pliku kodu zmodyfikuj kod, aby był zgodny z poniższym kodem (użyj języka SQL lub Python w oparciu o wybór na poprzednim ekranie). Zastąp <pipeline-name> pełną nazwą tabeli sample_users.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Kliknij Uruchom potok, aby zaktualizować potok. Powinna ona teraz zawierać trzy tabele.

Krok 3. Analizowanie najważniejszych użytkowników

Następnie uzyskaj 100 pierwszych użytkowników według liczby rezerwacji, które zostały utworzone. Dołącz tabelę wanderbricks.bookings do zmaterializowanego users_cleaned widoku.

  1. W przeglądarce zasobów potoku kliknij ikonę plus i wybierz Przekształcenie.

  2. W oknie dialogowym Tworzenie nowego pliku transformacji wybierz następujące opcje:

    • Wybierz język Python lub SQL dla języka. To nie musi być zgodne z poprzednim wyborem.
    • Nadaj plikowi nazwę. W tym przypadku wybierz pozycję users_and_bookings.
    • W polu Ścieżka docelowa pozostaw wartość domyślną.
    • Dla Typ zestawu danych pozostaw to jako Nie wybrano.
  3. W nowym pliku kodu zmodyfikuj kod, aby był zgodny z poniższym kodem (użyj języka SQL lub Python w oparciu o wybór na poprzednim ekranie).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Kliknij pozycję Uruchom potok , aby zaktualizować zestawy danych. Po zakończeniu przebiegu można zobaczyć w grafie przepływu, że istnieją cztery tabele, w tym nowa tabela users_and_bookings.

    Wykres potoku przedstawiający cztery tabele w potoku

Dalsze kroki

Teraz, gdy wiesz już, jak używać niektórych funkcji edytora potoków Lakeflow i utworzyłeś potok, oto kilka innych funkcji, które warto poznać: