Freigeben über


Lernprogramm: Erstellen einer ETL-Pipeline mit Lakeflow Declarative Pipelines

Erfahren Sie, wie Sie eine ETL-Pipeline (Extrahieren, Transformieren und Laden) für die Daten-Orchestrierung mit Lakeflow Declarative Pipelines und Auto Loader erstellen und bereitstellen. Eine ETL-Pipeline implementiert die Schritte zum Lesen von Daten aus Quellsystemen, zum Transformieren dieser Daten basierend auf Anforderungen, z. B. Datenqualitätsprüfungen und Datensatzdeduplizierung, und zum Schreiben der Daten in ein Zielsystem, z. B. ein Data Warehouse oder einen Data Lake.

In diesem Lernprogramm verwenden Sie Lakeflow Declarative Pipelines und Auto Loader für:

  • Rohdaten in eine Zieltabelle importieren.
  • Transformieren Sie die Rohdaten, und schreiben Sie die transformierten Daten in zwei materialisierte Zielansichten.
  • Abfragen der transformierten Daten.
  • Automatisieren Sie die ETL-Pipeline mit einem Databricks-Auftrag.

Weitere Informationen zu lakeflow Declarative Pipelines und Auto Loader finden Sie unter Lakeflow Declarative Pipelines und Was ist Auto Loader?

Anforderungen

Um dieses Tutorial abzuschließen, müssen Sie die folgenden Anforderungen erfüllen:

Informationen zum Dataset

Das in diesem Beispiel verwendete Dataset ist eine Teilmenge von Million Song Dataset, eine Sammlung von Features und Metadaten für zeitgenössische Musiktitel. Dieses Dataset ist in den Beispieldatasets verfügbar, die in Ihrem Azure Databricks-Arbeitsbereich enthalten sind.

Schritt 1: Erstellen einer Pipeline

Zunächst erstellen Sie eine ETL-Pipeline in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines erstellt Pipelines, indem Abhängigkeiten aufgelöst werden, die in Notizbüchern oder Dateien (als Quellcode bezeichnet) mithilfe der Lakeflow Declarative Pipelines-Syntax definiert sind. Jede Quellcodedatei kann nur eine Sprache enthalten, Sie können jedoch mehrere sprachspezifische Notizbücher oder Dateien in der Pipeline hinzufügen. Weitere Informationen finden Sie unter Lakeflow Declarative Pipelines

Wichtig

Lassen Sie das Feld "Quellcode " leer, um automatisch ein Notizbuch für die Quellcodeerstellung zu erstellen und zu konfigurieren.

In diesem Tutorial werden serverless Computing und Unity Catalog verwendet. Verwenden Sie für alle nicht angegebenen Konfigurationsoptionen die Standardeinstellungen. Wenn die serverlose Berechnung in Ihrem Arbeitsbereich nicht aktiviert oder unterstützt wird, können Sie das Lernprogramm mit den Standardberechnungseinstellungen abschließen. Wenn Sie Standardberechnungseinstellungen verwenden, müssen Sie den Unity-Katalog manuell unter "Speicheroptionen " im Abschnitt "Ziel " der Benutzeroberfläche "Pipeline erstellen " auswählen.

Führen Sie die folgenden Schritte aus, um eine neue ETL-Pipeline in Lakeflow Declarative Pipelines zu erstellen:

  1. Klicken Sie in Ihrem Arbeitsbereich auf das Symbol Aufträge & Pipelines in der Randleiste.
  2. Klicken Sie unter "Neu" auf "ETL-Pipeline".
  3. Geben Sie unter "Pipelinename" einen eindeutigen Pipelinenamen ein.
  4. Aktivieren Sie das Kontrollkästchen "Serverless ".
  5. Wenn Sie im Ziel einen Unity-Katalogspeicherort konfigurieren möchten, an dem Tabellen veröffentlicht werden, wählen Sie einen vorhandenen Katalog aus , und schreiben Sie einen neuen Namen in Schema , um ein neues Schema in Ihrem Katalog zu erstellen.
  6. Klicken Sie auf Erstellen.

Die Pipeline-Benutzeroberfläche wird für die neue Pipeline angezeigt.

Schritt 2: Entwickeln einer Pipeline

Wichtig

Notizbücher können nur eine programmiersprache enthalten. Mischen Sie Python- und SQL-Code nicht in Pipelinequellcode-Notizbüchern.

In diesem Schritt verwenden Sie Databricks-Notizbücher zum interaktiven Entwickeln und Überprüfen des Quellcodes für Lakeflow Declarative Pipelines.

Der Code verwendet das automatische Laden für die inkrementelle Datenaufnahme. Der Autoloader erkennt und verarbeitet automatisch neue Dateien, sobald sie im Cloudobjektspeicher empfangen werden. Weitere Informationen finden Sie unter Was ist das automatische Laden?

Ein leeres Quellcodenotizbuch wird automatisch erstellt und für die Pipeline konfiguriert. Das Notizbuch wird in einem neuen Verzeichnis in Ihrem Benutzerverzeichnis erstellt. Der Name des neuen Verzeichnisses und der Datei entspricht dem Namen Ihrer Pipeline. Beispiel: /Users/someone@example.com/my_pipeline/my_pipeline.

