Uruchamianie pierwszego obciążenia ETL w usłudze Azure Databricks

Dowiedz się, jak używać narzędzi gotowych do produkcji z usługi Azure Databricks do tworzenia i wdrażania pierwszych potoków wyodrębniania, przekształcania i ładowania (ETL) na potrzeby orkiestracji danych.

Na końcu tego artykułu poczujesz się komfortowo:

  1. Uruchamianie klastra obliczeniowego usługi Databricks ogólnego przeznaczenia.
  2. Tworzenie notesu usługi Databricks.
  3. Konfigurowanie pozyskiwania danych przyrostowych do usługi Delta Lake za pomocą modułu ładującego automatycznego.
  4. Wykonywanie komórek notesu w celu przetwarzania, wykonywania zapytań i podglądu danych.
  5. Planowanie notesu jako zadania usługi Databricks.

W tym samouczku są używane interaktywne notesy do wykonywania typowych zadań ETL w języku Python lub Scala.

Do tworzenia potoków ETL można również użyć tabel różnicowych na żywo. Usługa Databricks utworzyła tabele delta live, aby zmniejszyć złożoność tworzenia, wdrażania i obsługi produkcyjnych potoków ETL. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.

Możesz również użyć dostawcy narzędzia Terraform usługi Databricks, aby utworzyć zasoby tego artykułu. Zobacz Tworzenie klastrów, notesów i zadań za pomocą programu Terraform.

Wymagania

Uwaga

Jeśli nie masz uprawnień kontroli klastra, możesz wykonać większość poniższych kroków, o ile masz dostęp do klastra.

Krok 1. Tworzenie klastra

Aby wykonać eksploracyjne analizy danych i inżynierii danych, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do wykonywania poleceń.

  1. Kliknij pozycję ikona obliczeniowaObliczenia na pasku bocznym.
  2. Na stronie Obliczenia kliknij pozycję Utwórz klaster. Spowoduje to otwarcie strony Nowy klaster.
  3. Określ unikatową nazwę klastra, pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij pozycję Utwórz klaster.

Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Obliczenia.

Krok 2. Tworzenie notesu usługi Databricks

Aby rozpocząć pisanie i wykonywanie kodu interaktywnego w usłudze Azure Databricks, utwórz notes.

  1. Kliknij pozycję Nowa ikonaNowy na pasku bocznym, a następnie kliknij przycisk Notes.
  2. Na stronie Tworzenie notesu:
    • Określ unikatową nazwę notesu.
    • Upewnij się, że język domyślny jest ustawiony na Python lub Scala.
    • Wybierz klaster utworzony w kroku 1 z listy rozwijanej Klaster .
    • Kliknij pozycję Utwórz.

Notes zostanie otwarty z pustą komórką u góry.

Aby dowiedzieć się więcej na temat tworzenia notesów i zarządzania nimi, zobacz Zarządzanie notesami.

Krok 3. Konfigurowanie automatycznego modułu ładującego w celu pozyskiwania danych do usługi Delta Lake

Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania danych przyrostowych. Automatycznie moduł ładujący automatycznie wykrywa i przetwarza nowe pliki w miarę ich przybycia do magazynu obiektów w chmurze.

Usługa Databricks zaleca przechowywanie danych za pomocą usługi Delta Lake. Usługa Delta Lake to warstwa magazynu typu open source, która zapewnia transakcje ACID i umożliwia magazyn typu data lakehouse. Usługa Delta Lake jest domyślnym formatem tabel utworzonych w usłudze Databricks.

Aby skonfigurować moduł automatycznego ładowania w celu pozyskiwania danych do tabeli usługi Delta Lake, skopiuj i wklej następujący kod do pustej komórki w notesie:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Uwaga

Zmienne zdefiniowane w tym kodzie powinny umożliwić bezpieczne wykonywanie go bez ryzyka konfliktu z istniejącymi elementami zawartości obszaru roboczego lub innymi użytkownikami. Ograniczone uprawnienia sieci lub magazynu będą zgłaszać błędy podczas wykonywania tego kodu; skontaktuj się z administratorem obszaru roboczego, aby rozwiązać te ograniczenia.

Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.

Krok 4. Przetwarzanie i interakcja z danymi

Notesy wykonują komórkę logiki po komórce. Aby wykonać logikę w komórce:

  1. Aby uruchomić komórkę ukończoną w poprzednim kroku, wybierz komórkę i naciśnij klawisze SHIFT+ENTER.

  2. Aby wykonać zapytanie dotyczące utworzonej tabeli, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij klawisze SHIFT+ENTER , aby uruchomić komórkę.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Aby wyświetlić podgląd danych w ramce danych, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij klawisze SHIFT+ENTER , aby uruchomić komórkę.

    Python

    display(df)
    

    Scala

    display(df)
    

Aby dowiedzieć się więcej na temat interaktywnych opcji wizualizacji danych, zobacz Wizualizacje w notesach usługi Databricks.

Krok 5. Planowanie zadania

Notesy usługi Databricks można uruchamiać jako skrypty produkcyjne, dodając je jako zadanie w zadaniu usługi Databricks. W tym kroku utworzysz nowe zadanie, które można wyzwolić ręcznie.

Aby zaplanować notes jako zadanie:

  1. Kliknij pozycję Harmonogram po prawej stronie paska nagłówka.
  2. Wprowadź unikatową nazwę zadania.
  3. Kliknij pozycję Ręczne.
  4. Z listy rozwijanej Klaster wybierz klaster utworzony w kroku 1.
  5. Kliknij pozycję Utwórz.
  6. W wyświetlonym oknie kliknij pozycję Uruchom teraz.
  7. Aby wyświetlić wyniki uruchomienia zadania, kliknij ikonę Łącze zewnętrzne obok znacznika czasu ostatniego uruchomienia .

Aby uzyskać więcej informacji na temat zadań, zobacz Co to jest usługa Azure Databricks Jobs?.

Dodatkowe integracje

Dowiedz się więcej na temat integracji i narzędzi do inżynierii danych za pomocą usługi Azure Databricks: