Freigeben über


Transformieren von Daten mit Delta Live Tables

In diesem Artikel wird beschrieben, wie Sie mit Delta Live Tables Transformationen für Datasets deklarieren und angeben können, wie Datensätze über Abfragelogik verarbeitet werden. Er enthält auch einige Beispiele für gängige Transformationsmuster, die beim Erstellen von Delta Live Tables-Pipelines nützlich sein können.

Sie können ein Dataset für jede Abfrage definieren, die einen DataFrame zurückgibt. Sie können integrierte Apache Spark-Vorgänge, UDFs, benutzerdefinierte Logik und MLflow-Modelle als Transformationen in Ihrer Delta Live Tables-Pipeline verwenden. Nachdem Daten in Ihrer Delta Live Tables-Pipeline erfasst wurden, können Sie neue Datasets für Upstream-Quellen definieren, um neue Streamingtabellen, materialisierte Sichten und Ansichten zu erstellen.

Informationen zum effektiven Ausführen zustandsbehafteter Verarbeitung mit Delta Live Tables finden Sie unter Optimieren der zustandsbehafteten Verarbeitung in Delta Live Tables mit Wasserzeichen.

Verwendung von Ansichten, materialisierten Sichten und Streamingtabellen

Um sicherzustellen, dass Ihre Pipelines effizient und verwaltbar sind, wählen Sie beim Implementieren Ihrer Pipelineabfragen den optimalen Datasettyp aus.

Ziehen Sie in folgenden Fällen die Verwendung einer Sicht in Betracht:

  • Sie verfügen über eine große oder komplexe Abfrage, die Sie in einfacher zu verwaltende Abfragen unterteilen möchten.
  • Sie möchten Zwischenergebnisse anhand von Erwartungen überprüfen.
  • Sie möchten die Speicher- und Computekosten reduzieren und benötigen keine Materialisierung von Abfrageergebnissen. Da Tabellen materialisiert werden, erfordern sie zusätzliche Berechnungs- und Speicherressourcen.

Erwägen Sie die Verwendung einer materialisierten Sicht in folgenden Fällen:

  • Die Tabelle wird von mehreren Downstreamabfragen genutzt. Da Sichten bei Bedarf berechnet werden, wird die Sicht bei jeder Abfrage neu berechnet.
  • Andere Pipelines, Aufträge oder Abfragen nutzen die Tabelle. Da Sichten nicht materialisiert werden, können Sie sie nur in derselben Pipeline verwenden.
  • Sie möchten die Ergebnisse einer Abfrage während der Entwicklung anzeigen. Da Tabellen materialisiert werden und außerhalb der Pipeline angezeigt und abgefragt werden können, bietet die Verwendung von Tabellen während der Entwicklung die Möglichkeit, die Richtigkeit von Berechnungen zu überprüfen. Konvertieren Sie nach der Überprüfung diejenigen Abfragen, die keine Materialisierung erfordern, in Sichten.

Ziehen Sie in folgenden Fällen die Verwendung einer Streamingtabelle in Betracht:

  • Eine Abfrage wird für eine Datenquelle definiert, die kontinuierlich oder inkrementell wächst.
  • Abfrageergebnisse sollten inkrementell berechnet werden.
  • Für die Pipeline ist ein hoher Durchsatz und eine geringe Latenz gewünscht.

Hinweis

Streamingtabellen werden immer für Streamingquellen definiert. Sie können auch Streamingquellen mit APPLY CHANGES INTO verwenden, um Updates aus CDC-Feeds anzuwenden. Weitere Informationen finden Sie unter Vereinfachtes CDC (Change Data Capture) mit der APPLY CHANGES-API in Delta Live Tables.

Kombinieren von Streamingtabellen und materialisierten Sichten in einer einzigen Pipeline

Streamingtabellen erben die Verarbeitungsgarantien für strukturiertes Streamen von Apache Spark und sind für die Verarbeitung von Abfragen aus anfügegeschützten Datenquellen konfiguriert, wobei immer neue Zeilen in die Quelltabelle eingefügt und nicht geändert werden.

Hinweis

Obwohl Streamingtabellen standardmäßig reine Anfüge-Datenquellen erfordern, können Sie dieses Verhalten mit dem Flag skipChangeCommits überschreiben, wenn eine Streamingquelle eine andere Streamingtabelle ist, die Updates oder Löschvorgänge erfordert.

