Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dowiedz się, jak utworzyć nowy potok przy użyciu Deklaratywnych Potoków Spark Lakeflow (SDP) do orkiestracji danych i Automatycznego Ładowacza. Ten samouczek rozszerza przykładowy potok danych poprzez oczyszczanie danych i tworzenie zapytania w celu znalezienia 100 najlepszych użytkowników.
Z tego samouczka dowiesz się, jak używać edytora Lakeflow Pipelines do:
- Utwórz nowy potok z domyślną strukturą folderów i zacznij od zestawu przykładowych plików.
- Definiowanie ograniczeń dotyczących jakości danych przy użyciu oczekiwań.
- Użyj funkcji edytora, aby rozszerzyć potok przy użyciu nowej transformacji w celu przeprowadzenia analizy danych.
Requirements
Przed rozpoczęciem tego samouczka należy wykonać następujące czynności:
- Zaloguj się do obszaru roboczego usługi Azure Databricks.
- Upewnij się, że Katalog Unity jest włączony dla Twojego obszaru roboczego.
- Edytor potoków Lakeflow musi być włączony dla obszaru roboczego oraz musisz być zarejestrowany do tej opcji. Zobacz Włączanie edytora potoków lakeflow i aktualizowanie monitorowania.
- Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- Musisz mieć uprawnienia do tworzenia nowego schematu w katalogu. Wymagane uprawnienia to
ALL PRIVILEGESlubUSE CATALOGiCREATE SCHEMA.
Krok 1: Utwórz potok
W tym kroku utworzysz pipeline przy użyciu domyślnej struktury folderów i przykładowego kodu. Przykłady kodu odwołują się do users tabeli w przykładowym wanderbricks źródle danych.
W obszarze roboczym usługi Azure Databricks kliknij
Nowy, a następnie
Potok ETL. Spowoduje to otwarcie edytora potoku na stronie tworzenia potoku.
Kliknij nagłówek, aby nadać potokowi nazwę.
Tuż pod nazwą wybierz domyślny wykaz i schemat dla tabel wyjściowych. Są one używane, gdy nie określasz katalogu i schematu w twoich definicjach potoków.
W obszarze Następny krok potoku kliknij
Zacznij od przykładowego kodu w kodzie SQL lub
Zacznij od przykładowego kodu w języku Python na podstawie preferencji języka. Spowoduje to zmianę domyślnego języka przykładowego kodu, ale możesz dodać kod w innym języku później. Spowoduje to utworzenie domyślnej struktury folderów z przykładowym kodem, aby rozpocząć pracę.
Przykładowy kod można wyświetlić w przeglądarce zasobów potoku po lewej stronie obszaru roboczego. Poniżej
transformationsznajdują się dwa pliki, z których każdy generuje jeden zestaw danych potoku. W obszarzeexplorationsznajduje się notes zawierający kod, który ułatwia wyświetlenie danych wyjściowych potoku. Kliknięcie pliku umożliwia wyświetlanie i edytowanie kodu w edytorze.Wyjściowe zestawy danych nie zostały jeszcze utworzone, a wykres potoku po prawej stronie ekranu jest pusty.
Aby uruchomić kod potoku (kod w folderze
transformations), kliknij Uruchom potok w prawym górnym rogu ekranu.Po zakończeniu przebiegu w dolnej części obszaru roboczego zostaną wyświetlone dwie nowe tabele, które zostały utworzone,
sample_users_<pipeline-name>isample_aggregation_<pipeline-name>. Możesz również zobaczyć, że wykres potoku po prawej stronie obszaru roboczego teraz pokazuje dwie tabele, w tymsample_usersjest źródłem dlasample_aggregationelementu.
Krok 2. Stosowanie kontroli jakości danych
W tym kroku dodajesz kontrolę jakości danych do tabeli sample_users. Aby ograniczyć dane, należy użyć oczekiwań dla potoku. W takim przypadku usuniesz wszystkie rekordy użytkownika, które nie mają prawidłowego adresu e-mail, i wyświetlisz wyczyszczonej tabeli jako users_cleaned.
W przeglądarce zasobów potoku kliknij
i wybierz Przekształcenie.
W oknie dialogowym Tworzenie nowego pliku transformacji wybierz następujące opcje:
- Wybierz język Python lub SQL dla języka. To nie musi być zgodne z poprzednim wyborem.
- Nadaj plikowi nazwę. W tym przypadku wybierz pozycję
users_cleaned. - W polu Ścieżka docelowa pozostaw wartość domyślną.
- W polu Typ zestawu danych pozostaw wartość Brak wybranego lub wybierz pozycję Zmaterializowany widok. W przypadku wybrania widoku zmaterializowanego zostanie wygenerowany przykładowy kod.
W nowym pliku kodu zmodyfikuj kod, aby był zgodny z poniższym kodem (użyj języka SQL lub Python w oparciu o wybór na poprzednim ekranie). Zastąp
<pipeline-name>pełną nazwą tabelisample_users.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Python
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Kliknij Uruchom potok, aby zaktualizować potok. Powinna ona teraz zawierać trzy tabele.
Krok 3. Analizowanie najważniejszych użytkowników
Następnie uzyskaj 100 pierwszych użytkowników według liczby rezerwacji, które zostały utworzone. Dołącz tabelę wanderbricks.bookings do zmaterializowanego users_cleaned widoku.
W przeglądarce zasobów potoku kliknij
i wybierz Przekształcenie.
W oknie dialogowym Tworzenie nowego pliku transformacji wybierz następujące opcje:
- Wybierz język Python lub SQL dla języka. To nie musi być zgodne z poprzednim wyborem.
- Nadaj plikowi nazwę. W tym przypadku wybierz pozycję
users_and_bookings. - W polu Ścieżka docelowa pozostaw wartość domyślną.
- Dla Typ zestawu danych pozostaw to jako Nie wybrano.
W nowym pliku kodu zmodyfikuj kod, aby był zgodny z poniższym kodem (użyj języka SQL lub Python w oparciu o wybór na poprzednim ekranie).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Python
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Kliknij pozycję Uruchom potok , aby zaktualizować zestawy danych. Po zakończeniu przebiegu można zobaczyć w grafie przepływu, że istnieją cztery tabele, w tym nowa tabela
users_and_bookings.
Dalsze kroki
Teraz, gdy wiesz już, jak używać niektórych funkcji edytora potoków Lakeflow i utworzyłeś potok, oto kilka innych funkcji, które warto poznać:
Narzędzia do pracy z przekształceniami i debugowania podczas tworzenia potoków:
- Selektywne wykonywanie
- Podglądy danych
- Interaktywny graf DAG (graf danych w twoim potoku)
Wbudowana integracja pakietów zasobów usługi Databricks w celu wydajnej współpracy, kontroli wersji i ciągłej integracji/ciągłego wdrażania bezpośrednio z edytora: