Samouczek: uruchamianie pierwszego potoku delta live tables

Ważne

Bezserwerowe potoki DLT są w publicznej wersji zapoznawczej. Aby dowiedzieć się więcej na temat włączania bezserwerowych potoków DLT, skontaktuj się z zespołem konta usługi Azure Databricks.

W tym samouczku pokazano, jak skonfigurować potok delta Live Tables z kodu w notesie usługi Databricks i uruchomić potok przez wyzwolenie aktualizacji potoku. Ten samouczek zawiera przykładowy potok pozyskiwania i przetwarzania przykładowego zestawu danych z przykładowym kodem przy użyciu interfejsów Python i SQL . Możesz również użyć instrukcji w tym samouczku, aby utworzyć potok z dowolnymi notesami z prawidłowo zdefiniowaną składnią delta Live Tables.

Potoki i aktualizacje wyzwalaczy tabel delta live można skonfigurować przy użyciu interfejsu użytkownika obszaru roboczego usługi Azure Databricks lub zautomatyzowanych opcji narzędzi, takich jak interfejs API, interfejs wiersza polecenia, pakiety zasobów usługi Databricks lub jako zadanie w przepływie pracy usługi Databricks. Aby zapoznać się z funkcjami i funkcjami tabel delta Live Tables, usługa Databricks zaleca najpierw używanie interfejsu użytkownika do tworzenia i uruchamiania potoków. Ponadto podczas konfigurowania potoku w interfejsie użytkownika funkcja Delta Live Tables generuje konfigurację JSON dla potoku, która może służyć do implementowania programowych przepływów pracy.

Aby zademonstrować funkcje funkcji Delta Live Tables, przykłady w tym samouczku umożliwiają pobranie publicznie dostępnego zestawu danych. Jednak usługa Databricks ma kilka sposobów nawiązywania połączenia ze źródłami danych i pozyskiwania danych, które będą używane przez potoki implementujące rzeczywiste przypadki użycia. Zobacz Pozyskiwanie danych za pomocą tabel delta live.

Wymagania

  • Aby uruchomić potok bezserwerowy, musisz mieć uprawnienie do tworzenia klastra lub dostęp do zasad klastra definiujących klaster delta live tables. Środowisko uruchomieniowe delta Live Tables tworzy klaster przed uruchomieniem potoku i kończy się niepowodzeniem, jeśli nie masz odpowiednich uprawnień.

  • Aby użyć przykładów w tym samouczku, obszar roboczy musi mieć włączony wykaz aparatu Unity.

  • Musisz mieć następujące uprawnienia w katalogu aparatu Unity:

    • READ VOLUME i WRITE VOLUME, dla ALL PRIVILEGESwoluminu my-volume .
    • USE SCHEMA lub ALL PRIVILEGES dla schematu default .
    • USE CATALOG lub ALL PRIVILEGES katalogu main .

    Aby ustawić te uprawnienia, zobacz uprawnienia administratora usługi Databricks lub katalogu aparatu Unity oraz zabezpieczane obiekty.

  • Przykłady w tym samouczku używają woluminu wykazu aparatu Unity do przechowywania przykładowych danych. Aby użyć tych przykładów, utwórz wolumin i użyj wykazu, schematu i nazw woluminów, aby ustawić ścieżkę woluminu używaną przez przykłady.

Uwaga

Jeśli obszar roboczy nie ma włączonego wykazu aparatu Unity, notesy z przykładami, które nie wymagają wykazu aparatu Unity, są dołączone do tego artykułu. Aby użyć tych przykładów, wybierz Hive metastore jako opcję magazynu podczas tworzenia potoku.

Gdzie uruchamiasz zapytania funkcji Delta Live Tables?

Zapytania usługi Delta Live Tables są implementowane głównie w notesach usługi Databricks, ale funkcja Delta Live Tables nie jest przeznaczona do interaktywnego uruchamiania w komórkach notesu. Wykonanie komórki zawierającej składnię delta live tables w notesie usługi Databricks powoduje wyświetlenie komunikatu o błędzie. Aby uruchamiać zapytania, należy skonfigurować notesy w ramach potoku.

