Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym samouczku pokazano, jak opracowywać i wdrażać pierwszy potok ETL (wyodrębnianie, przekształcanie i ładowanie) na potrzeby orkiestracji danych za pomocą platformy Apache Spark. Mimo że w tym samouczku są używane ogólne zasoby obliczeniowe Databricks, możesz również używać bezserwerowych zasobów obliczeniowych, jeśli są włączone dla twojego obszaru roboczego.
Możesz również użyć deklaratywnych potoków Lakeflow do budowania potoków ETL. Potoki deklaratywne usługi Databricks Lakeflow zmniejszają złożoność tworzenia, wdrażania i obsługi produkcyjnych potoków ETL. Zobacz Samouczek: Tworzenie potoku ETL za pomocą potoków deklaratywnych Lakeflow.
Na końcu tego artykułu dowiesz się, jak wykonać następujące działania:
- Uruchamianie uniwersalnego klastra obliczeniowego Databricks.
- Tworzenie notatnika Databricks.
- Konfigurowanie przyrostowego pobierania danych do Delta Lake za pomocą automatycznego modułu ładującego.
- Wykonywanie komórek notesu w celu przetwarzania, wykonywania zapytań i podglądu danych.
- Planowanie notesu jako zadania Databricks.
W tym samouczku są używane interaktywne notesy do wykonywania typowych zadań ETL w języku Python lub Scala.
Możesz również użyć dostawcy Terraform dla Databricks, aby utworzyć zasoby opisane w tym artykule. Zobacz Tworzenie klastrów, notesów i zadań za pomocą programu Terraform.
Wymagania
- Zalogowano się do obszaru roboczego usługi Azure Databricks.
- Masz uprawnienia do tworzenia klastra.
Uwaga
Jeśli nie masz uprawnień kontroli klastra, możesz wykonać większość poniższych kroków, o ile masz dostęp do klastra.
Krok 1. Tworzenie klastra
Aby wykonać eksploracyjne analizy danych i inżynierii danych, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do wykonywania poleceń.
- Kliknij
Oblicz na pasku bocznym.
- Na stronie Obliczenia kliknij pozycję Utwórz klaster. Spowoduje to otwarcie strony Nowy klaster.
- Określ unikatową nazwę klastra, pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij Utwórz klaster.
Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Compute.
Krok 2: Utwórz notatnik Databricks
Aby utworzyć notes w obszarze roboczym, kliknij pozycję Nowy na pasku bocznym, a następnie kliknij przycisk Notes. W obszarze roboczym otwiera się pusty notes.
Aby dowiedzieć się więcej na temat tworzenia notesów i zarządzania nimi, zobacz Zarządzanie notesami.
Krok 3. Konfigurowanie automatycznego modułu ładującego w celu pozyskiwania danych do usługi Delta Lake
Databricks zaleca używanie Auto Loader do przyrostowego pozyskiwania danych. Auto Loader automatycznie wykrywa i przetwarza nowe pliki w miarę ich przybycia do magazynu obiektów w chmurze.
Usługa Databricks zaleca przechowywanie danych za pomocą usługi Delta Lake. Delta Lake to open source warstwa magazynowa, która zapewnia transakcje ACID i umożliwia tworzenie magazynu danych typu lakehouse. Usługa Delta Lake jest domyślnym formatem tabel utworzonych w usłudze Databricks.
Aby skonfigurować moduł automatycznego ładowania w celu pozyskiwania danych do tabeli usługi Delta Lake, skopiuj i wklej następujący kod do pustej komórki w notesie:
Pyton
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Skala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Uwaga
Zmienne zdefiniowane w tym kodzie powinny umożliwić bezpieczne wykonywanie go bez ryzyka konfliktu z istniejącymi elementami zawartości obszaru roboczego lub innymi użytkownikami. Ograniczone uprawnienia sieci lub przechowywania danych będą powodować błędy podczas wykonywania tego kodu; skontaktuj się z administratorem obszaru roboczego, aby usunąć te ograniczenia.
Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.
Krok 4. Przetwarzanie i interakcja z danymi
Notatniki wykonują kod komórka po komórce. Aby uruchomić funkcję logiczną w komórce:
Aby uruchomić komórkę ukończoną w poprzednim kroku, wybierz komórkę i naciśnij SHIFT+ENTER.
Aby wykonać zapytanie dotyczące utworzonej tabeli, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij SHIFT+ENTER , aby uruchomić komórkę.
Pyton
df = spark.read.table(table_name)
Skala
val df = spark.read.table(table_name)
Aby wyświetlić podgląd danych w ramce danych, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij SHIFT+ENTER , aby uruchomić komórkę.
Pyton
display(df)
Skala
display(df)
Aby dowiedzieć się więcej na temat interaktywnych opcji wizualizacji danych, zobacz Wizualizacje w notesach usługi Databricks i edytorze SQL.
Krok 5. Planowanie zadania
Notesy usługi Databricks można uruchamiać jako skrypty produkcyjne, dodając je jako zadanie w zadaniu usługi Databricks. W tym kroku utworzysz nowe zadanie, które można wyzwolić ręcznie.
Aby zaplanować notes jako zadanie:
- Kliknij pozycję Harmonogram po prawej stronie paska nagłówka.
- Wprowadź unikatową nazwę zadania.
- Kliknij pozycję Ręczne.
- Z listy rozwijanej Klaster wybierz klaster utworzony w kroku 1.
- Kliknij pozycję Utwórz.
- W wyświetlonym oknie kliknij pozycję Uruchom teraz.
- Aby wyświetlić wyniki uruchomienia zadania, kliknij ikonę
obok znacznika czasu ostatniego uruchomienia .
Aby uzyskać więcej informacji na temat zadań, zobacz Co to są zadania?.
Dodatkowe integracje
Dowiedz się więcej na temat integracji i narzędzi do inżynierii danych za pomocą usługi Azure Databricks: