Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel wird beschrieben, wie Sie Lakeflow Declarative Pipelines verwenden können, um Transformationen für Datasets zu deklarieren und anzugeben, wie Datensätze über abfragelogik verarbeitet werden. Es enthält auch Beispiele für allgemeine Transformationsmuster zum Erstellen von Deklarativen Pipelines für Lakeflow.
Sie können ein Dataset für jede Abfrage definieren, die einen DataFrame zurückgibt. Sie können integrierte Apache Spark-Vorgänge, UDFs, benutzerdefinierte Logik und MLflow-Modelle als Transformationen in Lakeflow Declarative Pipelines verwenden. Nachdem Daten in Ihre Pipeline aufgenommen wurden, können Sie neue Datensätze für Upstream-Quellen definieren, um neue Streamingtabellen, materialisierte Ansichten und Ansichten zu erstellen.
Informationen zum effektiven Ausführen einer zustandsbehafteten Verarbeitung mit Lakeflow Declarative Pipelines finden Sie unter Optimieren der zustandsbehafteten Verarbeitung in Lakeflow Declarative Pipelines mit Wasserzeichen.
Wann sollten Ansichten, materialisierte Ansichten und Streamingtabellen verwendet werden?
Wenn Sie Ihre Pipelineabfragen implementieren, wählen Sie den besten Datasettyp aus, um sicherzustellen, dass sie effizient und verwaltet werden können.
Erwägen Sie die Verwendung einer Datenbankansicht, um Folgendes auszuführen:
- Zerlegen Sie eine große oder komplexe Abfrage, die Sie wünschen, in leichter zu verwaltende Abfragen.
- Überprüfen Sie zwischengeschaltete Ergebnisse unter Verwendung von Erwartungen.
- Reduzieren Sie Speicher- und Berechnungskosten für Ergebnisse, die Sie nicht beibehalten müssen. Da Tabellen materialisiert werden, erfordern sie zusätzliche Berechnungs- und Speicherressourcen.
Erwägen Sie die Verwendung einer materialisierten Sicht in folgenden Fällen:
- Die Tabelle wird von mehreren Downstreamabfragen genutzt. Da Ansichten bei Bedarf berechnet werden, wird die Ansicht jedes Mal neu berechnet, wenn sie abgefragt wird.
- Andere Pipelines, Aufträge oder Abfragen nutzen die Tabelle. Da Sichten nicht materialisiert werden, können Sie sie nur in derselben Pipeline verwenden.
- Sie möchten die Ergebnisse einer Abfrage während der Entwicklung anzeigen. Da Tabellen materialisiert werden und außerhalb der Pipeline angezeigt und abgefragt werden können, bietet die Verwendung von Tabellen während der Entwicklung die Möglichkeit, die Richtigkeit von Berechnungen zu überprüfen. Konvertieren Sie nach der Überprüfung diejenigen Abfragen, die keine Materialisierung erfordern, in Sichten.
Ziehen Sie in folgenden Fällen die Verwendung einer Streamingtabelle in Betracht:
- Eine Abfrage wird für eine Datenquelle definiert, die kontinuierlich oder inkrementell wächst.
- Abfrageergebnisse sollten inkrementell berechnet werden.
- Die Pipeline benötigt hohen Durchsatz und niedrige Latenz.
Hinweis
Streamingtabellen werden immer auf Basis von Streamingquellen definiert. Sie können auch Streamingquellen mit AUTO CDC ... INTO
verwenden, um Updates aus CDC-Feeds anzuwenden. Siehe Die AUTO CDC-APIs: Vereinfachen Sie die Erfassung von Änderungsdaten mit Lakeflow-Deklarativ-Pipelines.
Ausschließen von Tabellen aus dem Zielschema
Wenn Sie Zwischentabellen berechnen müssen, die nicht für den externen Verbrauch vorgesehen sind, können Sie verhindern, dass sie mit dem TEMPORARY
Schlüsselwort in einem Schema veröffentlicht werden. Temporäre Tabellen speichern und verarbeiten weiterhin Daten gemäß lakeflow Declarative Pipelines-Semantik, sollten jedoch nicht außerhalb der aktuellen Pipeline zugegriffen werden. Eine temporäre Tabelle wird für die Lebensdauer der Pipeline beibehalten, die sie erstellt. Verwenden Sie die folgende Syntax, um temporäre Tabellen zu deklarieren:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Kombinieren Sie Streaming-Tabellen und materialisierte Ansichten in einer einzigen Pipeline
Streamingtabellen erben die Verarbeitungsgarantien für strukturiertes Streamen von Apache Spark und sind für die Verarbeitung von Abfragen aus anfügegeschützten Datenquellen konfiguriert, wobei immer neue Zeilen in die Quelltabelle eingefügt und nicht geändert werden.
Hinweis
Obwohl Streamingtabellen standardmäßig nur Anfügedatenquellen erfordern, wenn es sich bei einer Streamingquelle um eine andere Streamingtabelle handelt, die Aktualisierungen oder Löschungen erfordert, können Sie dieses Verhalten mit dem SkipChangeCommits-Flag außer Kraft setzen.
Ein gängiges Streamingmuster umfasst das Aufnehmen von Quelldaten, um die ersten Datensätze in einer Pipeline zu erstellen. Diese anfänglichen Datasets werden häufig als Bronze-Tabellen bezeichnet und führen häufig einfache Transformationen aus.
Im Gegensatz dazu erfordern die endgültigen Tabellen in einer Pipeline, die häufig als Goldtabellen bezeichnet werden, häufig komplizierte Aggregationen oder Lesevorgänge von Zielen eines AUTO CDC ... INTO
-Vorgangs. Da diese Vorgänge inhärent Updates erstellen, anstatt sie anzufügen, werden sie nicht als Eingaben in Streaming-Tabellen unterstützt. Diese Transformationen eignen sich besser für materialisierte Sichten.
Indem Sie die Streaming-Tababellen und materialisierte Sichten in einer einzelnen Pipeline kombinieren, können Sie Ihre Pipeline vereinfachen und eine kostspielige erneute Erfassung oder erneute Verarbeitung von Rohdaten vermeiden und die volle Leistung von SQL nutzen, um komplexe Aggregationen über ein effizient codiertes und gefiltertes Dataset zu berechnen. Im folgenden Beispiel wird diese Art der gemischten Verarbeitung veranschaulicht:
Hinweis
In diesen Beispielen wird Autoloader verwendet, um Dateien aus dem Cloudspeicher zu laden. Um Dateien mit Autoloader in einer Pipeline mit Unity Catalog-Aktivierung zu laden, müssen Sie externe Speicherorte verwenden. Weitere Informationen zur Verwendung des Unity-Katalogs mit Lakeflow Declarative Pipelines finden Sie unter „Verwenden Sie Unity-Katalog mit Ihren Lakeflow-Deklarativen Pipelines“.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Erfahren Sie mehr über die Verwendung des automatischen Ladeprogramms zum inkrementellen Aufnehmen von JSON-Dateien aus Azure Storage.
Streamstatik-Verknüpfungen
Streamstatik-Verknüpfungen sind eine gute Wahl beim Denormalisieren eines kontinuierlichen Datenstroms von Daten, die nur zum Anfügen vorgesehen sind, mit einer hauptsächlich statischen Dimensionstabelle.
Bei jeder Pipeline-Aktualisierung werden neue Datensätze aus dem Stream mit den aktuellsten Momentaufnahmen der statischen Tabelle verknüpft. Wenn Datensätze in der statischen Tabelle hinzugefügt oder aktualisiert werden, nachdem entsprechende Daten aus der Streamingtabelle verarbeitet wurden, werden die resultierenden Datensätze erst dann neu berechnet, wenn eine vollständige Aktualisierung durchgeführt wird.
In Pipelines, die für die ausgelöste Ausführung konfiguriert sind, gibt die statische Tabelle ergebnisse zurück, sobald das Update gestartet wurde. In Pipelines, die für die kontinuierliche Ausführung konfiguriert sind, wird die neueste Version der statischen Tabelle jedes Mal abgefragt, wenn die Tabelle eine Aktualisierung verarbeitet.
Im Folgenden sehen Sie ein Beispiel für eine Streamstatik-Verknüpfung:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Effizientes Berechnen von Aggregaten
Sie können Streaming-Tabellen verwenden, um inkrementell einfache distributive Aggregate wie Zählung, Minimum, Maximum oder Summe sowie algebraische Aggregate wie Durchschnitt oder Standardabweichung zu berechnen. Databricks empfiehlt die inkrementelle Aggregation für Abfragen mit einer begrenzten Anzahl von Gruppen, z. B. eine Abfrage mit einer GROUP BY country
Klausel. Bei jedem Update werden nur neue Eingabedaten gelesen.
Weitere Informationen zum Schreiben von Lakeflow Declarative Pipelines-Abfragen, die inkrementelle Aggregationen ausführen, finden Sie unter Ausführen von Fensteraggregationen mit Wasserzeichen.
Verwenden von MLflow-Modellen in Lakeflow Declarative Pipelines
Hinweis
Zur Verwendung von MLflow-Modellen in einer Unity Catalog-fähigen Pipeline muss Ihre Pipeline für die Verwendung des preview
-Kanals konfiguriert sein. Um den current
-Kanal zu verwenden, müssen Sie Ihre Pipeline so konfigurieren, dass sie im Hive-Metastore veröffentlicht wird.
Sie können MLflow-trainierte Modelle in Lakeflow Declarative Pipelines verwenden. MLflow-Modelle werden als Transformationen in Azure Databricks behandelt, was bedeutet, dass sie auf eine Spark-DataFrame-Eingabe reagieren und Ergebnisse als Spark DataFrame zurückgeben. Da Lakeflow Declarative Pipelines Datasets für DataFrames definiert, können Sie Apache Spark-Workloads konvertieren, die MLflow in Lakeflow Declarative Pipelines mit nur wenigen Codezeilen verwenden. Weitere Informationen zu MLflow finden Sie unter MLflow für den ML-Modelllebenszyklus.
Wenn Sie bereits über ein Python-Notizbuch verfügen, das ein MLflow-Modell aufruft, können Sie diesen Code mithilfe des @dlt.table
Dekorators an Lakeflow Declarative Pipelines anpassen und sicherstellen, dass Funktionen definiert sind, um Transformationsergebnisse zurückzugeben. Lakeflow Declarative Pipelines installiert MLflow standardmäßig nicht, deshalb sollten Sie sicherstellen, dass Sie die MLflow-Bibliotheken mit %pip install mlflow
installiert und mlflow
sowie dlt
am Anfang Ihres Notizbuchs importiert haben. Eine Einführung in die Syntax deklarativer Pipelines von Lakeflow finden Sie unter Entwickeln von Pipelinecode mit Python.
Führen Sie die folgenden Schritte aus, um MLflow-Modelle in Lakeflow Declarative Pipelines zu verwenden:
- Rufen Sie die Ausführungs-ID und den Modellnamen des MLflow-Modells ab. Die Ausführungs-ID und der Modellname werden verwendet, um den URI des MLFlow-Modells zu erstellen.
- Verwenden Sie den URI, um eine Spark-UDF zum Laden des MLFlow-Modells zu definieren.
- Rufen Sie die UDF in Ihren Tabellendefinitionen auf, um das MLFlow-Modell zu verwenden.
Das folgende Beispiel zeigt die grundlegende Syntax für dieses Muster:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Als vollständiges Beispiel wird im folgenden Code eine Spark-UDF mit dem Namen loaded_model_udf
definiert. Diese lädt ein MLFlow-Modell, das mit Kreditrisikodaten trainiert wurde. Die Datenspalten, die zum Treffen der Vorhersage verwendet werden, werden als Argument an die UDF übergeben. Die Tabelle loan_risk_predictions
berechnet Vorhersagen für jede Zeile in loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Manuelle Löschungen oder Updates aufbewahren
Lakeflow Declarative Pipelines ermöglicht es Ihnen, Datensätze manuell aus einer Tabelle zu löschen oder zu aktualisieren und einen Aktualisierungsvorgang auszuführen, um nachgelagerte Tabellen neu zu komppilieren.
Standardmäßig berechnet Lakeflow Declarative Pipelines Tabellenergebnisse basierend auf Eingabedaten jedes Mal neu, wenn eine Pipeline aktualisiert wird, sodass Sie sicherstellen müssen, dass der gelöschte Datensatz nicht aus den Quelldaten neu geladen wird. Beim Festlegen der Tabelleneigenschaft pipelines.reset.allowed
auf false
werden Tabellenaktualisierungen verhindert, es wird jedoch nicht verhindert, dass inkrementelle Schreibvorgänge in die Tabellen durchgeführt werden oder dass Daten in die Tabelle einfließen.
Das folgende Diagramm veranschaulicht ein Beispiel mit zwei Streamingtabellen:
-
raw_user_table
importiert Rohdaten von Benutzern aus einer Quelle. -
bmi_table
berechnet inkrementell BMI-Werte anhand des Gewichts und der Körpergröße ausraw_user_table
.
Sie möchten Benutzerdatensätze manuell aus raw_user_table
löschen oder aktualisieren und bmi_table
neu berechnen.
Der folgende Code veranschaulicht das Festlegen der Tabelleneigenschaft pipelines.reset.allowed
auf false
, um die vollständige Aktualisierung für raw_user_table
zu deaktivieren, sodass die beabsichtigten Änderungen im Laufe der Zeit beibehalten werden. Nachgelagerte Tabellen werden jedoch neu berechnet, wenn ein Pipelineupdate ausgeführt wird:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);