Compartir vía


Usa parámetros con canalizaciones

En este artículo se explica cómo configurar parámetros para las canalizaciones.

Parámetros de referencia

Durante las actualizaciones, el código fuente de la canalización puede acceder a los parámetros de canalización mediante la sintaxis para obtener valores para las configuraciones de Spark.

Se hace referencia a los parámetros de canalización mediante la clave . El valor se inserta en el código fuente como una cadena antes de que se evalúe la lógica del código fuente.

La sintaxis de ejemplo siguiente usa un parámetro con clave source_catalog y valor dev_catalog para especificar el origen de datos para una vista materializada:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Pitón

from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count

@dp.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Establecer parámetros

Proporcione parámetros a las canalizaciones mediante el paso de pares clave-valor arbitrarios como configuraciones para las canalizaciones. Puede establecer parámetros al definir o editar una configuración de canalización mediante la interfaz de usuario del área de trabajo o JSON. Consulte Configuración de canalizaciones.

Las claves de parámetro de canalización solo pueden contener _ - . o caracteres alfanuméricos. Los valores de parámetro se establecen como cadenas.

Los parámetros de canalización no admiten valores dinámicos. Debe actualizar el valor asociado a una clave en la configuración del pipeline.

Importante

No use palabras clave que entren en conflicto con la canalización reservada o los valores de configuración de Apache Spark.

Parametrizar declaraciones de conjunto de datos en Python o SQL

El código de Python y SQL que define tus conjuntos de datos puede ser parametrizado por las configuraciones de la canalización. La parametrización habilita los siguientes casos de uso:

  • Separar rutas de acceso largas y otras variables de tu código.
  • Reducir la cantidad de datos procesados en entornos de desarrollo o ensayo para acelerar las pruebas.
  • Reutilización de la misma lógica de transformación para procesar desde varios orígenes de datos.

En el ejemplo siguiente se usa el startDate valor de configuración para limitar la canalización de desarrollo a un subconjunto de los datos de entrada:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Control de orígenes de datos con parámetros

Puede usar parámetros de canalización para especificar orígenes de datos diferentes en configuraciones diferentes de la misma canalización.

Por ejemplo, puede especificar diferentes rutas de acceso en configuraciones de desarrollo, pruebas y producción para una canalización mediante la variable data_source_path y, a continuación, hacer referencia a ella mediante el código siguiente:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Pitón

from pyspark import pipelines as dp
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dp.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Este patrón es beneficioso para probar cómo la lógica de ingesta podría manejar los datos malformados o del esquema durante la ingesta inicial. Puede usar el código idéntico en toda la canalización en todos los entornos al cambiar los conjuntos de datos.