Ważne

  • Nie można polegać na porządkowaniu wykonywania komórek przez komórkę notesów podczas pisania zapytań dotyczących tabel delta live. Funkcja Delta Live Tables ocenia i uruchamia cały kod zdefiniowany w notesach, ale ma inny model wykonywania niż notes Uruchom wszystkie polecenia.
  • Nie można mieszać języków w jednym pliku kodu źródłowego delta Live Tables. Na przykład notes może zawierać tylko zapytania języka Python lub zapytania SQL. Jeśli musisz używać wielu języków w potoku, użyj wielu notesów lub plików specyficznych dla języka w potoku.

Możesz również użyć kodu języka Python przechowywanego w plikach. Można na przykład utworzyć moduł języka Python, który można zaimportować do potoków języka Python lub zdefiniować funkcje zdefiniowane przez użytkownika języka Python (UDF) do użycia w zapytaniach SQL. Aby dowiedzieć się więcej na temat importowania modułów języka Python, zobacz Importowanie modułów języka Python z folderów git lub plików obszarów roboczych. Aby dowiedzieć się więcej o korzystaniu z funkcji zdefiniowanych przez użytkownika funkcji zdefiniowanych przez użytkownika — Python.

Przykład: pozyskiwanie i przetwarzanie danych dotyczących nazw dzieci w Nowym Jorku

W przykładzie w tym artykule użyto publicznie dostępnego zestawu danych zawierającego rekordy nowojorskich nazw dzieci. W poniższych przykładach pokazano użycie potoku delta live tables w celu:

  • Odczytywanie nieprzetworzonych danych CSV z publicznie dostępnego zestawu danych do tabeli.
  • Odczytywanie rekordów z tabeli danych pierwotnych i używanie oczekiwań delty na żywo tabel w celu utworzenia nowej tabeli zawierającej oczyszczone dane.
  • Użyj oczyszczonych rekordów jako danych wejściowych do zapytań delta Live Tables, które tworzą pochodne zestawy danych.

Ten kod przedstawia uproszczony przykład architektury medalonu. Zobacz Co to jest architektura medallion lakehouse?.

Implementacje tego przykładu są udostępniane dla interfejsów Python i SQL . Możesz wykonać kroki tworzenia nowych notesów zawierających przykładowy kod. Możesz też przejść do sekcji Tworzenie potoku i użyć jednego z notesów dostępnych na tej stronie.

Implementowanie potoku delty tabel na żywo za pomocą języka Python

Kod w języku Python, który tworzy zestawy danych delta Live Tables, musi zwracać ramki danych, znane użytkownikom korzystającym z programu PySpark lub Pandas na potrzeby platformy Spark. W przypadku użytkowników nieznanych ramek danych usługa Databricks zaleca korzystanie z interfejsu SQL. Zobacz Implementowanie potoku delty tabel na żywo za pomocą języka SQL.

Wszystkie interfejsy API języka Python tabel delta live tables są implementowane w module dlt . Kod potoku delta Live Tables zaimplementowany za pomocą języka Python musi jawnie zaimportować dlt moduł w górnej części notesów i plików języka Python. Delta Live Tables różni się od wielu skryptów języka Python w kluczowy sposób: nie wywołujesz funkcji wykonujących pozyskiwanie i przekształcanie danych w celu utworzenia zestawów danych delta Live Tables. Zamiast tego funkcja Delta Live Tables interpretuje funkcje dekoratora z modułu dlt we wszystkich plikach załadowanych do potoku i tworzy graf przepływu danych.

Aby zaimplementować przykład w tym samouczku, skopiuj i wklej następujący kod języka Python do nowego notesu języka Python. Każdy przykładowy fragment kodu należy dodać do własnej komórki w notesie w podanej kolejności. Aby zapoznać się z opcjami tworzenia notesów, zobacz Tworzenie notesu.

Uwaga

Podczas tworzenia potoku za pomocą interfejsu języka Python domyślnie nazwy tabel są definiowane przez nazwy funkcji. Na przykład poniższy przykład języka Python tworzy trzy tabele o nazwach baby_names_raw, baby_names_preparedi top_baby_names_2021. Nazwę tabeli można zastąpić przy użyciu parametru name . Zobacz Tworzenie tabel delta live zmaterializowany widok lub tabela przesyłania strumieniowego.

Importowanie modułu Delta Live Tables

Wszystkie interfejsy API języka Python tabel delta live tables są implementowane w module dlt . Jawnie zaimportuj dlt moduł w górnej części notesów i plików języka Python.

W poniższym przykładzie pokazano ten import wraz z instrukcjami import dla elementu pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Pobieranie danych

Aby pobrać dane dla tego przykładu, pobierz plik CSV i zapisz go w woluminie w następujący sposób:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Zastąp <catalog-name>wartości , <schema-name>i <volume-name> nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity.

Tworzenie tabeli na podstawie plików w magazynie obiektów

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Dekorator @dlt.table nakazuje funkcji Delta Live Tables utworzenie tabeli zawierającej DataFrame wynik zwracany przez funkcję. @dlt.table Dodaj dekorator przed dowolną definicją funkcji języka Python, która zwraca ramkę danych Platformy Spark, aby zarejestrować nową tabelę w tabelach delta live. W poniższym przykładzie pokazano użycie nazwy funkcji jako nazwy tabeli i dodanie opisowego komentarza do tabeli:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Dodawanie tabeli z nadrzędnego zestawu danych w potoku

dlt.read() Służy do odczytywania danych z innych zestawów danych zadeklarowanych w bieżącym potoku delta Live Tables. Deklarowanie nowych tabel w ten sposób powoduje utworzenie zależności automatycznie rozpoznawanej przez funkcję Delta Live Tables przed wykonaniem aktualizacji. Poniższy kod zawiera również przykłady monitorowania i wymuszania jakości danych z oczekiwaniami. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Tworzenie tabeli z wzbogaconymi widokami danych

Ponieważ tabele na żywo delty przetwarzają aktualizacje potoków jako serię grafów zależności, można zadeklarować wysoce wzbogacone widoki obsługujące pulpity nawigacyjne, analizy biznesowej i analizy, deklarując tabele z określoną logiką biznesową.

Tabele w tabelach na żywo funkcji Delta są równoważne zmaterializowanym widokom. Natomiast tradycyjne widoki logiki uruchamiania platformy Spark za każdym razem, gdy widok jest badany, tabela Delta Live Tables przechowuje najnowszą wersję wyników zapytania w plikach danych. Ponieważ usługa Delta Live Tables zarządza aktualizacjami dla wszystkich zestawów danych w potoku, można zaplanować aktualizacje potoku w celu dopasowania ich do zmaterializowanych widoków i wiedzieć, że zapytania względem tych tabel zawierają najnowszą dostępną wersję danych.

Tabela zdefiniowana w poniższym kodzie przedstawia koncepcyjne podobieństwo do zmaterializowanego widoku pochodzącego z danych nadrzędnych w potoku:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Aby skonfigurować potok korzystający z notesu, zobacz Tworzenie potoku.

Implementowanie potoku delty tabel na żywo za pomocą języka SQL

Usługa Databricks zaleca stosowanie tabel delta live z bazą danych SQL jako preferowanym sposobem tworzenia nowych potoków ETL, pozyskiwania i przekształcania w usłudze Azure Databricks. Interfejs SQL dla tabel delta Live Tables rozszerza standardowy język Spark SQL z wieloma nowymi słowami kluczowymi, konstrukcjami i funkcjami wartości tabeli. Te dodatki do standardowej bazy danych SQL umożliwiają użytkownikom deklarowanie zależności między zestawami danych i wdrażanie infrastruktury klasy produkcyjnej bez uczenia się nowych narzędzi lub dodatkowych pojęć.

Użytkownicy zaznajomieni z ramkami danych platformy Spark i potrzebują obsługi bardziej rozbudowanych testów i operacji, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania, usługa Databricks zaleca korzystanie z interfejsu języka Python. Zobacz przykład: Pozyskiwanie i przetwarzanie danych dotyczących nazw dzieci w Nowym Jorku.

Pobieranie danych

Aby uzyskać dane dla tego przykładu, skopiuj następujący kod, wklej go do nowego notesu, a następnie uruchom notes. Aby zapoznać się z opcjami tworzenia notesów, zobacz Tworzenie notesu.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Zastąp <catalog-name>wartości , <schema-name>i <volume-name> nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity.

Tworzenie tabeli na podstawie plików w wykazie aparatu Unity