Ein allgemeines Streamingmuster enthält die Aufnahme von Quelldaten, um die anfänglichen Datasets in einer Pipeline zu erstellen. Diese anfänglichen Datasets werden häufig als Bronze-Tabellen bezeichnet und führen häufig einfache Transformationen aus.

Im Gegensatz dazu erfordern die endgültigen Tabellen in einer Pipeline, die normalerweise als Gold-Tabellen bezeichnet werden, häufig komplizierte Aggregationen oder Lesevorgänge von Quellen, die die Ziele eines APPLY CHANGES INTO-Vorgangs sind. Da diese Vorgänge inhärent Updates erstellen, anstatt sie anzufügen, werden sie nicht als Eingaben in Streaming-Tabellen unterstützt. Diese Transformationen eignen sich besser für materialisierte Sichten.

Indem Sie die Streaming-Tababellen und materialisierte Sichten in einer einzelnen Pipeline kombinieren, können Sie Ihre Pipeline vereinfachen und eine kostspielige erneute Erfassung oder erneute Verarbeitung von Rohdaten vermeiden und die volle Leistung von SQL nutzen, um komplexe Aggregationen über ein effizient codiertes und gefiltertes Dataset zu berechnen. Im folgenden Beispiel wird diese Art der gemischten Verarbeitung veranschaulicht:

Hinweis

In diesen Beispielen wird Autoloader verwendet, um Dateien aus dem Cloudspeicher zu laden. Um Dateien mit Autoloader in einer Pipeline mit Unity Catalog-Aktivierung zu laden, müssen Sie externe Speicherorte verwenden. Weitere Informationen zur Verwendung von Unity Catalog mit Delta Live Tables finden Sie unter Verwenden von Unity Catalog mit Ihren Delta Live Tables-Pipelines.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Erfahren Sie mehr über die Verwendung von Auto Loader zum effizienten Lesen von JSON-Dateien aus Azure Storage für die inkrementelle Verarbeitung.

Streamstatik-Verknüpfungen

Streamstatik-Verknüpfungen sind eine gute Wahl beim Denormalisieren eines kontinuierlichen Datenstroms von Daten, die nur zum Anfügen vorgesehen sind, mit einer hauptsächlich statischen Dimensionstabelle.

Mit jedem Pipelineupdate werden neue Datensätze aus dem Stream mit dem aktuellsten Momentaufnahme der statischen Tabelle verknüpft. Wenn Datensätze in der statischen Tabelle hinzugefügt oder aktualisiert werden, nachdem entsprechende Daten aus der Streamingtabelle verarbeitet wurden, werden die resultierenden Datensätze erst dann neu berechnet, wenn eine vollständige Aktualisierung durchgeführt wird.

In Pipelines, die für die ausgelöste Ausführung konfiguriert sind, gibt die statische Tabelle ergebnisse zurück, sobald das Update gestartet wurde. In Pipelines, die für die kontinuierliche Ausführung konfiguriert sind, wird bei jeder Aktualisierung der Tabelle die neueste Version der statischen Tabelle abgefragt.

Im Folgenden sehen Sie ein Beispiel für eine Streamstatik-Verknüpfung:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Effizientes Berechnen von Aggregaten

Sie können Streaming-Livetabellen ebenfalls für das inkrementelle Berechnen von einfachen verteilbaren Aggregaten wie count, min, max oder sum sowie algebraische Aggregate wie Durchschnitts- oder Standardabweichung verwenden. Databricks empfiehlt die inkrementelle Aggregation für Abfragen mit einer begrenzten Anzahl von Gruppen, z. B. eine Abfrage mit einer GROUP BY country-Klausel. Bei jedem Update werden nur neue Eingabedaten gelesen.

Weitere Informationen zum Schreiben von Delta Live Tables-Abfragen, die inkrementelle Aggregationen ausführen, finden Sie unter Durchführen von Aggregationen im Fenstermodus mit Grenzwerten.

Verwenden von MLFlow-Modellen in einer Delta Live Tables-Pipeline

Hinweis

Zur Verwendung von MLflow-Modellen in einer Unity Catalog-fähigen Pipeline muss Ihre Pipeline für die Verwendung des preview-Kanals konfiguriert sein. Um den current-Kanal zu verwenden, müssen Sie Ihre Pipeline so konfigurieren, dass sie im Hive-Metastore veröffentlicht wird.

