Samouczek: uruchamianie kompleksowego potoku analizy typu lakehouse

W tym samouczku przedstawiono sposób konfigurowania kompleksowego potoku analizy dla usługi Azure Databricks Lakehouse.

Ważne

W tym samouczku są używane interaktywne notesy do wykonywania typowych zadań ETL w języku Python w klastrach z obsługą wykazu aparatu Unity. Jeśli nie używasz wykazu aparatu Unity, zobacz Uruchamianie pierwszego obciążenia ETL w usłudze Azure Databricks.

Zadania w tym samouczku

Na końcu tego artykułu poczujesz się komfortowo:

  1. Uruchamianie klastra obliczeniowego z obsługą wykazu aparatu Unity.
  2. Tworzenie notesu usługi Databricks.
  3. Zapisywanie i odczytywanie danych z lokalizacji zewnętrznej wykazu aparatu Unity.
  4. Konfigurowanie pozyskiwania danych przyrostowych do tabeli wykazu aparatu Unity za pomocą modułu automatycznego ładowania.
  5. Wykonywanie komórek notesu w celu przetwarzania, wykonywania zapytań i podglądu danych.
  6. Planowanie notesu jako zadania usługi Databricks.
  7. Wykonywanie zapytań względem tabel wykazu aparatu Unity z bazy danych Databricks SQL

Usługa Azure Databricks udostępnia zestaw narzędzi gotowych do produkcji, które umożliwiają specjalistom ds. danych szybkie opracowywanie i wdrażanie potoków wyodrębniania, przekształcania i ładowania (ETL). Katalog aparatu Unity umożliwia stewardom danych konfigurowanie i zabezpieczanie poświadczeń magazynu, lokalizacji zewnętrznych i obiektów bazy danych dla użytkowników w całej organizacji. Usługa Databricks SQL umożliwia analitykom uruchamianie zapytań SQL względem tych samych tabel używanych w produkcyjnych obciążeniach ETL, co umożliwia analizę biznesową w czasie rzeczywistym na dużą skalę.

Do tworzenia potoków ETL można również użyć tabel różnicowych na żywo. Usługa Databricks utworzyła tabele delta live, aby zmniejszyć złożoność tworzenia, wdrażania i obsługi produkcyjnych potoków ETL. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.

Wymagania

Uwaga

Jeśli nie masz uprawnień kontroli klastra, możesz wykonać większość poniższych kroków, o ile masz dostęp do klastra.

Krok 1. Tworzenie klastra

Aby wykonać eksploracyjne analizy danych i inżynierii danych, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do wykonywania poleceń.

  1. Kliknij pozycję ikona obliczeniowaObliczenia na pasku bocznym.
  2. Kliknij pozycję Nowa ikonaNowy na pasku bocznym, a następnie wybierz pozycję Klaster. Spowoduje to otwarcie strony Nowy klaster/obliczenia.
  3. Określ unikatową nazwę klastra.
  4. Wybierz przycisk radiowy Pojedynczy węzeł.
  5. Wybierz pozycję Pojedynczy użytkownik z listy rozwijanej Tryb dostępu.
  6. Upewnij się, że twój adres e-mail jest widoczny w polu Pojedynczy użytkownik .
  7. Wybierz żądaną wersję środowiska uruchomieniowego usługi Databricks, 11.1 lub nowszą, aby użyć wykazu aparatu Unity.
  8. Kliknij pozycję Utwórz zasoby obliczeniowe , aby utworzyć klaster.

Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Obliczenia.

Krok 2. Tworzenie notesu usługi Databricks

Aby rozpocząć pisanie i wykonywanie kodu interaktywnego w usłudze Azure Databricks, utwórz notes.

  1. Kliknij pozycję Nowa ikonaNowy na pasku bocznym, a następnie kliknij przycisk Notes.
  2. Na stronie Tworzenie notesu:
    • Określ unikatową nazwę notesu.
    • Upewnij się, że język domyślny jest ustawiony na python.
    • Użyj menu rozwijanego Połączenie, aby wybrać klaster utworzony w kroku 1 z listy rozwijanej Klaster.

Notes zostanie otwarty z jedną pustą komórką.

Aby dowiedzieć się więcej na temat tworzenia notesów i zarządzania nimi, zobacz Zarządzanie notesami.

Krok 3. Zapisywanie i odczytywanie danych z lokalizacji zewnętrznej zarządzanej przez wykaz aparatu Unity

Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania danych przyrostowych. Automatycznie moduł ładujący automatycznie wykrywa i przetwarza nowe pliki w miarę ich przybycia do magazynu obiektów w chmurze.

Użyj wykazu aparatu Unity, aby zarządzać bezpiecznym dostępem do lokalizacji zewnętrznych. Użytkownicy lub jednostki usługi z uprawnieniami READ FILES do lokalizacji zewnętrznej mogą używać automatycznego modułu ładującego do pozyskiwania danych.

Zwykle dane docierają do lokalizacji zewnętrznej ze względu na zapisy z innych systemów. W tym pokazie można symulować przybycie danych, zapisując pliki JSON w lokalizacji zewnętrznej.

Skopiuj poniższy kod do komórki notesu. Zastąp wartość ciągu nazwą catalog katalogu i uprawnieniami CREATE CATALOGUSE CATALOG . Zastąp wartość external_location ciągu ścieżką lokalizacji zewnętrznej , READ FILESWRITE FILESi CREATE EXTERNAL TABLE uprawnieniami.

Lokalizacje zewnętrzne można zdefiniować jako cały kontener magazynu, ale często wskazują katalog zagnieżdżony w kontenerze.