W pozostałej części tego przykładu skopiuj następujące fragmenty kodu SQL i wklej je do nowego notesu SQL, niezależnie od notesu w poprzedniej sekcji. Każdy przykładowy fragment kodu SQL należy dodać do własnej komórki w notesie w podanej kolejności.

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Wszystkie instrukcje SQL funkcji Delta Live Tables używają CREATE OR REFRESH składni i semantyki. Podczas aktualizowania potoku tabele na żywo funkcji Delta określają, czy można wykonać logicznie poprawny wynik dla tabeli za pomocą przetwarzania przyrostowego lub jeśli wymagana jest pełna ponowna kompilacja.

Poniższy przykład tworzy tabelę, ładując dane z pliku CSV przechowywanego w woluminie wykazu aparatu Unity:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Zastąp <catalog-name>wartości , <schema-name>i <volume-name> nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity.

Dodawanie tabeli z nadrzędnego zestawu danych do potoku

Możesz użyć schematu wirtualnego live do wykonywania zapytań dotyczących danych z innych zestawów danych zadeklarowanych w bieżącym potoku delta live tables. Deklarowanie nowych tabel w ten sposób powoduje utworzenie zależności automatycznie rozpoznawanej przez funkcję Delta Live Tables przed wykonaniem aktualizacji. Schemat live jest niestandardowym słowem kluczowym zaimplementowanym w tabelach delta Live Tables, które można zastąpić schematem docelowym, jeśli chcesz opublikować zestawy danych. Zobacz Używanie wykazu aparatu Unity z potokami tabel delta Live Tables i Publikowanie danych z tabel delta Live Tables do magazynu metadanych Hive.

Poniższy kod zawiera również przykłady monitorowania i wymuszania jakości danych z oczekiwaniami. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Tworzenie wzbogaconego widoku danych

Ponieważ tabele na żywo delty przetwarzają aktualizacje potoków jako serię grafów zależności, można zadeklarować wysoce wzbogacone widoki obsługujące pulpity nawigacyjne, analizy biznesowej i analizy, deklarując tabele z określoną logiką biznesową.

Tabele na żywo są równoważne zmaterializowanym widokom. Natomiast tradycyjne widoki logiki uruchamiania platformy Spark za każdym razem, gdy w widoku są wykonywane zapytania, tabele na żywo przechowują najnowszą wersję wyników zapytania w plikach danych. Ponieważ usługa Delta Live Tables zarządza aktualizacjami dla wszystkich zestawów danych w potoku, można zaplanować aktualizacje potoku w celu dopasowania ich do zmaterializowanych widoków i wiedzieć, że zapytania względem tych tabel zawierają najnowszą dostępną wersję danych.

Poniższy kod tworzy wzbogacony zmaterializowany widok danych nadrzędnych:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Aby skonfigurować potok używający notesu, przejdź do sekcji Tworzenie potoku.

Tworzenie potoku

Delta Live Tables tworzy potoki, rozwiązując zależności zdefiniowane w notesach lub plikach (nazywanych kodem źródłowym lub bibliotekami) przy użyciu składni delta Live Tables. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale można mieszać biblioteki różnych języków w potoku.

  1. Kliknij pozycję Delta Live Tables (Tabele na żywo funkcji Delta) na pasku bocznym i kliknij pozycję Create Pipeline (Utwórz potok).
  2. Nadaj potokowi nazwę.
  3. (Opcjonalnie) Zaznacz pole wyboru Bezserwerowe , aby użyć w pełni zarządzanych zasobów obliczeniowych dla tego potoku. Po wybraniu pozycji Bezserwerowe ustawienia obliczeniowe zostaną usunięte z interfejsu użytkownika.
  4. (Opcjonalnie) Wybierz wersję produktu.
  5. Wybierz pozycję Wyzwolone dla trybu potoku.
  6. Skonfiguruj co najmniej jeden notes zawierający kod źródłowy potoku. W polu tekstowym Ścieżki wprowadź ścieżkę do notesu lub kliknij Ikona selektora plików , aby wybrać notes.
  7. Wybierz miejsce docelowe dla zestawów danych publikowanych przez potok — magazyn metadanych Hive lub wykaz aparatu Unity. Zobacz Publikowanie zestawów danych.
    • Magazyn metadanych Hive:
      • (Opcjonalnie) Wprowadź lokalizację magazynu dla danych wyjściowych z potoku. System używa domyślnej lokalizacji, jeśli pozostawisz pustą lokalizację magazynu.
      • (Opcjonalnie) Określ schemat docelowy, aby opublikować zestaw danych w magazynie metadanych Hive.
    • Wykaz aparatu Unity: określ wykaz i schemat docelowy, aby opublikować zestaw danych w wykazie aparatu Unity.
  8. (Opcjonalnie) Jeśli nie wybrano opcji Bezserwerowe, możesz skonfigurować ustawienia obliczeniowe dla potoku. Aby dowiedzieć się więcej o opcjach ustawień obliczeniowych, zobacz Konfigurowanie ustawień potoku dla tabel na żywo funkcji Delta.
  9. (Opcjonalnie) Kliknij pozycję Dodaj powiadomienie , aby skonfigurować co najmniej jeden adres e-mail, aby otrzymywać powiadomienia dotyczące zdarzeń potoku. Zobacz Dodawanie powiadomień e-mail dotyczących zdarzeń potoku.
  10. (Opcjonalnie) Skonfiguruj ustawienia zaawansowane dla potoku. Aby dowiedzieć się więcej o opcjach ustawień zaawansowanych, zobacz Konfigurowanie ustawień potoku dla tabel delta Live Tables.
  11. Kliknij pozycję Utwórz.

