Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo explica como configurar parâmetros para seus pipelines.
Parâmetros de referência
Durante as atualizações, o código-fonte do pipeline pode acessar parâmetros do pipeline por meio de sintaxe para obter valores das configurações do Spark.
Você faz referência a parâmetros de pipeline usando a chave. O valor é injetado no código-fonte como uma cadeia de caracteres antes que a lógica do código-fonte seja avaliada.
A sintaxe de exemplo a seguir usa um parâmetro com chave source_catalog e valor dev_catalog para especificar a fonte de dados para uma exibição materializada:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count
@dp.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Definir Parâmetros
Passe parâmetros para pipelines passando pares de chave-valor arbitrários como configurações para o pipeline. Você pode definir parâmetros ao definir ou editar uma configuração de pipeline usando a interface do usuário do workspace ou JSON. Consulte Configurar Pipelines.
As chaves de parâmetro de pipeline só podem conter _ - . ou caracteres alfanuméricos. Os valores de parâmetro são definidos como cadeias de caracteres.
Os parâmetros de pipeline não dão suporte a valores dinâmicos. Você deve atualizar o valor associado a uma chave na configuração do pipeline.
Importante
Não use palavras-chave que entrem em conflito com os valores de configuração do pipeline reservado ou do Apache Spark.
Parametrizar declarações de conjunto de dados em Python ou SQL
O código Python e SQL que define seus conjuntos de dados pode ser parametrizado pelas configurações do pipeline. A parametrização habilita os seguintes casos de uso:
- Separando caminhos completos e outras variáveis do seu código.
- Reduzindo a quantidade de dados processados em ambientes de desenvolvimento ou preparo para acelerar o teste.
- Reutilizando a mesma lógica de transformação para processar de várias fontes de dados.
O exemplo a seguir usa o valor de startDate configuração para limitar o pipeline de desenvolvimento a um subconjunto dos dados de entrada:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Controlar fontes de dados com parâmetros
Você pode usar parâmetros de pipeline para especificar fontes de dados diferentes em configurações diferentes do mesmo pipeline.
Por exemplo, você pode especificar caminhos diferentes em configurações de desenvolvimento, teste e produção para um pipeline usando a variável data_source_path e, em seguida, referenciá-la usando o seguinte código:
SQL
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dp.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Esse padrão é benéfico para testar como a lógica de ingestão pode lidar com esquemas ou dados malformados no início da ingestão. Você pode usar o código idêntico em todo o pipeline em todos os ambientes ao alternar conjuntos de dados.