Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Erfahren Sie, wie Sie eine ETL-Pipeline (Extrahieren, Transformieren und Laden) für die Daten-Orchestrierung mit Lakeflow Declarative Pipelines und Auto Loader erstellen und bereitstellen. Eine ETL-Pipeline implementiert die Schritte zum Lesen von Daten aus Quellsystemen, zum Transformieren dieser Daten basierend auf Anforderungen, z. B. Datenqualitätsprüfungen und Datensatzdeduplizierung, und zum Schreiben der Daten in ein Zielsystem, z. B. ein Data Warehouse oder einen Data Lake.
In diesem Lernprogramm verwenden Sie Lakeflow Declarative Pipelines und Auto Loader für:
- Rohdaten in eine Zieltabelle importieren.
- Transformieren Sie die Rohdaten, und schreiben Sie die transformierten Daten in zwei materialisierte Zielansichten.
- Abfragen der transformierten Daten.
- Automatisieren Sie die ETL-Pipeline mit einem Databricks-Auftrag.
Weitere Informationen zu lakeflow Declarative Pipelines und Auto Loader finden Sie unter Lakeflow Declarative Pipelines und Was ist Auto Loader?
Anforderungen
Um dieses Tutorial abzuschließen, müssen Sie die folgenden Anforderungen erfüllen:
- Melden Sie sich bei einem Azure Databricks-Arbeitsbereich an.
- Haben Sie Unity-Katalog für Ihren Arbeitsbereich aktiviert.
- Haben Sie serverlose Computefunktion für Ihr Konto aktiviert. Deklarative Pipelines für Serverless Lakeflow sind nicht in allen Regionen des Arbeitsbereichs verfügbar. Siehe Features mit eingeschränkter regionaler Verfügbarkeit für verfügbare Regionen.
- Verfügen Sie über die Berechtigung zum Erstellen einer Computeressource oder des Zugriffs auf eine Computeressource.
- Verfügen Sie über Berechtigungen zum Erstellen eines neuen Schemas in einem Katalog. Die erforderlichen Berechtigungen sind
ALL PRIVILEGES
oderUSE CATALOG
und .CREATE SCHEMA
- Verfügen Sie über Berechtigungen zum Erstellen eines neuen Volumes in einem vorhandenen Schema. Die erforderlichen Berechtigungen sind
ALL PRIVILEGES
oderUSE SCHEMA
und .CREATE VOLUME
Informationen zum Dataset
Das in diesem Beispiel verwendete Dataset ist eine Teilmenge von Million Song Dataset, eine Sammlung von Features und Metadaten für zeitgenössische Musiktitel. Dieses Dataset ist in den Beispieldatasets verfügbar, die in Ihrem Azure Databricks-Arbeitsbereich enthalten sind.
Schritt 1: Erstellen einer Pipeline
Zunächst erstellen Sie eine ETL-Pipeline in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines erstellt Pipelines, indem Abhängigkeiten aufgelöst werden, die in Notizbüchern oder Dateien (als Quellcode bezeichnet) mithilfe der Lakeflow Declarative Pipelines-Syntax definiert sind. Jede Quellcodedatei kann nur eine Sprache enthalten, Sie können jedoch mehrere sprachspezifische Notizbücher oder Dateien in der Pipeline hinzufügen. Weitere Informationen finden Sie unter Lakeflow Declarative Pipelines
Wichtig
Lassen Sie das Feld "Quellcode " leer, um automatisch ein Notizbuch für die Quellcodeerstellung zu erstellen und zu konfigurieren.
In diesem Tutorial werden serverless Computing und Unity Catalog verwendet. Verwenden Sie für alle nicht angegebenen Konfigurationsoptionen die Standardeinstellungen. Wenn die serverlose Berechnung in Ihrem Arbeitsbereich nicht aktiviert oder unterstützt wird, können Sie das Lernprogramm mit den Standardberechnungseinstellungen abschließen. Wenn Sie Standardberechnungseinstellungen verwenden, müssen Sie den Unity-Katalog manuell unter "Speicheroptionen " im Abschnitt "Ziel " der Benutzeroberfläche "Pipeline erstellen " auswählen.
Führen Sie die folgenden Schritte aus, um eine neue ETL-Pipeline in Lakeflow Declarative Pipelines zu erstellen:
- Klicken Sie in Ihrem Arbeitsbereich auf
Aufträge & Pipelines in der Randleiste.
- Klicken Sie unter "Neu" auf "ETL-Pipeline".
- Geben Sie unter "Pipelinename" einen eindeutigen Pipelinenamen ein.
- Aktivieren Sie das Kontrollkästchen "Serverless ".
- Wenn Sie im Ziel einen Unity-Katalogspeicherort konfigurieren möchten, an dem Tabellen veröffentlicht werden, wählen Sie einen vorhandenen Katalog aus , und schreiben Sie einen neuen Namen in Schema , um ein neues Schema in Ihrem Katalog zu erstellen.
- Klicken Sie auf Erstellen.
Die Pipeline-Benutzeroberfläche wird für die neue Pipeline angezeigt.
Schritt 2: Entwickeln einer Pipeline
Wichtig
Notizbücher können nur eine programmiersprache enthalten. Mischen Sie Python- und SQL-Code nicht in Pipelinequellcode-Notizbüchern.
In diesem Schritt verwenden Sie Databricks-Notizbücher zum interaktiven Entwickeln und Überprüfen des Quellcodes für Lakeflow Declarative Pipelines.
Der Code verwendet das automatische Laden für die inkrementelle Datenaufnahme. Der Autoloader erkennt und verarbeitet automatisch neue Dateien, sobald sie im Cloudobjektspeicher empfangen werden. Weitere Informationen finden Sie unter Was ist das automatische Laden?
Ein leeres Quellcodenotizbuch wird automatisch erstellt und für die Pipeline konfiguriert. Das Notizbuch wird in einem neuen Verzeichnis in Ihrem Benutzerverzeichnis erstellt. Der Name des neuen Verzeichnisses und der Datei entspricht dem Namen Ihrer Pipeline. Beispiel: /Users/someone@example.com/my_pipeline/my_pipeline
.
Beim Entwickeln einer Pipeline können Sie entweder Python oder SQL auswählen. Beispiele sind für beide Sprachen enthalten. Überprüfen Sie basierend auf Ihrer Sprachauswahl, ob Sie die Standardsprache des Notizbuchs auswählen. Weitere Informationen zur Notizbuchunterstützung für die Codeentwicklung von Lakeflow Declarative Pipelines finden Sie unter Entwickeln und Debuggen von ETL-Pipelines mit einem Notizbuch in Lakeflow Declarative Pipelines.
Ein Link für den Zugriff auf dieses Notizbuch befindet sich im Feld " Quellcode " im Bereich "Pipelinedetails ". Klicken Sie auf den Link, um das Notizbuch zu öffnen, bevor Sie mit dem nächsten Schritt fortfahren.
Klicken Sie oben rechts auf "Verbinden" , um das Berechnungskonfigurationsmenü zu öffnen.
Zeigen Sie mit der Maus auf den Namen der Pipeline, die Sie in Schritt 1 erstellt haben.
Klicken Sie auf Verbinden.
Wählen Sie neben dem Titel Ihres Notizbuchs oben die Standardsprache des Notizbuchs (Python oder SQL) aus.
Kopieren Sie den folgenden Code, und fügen Sie ihn in eine Zelle im Notizbuch ein.
Python
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.
Schritt 3: Abfragen der transformierten Daten
In diesem Schritt fragen Sie die in der ETL-Pipeline verarbeiteten Daten ab, um die Songdaten zu analysieren. Diese Abfragen verwenden die aufbereiteten Datensätze, die im vorherigen Schritt erstellt wurden.
Führen Sie zunächst eine Abfrage aus, die die Künstler findet, die die meisten Songs jedes Jahr seit 1990 veröffentlicht haben.
Klicken Sie in der Randleiste auf das
SQL-Editor.
Klicken Sie auf das Symbol
, und wählen Sie im Menü " Neue Abfrage erstellen " aus.
Geben Sie Folgendes ein:
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Ersetzen Sie
<catalog>
und<schema>
durch den Namen des Katalogs und des Schemas, in dem sich die Tabelle befindet. Beispiel:data_pipelines.songs_data.top_artists_by_year
.Klicken Sie auf Auswahl ausführen.
Führen Sie nun eine weitere Abfrage aus, die Songs mit einem 4/4 Beat und tanzbarem Tempo findet.
Klicken Sie auf das neue
und wählen Sie im Menü "Neue Abfrage erstellen" aus.
Geben Sie den folgenden Code ein:
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Ersetzen Sie
<catalog>
und<schema>
durch den Namen des Katalogs und des Schemas, in dem sich die Tabelle befindet. Beispiel:data_pipelines.songs_data.songs_prepared
.Klicken Sie auf Auswahl ausführen.
Schritt 4: Erstellen eines Auftrags zum Ausführen der Pipeline
Erstellen Sie als Nächstes einen Workflow zum Automatisieren von Datenaufnahme-, Verarbeitungs- und Analyseschritten mithilfe eines Databricks-Auftrags.
- Klicken Sie in Ihrem Arbeitsbereich auf
Aufträge & Pipelines in der Randleiste.
- Klicken Sie unter "Neu" auf "Auftrag".
- Ersetzen Sie im Feld "Aufgabentitel" Datum und Uhrzeit< des neuen Auftrags > durch Ihren Auftragsnamen. Beispiel:
Songs workflow
. - Geben Sie unter Aufgabenname einen Namen für die erste Aufgabe ein, z. B.
ETL_songs_data
. - Wählen Sie im Typ"Pipeline" aus.
- Wählen Sie in Der Pipeline die Pipeline aus, die Sie in Schritt 1 erstellt haben.
- Klicken Sie auf Erstellen.
- Klicken Sie zum Ausführen des Workflows auf "Jetzt ausführen". Um die Details für die Ausführung anzuzeigen, klicken Sie auf die Registerkarte "Ausführen ". Klicken Sie auf die Aufgabe, um Details für die Aufgabenausführung anzuzeigen.
- Um die Ergebnisse anzuzeigen, wenn der Workflow abgeschlossen ist, klicken Sie auf Zur neuesten erfolgreichen Ausführung wechseln oder auf die Startzeit der Auftragsausführung. Die Seite Ausgabe wird angezeigt und zeigt die Abfrageergebnisse an.
Weitere Informationen zu Auftragsausführungen finden Sie unter Überwachung und Beobachtbarkeit für Lakeflow-Aufträge.
Schritt 5: Planen des Pipelineauftrags
Führen Sie die folgenden Schritte aus, um die ETL-Pipeline in einem Zeitplan auszuführen:
- Navigieren Sie zur Benutzeroberfläche von Aufträgen und Pipelines im gleichen Azure Databricks-Arbeitsbereich wie der Auftrag.
- Wählen Sie optional die Filter "Jobs " und "Owned by me" aus .
- Klicken Sie in der Spalte Name auf den Auftragsnamen. Im Seitenbereich werden die Auftragsdetails angezeigt.
- Klicken Sie im Bereich "Zeitplan und Trigger" auf "Trigger hinzufügen", und wählen Sie "Geplant" im Triggertyp aus.
- Geben Sie den Zeitraum, die Startzeit und die Zeitzone an.
- Klicken Sie auf Speichern.
Erfahren Sie mehr
- Weitere Informationen zur Datenverarbeitungspipeline mit Lakeflow Declarative Pipelines finden Sie unter Lakeflow Declarative Pipelines
- Weitere Informationen zu Databricks-Notizbüchern finden Sie in der Einführung in Databricks-Notizbücher.
- Weitere Informationen zu Lakeflow Jobs finden Sie unter Was sind Jobs?
- Weitere Informationen zum Delta Lake finden Sie unter Was ist Delta Lake in Azure Databricks?