Udostępnij przez


Używanie narzędzia ForEachBatch do zapisywania w dowolnych ujściach danych w potokach

Ważne

Interfejs foreach_batch_sink API jest w publicznej wersji zapoznawczej.

Odbiornik ForEachBatch umożliwia przetwarzanie strumienia jako serii mikropartii. Każda seria może być przetwarzana w języku Python przy użyciu własnej logiki podobnej do strumieniowania ze strukturą Apache Spark foreachBatch. Za pomocą potoków deklaratywnych platformy Lakeflow (SDP) ForEachBatch można przekształcać, scalać lub zapisywać dane przesyłane strumieniowo do co najmniej jednego miejsca docelowego, które nie obsługują natywnie zapisów przesyłanych strumieniowo. Ta strona przeprowadzi Cię przez proces konfigurowania ujścia ForEachBatch, zawiera przykłady i omawia kluczowe zagadnienia.

Ujście ForEachBatch zapewnia następujące funkcje:

  • Niestandardowa logika dla każdej mikropartii: ForEachBatch to elastyczny konsument przesyłania strumieniowego. Możesz zastosować dowolne akcje (takie jak scalanie z tabelą zewnętrzną, zapisywanie w wielu miejscach docelowych lub wykonywanie operacji upsert) przy użyciu kodu w języku Python.
  • Obsługa pełnego odświeżania: Ciągi przetwarzania zarządzają punktami kontrolnymi dla poszczególnych przepływów, dzięki czemu punkty kontrolne są resetowane automatycznie podczas pełnego odświeżania ciągu przetwarzania. W przypadku ujścia ForEachBatch odpowiadasz za zarządzanie resetowaniem danych podrzędnych w takim przypadku.
  • Obsługa katalogu Unity: Element docelowy ForEachBatch obsługuje wszystkie funkcje katalogu Unity, takie jak odczytywanie z woluminów lub tabel katalogu Unity albo zapisywanie do nich.
  • Ograniczone przechowywanie: potok nie śledzi, jakie dane są zapisywane z ujścia ForEachBatch, więc nie można wyczyścić tych danych. Ponosisz odpowiedzialność za wszelkie podrzędne zarządzanie danymi.
  • Wpisy dziennika zdarzeń: dziennik zdarzeń potoku rejestruje tworzenie i użycie każdego zbiornika ForEachBatch. Jeśli funkcja języka Python nie jest serializowalna, w dzienniku zdarzeń zostanie wyświetlony wpis ostrzegawczy z dodatkowymi sugestiami.

Uwaga / Notatka

  • Ujście ForEachBatch jest przeznaczone do zapytań przesyłanych strumieniowo, takich jak append_flow. Nie jest przeznaczony dla potoków wyłącznie wsadowych ani semantyki AutoCDC.
  • Odbiornik ForEachBatch opisany na tej stronie jest przeznaczony dla potoków. Strukturalne Przesyłanie Strumieniowe Apache Spark obsługuje również foreachBatch. Aby uzyskać informacje na temat Strukturowanego przesyłania strumieniowego foreachBatch, zobacz Używanie funkcji foreachBatch do zapisywania w dowolnych ujściach danych.

Kiedy należy użyć ujścia ForEachBatch

Użyj ujścia ForEachBatch za każdym razem, gdy potok wymaga funkcji, które nie są dostępne za pomocą wbudowanego formatu ujścia, takiego jak delta, lub kafka. Typowe przypadki użycia to:

  • Scalanie lub upsertowanie w tabeli Delta Lake: uruchamianie niestandardowej logiki scalania dla każdej mikropartii (na przykład obsługa zaktualizowanych rekordów).
  • Zapisywanie w wielu lub nieobsługiwanych miejscach docelowych: zapisz dane wyjściowe każdej partii w wielu tabelach lub zewnętrznych systemach magazynowania, które nie obsługują zapisów przesyłanych strumieniowo (takich jak niektóre ujścia JDBC).
  • Stosowanie niestandardowej logiki lub przekształceń: bezpośrednie manipulowanie danymi w języku Python (na przykład przy użyciu wyspecjalizowanych bibliotek lub zaawansowanych przekształceń).

Aby uzyskać informacje o wbudowanych ujściach lub tworzeniu niestandardowych ujść za pomocą języka Python, zobacz Sinks in Lakeflow Spark Deklaratative Pipelines (Ujścia w potokach deklaratywnych platformy Lakeflow).

Składnia

