Aracılığıyla paylaş


Boru hatları ile parametreleri kullanma

Bu makalede, işlem hatlarınız için parametreleri yapılandırma açıklanmaktadır.

Başvuru parametreleri

Güncelleştirmeler sırasında işlem hattı kaynak kodunuz Spark yapılandırmalarının değerlerini almak için söz dizimini kullanarak işlem hattı parametrelerine erişebilir.

anahtarı kullanarak işlem hattı parametrelerine başvurursunuz. Değer, kaynak kodu mantığınız değerlendirmeden önce kaynak kodunuz içine dize olarak eklenir.

Aşağıdaki örnek söz dizimi, gerçekleştirilmiş bir görünümün veri kaynağını belirtmek için anahtar source_catalog ve değer dev_catalog içeren bir parametre kullanır:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Piton

from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count

@dp.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Parametreleri ayarlama

Parametreleri işlem hatları için yapılandırmalar olarak rastgele anahtar-değer çiftlerini geçirerek işlem hatlarına aktarın. Çalışma alanı kullanıcı arabirimini veya JSON'yi kullanarak işlem hattı yapılandırmasını tanımlar veya düzenlerken parametreleri ayarlayabilirsiniz. Bkz. İşlem Hatlarını Yapılandırma.

İşlem hattı parametre anahtarları yalnızca _ - . veya alfasayısal karakterler içerebilir. Parametre değerleri dize olarak ayarlanır.

İşlem hattı parametreleri dinamik değerleri desteklemez. İşlem hattı yapılandırmasındaki bir anahtarla ilişkili değeri güncelleştirmeniz gerekir.

Önemli

Ayrılmış işlem hattı veya Apache Spark yapılandırma değerleriyle çakışan anahtar sözcükler kullanmayın.

Python veya SQL'de veri kümesi bildirimlerini parametreleştirme

Veri kümelerinizi tanımlayan Python ve SQL kodu, işlem hattının ayarları tarafından parametrelendirilebilir. Parametreleştirme aşağıdaki kullanım örneklerini etkinleştirir:

  • Uzun yolları ve diğer değişkenleri kodunuzdan ayırma.
  • Test sürecini hızlandırmak için geliştirme veya hazırlama ortamlarında işlenen veri miktarını azaltma.
  • Birden çok veri kaynağından işlemek için aynı dönüştürme mantığını yeniden kullanma.

Aşağıdaki örnek, geliştirme işlem hattını giriş verilerinin startDate bir alt kümesiyle sınırlamak için yapılandırma değerini kullanır:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Veri kaynaklarını parametrelerle denetleme

aynı işlem hattının farklı yapılandırmalarında farklı veri kaynakları belirtmek için işlem hattı parametrelerini kullanabilirsiniz.

Örneğin, değişkenini data_source_path kullanarak işlem hattı için geliştirme, test ve üretim yapılandırmalarında farklı yollar belirtebilir ve ardından aşağıdaki kodu kullanarak buna başvurabilirsiniz:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Piton

from pyspark import pipelines as dp
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dp.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Bu desen, ilk alım sırasında alma mantığının şemayı veya hatalı biçimlendirilmiş verileri nasıl işleyebileceğini test etme açısından faydalıdır. Veri kümelerini değiştirirken işlem hattınızın tamamında aynı kodu tüm ortamlarda kullanabilirsiniz.