Samouczek: tworzenie wielu przepływów z różnymi parametrami

Potok może zawierać wiele przepływów, które są prawie identyczne, różniące się tylko kilkoma parametrami. Jawne zdefiniowanie tych przepływów jest podatne na błędy, nadmiarowe i trudne do utrzymania. Metaprogramowanie za pomocą Python funkcji wewnętrznych generuje dynamiczne powtarzające się przepływy, a każde wywołanie dostarcza inny zestaw parametrów.

Przegląd

Metaprogramowanie w deklaratywnych potokach Lakeflow Spark używa funkcji wewnętrznych Python. Ponieważ te funkcje są oceniane w sposób leniwy przez środowisko wykonawcze potoku, można opakowywać @dp.table dekoratory wewnątrz funkcji fabrycznej i wywoływać tę fabrykę wiele razy z różnymi argumentami. Każde wywołanie rejestruje nowy przepływ bez duplikowania kodu.

Aby uzyskać szczegółowe informacje na temat używania for pętli z potokami deklaratywnymi platformy Spark w usłudze Lakeflow, zobacz Tworzenie tabel w for pętli.

Przykład: czasy odpowiedzi straży pożarnej

W poniższym przykładzie użyto wbudowanego zestawu danych straży pożarnej, aby znaleźć dzielnice z najszybszym czasem reagowania awaryjnego dla każdego typu połączenia. Bez metaprogramowania należy napisać niemal identyczne definicje tabeli dla każdego typu wywołania (Alarmy, Pożar struktury, Incydent medyczny). W przypadku metaprogramowania pojedyncza funkcja fabryczna generuje wszystkie z nich.

Krok 1. Definiowanie nieprzetworzonej tabeli pozyskiwania

import functools
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dp.expect_or_drop("valid_received", "received IS NOT NULL")
@dp.expect_or_drop("valid_response", "responded IS NOT NULL")
@dp.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
      .select('call_type', 'received', 'responded', 'neighborhood')
  )

Krok 2: Zdefiniuj funkcję fabryki przepływu

generate_tables Funkcja fabryki rejestruje dwie tabele dla każdego typu wywołania: odfiltrowaną tabelę wywołań i sklasyfikowaną tabelę czasu odpowiedzi. Oba są tworzone jako funkcje wewnętrzne oznaczone dekoratorem @dp.table.

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dp.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return spark.sql("""
      SELECT
        unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
        unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        neighborhood
      FROM raw_fire_department
      WHERE call_type = '{filter}'
    """.format(filter=filter))

  @dp.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time"
  )
  def create_response_table():
    return spark.sql("""
      SELECT
        neighborhood,
        AVG((ts_received - ts_responded)) as response_time
      FROM {call_table}
      GROUP BY 1
      ORDER BY response_time
      LIMIT 10
    """.format(call_table=call_table))

  all_tables.append(response_table)

Krok 3. Wywołanie fabryki i zdefiniowanie tabeli podsumowania

Wywołaj fabrykę raz dla każdego typu wywołania, a następnie zdefiniuj tabelę podsumowania zawierającą wyniki, aby znaleźć dzielnice, które są najczęściej wyświetlane we wszystkich kategoriach.

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dp.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dp.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x, y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Po uruchomieniu tego potoku utworzysz zestaw podobnych tabel, takich jak ten wykres:

Wykres tabel wygenerowanych przez ten samouczek.

Kluczowe pojęcia

  • Funkcje wewnętrzne są leniwie rejestrowane: @dp.table dekorator nie uruchamia funkcji natychmiast. Rejestruje funkcję w środowisku uruchomieniowym potoku, który rozwiązuje pełny wykres przepływu danych przed rozpoczęciem wykonywania.
  • Zamknięcia przechwytują parametry: Każda funkcja wewnętrzna zamyka się nad parametrami przekazywanymi do fabryki (call_table, response_table, filter), więc każdy zarejestrowany przepływ używa własnego izolowanego zestawu wartości.
  • Dynamiczne listy tabel: używanie listy, takiej jak all_tables, do śledzenia programowo wygenerowanych nazw tabel ułatwia późniejsze odwołanie się do nich (na przykład w łączeniu lub sprzężeniu).

Dodatkowe zasoby