Użyj dekoracji @dp.foreach_batch_sink(), aby wygenerować ujście ForEachBatch. Następnie można odwoływać się do tego jako elementu target w definicji przepływu, na przykład w pliku @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Parameter Description
name Opcjonalny. Unikatowa nazwa identyfikująca ujście w potoku. Wartość domyślna to nazwa funkcji zdefiniowanej przez użytkownika, jeśli nie jest dołączona.
batch_handler Jest to funkcja zdefiniowana przez użytkownika (UDF), która będzie wywoływana dla każdej mikropartii.
Df Ramka danych Spark zawierająca dane dla bieżącej mikropartii.
batch_id Identyfikator całkowitoliczbowy mikropartii. Spark zwiększa ten identyfikator dla każdego interwału wyzwalania.
batch_id z 0 oznacza początek strumienia lub początek pełnego odświeżania. Kod foreach_batch_sink powinien prawidłowo obsługiwać pełne odświeżanie dla podrzędnych źródeł danych. Aby uzyskać więcej informacji, zobacz następną sekcję.

Pełne odświeżanie

Ponieważ narzędzie ForEachBatch używa zapytania strumieniowego, potok śledzi katalog punktu kontrolnego dla każdego przepływu. Po pełnym odświeżeniu:

  • Katalog punktu kontrolnego jest resetowany.
  • Funkcja "sink" (foreach_batch_sink UDF) zauważa zupełnie nowy cykl batch_id rozpoczynający się od 0.
  • Dane w docelowym systemie nie są automatycznie czyszczone przez rurociąg (ponieważ rurociąg nie wie, gdzie dane są zapisywane). Jeśli potrzebujesz scenariusza czyszczenia łupków, musisz ręcznie usunąć lub obcinać zewnętrzne tabele lub lokalizacje wypełnione przez ujście ForEachBatch.

Korzystanie z funkcji Unity Catalog

Wszystkie istniejące możliwości Unity Catalogu w ramach Spark Structured Streaming foreach_batch_sink pozostają dostępne.

Obejmuje to zapisywanie w zarządzanych lub zewnętrznych tabelach Unity Catalog. Mikrosady można zapisywać w tabelach zarządzanych przez Unity Catalog lub tabelach zewnętrznych dokładnie tak, jak w przypadku dowolnego zadania Structured Streaming w Apache Spark.

Wpisy dziennika zdarzeń

Podczas tworzenia ujścia ForEachBatch SinkDefinition zdarzenie z elementem "format": "foreachBatch" jest dodawane do dziennika zdarzeń potoku.

Dzięki temu można śledzić użycie zlewów ForEachBatch i wyświetlać ostrzeżenia dotyczące ujścia.

Korzystanie z Databricks Connect

Jeśli dostarczana funkcja nie jest serializowalna (ważne wymaganie dla usługi Databricks Connect), dziennik zdarzeń zawiera WARN wpis zalecający uproszczenie lub refaktoryzowanie kodu, jeśli wymagana jest obsługa programu Databricks Connect.

Na przykład, jeśli używasz dbutils do pobierania parametrów w UDF forEachBatch, możesz zamiast tego pobrać argument przed jego użyciem w UDF.

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Najlepsze rozwiązania

  1. Zachowaj zwięzłość funkcji ForEachBatch: unikaj użycia wątków, dużych zależności bibliotek lub rozległych operacji na danych w pamięci. Złożona lub logika oparta na stanach może powodować błędy serializacji lub zatory wydajności.
  2. Monitoruj folder punktu kontrolnego: w przypadku zapytań przesyłanych strumieniowo protokół SDP zarządza punktami kontrolnymi na podstawie przepływu, a nie za pomocą ujścia. Jeśli masz wiele przepływów w pipeline, każdy przepływ ma własny katalog checkpointów.
  3. Zweryfikuj zależności zewnętrzne: jeśli korzystasz z systemów zewnętrznych lub bibliotek, sprawdź, czy są one zainstalowane na wszystkich węzłach klastra lub w kontenerze.
  4. Zwróć uwagę na Databricks Connect: jeśli środowisko może przejść do Databricks Connect w przyszłości, sprawdź, czy kod jest serializowalny i nie korzysta z dbutils w ramach foreach_batch_sink UDF.

