Eğitim: Birden fazla akışı farklı parametrelerle oluşturma

İşlem hattı, yalnızca birkaç parametreyle farklılık gösteren, neredeyse aynı olan birden çok akış içerebilir. Bu akışları açıkça tanımlamak hataya açık, yedekli ve bakımı zordur. Python iç işlevlerle meta programlama, dinamik olarak yinelenen akışlar oluşturur ve her çağrı farklı bir parametre kümesi sağlar.

Genel bakış

Lakeflow Spark Bildirimli İşlem Hatlarında meta programlama, Python iç işlevleri kullanır. Bu işlevler işlem hattı çalışma zamanı tarafından gevşek bir şekilde değerlendirildiğinden, dekoratörleri bir fabrika işlevinin içine sarmalayabilir @dp.table ve bu fabrikayı farklı bağımsız değişkenlerle birden çok kez çağırabilirsiniz. Her çağrı, kodu çoğaltmadan yeni bir akış kaydeder.

Lakeflow Spark Bildirimli İşlem Hatları ile döngüleri kullanma for hakkında ayrıntılı bilgi için bkz. Döngüde for tablo oluşturma.

Örnek: İtfaiye yanıt süreleri

Aşağıdaki örnek, her çağrı türü için en hızlı acil durum yanıt sürelerine sahip mahalleleri bulmak için yerleşik itfaiye veri kümesini kullanır. Meta programlama olmadan, her çağrı türü (Alarmlar, Yapı Yangın, Tıbbi Olay) için neredeyse aynı tablo tanımları yazmanız gerekir. Meta programlama ile tek bir fabrika işlevi bunların tümünü oluşturur.

1. Adım: Ham alma tablosunu tanımlama

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

2. Adım: Akış fabrikası işlevini tanımlama

generate_tables Factory işlevi her çağrı türü için iki tablo kaydeder: filtrelenmiş bir çağrı tablosu ve dereceli yanıt süresi tablosu. Her ikisi de @dp.table ile süslenmiş iç işlevler olarak oluşturulmuştur.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

3. Adım: Fabrikayı çağırma ve özet tablosunu tanımlama

Her çağrı türü için fabrikayı bir kez çağırın, ardından tüm kategorilerde en sık görünen mahalleleri bulmak için sonuçları bir araya getiren bir özet tablo tanımlayın.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Bu işlem hattını çalıştırdıktan sonra, aşağıdaki grafik gibi benzer tablolardan oluşan bir küme oluşturacaksınız:

Bu öğretici tarafından oluşturulan tabloların grafiği.

Temel kavramlar

  • İç işlevler gizli olarak kaydedilir: @dp.table Dekoratör işlevi hemen çalıştırmaz. İşlevi işlem hattı çalışma zamanına kaydeder ve yürütme başlamadan önce tam veri akışı grafiğini çözümler.
  • Kapanışlar parametreleri yakalar: Her iç işlev, fabrikaya (call_table, , response_table) geçirilen parametreler üzerinden kapanır, filterböylece her kayıtlı akış kendi yalıtılmış değer kümesini kullanır.
  • Dinamik tablo listeleri: Programatik olarak oluşturulan tablo adlarını izlemek için all_tables gibi bir liste kullanmak, bunlara daha sonra başvurmayı kolaylaştırır (örneğin, birleştirme veya birleşim).

Ek kaynaklar