Partager via


Utiliser des paramètres avec des pipelines

Cet article explique comment configurer des paramètres pour vos pipelines.

Paramètres de référence

Pendant les mises à jour, votre code source de pipeline peut accéder aux paramètres de pipeline à l’aide de la syntaxe pour obtenir des valeurs pour les configurations Spark.

Vous référencez les paramètres de pipeline à l’aide de la clé. La valeur est injectée dans votre code source sous forme de chaîne avant l’évaluation de votre logique de code source.

L’exemple de syntaxe suivant utilise un paramètre avec clé source_catalog et valeur dev_catalog pour spécifier la source de données d’une vue matérialisée :

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count

@dp.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Définition des paramètres

Transmettez des paramètres aux pipelines en passant des paires clé-valeur arbitraires en tant que configurations pour le pipeline. Vous pouvez définir des paramètres lors de la définition ou de la modification d’une configuration de pipeline à l’aide de l’interface utilisateur de l’espace de travail ou json. Consultez Configurer des pipelines.

Les clés de paramètre de pipeline ne peuvent contenir _ - . que des caractères alphanumériques. Les valeurs de paramètre sont définies en tant que chaînes.

Les paramètres de pipeline ne prennent pas en charge les valeurs dynamiques. Vous devez mettre à jour la valeur associée à une clé dans la configuration du pipeline.

Important

N’utilisez pas de mots clés qui entrent en conflit avec les valeurs de configuration du pipeline réservé ou Apache Spark.

Paramétrer les déclarations de jeu de données en Python ou SQL

Le code Python et SQL qui définit vos jeux de données peut être paramétré par les paramètres du pipeline. Le paramétrage active les cas d’usage suivants :

  • Séparation des chemins longs et d’autres variables de votre code.
  • Réduction de la quantité de données traitées dans des environnements de développement ou de préproduction pour accélérer les tests.
  • Réutilisation de la même logique de transformation pour traiter à partir de plusieurs sources de données.

L’exemple suivant utilise la startDate valeur de configuration pour limiter le pipeline de développement à un sous-ensemble des données d’entrée :

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Contrôler les sources de données avec des paramètres

Vous pouvez utiliser des paramètres de pipeline pour spécifier différentes sources de données dans différentes configurations du même pipeline.

Par exemple, vous pouvez spécifier différents chemins d’accès dans les configurations de développement, de test et de production d’un pipeline à l’aide de la variable data_source_path , puis la référencer à l’aide du code suivant :

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dp.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Ce modèle est bénéfique pour tester la façon dont la logique d'ingestion peut gérer des données de schéma ou des données mal formées lors de l'ingestion initiale. Vous pouvez utiliser le code identique dans l’ensemble de votre pipeline dans tous les environnements tout en basculant des jeux de données.