Beim Entwickeln einer Pipeline können Sie entweder Python oder SQL auswählen. Beispiele sind für beide Sprachen enthalten. Überprüfen Sie basierend auf Ihrer Sprachauswahl, ob Sie die Standardsprache des Notizbuchs auswählen. Weitere Informationen zur Notizbuchunterstützung für die Codeentwicklung von Lakeflow Declarative Pipelines finden Sie unter Entwickeln und Debuggen von ETL-Pipelines mit einem Notizbuch in Lakeflow Declarative Pipelines.

  1. Ein Link für den Zugriff auf dieses Notizbuch befindet sich im Feld " Quellcode " im Bereich "Pipelinedetails ". Klicken Sie auf den Link, um das Notizbuch zu öffnen, bevor Sie mit dem nächsten Schritt fortfahren.

  2. Klicken Sie oben rechts auf "Verbinden" , um das Berechnungskonfigurationsmenü zu öffnen.

  3. Zeigen Sie mit der Maus auf den Namen der Pipeline, die Sie in Schritt 1 erstellt haben.

  4. Klicken Sie auf Verbinden.

  5. Wählen Sie neben dem Titel Ihres Notizbuchs oben die Standardsprache des Notizbuchs (Python oder SQL) aus.

  6. Kopieren Sie den folgenden Code, und fügen Sie ihn in eine Zelle im Notizbuch ein.

    Python

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.

Schritt 3: Abfragen der transformierten Daten

In diesem Schritt fragen Sie die in der ETL-Pipeline verarbeiteten Daten ab, um die Songdaten zu analysieren. Diese Abfragen verwenden die aufbereiteten Datensätze, die im vorherigen Schritt erstellt wurden.

Führen Sie zunächst eine Abfrage aus, die die Künstler findet, die die meisten Songs jedes Jahr seit 1990 veröffentlicht haben.

  1. Klicken Sie in der Randleiste auf das SQL-Editor-IconSQL-Editor.

  2. Klicken Sie auf das Symbol , und wählen Sie im Menü " Neue Abfrage erstellen " aus.

  3. Geben Sie Folgendes ein:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Ersetzen Sie <catalog> und <schema> durch den Namen des Katalogs und des Schemas, in dem sich die Tabelle befindet. Beispiel: data_pipelines.songs_data.top_artists_by_year.

  4. Klicken Sie auf Auswahl ausführen.

Führen Sie nun eine weitere Abfrage aus, die Songs mit einem 4/4 Beat und tanzbarem Tempo findet.

  1. Klicken Sie auf das neue Symbol und wählen Sie im Menü "Neue Abfrage erstellen" aus.

  2. Geben Sie den folgenden Code ein:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Ersetzen Sie <catalog> und <schema> durch den Namen des Katalogs und des Schemas, in dem sich die Tabelle befindet. Beispiel: data_pipelines.songs_data.songs_prepared.

  3. Klicken Sie auf Auswahl ausführen.

Schritt 4: Erstellen eines Auftrags zum Ausführen der Pipeline

Erstellen Sie als Nächstes einen Workflow zum Automatisieren von Datenaufnahme-, Verarbeitungs- und Analyseschritten mithilfe eines Databricks-Auftrags.

  1. Klicken Sie in Ihrem Arbeitsbereich auf das Symbol Aufträge & Pipelines in der Randleiste.
  2. Klicken Sie unter "Neu" auf "Auftrag".
  3. Ersetzen Sie im Feld "Aufgabentitel" Datum und Uhrzeit< des neuen Auftrags > durch Ihren Auftragsnamen. Beispiel: Songs workflow.
  4. Geben Sie unter Aufgabenname einen Namen für die erste Aufgabe ein, z. B. ETL_songs_data.
  5. Wählen Sie im Typ"Pipeline" aus.
  6. Wählen Sie in Der Pipeline die Pipeline aus, die Sie in Schritt 1 erstellt haben.
  7. Klicken Sie auf Erstellen.
  8. Klicken Sie zum Ausführen des Workflows auf "Jetzt ausführen". Um die Details für die Ausführung anzuzeigen, klicken Sie auf die Registerkarte "Ausführen ". Klicken Sie auf die Aufgabe, um Details für die Aufgabenausführung anzuzeigen.
  9. Um die Ergebnisse anzuzeigen, wenn der Workflow abgeschlossen ist, klicken Sie auf Zur neuesten erfolgreichen Ausführung wechseln oder auf die Startzeit der Auftragsausführung. Die Seite Ausgabe wird angezeigt und zeigt die Abfrageergebnisse an.

Weitere Informationen zu Auftragsausführungen finden Sie unter Überwachung und Beobachtbarkeit für Lakeflow-Aufträge.

Schritt 5: Planen des Pipelineauftrags

Führen Sie die folgenden Schritte aus, um die ETL-Pipeline in einem Zeitplan auszuführen:

  1. Navigieren Sie zur Benutzeroberfläche von Aufträgen und Pipelines im gleichen Azure Databricks-Arbeitsbereich wie der Auftrag.
  2. Wählen Sie optional die Filter "Jobs " und "Owned by me" aus .
  3. Klicken Sie in der Spalte Name auf den Auftragsnamen. Im Seitenbereich werden die Auftragsdetails angezeigt.
  4. Klicken Sie im Bereich "Zeitplan und Trigger" auf "Trigger hinzufügen", und wählen Sie "Geplant" im Triggertyp aus.
  5. Geben Sie den Zeitraum, die Startzeit und die Zeitzone an.
  6. Klicken Sie auf Speichern.

Erfahren Sie mehr