Sie können MLflow-trainierte Modelle in Delta Live Tables-Pipelines verwenden. MLflow-Modelle werden als Transformationen in Azure Databricks behandelt, was bedeutet, dass sie auf eine Spark-DataFrame-Eingabe reagieren und Ergebnisse als Spark DataFrame zurückgeben. Da Delta Live Tables Datasets für DataFrames definiert, können Sie Apache Spark-Workloads, die MLflow nutzen, mit nur wenigen Codezeilen in Delta Live Tables konvertieren. Weitere Informationen zu MLflow finden Sie unter ML-Lebenszyklusverwaltung mit MLflow.

Wenn Sie bereits über ein Python-Notebook verfügen, das ein MLflow-Modell aufruft, können Sie diesen Code mithilfe des @dlt.table-Decorators an Delta Live Tables anpassen und sicherstellen, dass Funktionen definiert sind, um Transformationsergebnisse zurückzugeben. Delta Live Tables installiert MLflow standardmäßig nicht. Stellen Sie daher sicher, dass Sie %pip install mlflow, und mlflow und dlt am Anfang Ihres Notebooks importieren. Eine Einführung in die Syntax von Delta Live Tables finden Sie unter Beispiel: Aufnehmen und Verarbeiten von New York Babynamendaten.

Führen Sie die folgenden Schritte aus, um MLflow-Modelle in Delta Live Tables zu verwenden:

  1. Rufen Sie die Ausführungs-ID und den Modellnamen des MLFlow-Modells ab. Die Ausführungs-ID und der Modellname werden verwendet, um den URI des MLFlow-Modells zu erstellen.
  2. Verwenden Sie den URI, um eine Spark-UDF zum Laden des MLFlow-Modells zu definieren.
  3. Rufen Sie die UDF in Ihren Tabellendefinitionen auf, um das MLFlow-Modell zu verwenden.

Das folgende Beispiel zeigt die grundlegende Syntax für dieses Muster:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Als vollständiges Beispiel wird im folgenden Code eine Spark-UDF mit dem Namen loaded_model_udf definiert. Diese lädt ein MLFlow-Modell, das mit Kreditrisikodaten trainiert wurde. Die Datenspalten, die zum Treffen der Vorhersage verwendet werden, werden als Argument an die UDF übergeben. Die Tabelle loan_risk_predictions berechnet Vorhersagen für jede Zeile in loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Beibehalten manueller Löschungen oder Aktualisierungen

Mit Delta Live Tables können Sie Datensätze aus einer Tabelle manuell löschen oder aktualisieren und einen Aktualisierungsvorgang ausführen, um nachgelagerte Tabellen neu zu kompensieren.

Standardmäßig berechnet Delta Live Tables die Tabellenergebnisse auf der Grundlage der Eingabedaten bei jeder Aktualisierung einer Pipeline neu, so dass Sie sicherstellen müssen, dass der gelöschte Datensatz nicht erneut aus den Quelldaten geladen wird. Beim Festlegen der Tabelleneigenschaft pipelines.reset.allowed auf false werden Tabellenaktualisierungen verhindert, es wird jedoch nicht verhindert, dass inkrementelle Schreibvorgänge in die Tabellen durchgeführt werden oder dass Daten in die Tabelle einfließen.

Das folgende Diagramm veranschaulicht ein Beispiel mit zwei Streamingtabellen:

  • raw_user_table erfasst einen Satz unformatierten Benutzerdaten aus einer Quelle.
  • bmi_table berechnet inkrementell BMI-Bewertungen anhand der Gewichtung und Höhe aus raw_user_table.

Sie möchten Benutzerdatensätze manuell aus raw_user_table löschen oder aktualisieren und bmi_table neu berechnen.

Diagramm zur Datenbeibehaltung

Der folgende Code veranschaulicht das Festlegen der Tabelleneigenschaft pipelines.reset.allowed auf false, um die vollständige Aktualisierung für raw_user_table zu deaktivieren, sodass die beabsichtigten Änderungen im Laufe der Zeit beibehalten werden. Nachgelagerte Tabellen werden jedoch neu berechnet, wenn ein Pipelineupdate ausgeführt wird:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);