共用方式為


使用參數配合管線

本文說明如何設定管線的參數。

參考參數

在更新期間,您的管線原始程式碼可以使用語法存取管線參數,以取得 Spark 組態的值。

您可以使用索引鍵參考管線參數。 在原始程式碼邏輯評估之前,該值會以字串的形式插入您的原始程式碼中。

下列範例語法使用具有索引鍵 source_catalog 和值 dev_catalog 的參數來指定具體化檢視的資料來源:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count

@dp.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

設定參數

將任意索引鍵值組作為組態傳遞至管線,以便將參數傳遞至管線。 您可以使用工作區 UI 或 JSON 定義或編輯管線組態時設定參數。 請參閱 設定管線

管道參數鍵只能包含 _ - . 或英文字母或數字。 參數值會設定為字串。

管線參數不支援動態值。 您必須更新管道配置中與某個鍵相關聯的值。

這很重要

請勿使用與保留管線或 Apache Spark 組態值衝突的關鍵字。

在 Python 或 SQL 中參數化資料集宣告

定義資料集的 Python 和 SQL 程式碼可以由管線的設定參數化。 參數化可啟用下列使用案例:

  • 將長路徑和其他變數與程式碼分開。
  • 減少在開發或預備環境中處理的資料量,以加快測試速度。
  • 重複使用相同的轉換邏輯來處理多個資料來源。

下列範例使用組態值將 startDate 開發管線限制為輸入資料的子集:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

使用參數控制資料來源

您可以使用管線參數,在相同管線的不同組態中指定不同的資料來源。

例如,您可以使用變數 data_source_path 在管線的開發、測試和生產組態中指定不同的路徑,然後使用下列程式碼參考它:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dp.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

此模式有助於測試擷取邏輯在初始擷取期間如何處理結構描述或格式不正確的資料。 您可以在切換資料集時,在所有環境中的整個管線中使用相同的程式碼。