この記事では、パイプラインのパラメーターを構成する方法について説明します。
参照パラメーター
更新中、パイプラインのソース コードは、構文を使用してパイプライン パラメーターにアクセスして、Spark 構成の値を取得できます。
キーを使用してパイプライン パラメーターを参照します。 値は、ソース コード ロジックが評価される前に、文字列としてソース コードに挿入されます。
次の構文例では、キー source_catalog と値 dev_catalog パラメーターを使用して、具体化されたビューのデータ ソースを指定します。
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count
@dp.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
パラメーターを設定する
パイプラインの構成として任意のキーと値のペアを渡すことによって、パイプラインにパラメーターを渡します。 ワークスペース UI または JSON を使用して、パイプライン構成の定義または編集中にパラメーターを設定できます。 「 パイプラインの構成」を参照してください。
パイプライン パラメーター キーには、 _ - . または英数字のみを含めることができます。 パラメーター値は文字列として設定されます。
パイプライン パラメーターは動的な値をサポートしていません。 パイプライン構成のキーに関連付けられている値を更新する必要があります。
Important
予約済みパイプラインまたは Apache Spark 構成値と競合するキーワードは使用しないでください。
Python または SQL でデータセット宣言をパラメーター化する
データセットを定義する Python および SQL コードは、パイプラインの設定によってパラメーター化できます。 パラメーター化により、次のユース ケースが有効になります。
- 長いパスとその他の変数をコードから分離する。
- テストを高速化するために、開発環境またはステージング環境で処理されるデータの量を減らします。
- 同じ変換ロジックを再利用して複数のデータ ソースから処理する。
次の例では、 startDate 構成値を使用して、開発パイプラインを入力データのサブセットに制限します。
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
パラメーターを使用してデータ ソースを制御する
パイプライン パラメーターを使用して、同じパイプラインのさまざまな構成で異なるデータ ソースを指定できます。
たとえば、変数 data_source_path を使用してパイプラインの開発、テスト、運用の構成で異なるパスを指定し、次のコードを使用してそれを参照できます。
SQL
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dp.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
このパターンは、インジェスト ロジックが初期インジェスト中にスキーマまたは形式が正しくないデータを処理する方法をテストするのに役立ちます。 データセットを切り替えながら、すべての環境でパイプライン全体で同じコードを使用できます。