Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Erfahren Sie, wie Sie eine ETL-Pipeline (Extrahieren, Transformieren und Laden) mit Änderungsdatenerfassung (CDC) mithilfe von Lakeflow Declarative Pipelines für die Daten-Orchestrierung und das automatische Laden erstellen und bereitstellen. Eine ETL-Pipeline implementiert die Schritte zum Lesen von Daten aus Quellsystemen, zum Transformieren dieser Daten basierend auf Anforderungen, z. B. Datenqualitätsprüfungen und Datensatzdeduplizierung, und zum Schreiben der Daten in ein Zielsystem, z. B. ein Data Warehouse oder einen Data Lake.
In diesem Lernprogramm verwenden Sie Daten aus einer customers
Tabelle in einer MySQL-Datenbank für:
- Extrahieren Sie die Änderungen aus einer Transaktionsdatenbank mithilfe von Debezium oder einem beliebigen anderen Tool, und speichern Sie sie in einem Cloudobjektspeicher (S3-Ordner, ADLS, GCS). Sie überspringen die Einrichtung eines externen CDC-Systems, um das Lernprogramm zu vereinfachen.
- Verwenden Sie das automatische Laden, um die Nachrichten inkrementell aus dem Cloudobjektspeicher zu laden und die unformatierten Nachrichten in der
customers_cdc
Tabelle zu speichern. Auto Loader wird das Schema ableiten und die Schemaentwicklung behandeln. - Fügen Sie eine Ansicht
customers_cdc_clean
hinzu, um die Datenqualität anhand der Erwartungen zu überprüfen. Beispielsweise sollteid
niemalsnull
sein, da Sie es zur Durchführung Ihrer Upsert-Vorgänge verwenden. - Führen Sie den Vorgang
AUTO CDC ... INTO
Upserts an den bereinigten CDC-Daten durch, um die Änderungen auf die endgültigecustomers
Tabelle anzuwenden. - Zeigen Sie, wie Lakeflow Declarative Pipelines eine Typ-2-Dimension mit langsamer Änderung (SCD2) erstellen können, um alle Änderungen nachzuverfolgen.
Ziel ist es, die Rohdaten in nahezu Echtzeit aufzunehmen und eine Tabelle für Ihr Analystenteam zu erstellen und gleichzeitig die Datenqualität sicherzustellen.
Das Lernprogramm verwendet die Medallion Lakehouse-Architektur, in der sie Rohdaten über die Bronzeschicht erfasst, Daten mit der Silberschicht bereinigt und überprüft und die dimensionale Modellierung und Aggregation mithilfe der Goldschicht anwendet. Weitere Informationen finden Sie unter "Was ist die Medallion Lakehouse-Architektur?
Der von Ihnen implementierte Fluss sieht wie folgt aus:
Weitere Informationen zu Lakeflow Declarative Pipelines, Auto Loader und CDC finden Sie unter Lakeflow Declarative Pipelines, Was ist Auto Loader? und Was ist Change Data Capture (CDC)?
Anforderungen
Um dieses Tutorial abzuschließen, müssen Sie die folgenden Anforderungen erfüllen:
- Melden Sie sich bei einem Azure Databricks-Arbeitsbereich an.
- Haben Sie Unity-Katalog für Ihren Arbeitsbereich aktiviert.
- Haben Sie serverlose Computefunktion für Ihr Konto aktiviert. Deklarative Pipelines für Serverless Lakeflow sind nicht in allen Arbeitsbereichsregionen verfügbar. Siehe Features mit eingeschränkter regionaler Verfügbarkeit für verfügbare Regionen.
- Verfügen Sie über die Berechtigung zum Erstellen einer Computeressource oder des Zugriffs auf eine Computeressource.
- Verfügen Sie über Berechtigungen zum Erstellen eines neuen Schemas in einem Katalog. Die erforderlichen Berechtigungen sind
ALL PRIVILEGES
oderUSE CATALOG
und .CREATE SCHEMA
- Verfügen Sie über Berechtigungen zum Erstellen eines neuen Volumes in einem vorhandenen Schema. Die erforderlichen Berechtigungen sind
ALL PRIVILEGES
oderUSE SCHEMA
und .CREATE VOLUME
Ändern der Datenerfassung in einer ETL-Pipeline
Change Data Capture (CDC) ist der Prozess, der die Änderungen in Datensätzen erfasst, die an einer Transaktionsdatenbank (z. B. MySQL oder PostgreSQL) oder einem Data Warehouse vorgenommen wurden. CDC erfasst Vorgänge wie Das Löschen, Anfügen und Aktualisieren von Daten, in der Regel als Datenstrom, um die Tabelle in externen Systemen neu zu materialisieren. CDC ermöglicht das inkrementelle Laden, ohne dass eine Massenladeaktualisierung erforderlich ist.
Hinweis
Um das Lernprogramm zu vereinfachen, überspringen Sie die Einrichtung eines externen CDC-Systems. Sie können es als einsatzbereit betrachten und die CDC-Daten als JSON-Dateien in einem BLOB-Speicher (S3, ADLS, GCS) speichern.
Erfassung von CDC
Es stehen eine Vielzahl von CDC-Tools zur Verfügung. Eine der führenden Open-Source-Lösungen ist Debezium, aber es gibt andere Implementierungen, die die Datenverarbeitung vereinfachen, wie Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate und AWS DMS.
In diesem Lernprogramm verwenden Sie CDC-Daten aus einem externen System wie Debezium oder DMS. Debezium erfasst jede geänderte Zeile. In der Regel wird der Verlauf von Datenänderungen an Kafka-Protokolle gesendet oder als Datei gespeichert.
Sie müssen die CDC-Informationen aus der customers
Tabelle (JSON-Format) erfassen, überprüfen, ob sie korrekt ist, und dann die Kundentabelle im Lakehouse materialisieren.
CDC-Eingabe von Debezium
Für jede Änderung erhalten Sie eine JSON-Nachricht mit allen Feldern der Zeile, die aktualisiert wird (id
, firstname
, , lastname
, email
). address
Darüber hinaus verfügen Sie über zusätzliche Metadateninformationen, einschließlich:
-
operation
: Ein Vorgangscode, in der Regel (DELETE
,APPEND
,UPDATE
). -
operation_date
: Datum und Zeitstempel für den Datensatz für jede Vorgangsaktion.
Tools wie Debezium können erweiterte Ausgaben erzeugen, z. B. den Zeilenwert vor der Änderung, aber in diesem Lernprogramm werden sie aus Gründen der Einfachheit weggelassen.
Schritt 0: Einrichten von Lernprogrammdaten
Zuerst müssen Sie ein neues Notizbuch erstellen und die Demodateien, die in diesem Lernprogramm in Ihrem Arbeitsbereich verwendet werden, installieren.
Klicken Sie in der oberen linken Ecke auf "Neu ".
Klicken Sie auf " Notizbuch".
Ändern Sie den Titel des Notizbuchs von Unbenanntes Notizbuch <Datum und Uhrzeit> zu Pipelines-Tutorial-Setup.
Legen Sie neben dem Titel Ihres Notizbuchs oben die Standardsprache des Notizbuchs auf Python fest.
Um das im Lernprogramm verwendete Dataset zu generieren, geben Sie den folgenden Code in die erste Zelle ein und drücken Sie UMSCHALTTASTE + + Eingabe, um den Code auszuführen.
# You can change the catalog, schema, dbName, and db. If you do so, you must also # change the names in the rest of the tutorial. catalog = "main" schema = dbName = db = "dbdemos_dlt_cdc" volume_name = "raw_data" spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`') spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`') volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exists, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
Um eine Vorschau der in diesem Lernprogramm verwendeten Daten anzuzeigen, geben Sie den Code in die nächste Zelle ein, und drücken Sie die UMSCHALTTASTE + und die Eingabetaste, um den Code auszuführen.
display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
Schritt 1: Erstellen einer Pipeline
Zunächst erstellen Sie eine ETL-Pipeline in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines erstellt Pipelines, indem Abhängigkeiten aufgelöst werden, die in Notizbüchern oder Dateien (als Quellcode bezeichnet) mithilfe der Lakeflow Declarative Pipelines-Syntax definiert sind. Jede Quellcodedatei kann nur eine Sprache enthalten, Sie können jedoch mehrere sprachspezifische Notizbücher oder Dateien in der Pipeline hinzufügen. Weitere Informationen finden Sie unter Lakeflow Declarative Pipelines
Von Bedeutung
Lassen Sie das Feld "Quellcode " leer, um automatisch ein Notizbuch für die Quellcodeerstellung zu erstellen und zu konfigurieren.
In diesem Tutorial werden serverless Computing und Unity Catalog verwendet. Verwenden Sie für alle nicht angegebenen Konfigurationsoptionen die Standardeinstellungen. Wenn die serverlose Berechnung in Ihrem Arbeitsbereich nicht aktiviert oder unterstützt wird, können Sie das Lernprogramm mit den Standardberechnungseinstellungen abschließen. Wenn Sie Standardberechnungseinstellungen verwenden, müssen Sie den Unity-Katalog manuell unter "Speicheroptionen " im Abschnitt "Ziel " der Benutzeroberfläche "Pipeline erstellen " auswählen.
Führen Sie die folgenden Schritte aus, um eine neue ETL-Pipeline in Lakeflow Declarative Pipelines zu erstellen:
- Klicken Sie in Ihrem Arbeitsbereich auf
Aufträge & Pipelines in der Randleiste.
- Klicken Sie unter "Neu" auf "ETL-Pipeline".
- Geben Sie unter "Pipelinename" einen eindeutigen Pipelinenamen ein.
- Aktivieren Sie das Kontrollkästchen "Serverless ".
- Wählen Sie "Ausgelöst " im Pipelinemodus aus. Dadurch werden die Streamprozesse mithilfe des AvailableNow-Triggers ausgeführt, der alle vorhandenen Daten verarbeitet und dann den Datenstrom beendet.
- Wenn Sie im Ziel einen Unity-Katalogspeicherort konfigurieren möchten, an dem Tabellen veröffentlicht werden, wählen Sie einen vorhandenen Katalog aus , und schreiben Sie einen neuen Namen in Schema , um ein neues Schema in Ihrem Katalog zu erstellen.
- Klicken Sie auf "Erstellen".
Die Pipeline-Benutzeroberfläche wird für die neue Pipeline angezeigt.
Ein leeres Quellcodenotizbuch wird automatisch erstellt und für die Pipeline konfiguriert. Das Notizbuch wird in einem neuen Verzeichnis in Ihrem Benutzerverzeichnis erstellt. Der Name des neuen Verzeichnisses und der Datei entspricht dem Namen Ihrer Pipeline. Beispiel: /Users/someone@example.com/my_pipeline/my_pipeline
.
- Ein Link für den Zugriff auf dieses Notizbuch befindet sich im Feld " Quellcode " im Bereich "Pipelinedetails ". Klicken Sie auf den Link, um das Notizbuch zu öffnen, bevor Sie mit dem nächsten Schritt fortfahren.
- Klicken Sie oben rechts auf "Verbinden" , um das Berechnungskonfigurationsmenü zu öffnen.
- Zeigen Sie mit der Maus auf den Namen der Pipeline, die Sie in Schritt 1 erstellt haben.
- Klicken Sie auf "Verbinden".
- Wählen Sie neben dem Titel Ihres Notizbuchs oben die Standardsprache des Notizbuchs (Python oder SQL) aus.
Von Bedeutung
Notizbücher können nur eine programmiersprache enthalten. Mischen Sie Python- und SQL-Code nicht in Pipelinequellcode-Notizbüchern.
Bei der Entwicklung von deklarativen Lakeflow-Pipelines können Sie entweder Python oder SQL auswählen. Dieses Lernprogramm enthält Beispiele für beide Sprachen. Überprüfen Sie basierend auf Ihrer Sprachauswahl, ob Sie die Standardsprache des Notizbuchs auswählen.
Weitere Informationen zur Notizbuchunterstützung für die Codeentwicklung von Lakeflow Declarative Pipelines finden Sie unter Entwickeln und Debuggen von ETL-Pipelines mit einem Notizbuch in Lakeflow Declarative Pipelines.
Schritt 2: Schrittweise Erfassung von Daten mit Auto Loader
Der erste Schritt besteht darin, die Rohdaten aus dem Cloudspeicher in eine Bronzeschicht aufzunehmen.
Dies kann aus mehreren Gründen herausfordernd sein, da Sie müssen:
- Arbeiten Sie im großen Maßstab, und nehmen Sie möglicherweise Millionen von kleinen Dateien auf.
- Ableiten des Schemas und des JSON-Typs.
- Behandeln Sie ungültige Datensätze mit falschem JSON-Schema.
- Kümmern Sie sich um die Schemaentwicklung (z. B. eine neue Spalte in der Kundentabelle).
Auto Loader vereinfacht diese Datenaufnahme, einschließlich Schemaschlussfolgerung und Schemaentwicklung, und skaliert dabei auf Millionen eingehender Dateien. Auto Loader ist in Python mit cloudFiles
und in SQL SELECT * FROM STREAM read_files(...)
verfügbar und kann mit einer Vielzahl von Formaten (JSON, CSV, Apache Avro usw.) verwendet werden:
Wenn Sie die Tabelle als Streamingtabelle definieren, wird sichergestellt, dass Nur neue eingehende Daten verwendet werden. Wenn Sie sie nicht als Streamingtabelle definieren, scannen und aufnehmen Sie alle verfügbaren Daten. Weitere Informationen finden Sie unter Streamingtabellen .
Wenn Sie die eingehenden Daten mithilfe des automatischen Ladens aufnehmen möchten, kopieren Sie den folgenden Code, und fügen Sie ihn in die erste Zelle im Notizbuch ein. Sie können Python oder SQL verwenden, abhängig von der Standardsprache des Notizbuchs, die Sie im vorherigen Schritt ausgewählt haben.
Python
from dlt import * from pyspark.sql.functions import * # Create the target bronze table dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers", format => "json", inferColumnTypes => "true" )
Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.
Schritt 3: Bereinigen und Erwartungen zur Nachverfolgung der Datenqualität
Nachdem die Bronzeschicht definiert wurde, erstellen Sie die Silberschichten, indem Sie die Erwartungen hinzufügen, um die Datenqualität zu kontrollieren, indem Sie die folgenden Bedingungen überprüfen:
- Die ID darf niemals sein
null
. - Der CDC-Vorgangstyp muss gültig sein.
- Das
json
muss vom Auto-Loader ausreichend gelesen worden sein.
Die Zeile wird gelöscht, wenn eine dieser Bedingungen nicht beachtet wird.
Weitere Informationen finden Sie unter Management der Datenqualität mit Pipeline-Erwartungen.
Klicken Sie unten auf "Bearbeiten " und "Zelle einfügen", um eine neue leere Zelle einzufügen .
Um eine silberne Ebene mit einer gereinigten Tabelle zu erstellen und Einschränkungen zu erzwingen, kopieren Sie den folgenden Code, und fügen Sie ihn in die neue Zelle im Notizbuch ein.
Python
dlt.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( dlt.read_stream("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;
Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.
Schritt 4: Materialisieren der Kundentabelle mit einem automatischen CDC-Ablauf
Die customers
Tabelle enthält die up-to-Datumsansicht und ist ein Replikat der ursprünglichen Tabelle.
Dies ist nicht trivial, manuell zu implementieren. Sie müssen Dinge wie die Datendeduplizierung berücksichtigen, um die aktuellste Zeile zu behalten.
Lakeflow Declarative Pipelines lösen jedoch diese Herausforderungen mit der AUTO CDC
Operation.
Klicken Sie unten auf "Bearbeiten " und "Zelle einfügen", um eine neue leere Zelle einzufügen .
Um die CDC-Daten mithilfe von
AUTO CDC
Deklarativen Pipelines in Lakeflow zu verarbeiten, kopieren Sie den folgenden Code, und fügen Sie ihn in die neue Zelle im Notizbuch ein.Python
dlt.create_streaming_table(name="customers", comment="Clean, materialized customers") dlt.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )
SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;
Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.
Schritt 5: Langsam ändernde Dimension des Typs 2 (SCD2)
Es ist häufig erforderlich, eine Tabelle zu erstellen, die alle Änderungen nachverfolgt, die sich aus APPEND
, , UPDATE
und DELETE
:
- Verlauf: Sie möchten einen Verlauf aller Änderungen an Ihrer Tabelle beibehalten.
- Rückverfolgbarkeit: Sie möchten sehen, welcher Vorgang aufgetreten ist.
SCD2 mit deklarativen Lakeflow-Pipelines
Delta unterstützt Änderungsdatenfluss (Change Data Flow, CDF) und table_change
kann die Tabellenänderung in SQL und Python abfragen. Der Hauptanwendungsfall des CDF besteht jedoch darin, Änderungen in einer Pipeline zu erfassen und keine vollständige Ansicht der Tabellenänderungen von Anfang an zu erstellen.
Die Implementierung wird insbesondere komplex, wenn Sie über außerhalb der Reihenfolge liegende Ereignisse verfügen. Wenn Sie Ihre Änderungen nach einem Zeitstempel sequenzieren und eine Änderung erhalten müssen, die in der Vergangenheit aufgetreten ist, müssen Sie einen neuen Eintrag in der SCD-Tabelle anfügen und die vorherigen Einträge aktualisieren.
Lakeflow Declarative Pipelines entfernt diese Komplexität und ermöglicht es Ihnen, eine separate Tabelle zu erstellen, die alle Änderungen am Anfang der Zeit enthält. Diese Tabelle kann dann in großem Maßstab mit bestimmten Partitionen/Z-Order-Spalten verwendet werden, falls erforderlich. Felder außerhalb der Reihenfolge werden automatisch basierend auf _sequence_by behandelt.
Zum Erstellen einer SCD2-Tabelle müssen wir die Option verwenden: STORED AS SCD TYPE 2
in SQL oder stored_as_scd_type="2"
in Python.
Hinweis
Sie können auch einschränken, welche Spalten das Feature verfolgt, indem Sie die Option verwenden: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
Klicken Sie unten auf "Bearbeiten " und "Zelle einfügen", um eine neue leere Zelle einzufügen .
Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue Zelle im Notizbuch ein.
Python
# create the table dlt.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dlt.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updates
SQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW cusotmers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;
Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.
Schritt 6: Erstellen einer materialisierten Ansicht, die verfolgt, wer ihre Informationen am meisten geändert hat
Die Tabelle customers_history
enthält alle historischen Änderungen, die ein Benutzer an seinen Informationen vorgenommen hat. Sie erstellen nun eine einfache materialisierte Ansicht in der Goldschicht, die nachverfolgt, wer ihre Informationen am meisten geändert hat. Dies könnte für die Betrugserkennungsanalyse oder Benutzerempfehlungen in einem realen Szenario verwendet werden. Darüber hinaus hat das Anwenden von Änderungen mit SCD2 bereits Duplikate für uns entfernt, sodass wir die Zeilen pro Benutzer-ID direkt zählen können.
Klicken Sie unten auf "Bearbeiten " und "Zelle einfügen", um eine neue leere Zelle einzufügen .
Kopieren Sie den folgenden Code, und fügen Sie ihn in die neue Zelle im Notizbuch ein.
Python
@dlt.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( dlt.read("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )
SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY id
Klicken Sie auf "Start ", um ein Update für die verbundene Pipeline zu starten.
Schritt 7: Erstellen eines Auftrags zum Ausführen der ETL-Pipeline
Erstellen Sie als Nächstes einen Workflow zum Automatisieren von Datenaufnahme-, Verarbeitungs- und Analyseschritten mithilfe eines Databricks-Auftrags.
- Klicken Sie in Ihrem Arbeitsbereich auf
Aufträge & Pipelines in der Randleiste.
- Klicken Sie unter "Neu" auf "Auftrag".
- Ersetzen Sie im Feld "Aufgabentitel" Datum und Uhrzeit< des neuen Auftrags > durch Ihren Auftragsnamen. Beispiel:
CDC customers workflow
. - Geben Sie unter Aufgabenname einen Namen für die erste Aufgabe ein, z. B.
ETL_customers_data
. - Wählen Sie im Typ"Pipeline" aus.
- Wählen Sie in Der Pipeline die Pipeline aus, die Sie in Schritt 1 erstellt haben.
- Klicken Sie auf "Erstellen".
- Klicken Sie zum Ausführen des Workflows auf "Jetzt ausführen". Um die Details für die Ausführung anzuzeigen, klicken Sie auf die Registerkarte "Ausführen ". Klicken Sie auf die Aufgabe, um Details für die Aufgabenausführung anzuzeigen.
- Um die Ergebnisse anzuzeigen, wenn der Workflow abgeschlossen ist, klicken Sie auf Zur neuesten erfolgreichen Ausführung wechseln oder auf die Startzeit der Auftragsausführung. Die Seite Ausgabe wird angezeigt und zeigt die Abfrageergebnisse an.
Weitere Informationen zu Auftragsausführungen finden Sie unter Überwachung und Beobachtbarkeit für Lakeflow-Aufträge.
Schritt 8: Planen des Auftrags
Führen Sie die folgenden Schritte aus, um die ETL-Pipeline in einem Zeitplan auszuführen:
- Klicken Sie in der Randleiste auf
Jobs & Pipelines.
- Wählen Sie optional die Filter "Jobs " und "Owned by me" aus .
- Klicken Sie in der Spalte Name auf den Auftragsnamen. Der seitliche Bereich wird als Auftragsdetails angezeigt.
- Klicken Sie im Bereich "Zeitplan und Trigger" auf "Trigger hinzufügen", und wählen Sie "Geplant" im Triggertyp aus.
- Geben Sie den Zeitraum, die Startzeit und die Zeitzone an.
- Klicken Sie auf Speichern.