Poprawny format ścieżki lokalizacji zewnętrznej to "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Wykonanie tej komórki powinno wyświetlić wiersz odczytujący 12 bajtów, wydrukować ciąg "Hello world!" i wyświetlić wszystkie bazy danych obecne w podanym wykazie. Jeśli nie możesz uruchomić tej komórki, upewnij się, że jesteś w obszarze roboczym z włączoną obsługą wykazu aparatu Unity i zażądaj odpowiednich uprawnień od administratora obszaru roboczego, aby ukończyć ten samouczek.

Poniższy kod w języku Python używa twojego adresu e-mail do utworzenia unikatowej bazy danych w podanym wykazie i unikatowej lokalizacji przechowywania w podanej lokalizacji zewnętrznej. Wykonanie tej komórki spowoduje usunięcie wszystkich danych skojarzonych z tym samouczkiem, co umożliwi wykonanie tego przykładu idempotentnie. Klasa jest zdefiniowana i utworzona przy użyciu funkcji symulacji partii danych przychodzących z połączonego systemu do lokalizacji zewnętrznej źródła.

Skopiuj ten kod do nowej komórki w notesie i wykonaj go w celu skonfigurowania środowiska.

Uwaga

Zmienne zdefiniowane w tym kodzie powinny umożliwić bezpieczne wykonywanie go bez ryzyka konfliktu z istniejącymi elementami zawartości obszaru roboczego lub innymi użytkownikami. Ograniczone uprawnienia sieci lub magazynu będą zgłaszać błędy podczas wykonywania tego kodu; skontaktuj się z administratorem obszaru roboczego, aby rozwiązać te ograniczenia.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Teraz możesz umieścić partię danych, kopiując następujący kod do komórki i wykonując go. Możesz ręcznie wykonać tę komórkę maksymalnie 60 razy, aby wyzwolić nowe dane przybycia.

RawData.land_batch()

Krok 4. Konfigurowanie automatycznego modułu ładującego w celu pozyskiwania danych do wykazu aparatu Unity

Usługa Databricks zaleca przechowywanie danych za pomocą usługi Delta Lake. Usługa Delta Lake to warstwa magazynu typu open source, która zapewnia transakcje ACID i umożliwia magazyn typu data lakehouse. Usługa Delta Lake jest domyślnym formatem tabel utworzonych w usłudze Databricks.

Aby skonfigurować moduł automatycznego ładowania w celu pozyskiwania danych do tabeli wykazu aparatu Unity, skopiuj i wklej następujący kod do pustej komórki w notesie:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.

Aby dowiedzieć się więcej na temat przesyłania strumieniowego ze strukturą za pomocą wykazu aparatu Unity, zobacz Using Unity Catalog with Structured Streaming (Używanie wykazu aparatu Unity z przesyłaniem strumieniowym ze strukturą).

Krok 5. Przetwarzanie danych i interakcja z nimi

Notesy wykonują komórkę logiki po komórce. Wykonaj następujące kroki, aby wykonać logikę w komórce:

  1. Aby uruchomić komórkę ukończoną w poprzednim kroku, wybierz komórkę i naciśnij klawisze SHIFT+ENTER.

  2. Aby wykonać zapytanie dotyczące utworzonej tabeli, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij klawisze SHIFT+ENTER , aby uruchomić komórkę.

    df = spark.read.table(table_name)
    
  3. Aby wyświetlić podgląd danych w ramce danych, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij klawisze SHIFT+ENTER , aby uruchomić komórkę.

    display(df)
    

Aby dowiedzieć się więcej na temat interaktywnych opcji wizualizacji danych, zobacz Wizualizacje w notesach usługi Databricks.

Krok 6. Planowanie zadania

Notesy usługi Databricks można uruchamiać jako skrypty produkcyjne, dodając je jako zadanie w zadaniu usługi Databricks. W tym kroku utworzysz nowe zadanie, które można wyzwolić ręcznie.

Aby zaplanować notes jako zadanie:

  1. Kliknij pozycję Harmonogram po prawej stronie paska nagłówka.
  2. Wprowadź unikatową nazwę zadania.
  3. Kliknij pozycję Ręczne.
  4. Z listy rozwijanej Klaster wybierz klaster utworzony w kroku 1.
  5. Kliknij pozycję Utwórz.
  6. W wyświetlonym oknie kliknij pozycję Uruchom teraz.
  7. Aby wyświetlić wyniki uruchomienia zadania, kliknij ikonę Łącze zewnętrzne obok znacznika czasu ostatniego uruchomienia .

Aby uzyskać więcej informacji na temat zadań, zobacz Co to jest usługa Azure Databricks Jobs?.

Krok 7. Wykonywanie zapytań dotyczących tabeli z bazy danych Databricks SQL

Każda osoba mająca USE CATALOG uprawnienia do bieżącego wykazu, USE SCHEMA uprawnienie do bieżącego schematu i SELECT uprawnienia w tabeli może wykonywać zapytania dotyczące zawartości tabeli z preferowanego interfejsu API usługi Databricks.

Do wykonywania zapytań w usłudze Databricks SQL SQL jest potrzebny dostęp do uruchomionego magazynu SQL.

Tabela utworzona wcześniej w tym samouczku ma nazwę target_table. Zapytania można wykonać za pomocą katalogu podanego w pierwszej komórce i bazie danych z ojcem e2e_lakehouse_<your-username>. Eksplorator wykazu służy do znajdowania utworzonych obiektów danych.

Dodatkowe integracje

Dowiedz się więcej na temat integracji i narzędzi do inżynierii danych za pomocą usługi Azure Databricks: