パイプラインでパラメーターを使用する

パイプライン パラメーターを使用すると、環境またはデータセット間で同じパイプライン ソース コードを再利用できます。 たとえば、 dev カタログと prod カタログに対して同じ変換を実行したり、実行ごとに異なるソース パスから取り込んだりできます。 パイプラインでパラメーターを定義 (または更新の開始時にオーバーライド) し、SQL ソース コードから参照します。

Important

この機能は ベータ版です。 ワークスペース管理者は、[ プレビュー] ページからこの機能へのアクセスを制御できます。 Manage Azure Databricks プレビューを参照してください。

このページでは、SQL ソース コードで使用できるパイプライン パラメーター機能について説明します。 パイプライン内で Python ソースコードをパラメータ化するには、構成フィールドを使用してパラメータを参照する で説明されているように、引き続き Configuration フィールドを使用します。 構成は、パイプラインが実行時に読み取る Spark 構成値を設定するためにも使用されます。 Spark 構成設定の詳細については、「 パイプラインのプロパティ リファレンス」を参照してください

パイプライン パラメーターとは

パイプライン パラメーターは、次のことが可能なキーと値のペアです。

  • パイプライン設定で既定値として宣言します。
  • パイプライン UI、更新開始 API、または 別の設定で実行 ダイアログから更新を開始する際に上書きします。
  • 必要に応じてジョブレベルのパラメーターをプッシュダウンしながら、ジョブ内のパイプライン タスクを上書きします。
  • 名前付きパラメーター構文を使用した SQL ソース コードからの参照。

パラメーター値は常に文字列です。 キーには、英数字、アンダースコア (_)、ハイフン (-)、ピリオド (.) を含めることができます。

パイプライン パラメーターと [構成] フィールドは、さまざまな目的に対応します。

...にはパラメーターを使用してください。 Configurationを使用して...
更新の間で変更される値 (ターゲット カタログ、ソース パス、日付範囲)。 パイプラインの動作 (pipelines.enzyme.enabledpipelines.clusterLabelsV2Enabled) を制御する Spark 構成。
ジョブまたはタスクからプッシュダウンする値。 静的な構造パイプラインのプロパティ。
名前付きパラメーター構文を使用して SQL で参照する値。 SQL の ${key} 構文または Python の spark.conf.get("key") で参照する値。

パイプライン パラメーターを定義する

パイプライン設定で既定のパラメーター値を定義できます。 更新プログラムがオーバーライドなしで実行されると、パイプラインではこれらの既定値が使用されます。

パイプライン UI を使用する

  1. ワークスペースで、[ワークフロー] アイコンをクリックします。サイドバーのジョブとパイプラインを選択し、パイプラインを選択します。
  2. [設定]をクリックします。
  3. [ パイプライン設定 ] サイドバーで、[ パラメーター] セクションを見つけて、[ 編集] をクリックします。
  4. キーのエントリを追加し、[保存] をクリックします。

JSON または REST API を使用する

パイプライン定義に parameters マップを追加します。

{
  "name": "Sales pipeline",
  "parameters": {
    "source_catalog": "dev_catalog",
    "source_schema": "sales",
    "start_date": "2026-01-01"
  }
}

完全なパイプライン JSON リファレンスについては、「 パイプラインの構成」を参照してください。

SQL ソース コードの参照パラメーター

キーの前にコロンを付けることで、パラメーターを参照します。 Azure Databricksは、更新時に値を文字列としてバインドします。

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id,
  COUNT(txn_id) AS txn_count,
  SUM(txn_amount) AS account_revenue
FROM :source_catalog.sales.transactions
WHERE txn_date >= :start_date
GROUP BY account_id

カタログ、スキーマ、テーブル名などの識別子の位置でパラメーターを使用するには、 IDENTIFIER()でラップします。

USE CATALOG IDENTIFIER(:source_catalog);
USE SCHEMA IDENTIFIER(:source_schema);

CREATE OR REFRESH MATERIALIZED VIEW daily_sales AS
SELECT date(timestamp) AS date,
  SUM(price) AS total_sales
FROM transactions
GROUP BY date;

ソース コードが更新時に値のないパラメーターを参照している場合、更新はエラーで失敗します。 パイプラインは、コードが参照しない追加のパラメーターを無視します。

更新時にパラメーターをオーバーライドする

保存された既定値を変更せずに、単一の更新のパラメーター値をオーバーライドできます。

  • パイプライン UI で、[ 異なる設定で実行 ] をクリックし、[ パラメーター] セクションを編集します。
  • ジョブ内のパイプライン タスクで、タスクの Parameters フィールドにパラメーターのオーバーライドを指定します。 「パラメーター」を参照してください。
  • API から、parameters要求で マップを渡します。

Azure Databricks更新履歴に特定の更新プログラムのパラメーターを記録し、パイプライン実行リストの Run パラメーター 列に表示します。

パラメーターの優先順位

同じキーを複数の場所で定義すると、優先順位が最も高い値が優先されます。 最高から最下位へ:

  1. ジョブ実行パラメーター: 1 回のジョブ実行に指定された値 (オーバーライド)。
  2. ジョブ パラメーター: 親ジョブで定義されている既定値。
  3. パイプライン タスク パラメーター: パイプライン タスクに設定された値。
  4. パイプライン パラメーター: パイプライン設定で定義されている既定値。

これは、他の ジョブ パラメーター タスクの種類で使用される優先順位と一致します。

Lakeflow ジョブのパイプライン パラメータ

ジョブでパイプラインを パイプライン タスク としてスケジュールすると、タスクはパイプラインの既定値をオーバーライドするパラメーターを指定できます。 パラメーター値は 、動的値参照 を使用して、 {{job.trigger.time.iso_date}}{{job.parameters.region}}などのジョブ実行時の値を挿入できます。

また、Lakeflow ジョブは、ノートブックおよび SQL タスクにプッシュダウンするのと同じ方法で、すべてのジョブ パラメーターをパイプライン タスクに自動的にプッシュします。 パイプラインのソース コードは、名前付きパラメーター構文を使用して、プッシュダウンされた値を参照できます。 パイプライン設定でのパラメーターの宣言は省略可能であり、オーバーライドなしで実行の既定値のみを設定します。

注意事項と既知の制限事項

  • パイプラインは一度に 1 つの更新を実行します。パイプラインは一度に 1 つの更新のみを実行できます。 複数の更新が重複してしまう場合にジョブが失敗するのを防ぐため、Azure Databricks は次の 2 つの状況ではコンカレンシーを 1 に制限します。

    • パイプライン タスクを含み、max_concurrent_runs が複数設定されているジョブ。
    • イテレーション数に関係なく、for-each タスクでラップされたパイプラインタスク。

    この上限が有効になると、ジョブ UI に通知が表示されます。 多数のパラメーターの組み合わせで実行する予定のパラメーター化されたパイプラインを設計するときは、上限を回避する必要があります。

  • 日付フィルターは、完全な更新をトリガーできます。パラメーター化の一般的なユース ケースは、日付でデータをフィルター処理することです。 述語に注意してください。日付範囲の 両側 でフィルター処理を行うと、具体化されたビューの増分処理が無効になり、更新ごとに完全な更新がトリガーされます。

    -- Triggers a full refresh on each update
    CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
    SELECT * FROM orders
    WHERE order_date >= :start_date AND order_date < :end_date;
    
    -- Processes incrementally
    CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
    SELECT * FROM orders
    WHERE order_date >= :start_date;
    
  • 名前付きパラメーターは SQL 専用です。このベータ版では、名前付きパラメーター構文は SQL ソース コードでのみ使用できます。 ソース コードPythonパラメーター化するには、引き続き Configuration フィールドと spark.conf.get() を使用します。 構成フィールドを使用した参照パラメーターを参照してください。

構成フィールドを使用してパラメーターを参照する

パイプラインの [構成] フィールドは、Spark 構成値として公開されている任意のキーと値のペアを受け入れます。 これは従来のパラメーター化メカニズムであり、パイプライン パラメーターと共に引き続き動作します。 Pythonソース コードと、名前付きパラメーター構文ではなく、spark.conf.get() で読み取るキーに使用します。

次の例では、 mypipeline.start_date 構成値を使用して、開発パイプラインを入力データのサブセットに制限します。

SQL

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM source_table WHERE date > '${mypipeline.start_date}';

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.start_date")
  return spark.read.table("source_table").where(col("date") > start_date)

構成値は、パイプライン設定の [構成] セクション、またはパイプライン JSON の [ configuration ] フィールドで設定します。 予約済みパイプラインまたは Apache Spark 構成値と競合するキーは避けてください。