Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel wird erläutert, wie Sie Lakeflow Declarative Pipelines-Konfigurationen verwenden können, um Pipelinecode zu parametrisieren.
Parameter referenzieren
Während der Updates kann Ihr Pipeline-Quellcode mithilfe der Syntax auf Pipeline-Parameter zugreifen, um Werte für Spark-Konfigurationen abzurufen.
Sie verweisen mithilfe des Schlüssels auf Pipelineparameter. Der Wert wird in den Quellcode als Zeichenfolge eingefügt, bevor die Quellcodelogik ausgewertet wird.
Die folgende Beispielsyntax verwendet einen Parameter mit Schlüssel source_catalog
und Wert dev_catalog
, um die Datenquelle für eine materialisierte Ansicht anzugeben:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Festlegen von Parametern
Übergeben Sie Parameter an Pipelines, indem Sie beliebige Schlüsselwertpaare als Konfigurationen für die Pipeline übergeben. Sie können Parameter festlegen, während Sie eine Pipelinekonfiguration mithilfe der Arbeitsbereich-UI oder JSON definieren oder bearbeiten. Siehe Konfigurieren von deklarativen Lakeflow-Pipelines.
Pipelineparameterschlüssel können nur _ - .
oder alphanumerische Zeichen enthalten. Parameterwerte werden als Zeichenfolgen festgelegt.
Pipelineparameter unterstützen keine dynamischen Werte. Sie müssen den Wert aktualisieren, der einem Schlüssel in der Pipelinekonfiguration zugeordnet ist.
Von Bedeutung
Verwenden Sie keine Schlüsselwörter, die mit reservierten Pipeline- oder Apache Spark-Konfigurationswerten in Konflikt geraten.
Parametrisieren von Datasetdeklarationen in Python oder SQL
Der Python- und SQL-Code, der Ihre Datasets definiert, kann durch die Einstellungen der Pipeline parametrisiert werden. Die Parameterisierung ermöglicht die folgenden Anwendungsfälle:
- Trennen langer Pfade und anderer Variablen vom Code.
- Verringern der Datenmenge, die in Entwicklungs- oder Stagingumgebungen verarbeitet wird, um Tests zu beschleunigen.
- Verwenden der gleichen Transformationslogik für die Verarbeitung aus mehreren Datenquellen.
Im folgenden Beispiel wird der startDate
Konfigurationswert verwendet, um die Entwicklungspipeline auf eine Teilmenge der Eingabedaten zu beschränken:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Steuern von Datenquellen mit Parametern
Sie können Pipelineparameter verwenden, um unterschiedliche Datenquellen in verschiedenen Konfigurationen derselben Pipeline anzugeben.
Sie können z. B. verschiedene Pfade in Entwicklungs-, Test- und Produktionskonfigurationen für eine Pipeline mithilfe der Variablen data_source_path
angeben und dann mithilfe des folgenden Codes darauf verweisen:
SQL
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Diese Vorlage ist vorteilhaft für Tests, wie die Aufnahmelogik schematische oder fehlerhaft formatierte Daten während der Erstaufnahme behandeln kann. Sie können den identischen Code in der gesamten Pipeline in allen Umgebungen verwenden, während Sie Datasets wechseln.