Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Lakeflow Spark Declarative Pipelines (SDP) führt mehrere neue Python-Codekonstrukte zum Definieren materialisierter Ansichten und Streamingtabellen in Pipelines ein. Python-Unterstützung für die Entwicklung von Pipelines basiert auf den Grundlagen von PySpark DataFrame und Structured Streaming APIs.
Für Benutzerinnen und Benutzer, die nicht mit Python und DataFrames vertraut sind, empfiehlt Databricks die Verwendung der SQL-Schnittstelle. Siehe Entwickeln von Lakeflow Spark Declarative Pipelines-Code mit SQL.
Eine vollständige Referenz zur Lakeflow-SDP-Python-Syntax finden Sie unter Lakeflow Spark Declarative Pipelines Python-Sprachreferenz.
Grundlagen von Python für die Pipelineentwicklung
Python-Code zum Erstellen von Pipline-Datasets muss DataFrames zurückgeben.
Alle Lakeflow Spark Declarative Pipelines Python-APIs werden im pyspark.pipelines Modul implementiert. Ihr mit Python implementierter Pipelinecode muss das pipelines Modul explizit oben in der Python-Quelle importieren. In unseren Beispielen verwenden wir den folgenden Importbefehl und verwenden dp in Beispielen, auf die pipelinesverwiesen wird.
from pyspark import pipelines as dp
Hinweis
Apache Spark™ enthält deklarative Pipelines ab Spark 4.1, die über das pyspark.pipelines Modul verfügbar sind. Die Databricks-Runtime erweitert diese Open-Source-Funktionen mit zusätzlichen APIs und Integrationen für die verwaltete Produktionsverwendung.
Code, der mit dem Open Source-Modul pipelines geschrieben wurde, wird ohne Änderung in Azure Databricks ausgeführt. Die folgenden Features sind nicht Teil von Apache Spark:
dp.create_auto_cdc_flowdp.create_auto_cdc_from_snapshot_flow@dp.expect(...)@dp.temporary_view
Pipeline liest und schreibt standardmäßig in dem Katalog und dem Schema, die während der Pipelinekonfiguration angegeben sind. Siehe Festlegen des Zielkatalogs und des Schemas.
Pipelinespezifischer Python-Code unterscheidet sich von anderen Python-Codetypen auf eine kritische Weise: Python-Pipelinecode ruft nicht direkt die Funktionen auf, die Datenaufnahme und Transformation durchführen, um Datasets zu erstellen. Stattdessen interpretiert SDP die Dekorierfunktionen aus dem dp Modul in allen Quellcodedateien, die in einer Pipeline konfiguriert sind, und erstellt ein Dataflow-Diagramm.
Von Bedeutung
Um ein unerwartetes Verhalten bei der Ausführung Ihrer Pipeline zu vermeiden, sollten Sie in Ihre Funktionen, die Datasets definieren, keinen Code aufnehmen, der Nebeneffekte haben könnte. Weitere Informationen finden Sie in der Python-Referenz.
Erstellen einer materialisierten Ansicht oder Streamingtabelle mit Python
Verwenden Sie @dp.table, um eine Streamingtabelle aus den Ergebnissen eines Streaming-Lesevorgangs zu erstellen. Verwenden Sie @dp.materialized_view, um eine materialisierte Ansicht aus den Ergebnissen einer Batch-Lesung zu erstellen.
Standardmäßig werden materialisierte Ansichts- und Streamingtabellennamen von Funktionsnamen abgeleitet. Das folgende Codebeispiel zeigt die grundlegende Syntax zum Erstellen einer materialisierten Ansichts- und Streamingtabelle:
Hinweis
Beide Funktionen verweisen auf dieselbe Tabelle im samples Katalog und verwenden dieselbe Dekorierfunktion. In diesen Beispielen wird hervorgehoben, dass der einzige Unterschied in der grundlegenden Syntax für materialisierte Ansichten und Streamingtabellen die Verwendung von spark.read gegenüber spark.readStream ist.
Nicht alle Datenquellen unterstützen Streaminglesevorgänge. Einige Datenquellen sollten immer mit Streamingsemantik verarbeitet werden.
from pyspark import pipelines as dp
@dp.materialized_view()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Optional können Sie den Tabellennamen mithilfe des name Arguments im @dp.table Dekorateur angeben. Im folgenden Beispiel wird dieses Muster für eine materialisierte Sicht und eine Streamingtabelle veranschaulicht.
from pyspark import pipelines as dp
@dp.materialized_view(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Laden von Daten aus objektspeicher
Pipelines unterstützen das Laden von Daten aus allen Von Azure Databricks unterstützten Formaten. Siehe Datenformatoptionen.
Hinweis
In diesen Beispielen werden Daten verwendet, die im Pfad „/databricks-datasets“ verfügbar sind, der automatisch in Ihren Arbeitsbereich eingebunden wurde. Databricks empfiehlt die Verwendung von Volumepfaden oder Cloud-URIs, um auf daten zu verweisen, die im Cloudobjektspeicher gespeichert sind. Weitere Informationen finden Sie unter Was sind Unity Catalog-Volumes?.
Databricks empfiehlt die Verwendung von AutoLade- und Streamingtabellen beim Konfigurieren von inkrementellen Erfassungsworkloads für Daten, die im Cloudobjektspeicher gespeichert sind. Siehe Was ist Autoloader?.
Im folgenden Beispiel wird eine Streamingtabelle aus JSON-Dateien mit dem automatischen Ladeprogramm erstellt:
from pyspark import pipelines as dp
@dp.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Im folgenden Beispiel wird Batchsemantik verwendet, um ein JSON-Verzeichnis zu lesen und eine materialisierte Ansicht zu erstellen:
from pyspark import pipelines as dp
@dp.materialized_view()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Überprüfen von Daten mit Erwartungen
Sie können die Erwartungen verwenden, um Einschränkungen für die Datenqualität festzulegen und zu erzwingen. Weitere Informationen finden Sie unter Verwalten der Datenqualität mit Pipelineerwartungen.
Der folgende Code verwendet @dp.expect_or_drop, um eine Erwartung namens valid_data zu definieren, die während der Datenerfassung Datensätze ignoriert, die NULL sind:
from pyspark import pipelines as dp
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Führen Sie Abfragen für materialisierte Ansichten und Streaming-Tabellen aus, die in Ihrer Pipeline definiert sind.
Im folgenden Beispiel werden vier Datasets definiert:
- Eine Streamingtabelle namens
orders, die JSON-Daten lädt. - Eine materialisierte Ansicht namens
customers, die CSV-Daten lädt. - Eine materialisierte Ansicht namens
customer_orders, die Datensätze aus den Datensätzenordersundcustomersverknüpft, den Zeitstempel der Reihenfolge in ein Datum wandelt und die Feldercustomer_id,order_number,stateundorder_dateauswählt. - Eine materialisierte Ansicht namens
daily_orders_by_state, die die tägliche Zählung der Bestellungen für jedes Bundesland aggregiert.
Hinweis
Beim Abfragen von Ansichten oder Tabellen in Ihrer Pipeline können Sie den Katalog und das Schema direkt angeben oder die in Ihrer Pipeline konfigurierten Standardwerte verwenden. In diesem Beispiel werden die Tabellen orders, customersund customer_orders aus dem Standardkatalog und dem für Ihre Pipeline konfigurierten Schema geschrieben und gelesen.
Im Legacyveröffentlichungsmodus wird das LIVE Schema verwendet, um andere materialisierte Ansichten und Streamingtabellen abzufragen, die in Ihrer Pipeline definiert sind. In neuen Pipelines wird die LIVE-Schemasyntax stillschweigend ignoriert. Weitere Informationen finden Sie unter LIVE-Schema (Legacy).
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dp.materialized_view()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dp.materialized_view()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dp.materialized_view()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Tabellen in einer for Schleife erstellen
Sie können Python-Schleifen for verwenden, um mehrere Tabellen programmgesteuert zu erstellen. Dies kann nützlich sein, wenn Sie über viele Datenquellen oder Zieldatensätze verfügen, die sich nur in wenigen Parametern unterscheiden, was in weniger zu wartendem Gesamtcode und reduzierter Coderedundanz resultiert.
Die for-Schleife wertet Logik in serieller Reihenfolge aus, aber sobald die Planung für die Datasets abgeschlossen ist, führt die Pipeline Logik parallel aus.
Von Bedeutung
Stellen Sie bei Verwendung dieses Musters zum Definieren von Datasets sicher, dass die Liste der an die for Schleife übergebenen Werte immer additiv ist. Wenn ein zuvor in einer Pipeline definiertes Dataset aus einer zukünftigen Pipelineausführung weggelassen wird, wird dieses Dataset automatisch aus dem Zielschema gelöscht.
Im folgenden Beispiel werden fünf Tabellen erstellt, die Kundenbestellungen nach Region filtern. Hier wird der Regionsname verwendet, um den Namen der materialisierten Zielansichten festzulegen und die Quelldaten zu filtern. Temporäre Ansichten werden verwendet, um Verknüpfungen aus den Quelltabellen zu definieren, die beim Erstellen der endgültigen materialisierten Ansichten verwendet werden.
from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dp.temporary_view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
Im Folgenden sehen Sie ein Beispiel für das Datenflussdiagramm für diese Pipeline:
Problembehandlung: for Schleife erstellt viele Tabellen mit denselben Werten
Das verzögerte („faule“) Ausführungsmodell, das Pipelines zum Auswerten von Python-Code verwenden, erfordert, dass Ihre Logik direkt auf einzelne Werte verweist, wenn die Funktion mit dem @dp.materialized_view()-Decorator aufgerufen wird.
Im folgenden Beispiel werden zwei richtige Ansätze zum Definieren von Tabellen mit einer for Schleife veranschaulicht. In beiden Beispielen wird auf jeden Tabellennamen aus der tables-Liste explizit innerhalb der Funktion mit dem @dp.materialized_view()-Decorator verwiesen.
from pyspark import pipelines as dp
# Create a parent function to set local variables
def create_table(table_name):
@dp.materialized_view(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dp.materialized_view()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized_view(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
Im folgenden Beispiel wird nicht richtig auf Werte verwiesen. In diesem Beispiel werden Tabellen mit unterschiedlichen Namen erstellt, aber alle Tabellen laden Daten aus dem letzten Wert in der for Schleife:
from pyspark import pipelines as dp
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized(name=t_name)
def create_table():
return spark.read.table(t_name)
Dauerhaftes Löschen von Datensätzen aus einer materialisierten Ansicht oder Streamingtabelle
Um Datensätze dauerhaft aus einer materialisierten Ansicht oder Streamingtabelle mit aktivierten Löschvektoren zu löschen, z. B. für die DSGVO-Compliance, müssen zusätzliche Vorgänge für die zugrunde liegenden Delta-Tabellen des Objekts ausgeführt werden. Informationen zum Sicherstellen der Löschung von Datensätzen aus einer materialisierten Ansicht finden Sie unter Dauerhaftes Löschen von Datensätzen aus einer materialisierten Ansicht mit aktivierten Löschvektoren. Informationen zum Sicherstellen des Löschens von Datensätzen aus einer Streamingtabelle finden Sie unter Dauerhaftes Löschen von Datensätzen aus einer Streamingtabelle.