Po kliknięciu przycisku Utwórz system wyświetli stronę Szczegóły potoku. Możesz również uzyskać dostęp do potoku, klikając nazwę potoku na karcie Delta Live Tables .

Uruchamianie aktualizacji potoku

Aby rozpocząć aktualizację potoku, kliknij Ikona uruchamiania tabel na żywo usługi Delta przycisk w górnym panelu. System zwraca komunikat z potwierdzeniem uruchomienia potoku.

Po pomyślnym uruchomieniu aktualizacji system Delta Live Tables:

  1. Uruchamia klaster przy użyciu konfiguracji klastra utworzonej przez system Delta Live Tables. Można również określić niestandardową konfigurację klastra.
  2. Tworzy wszystkie tabele, które nie istnieją i gwarantuje, że schemat jest poprawny dla wszystkich istniejących tabel.
  3. Aktualizacje tabele z najnowszymi dostępnymi danymi.
  4. Zamyka klaster po zakończeniu aktualizacji.

Uwaga

Tryb wykonywania jest domyślnie ustawiony na Wartość produkcyjna , która wdraża efemeryczne zasoby obliczeniowe dla każdej aktualizacji. Możesz użyć trybu programowania , aby zmienić to zachowanie, zezwalając na używanie tych samych zasobów obliczeniowych na potrzeby wielu aktualizacji potoku podczas programowania i testowania. Zobacz Tryby programowania i produkcji.

Publikowanie zestawów danych

Zestawy danych usługi Delta Live Tables można udostępnić do wykonywania zapytań, publikując tabele w magazynie metadanych Programu Hive lub wykazie aparatu Unity. Jeśli nie określisz obiektu docelowego do publikowania danych, tabele utworzone w potokach delta live tables mogą być dostępne tylko przez inne operacje w tym samym potoku. Zobacz Publikowanie danych z tabel delta Live Tables do magazynu metadanych Hive i Używanie wykazu aparatu Unity z potokami delta Live Tables.

Przykładowe notesy kodu źródłowego

Możesz zaimportować te notesy do obszaru roboczego usługi Azure Databricks i użyć ich do wdrożenia potoku delta Live Tables. Zobacz Tworzenie potoku.

Wprowadzenie do notesu usługi Delta Live Tables w języku Python

Pobierz notes

Wprowadzenie do notesu SQL usługi Delta Live Tables

Pobierz notes

Przykładowe notesy kodu źródłowego dla obszarów roboczych bez wykazu aparatu Unity

Te notesy można zaimportować do obszaru roboczego usługi Azure Databricks bez włączonego wykazu aparatu Unity i użyć ich do wdrożenia potoku delta live tables. Zobacz Tworzenie potoku.

Wprowadzenie do notesu usługi Delta Live Tables w języku Python

Pobierz notes

Wprowadzenie do notesu SQL usługi Delta Live Tables

Pobierz notes