Ograniczenia

  • Brak mechanizmu sprzątania dla ForEachBatch: ponieważ niestandardowy kod języka Python może zapisywać dane w dowolnym miejscu, potok nie może sprzątać ani śledzić tych danych. Musisz zarządzać swoimi zasadami zarządzania lub przechowywania danych dla docelowych lokalizacji, do których zapisujesz.
  • Metryki w mikropartii: potoki zbierają metryki streamingu, ale niektóre scenariusze mogą powodować niekompletne lub nietypowe metryki podczas korzystania z operacji ForEachBatch. Jest to spowodowane podstawową elastycznością narzędzia ForEachBatch, co sprawia, że śledzenie przepływu danych i wierszy jest trudne dla systemu.
  • Obsługa zapisu w wielu miejscach docelowych bez wielu odczytów: niektórzy klienci mogą użyć narzędzia ForEachBatch do odczytu ze źródła raz, a następnie zapisu w wielu miejscach docelowych. Aby to osiągnąć, należy dołączyć df.persist lub df.cache do funkcji ForEachBatch. Korzystając z tych opcji, usługa Azure Databricks podejmie próbę gotowości danych tylko raz. Bez tych opcji zapytanie spowoduje wiele operacji odczytu. Nie jest to uwzględnione w poniższych przykładach kodu.
  • Używanie z usługą Databricks Connect: jeśli potok działa w usłudze Databricks Connect, foreachBatch funkcje zdefiniowane przez użytkownika muszą być serializowalne i nie mogą używać funkcji dbutils. Potok zgłasza ostrzeżenia, jeśli wykryje nieseryfikowalny UDF, ale nie kończy się niepowodzeniem potoku.
  • Logika niemożliwa do serializacji: Kod odwołujący się do obiektów lokalnych, klas lub nieprzypisalnych zasobów może przerwać konteksty usługi Databricks Connect. Używaj czystych modułów języka Python i upewnij się, że odwołania (na przykład dbutils) nie są używane, jeśli wymagane jest korzystanie z Databricks Connect.

Przykłady

Podstawowy przykład składni

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Używanie przykładowych danych dla prostego potoku

W tym przykładzie użyto przykładu NYC Taxi. Przyjęto założenie, że administrator obszaru roboczego włączył wykaz publicznych zestawów danych usługi Databricks. Dla odbiornika zmień my_catalog.my_schema na katalog i schemat, do którego masz dostęp.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Zapisywanie w wielu miejscach docelowych

W tym przykładzie dane są zapisywane w wielu miejscach docelowych. Demonstruje użycie elementów txnVersion i txnAppId, aby zapisy w tabelach Delta Lake były idempotentne. Aby uzyskać szczegółowe informacje, zobacz Idempotent table writes in foreachBatch.

Załóżmy, że zapisujemy w dwóch tabelach, table_a i table_b, i załóżmy, że w ramach partii zapis do table_a zakończy się powodzeniem, podczas gdy zapis do table_b zakończy się niepowodzeniem. Gdy partia zostanie ponownie uruchomiona, para (txnVersion, txnAppId) umożliwi funkcji Delta zignorowanie zduplikowanego zapisu do table_a, a tylko zapisanie partii na table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Korzystanie z spark.sql()

Możesz użyć spark.sql() w ujściu ForEachBatch, jak w poniższym przykładzie.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Najczęściej zadawane pytania (FAQ)

Czy mogę użyć dbutils w danych wyjściowych ForEachBatch?

Jeśli planujesz uruchamianie potoku w środowisku innego niż Databricks Connect, dbutils może działać. Jeśli jednak używasz narzędzia Databricks Connect, dbutils nie jest dostępny w ramach funkcji foreachBatch . Potok danych może zgłaszać ostrzeżenia, jeśli wykryje użycie dbutils, aby pomóc uniknąć awarii.

Czy można używać wielu przepływów z jednym ujściem ForEachBatch?

Tak. Można zdefiniować wiele przepływów (z @dp.append_flow), które mają taką samą nazwę ujścia, ale każda z nich obsługuje własne punkty kontrolne.

Czy potok obsługuje przechowywanie danych lub oczyszczanie dla mojego miejsca docelowego?

Nie. Ponieważ odbiornik ForEachBatch może zapisywać dane w dowolnej lokalizacji lub systemie, proces nie może automatycznie zarządzać ani usuwać danych w tym miejscu docelowym. Te operacje należy obsługiwać w ramach niestandardowego kodu lub procesów zewnętrznych.

Jak rozwiązywać problemy z błędami serializacji lub błędami w funkcji ForEachBatch?

Przejrzyj dzienniki sterowników klastra lub dzienniki zdarzeń pipeline'u. W przypadku problemów z serializacji związanych z programem Spark Connect sprawdź, czy funkcja zależy tylko od obiektów języka Python z możliwością serializacji i nie odwołuje się do niedozwolonych obiektów (takich jak otwarte dojścia plików lub dbutils).