Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym samouczku wyjaśniono, jak utworzyć i wdrożyć potok ETL (wyodrębnianie, przekształcanie i ładowanie) na potrzeby orkiestracji danych przy użyciu deklaratywnych potoków Lakeflow Spark i Auto Loader. Potok ETL implementuje kroki odczytywania danych z systemów źródłowych, przekształcania tych danych na podstawie wymagań, takich jak kontrole jakości danych i deduplikacja rekordów oraz zapisywanie danych w systemie docelowym, takim jak magazyn danych lub magazyn danych typu data lake.
W tym samouczku użyjesz potoków i modułu automatycznego ładującego do:
- Wczytaj nieprzetworzone dane źródłowe do tabeli docelowej.
- Przekształć nieprzetworzone dane źródłowe i zapisz przekształcone dane do dwóch zmaterializowanych widoków docelowych.
- Wykonaj zapytanie dotyczące przekształconych danych.
- Zautomatyzuj potok ETL za pomocą zadania Databricks.
Aby uzyskać więcej informacji na temat potoków i automatycznego modułu ładującego, zobacz Lakeflow Spark Deklaratywne potoki i Co to jest moduł automatycznego ładowania?
Wymagania
Aby ukończyć ten samouczek, musisz spełnić następujące wymagania:
- Zaloguj się do obszaru roboczego usługi Azure Databricks.
- Włącz Unity Catalog dla swojego obszaru roboczego.
- Mieć włączone bezserwerowe obliczenia dla twojego konta. Bezserwerowe deklaratywne potoki Lakeflow Spark nie są dostępne we wszystkich regionach obszarów roboczych. Zobacz Funkcje z ograniczoną dostępnością regionalną dla dostępnych regionów.
- Mieć uprawnienia do tworzenia zasobu obliczeniowego lub dostępu do zasobu obliczeniowego.
- Mieć uprawnienia do tworzenia nowego schematu w wykazie. Wymagane uprawnienia to
ALL PRIVILEGESlubUSE CATALOGiCREATE SCHEMA. - Mieć uprawnienia do tworzenia nowego woluminu w istniejącym schemacie. Wymagane uprawnienia to
ALL PRIVILEGESlubUSE SCHEMAiCREATE VOLUME.
Informacje o zestawie danych
Zestaw danych używany w tym przykładzie jest podzbiorem zestawu danych Million Song Dataset, kolekcją funkcji i metadanych utworów muzyki współczesnej. Ten zestaw danych jest dostępny w przykładowych zestawach danych zawartych w obszarze roboczym usługi Azure Databricks.
Krok 1: Utwórz potok danych
Najpierw utwórz pipeline, definiując zestawy danych w plikach nazywanych kodem źródłowym, korzystając ze składni pipeline. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale w potoku można dodać wiele plików specyficznych dla języka. Aby dowiedzieć się więcej, zapoznaj się z Deklaratywnymi potokami Lakeflow Spark
W tym samouczku użyto obliczeń bezserwerowych i Unity Catalog. Dla wszystkich opcji konfiguracji, które nie są określone, użyj ustawień domyślnych. Jeśli przetwarzanie bezserwerowe nie jest włączone lub obsługiwane w obszarze roboczym, możesz ukończyć samouczek zgodnie z instrukcjami napisanymi przy użyciu domyślnych ustawień obliczeniowych.
Aby utworzyć nowy potok, wykonaj następujące kroki:
- W obszarze roboczym kliknij
Następnie na pasku bocznym wybierz Nowy, a potem wybierz Potok ETL.
- Nadaj potokowi unikatową nazwę.
- Tuż pod nazwą wybierz domyślny wykaz i schemat dla wygenerowanych danych. Możesz określić inne miejsca docelowe w przekształceniach, ale w tym samouczku są używane te wartości domyślne. Musisz mieć uprawnienia do utworzonego katalogu i schematu. Zobacz Wymagania.
- Na potrzeby tego samouczka wybierz pozycję Rozpocznij od pustego pliku.
- W obszarze Ścieżka folderu określ lokalizację plików źródłowych lub zaakceptuj wartość domyślną (folder użytkownika).
- Wybierz język Python lub SQL jako język pierwszego pliku źródłowego (potok może mieszać i dopasowywać języki, ale każdy plik musi znajdować się w jednym języku).
- Kliknij pozycję Wybierz.
Zostanie wyświetlony edytor potoku dla nowego potoku. Zostanie utworzony pusty plik źródłowy dla danego języka, gotowy do pierwszej transformacji.
Krok 2. Opracowywanie logiki potoku
W tym kroku użyjesz Edytora Potoków Lakeflow do opracowywania i weryfikowania kodu źródłowego potoku w sposób interaktywny.
Kod używa automatycznego modułu ładującego do pozyskiwania danych przyrostowych. Moduł ładujący automatycznie wykrywa i przetwarza nowe pliki, gdy docierają do magazynu obiektów w chmurze. Aby dowiedzieć się więcej, zobacz Co to jest moduł automatycznego ładowania?
Pusty plik kodu źródłowego jest automatycznie tworzony i konfigurowany dla pipeline'u. Plik jest tworzony w folderze transformacji pipeline'u. Domyślnie wszystkie pliki *.py i *.sql w folderze przekształceń są częścią źródła potoku.
Skopiuj i wklej następujący kod do pliku źródłowego. Pamiętaj, aby użyć języka wybranego dla pliku w kroku 1.
Python
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;To źródło zawiera kod dla trzech zapytań. Możesz również umieścić te zapytania w osobnych plikach, aby zorganizować pliki i kod w preferowany sposób.
Kliknij
Uruchom plik lub Uruchom potok, aby uruchomić aktualizację połączonego potoku. W przypadku tylko jednego pliku źródłowego w potoku są one funkcjonalnie równoważne.
Po zakończeniu aktualizacji edytor zostanie zaktualizowany o informacje dotyczące twojego pipeline'u.
- Wykres potoku (DAG) na pasku bocznym po prawej stronie kodu przedstawia trzy tabele,
songs_raw,songs_prepareditop_artists_by_year. - Podsumowanie aktualizacji jest wyświetlane na górze przeglądarki zasobów pipelinu.
- Szczegóły tabel, które zostały wygenerowane, są wyświetlane w dolnym okienku i można przeglądać dane z tabel, wybierając je.
Obejmuje to nieprzetworzone i oczyszczone dane, a także prostą analizę, aby znaleźć najlepszych artystów według roku. W następnym kroku utworzysz zapytania ad hoc w oddzielnym pliku w swoim przepływie pracy na potrzeby dalszej analizy.
Krok 3. Zapoznanie się z zestawami danych utworzonymi przez potok
W tym kroku wykonasz zapytania ad hoc dotyczące danych przetwarzanych w potoku ETL w celu przeanalizowania danych dotyczących piosenek w edytorze SQL usługi Databricks. Te zapytania używają przygotowanych rekordów utworzonych w poprzednim kroku.
Najpierw uruchom zapytanie, które znajduje artystów, którzy wydali najwięcej piosenek każdego roku od 1990 roku.
Na pasku bocznym przeglądarki zasobów pipeline kliknij ikonę plusa
, a następnie wybierz Dodaj i Eksplorację.
Wprowadź nazwę i wybierz SQL jako opcję dla pliku eksploracji. Notatnik SQL został utworzony w nowym folderze
explorations. Pliki w folderzeexplorationsnie są domyślnie uruchamiane w ramach aktualizacji potoku. Notes SQL zawiera komórki, które można uruchamiać razem lub oddzielnie.Aby utworzyć tabelę artystów, która publikuje większość piosenek w każdym roku po 1990 roku, wprowadź następujący kod w nowym pliku SQL (jeśli w pliku znajduje się przykładowy kod, zastąp go). Ponieważ ten notatnik nie jest częścią potoku, nie korzysta z domyślnego katalogu i schematu. Zastąp element
<catalog>.<schema>katalogiem i schematem, które użyłeś jako domyślne dla potoku:-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;Kliknij
naciśnij
Shift + Enter, aby uruchomić to zapytanie.
Teraz uruchom kolejne zapytanie, które znajduje piosenki z rytmem 4/4 i tempem tanecznym.
Dodaj następujący kod do następnej komórki w tym samym pliku. Ponownie zastąp element
<catalog>.<schema>katalogiem i schematem, których używasz jako domyślne dla potoku.-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;Kliknij
naciśnij
Shift + Enter, aby uruchomić to zapytanie.
Krok 4. Tworzenie zadania w celu uruchomienia potoku
Następnie utwórz przepływ pracy w celu zautomatyzowania kroków pozyskiwania, przetwarzania i analizy danych przy użyciu zadania usługi Databricks uruchamianego zgodnie z harmonogramem.
- W górnej części edytora wybierz przycisk Harmonogram .
- Jeśli zostanie wyświetlone okno dialogowe Harmonogramy , wybierz pozycję Dodaj harmonogram.
- Spowoduje otwarcie okna dialogowego Nowy harmonogram, w którym można utworzyć zadanie uruchamiające potok zgodnie z harmonogramem.
- Opcjonalnie nadaj zadaniu nazwę.
- Domyślnie harmonogram jest uruchamiany raz dziennie. Możesz zaakceptować ustawienie domyślne lub ustawić własny harmonogram. Wybranie opcji Zaawansowane umożliwia ustawienie określonego czasu, w którym zadanie zostanie uruchomione. Wybranie pozycji Więcej opcji umożliwia tworzenie powiadomień po uruchomieniu zadania.
- Wybierz pozycję Utwórz , aby zastosować zmiany i utworzyć zadanie.
Teraz zadanie będzie uruchamiane codziennie, aby zapewnić aktualność Twojego pipeline'u. Możesz ponownie wybrać pozycję Harmonogram , aby wyświetlić listę harmonogramów. Możesz zarządzać harmonogramami potoku w tym oknie dialogowym, w tym dodawać, edytować lub usuwać harmonogramy.
Kliknięcie nazwy harmonogramu (lub zadania) przeniesie Cię do strony zadania na liście Zadania i potoki danych. W tym miejscu możesz wyświetlić szczegółowe informacje na temat przebiegów zadań, w tym historię przebiegów, lub natychmiast uruchomić zadanie za pomocą przycisku Uruchom teraz .
Aby uzyskać więcej informacji na temat przebiegów zadań, zobacz Monitorowanie i obserwacja dla zadań Lakeflow.
Dowiedz się więcej
- Aby dowiedzieć się więcej na temat potoków przetwarzania danych, zobacz Deklaratywne potoki Lakeflow Spark
- Aby dowiedzieć się więcej na temat notesów usługi Databricks, zobacz Notesy usługi Databricks.
- Aby dowiedzieć się więcej o zadaniach lakeflow, zobacz Co to są zadania?
- Aby dowiedzieć się więcej o usłudze Delta Lake, zobacz Co to jest usługa Delta Lake w usłudze Azure Databricks?