Menggunakan parameter dengan alur

Artikel ini menjelaskan cara mengonfigurasi parameter untuk alur Anda.

Parameter referensi

Selama pembaruan, kode sumber alur Anda dapat mengakses parameter alur menggunakan sintaks untuk mendapatkan nilai untuk konfigurasi Spark.

Anda mereferensikan parameter alur menggunakan kunci . Nilai disuntikkan ke dalam kode sumber Anda sebagai string sebelum logika kode sumber Anda dievaluasi.

Contoh sintaks berikut menggunakan parameter dengan kunci source_catalog dan nilai dev_catalog untuk menentukan sumber data untuk tampilan materialisasi:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Phyton

from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count

@dp.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Menetapkan parameter

Teruskan parameter ke alur dengan meneruskan pasangan kunci-nilai arbitrer sebagai konfigurasi untuk alur. Anda dapat mengatur parameter saat menentukan atau mengedit konfigurasi alur menggunakan UI ruang kerja atau JSON. Lihat Mengonfigurasi Alur.

Kunci parameter alur hanya dapat berisi _ - . atau karakter alfanumerik. Nilai parameter diatur sebagai string.

Parameter alur tidak mendukung nilai dinamis. Anda harus memperbarui nilai yang terkait dengan kunci dalam konfigurasi pipeline.

Penting

Jangan gunakan kata kunci yang bertentangan dengan alur yang dipesan atau nilai konfigurasi Apache Spark.

Membuat parameter deklarasi himpunan data di Python atau SQL

Kode Python dan SQL yang menentukan himpunan data Anda dapat diparameterkan oleh pengaturan alur. Parameterisasi memungkinkan kasus penggunaan berikut:

  • Memisahkan jalur panjang dan variabel lain dari kode Anda.
  • Mengurangi jumlah data yang diproses dalam lingkungan pengembangan atau penahapan untuk mempercepat pengujian.
  • Menggunakan kembali logika transformasi yang sama untuk diproses dari beberapa sumber data.

Contoh berikut menggunakan startDate nilai konfigurasi untuk membatasi alur pengembangan ke subset data input:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Mengontrol sumber data dengan parameter

Anda dapat menggunakan parameter alur untuk menentukan sumber data yang berbeda dalam konfigurasi yang berbeda dari alur yang sama.

Misalnya, Anda dapat menentukan jalur yang berbeda dalam konfigurasi pengembangan, pengujian, dan produksi untuk alur menggunakan variabel data_source_path lalu mereferensikannya menggunakan kode berikut:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Phyton

from pyspark import pipelines as dp
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dp.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Pola ini bermanfaat untuk menguji bagaimana logika penyerapan mungkin menangani skema atau data cacat selama penyerapan awal. Anda dapat menggunakan kode yang identik di seluruh alur anda di semua lingkungan sambil mengalihkan himpunan data.