Tutorial: Ausführen einer End-to-End-Lakehouse-Analysepipeline
In diesem Tutorial erfahren Sie, wie Sie eine End-to-End-Analysepipeline für Azure Databricks Lakehouse einrichten.
Wichtig
In diesem Tutorial werden interaktive Notebooks verwendet, um allgemeine ETL-Aufgaben in Python in Clustern mit Unity Catalog-Aktivierung auszuführen. Wenn Sie Unity Catalog nicht verwenden, finden Sie weitere Informationen unter Ausführen Ihrer ersten ETL-Workload auf Azure Databricks.
Aufgaben in diesem Tutorial
Am Ende dieses Artikels sind Sie mit Folgendem vertraut:
- Starten eines Computeclusters mit Unity Catalog-Aktivierung
- Erstellen eines Databricks-Notebooks
- Schreiben und Lesen von Daten aus einem externen Unity Catalog-Speicherort
- Konfigurieren der inkrementellen Datenerfassung in einer Unity Catalog-Tabelle mit Autoloader
- Ausführen von Notebookzellen zum Verarbeiten und Abfragen von Daten sowie zum Anzeigen einer Vorschau der Daten
- Planen eines Notebooks als Databricks-Auftrag
- Abfragen von Unity Catalog-Tabellen aus Databricks SQL
Azure Databricks bietet eine Suite von produktionsbereiten Tools, mit denen Datenexperten schnell ETL-Pipelines (Extrahieren, Transformieren und Laden) entwickeln und bereitstellen können. Unity Catalog ermöglicht es Data Stewards, Anmeldeinformationen, externe Speicherorte und Datenbankobjekte für Benutzer in einer Organisation zu konfigurieren und zu schützen. Databricks SQL ermöglicht es Analysten, SQL-Abfragen für die gleichen Tabellen auszuführen, die in ETL-Produktionsworkloads verwendet werden. Dadurch wird Business Intelligence in Echtzeit im großen Stil ermöglicht.
Sie können außerdem Delta Live Tables verwenden, um ETL-Pipelines zu erstellen. Databricks hat Delta Live Tables erstellt, um die Komplexität des Erstellens, Bereitstellens und Verwaltens von ETL-Pipelines in der Produktion zu reduzieren. Siehe Tutorial: Ausführen Ihrer ersten Delta Live Tables-Pipeline.
Anforderungen
Hinweis
Wenn Sie keine Clustersteuerungsberechtigungen haben, können Sie dennoch die meisten der folgenden Schritte ausführen, solange Sie Zugriff auf einen Cluster haben.
Schritt 1: Erstellen eines Clusters
Erstellen Sie für die explorative Datenanalyse und Datentechnik einen Cluster, um die für die Ausführung von Befehlen erforderlichen Computeressourcen bereitzustellen.
- Klicken Sie auf der Seitenleiste auf Compute.
- Klicken Sie auf der Seitenleiste auf Neu und dann auf Cluster. Dadurch wird die Seite „New Cluster/Compute” (Neuer Cluster/Compute) geöffnet.
- Geben Sie einen eindeutigen Namen für den Cluster an.
- Wählen Sie die Optionsschaltfläche Einzelner Knoten aus.
- Wählen Sie die Option Einzelbenutzer in der Dropdownliste Zugriffsmodus aus.
- Stellen Sie sicher, dass Ihre E-Mail-Adresse im Feld Einzelbenutzer steht.
- Wählen Sie die gewünschte Databricks-Runtimeversion (mindestens 11.1) aus, um Unity Catalog zu verwenden.
- Klicken Sie auf Create compute (Compute erstellen), um den Cluster zu erstellen.
Weitere Informationen zu Databricks-Clustern finden Sie unter Compute.
Schritt 2: Erstellen eines Databricks-Notebooks
Wenn Sie ein Notebook in Ihrem Arbeitsbereich erstellen möchten, wählen Sie in der Randleiste Neu aus, und wählen Sie dann Notebook aus. Im Arbeitsbereich wird ein leeres Notebook geöffnet.
Weitere Informationen zum Erstellen und Verwalten von Notebooks finden Sie unter Verwalten von Notebooks.
Schritt 3: Schreiben und Lesen in einem von Unity Catalog verwalteten externen Speicherort
Databricks empfiehlt die Verwendung des Autoloaders für die inkrementelle Datenerfassung. Der Autoloader erkennt und verarbeitet automatisch neue Dateien, sobald sie im Cloudobjektspeicher empfangen werden.
Sie können Unity Catalog verwenden, um den sicheren Zugriff auf externe Speicherorte zu verwalten. Benutzer oder Dienstprinzipale mit Berechtigungen vom Typ READ FILES
für einen externen Speicherort können den Autoloader verwenden, um Daten zu erfassen.
Normalerweise gehen Daten aufgrund von Schreibvorgängen aus anderen Systemen an einem externen Speicherort ein. In dieser Demo können Sie den Eingang von Daten simulieren, indem Sie JSON-Dateien an einen externen Speicherort schreiben.
Kopieren Sie den folgenden Code in eine Notebookzelle. Ersetzen Sie den Zeichenfolgenwert für catalog
durch den Namen eines Katalogs mit Berechtigungen vom Typ CREATE CATALOG
und USE CATALOG
. Ersetzen Sie den Zeichenfolgenwert für external_location
durch den Pfad für einen externen Speicherort mit Berechtigungen vom Typ READ FILES
, WRITE FILES
und CREATE EXTERNAL TABLE
.
Externe Speicherorte können als gesamter Speichercontainer definiert werden, verweisen jedoch häufig auf ein Verzeichnis, das in einem Container geschachtelt ist.
Das richtige Format für den Pfad zu einem externen Speicherort ist "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
Beim Ausführen dieser Zelle sollten eine Zeile mit dem Hinweis, dass 12 Bytes geschrieben wurden, sowie die Zeichenfolge „Hello world!” ausgegeben und alle Datenbanken angezeigt werden, die im bereitgestellten Katalog vorhanden sind. Wenn Sie diese Zelle nicht ausführen können, vergewissern Sie sich, dass Sie sich in einem Arbeitsbereich befinden, für den Unity Catalog aktiviert ist, und fordern Sie die richtigen Berechtigungen von Ihrem Arbeitsbereichsadministrator an, um dieses Tutorial abzuschließen.
Der nachstehende Python-Code verwendet Ihre E-Mail-Adresse, um eine eindeutige Datenbank im bereitgestellten Katalog und einen eindeutigen Speicherort am angegebenen externen Speicherort zu erstellen. Durch das Ausführen dieser Zelle werden alle Daten im Zusammenhang mit diesem Tutorial entfernt, sodass Sie dieses Beispiel idempotent ausführen können. Eine Klasse wird definiert und instanziiert, mit der Sie Batches von Daten simulieren, die von einem verbundenen System an Ihrem externen Quellspeicherort eingehen.
Kopieren Sie diesen Code in eine neue Zelle in Ihrem Notebook, und führen Sie ihn aus, um Ihre Umgebung zu konfigurieren.
Hinweis
Die in diesem Code definierten Variablen sollten Ihnen eine sichere Ausführung ermöglichen, ohne dass es zu Konflikten mit bestehenden Arbeitsbereichsressourcen oder anderen Benutzern kommt. Eingeschränkte Netzwerk- oder Speicherberechtigungen lösen beim Ausführen dieses Codes Fehler aus. Wenden Sie sich an Ihren Arbeitsbereichsadministrator, um diese Einschränkungen zu behandeln.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Sie können nun einen Datenbatch empfangen, indem Sie den folgenden Code in eine Zelle kopieren und ausführen. Sie können diese Zelle manuell bis zu 60 Mal ausführen, um neue Dateneingänge auszulösen.
RawData.land_batch()
Schritt 4: Konfigurieren des Autoloaders zum Erfassen von Daten in Unity Catalog
Databricks empfiehlt das Speichern von Daten mit Delta Lake. Delta Lake ist eine Open Source-Speicherebene, die ACID-Transaktionen einführt und Data Lakehouse aktiviert. Delta Lake ist das Standardformat für Tabellen, die in Databricks erstellt wurden.
Wenn Sie Autoloader zum Erfassen von Daten in einer Unity Catalog-Tabelle konfigurieren möchten, kopieren Sie den folgenden Code in eine leere Zelle in Ihrem Notebook:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Um mehr über Auto Loader zu erfahren, siehe Was ist Auto Loader?.
Informationen zu strukturiertem Streaming mit Unity Catalog finden Sie unter Verwenden von Unity Catalog mit strukturiertem Streaming.
Schritt 5: Verarbeiten von und Interagieren mit Daten
Notebooks führen Logik Zelle für Zelle aus. Führen Sie die folgenden Schritte aus, um die Logik in Ihrer Zelle auszuführen:
Wenn Sie die im vorherigen Schritt abgeschlossene Zelle ausführen möchten, wählen Sie die Zelle aus, und drücken Sie UMSCHALT+EINGABE.
Wenn Sie die gerade erstellte Tabelle abfragen möchten, kopieren Sie den folgenden Code in eine leere Zelle, und drücken Sie dann UMSCHALT+EINGABE, um die Zelle auszuführen.
df = spark.read.table(table)
Wenn Sie eine Vorschau der Daten im Datenrahmen anzeigen möchten, kopieren Sie den folgenden Code in eine leere Zelle, und drücken Sie dann UMSCHALT+EINGABE, um die Zelle auszuführen.
display(df)
Weitere Informationen zu interaktiven Optionen zur Visualisierung von Daten finden Sie unter Visualisierungen in Databricks Notebooks.
Schritt 6: Planen eines Auftrags
Sie können Databricks-Notebooks als Produktionsskripts ausführen, indem Sie sie als Aufgabe in einem Databricks-Auftrag hinzufügen. In diesem Schritt erstellen Sie einen neuen Auftrag, den Sie manuell auslösen können.
So planen Sie Ihr Notebook als Aufgabe
- Klicken Sie auf der rechten Seite der Kopfleiste auf Planen.
- Geben Sie einen eindeutigen Namen unter Auftragsname ein.
- Klicken Sie auf Manuell.
- Wählen Sie in der Dropdownliste Cluster den Cluster aus, den Sie in Schritt 1 erstellt haben.
- Klicken Sie auf Erstellen.
- Klicken Sie im angezeigten Fenster auf Jetzt ausführen.
- Klicken Sie neben dem Zeitstempel Letzte Ausführung auf das Symbol , um die Ergebnisse der Auftragsausführung anzuzeigen.
Weitere Informationen zu Aufträgen finden Sie unter Was sind Databricks-Aufträge?.
Schritt 7: Abfragen einer Tabelle in Databricks SQL
Jeder mit der Berechtigung USE CATALOG
für den aktuellen Katalog, der Berechtigung USE SCHEMA
für das aktuelle Schema und SELECT
-Berechtigungen für die Tabelle kann den Inhalt der Tabelle über seine bevorzugte Databricks-API abfragen.
Sie benötigen Zugriff auf ein ausgeführtes SQL-Warehouse, um Abfragen in Databricks SQL auszuführen.
Die zuvor in diesem Tutorial erstellte Tabelle hat den Namen target_table
. Sie können sie mithilfe des Katalogs, den Sie in der ersten Zelle angegeben haben, und der Datenbank mit dem Muster e2e_lakehouse_<your-username>
abfragen. Sie können den Katalog-Explorer verwenden, um die von Ihnen erstellten Datenobjekte zu suchen.
Zusätzliche Integrationen
Erfahren Sie mehr über Integrationen und Tools für Datentechnik mit Azure Databricks: