Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra come configurare i parametri per le pipeline.
Parametri di riferimento
Durante gli aggiornamenti, il codice sorgente della pipeline può accedere ai parametri della pipeline usando la sintassi per ottenere valori per le configurazioni spark.
È possibile fare riferimento ai parametri della pipeline usando la chiave . Il valore viene inserito nel codice sorgente come stringa prima che la logica del codice sorgente valuti.
La sintassi di esempio seguente usa un parametro con chiave source_catalog e valore dev_catalog per specificare l'origine dati per una vista materializzata:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Pitone
from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count
@dp.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Imposta i parametri
Inviare parametri alle pipeline utilizzando coppie chiave-valore arbitrarie come configurazioni per la pipeline. È possibile impostare i parametri durante la definizione o la modifica di una configurazione della pipeline usando l'interfaccia utente dell'area di lavoro o JSON. Vedere Configurare le pipeline.
Le chiavi dei parametri della pipeline possono contenere _ - . o caratteri alfanumerici. I valori dei parametri vengono impostati come stringhe.
I parametri della pipeline non supportano valori dinamici. È necessario aggiornare il valore associato a una chiave nella configurazione della pipeline.
Importante
Non usare parole chiave in conflitto con pipeline riservate o valori di configurazione di Apache Spark.
Parametrizzare le dichiarazioni del set di dati in Python o SQL
Il codice Python e SQL che definisce i set di dati possono essere parametrizzati dalle impostazioni della pipeline. La parametrizzazione abilita i casi d'uso seguenti:
- Separazione di percorsi lunghi e altre variabili dal codice.
- Riduzione della quantità di dati elaborati in ambienti di sviluppo o staging per velocizzare i test.
- Riutilizzo della stessa logica di trasformazione per l'elaborazione da più origini dati.
Nell'esempio seguente viene usato il startDate valore di configurazione per limitare la pipeline di sviluppo a un subset dei dati di input:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Controllare le origini dati con parametri
È possibile usare i parametri della pipeline per specificare origini dati diverse in configurazioni diverse della stessa pipeline.
Ad esempio, è possibile specificare percorsi diversi nelle configurazioni di sviluppo, test e produzione per una pipeline usando la variabile data_source_path e quindi farvi riferimento usando il codice seguente:
SQL
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)
Pitone
from pyspark import pipelines as dp
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dp.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Questo modello è utile per testare il modo in cui la logica di inserimento potrebbe gestire lo schema o i dati in formato non valido durante l'inserimento iniziale. È possibile usare il codice identico in tutta la pipeline in tutti gli ambienti durante la disattivazione dei set di dati.