Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Dowiedz się, jak utworzyć i wdrożyć potok ETL (wyodrębnianie, przekształcanie i ładowanie) do orkiestracji danych za pomocą potoków deklaratywnych Lakeflow i Auto Loader. Potok ETL implementuje kroki odczytywania danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań, takich jak kontrole jakości danych i deduplikacja rekordów oraz zapisywanie danych w systemie docelowym, takim jak magazyn danych lub magazyn danych typu data lake.
W tym samouczku użyjesz Potoków Deklaratywnych Lakeflow i Modułu Ładującego Automatycznego do:
- Wczytaj nieprzetworzone dane źródłowe do tabeli docelowej.
- Przekształć nieprzetworzone dane źródłowe i zapisz przekształcone dane do dwóch zmaterializowanych widoków docelowych.
- Wykonaj zapytanie dotyczące przekształconych danych.
- Zautomatyzuj potok ETL za pomocą zadania Databricks.
Aby uzyskać więcej informacji na temat Lakeflow Declarative Pipelines i Auto Loader, zobacz Lakeflow Declarative Pipelines i What is Auto Loader?
Wymagania
Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:
- Zaloguj się do obszaru roboczego usługi Azure Databricks.
- Włącz Unity Catalog dla swojego obszaru roboczego.
- Mieć włączone bezserwerowe obliczenia dla twojego konta. Bezserwerowe potoki deklaratywne Lakeflow nie są dostępne we wszystkich regionach obszaru roboczego. Zobacz Funkcje z ograniczoną dostępnością regionalną dla dostępnych regionów.
- Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- Mieć uprawnienia do tworzenia nowego schematu w wykazie. Wymagane uprawnienia to
ALL PRIVILEGES
lubUSE CATALOG
iCREATE SCHEMA
. - Mieć uprawnienia do tworzenia nowego woluminu w istniejącym schemacie. Wymagane uprawnienia to
ALL PRIVILEGES
lubUSE SCHEMA
iCREATE VOLUME
.
Informacje o zestawie danych
Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.
Krok 1: Utwórz potok danych
Najpierw utworzysz przepływ ETL w Lakeflow Declarative Pipelines. Deklaratywne Potoki Lakeflow tworzą przepływy, rozwiązując zależności zdefiniowane w notesach lub plikach (nazywanych kodem źródłowym), przy użyciu składni Deklaratywnych Potoków Lakeflow. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale w pipeline można dodać wiele notesów lub plików specyficznych dla języka. Aby dowiedzieć się więcej, zobacz Deklaratywne potoki Lakeflow
Ważne
Pozostaw pole Kod źródłowy puste, aby utworzyć i skonfigurować notes do automatycznego tworzenia kodu źródłowego.
W tym samouczku użyto obliczeń bezserwerowych i Unity Catalog. Dla wszystkich opcji konfiguracji, które nie są określone, użyj ustawień domyślnych. Jeśli przetwarzanie bezserwerowe nie jest włączone lub obsługiwane w obszarze roboczym, możesz ukończyć samouczek zgodnie z instrukcjami napisanymi przy użyciu domyślnych ustawień obliczeniowych. Jeśli używasz domyślnych ustawień obliczeniowych, musisz ręcznie wybrać Unity Catalog pod Opcje magazynu w sekcji Miejsce docelowe interfejsu użytkownika Tworzenie potoku.
Aby utworzyć nowy potok ETL w Lakeflow Declarative Pipelines, wykonaj następujące kroki:
- W obszarze roboczym kliknij
Zadania i rury na pasku bocznym.
- W obszarze Nowy kliknij ETL Pipeline.
- W nazwa potokuwpisz unikatową nazwę potoku.
- Zaznacz pole wyboru Serverless.
- W Docelowym, aby skonfigurować lokalizację katalogu Unity, w której są publikowane tabele, wybierz istniejący Katalog i wpisz nową nazwę w schemacie, aby utworzyć nowy schemat w twoim katalogu.
- Kliknij pozycję Utwórz.
Interfejs użytkownika potoku zostanie wyświetlony dla nowego potoku.
Krok 2: Rozwijanie potoku
Ważne
Notesy mogą zawierać tylko jeden język programowania. Nie mieszaj kodu w języku Python i SQL w notesach potoku kodu źródłowego.
W tym kroku użyjesz notesów usługi Databricks do interaktywnego tworzenia i weryfikowania kodu źródłowego dla potoków deklaratywnych lakeflow.
Kod używa automatycznego modułu ładującego do pozyskiwania danych przyrostowych. Moduł ładujący automatycznie wykrywa i przetwarza nowe pliki, gdy docierają do magazynu obiektów w chmurze. Aby dowiedzieć się więcej, zobacz Co to jest moduł automatycznego ładowania?
Pusty notatnik kodu źródłowego jest automatycznie tworzony i konfigurowany dla pipeliny. Notatnik jest tworzony w nowym folderze w katalogu użytkownika. Nazwa nowego katalogu i pliku odpowiada nazwie pipeline'u. Na przykład /Users/someone@example.com/my_pipeline/my_pipeline
.
Podczas tworzenia potoku możesz wybrać język Python lub SQL. Przykłady są uwzględniane w obu językach. W zależności od wybranego języka sprawdź, czy wybrano domyślny język notesu. Aby dowiedzieć się więcej na temat obsługi notatnika na potrzeby opracowywania kodu w potokach deklaratywnych Lakeflow, zobacz Tworzenie i debugowanie potoków ETL za pomocą notatnika w Lakeflow Declarative Pipelines.
Link umożliwiający dostęp do tego notatnika znajduje się w polu Kod źródłowy w panelu Szczegóły potoku. Kliknij link, aby otworzyć notes przed przejściem do następnego kroku.
Kliknij pozycję Połącz w prawym górnym rogu, aby otworzyć menu konfiguracji obliczeniowej.
Najedź kursorem na nazwę pipeline'u utworzonego w kroku 1.
Kliknij Połącz.
Obok tytułu notesu u góry wybierz domyślny język notesu (Python lub SQL).
Skopiuj i wklej następujący kod do komórki w notesie.
Pyton
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Kliknij Start, aby rozpocząć aktualizację podłączonego potoku.
Krok 3. Wykonywanie zapytań dotyczących przekształconych danych
W tym kroku wykonasz zapytanie do danych przetworzonych w potoku ETL, aby przeanalizować dane dotyczące piosenek. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.
Najpierw uruchom zapytanie, które znajduje artystów, którzy wydali najwięcej piosenek każdego roku od 1990 roku.
Na pasku bocznym kliknij pozycję
.
Kliknij
nowej karty i wybierz pozycję Utwórz nowe zapytanie z menu.
Wprowadź następujące informacje:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Zastąp
<catalog>
nazwą katalogu i<schema>
nazwą schematu, w których jest tabela. Na przykładdata_pipelines.songs_data.top_artists_by_year
.Kliknij Uruchom wybrane.
Teraz uruchom kolejne zapytanie, które znajduje piosenki z rytmem 4/4 i tempem tanecznym.
Kliknij
, a następnie wybierz pozycję Utwórz nowe zapytanie z menu.
Wprowadź następujący kod:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Zastąp
<catalog>
nazwą katalogu i<schema>
nazwą schematu, w których jest tabela. Na przykładdata_pipelines.songs_data.songs_prepared
.Kliknij Uruchom wybrane.
Krok 4. Tworzenie zadania w celu uruchomienia potoku
Następnie utwórz przepływ pracy w celu zautomatyzowania kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Databricks.
- W obszarze roboczym kliknij
Zadania i rury na pasku bocznym.
- W obszarze Nowy kliknij pozycję Zadanie.
- W polu tytuł zadania zastąp wartość Nowa data i godzina< zadania > nazwą zadania. Na przykład
Songs workflow
. - W polu Nazwa zadania wprowadź nazwę pierwszego zadania, na przykład
ETL_songs_data
. - W polu Typ wybierz pozycję Potok.
- W obszarze Potok wybierz potok utworzony w kroku 1.
- Kliknij pozycję Utwórz.
- Aby uruchomić przepływ pracy, kliknij pozycję Uruchom teraz. Aby wyświetlić szczegóły przebiegu, kliknij kartę Uruchomienia. Kliknij zadanie, aby zobaczyć szczegóły przebiegu konkretnego zadania.
- Aby wyświetlić wyniki po zakończeniu przepływu pracy, kliknij Przejdź do najnowszego pomyślnego uruchomienia lub Czas rozpoczęcia uruchomienia zadania. Zostanie wyświetlona strona Dane wyjściowe i wyświetli wyniki zapytania.
Aby uzyskać więcej informacji na temat przebiegów zadań, zobacz Monitorowanie i obserwacja dla zadań Lakeflow.
Krok 5: Zaplanuj zadanie przepływu danych
Aby uruchomić potok ETL zgodnie z ustalonym harmonogramem, wykonaj następujące kroki:
- Przejdź do interfejsu użytkownika zadania i potoków w tym samym obszarze roboczym usługi Azure Databricks co zadanie.
- Opcjonalnie wybierz filtry Zadania i Należące do mnie .
- W kolumnie Nazwa kliknij nazwę zadania. Na panelu bocznym są wyświetlane szczegóły zadania.
- Kliknij pozycję Dodaj wyzwalacz na panelu Harmonogramy i wyzwalacze , a następnie wybierz pozycję Zaplanowane w polu Typ wyzwalacza.
- Określ okres, czas rozpoczęcia i strefę czasową.
- Kliknij przycisk Zapisz.
Dowiedz się więcej
- Aby dowiedzieć się więcej na temat potoków przetwarzania danych z Lakeflow Deklaratywne Potoki, zobacz Lakeflow Deklaratywne Potoki
- Aby dowiedzieć się więcej na temat notesów usługi Databricks, zobacz Wprowadzenie do notesów usługi Databricks.
- Aby dowiedzieć się więcej o zadaniach lakeflow, zobacz Co to są zadania?
- Aby dowiedzieć się więcej o usłudze Delta Lake, zobacz Co to jest usługa Delta Lake w usłudze Azure Databricks?