Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule omówiono używanie funkcji foreachBatch
przesyłania strumieniowego ze strukturą do zapisywania danych wyjściowych zapytania przesyłania strumieniowego do źródeł danych, które nie mają istniejącego ujścia przesyłania strumieniowego.
Wzorzec streamingDF.writeStream.foreachBatch(...)
kodu umożliwia zastosowanie funkcji wsadowych do danych wyjściowych każdej mikrosadowej zapytania przesyłania strumieniowego. Funkcje używane z foreachBatch
przyjmują dwa parametry:
- Ramka danych zawierająca dane wyjściowe mikrosadowej.
- Unikatowy identyfikator mikrosadowej partii.
Operacje scalania usługi Delta Lake należy używać foreachBatch
w przesyłania strumieniowego ze strukturą. Zobacz Upsert z zapytań przesyłanych strumieniowo przez foreachBatch
.
Stosowanie dodatkowych operacji ramki danych
Wiele operacji ramek danych i zestawów danych nie jest obsługiwanych w przypadku przesyłania strumieniowego ramek danych, ponieważ platforma Spark nie obsługuje generowania planów przyrostowych w tych przypadkach. Za pomocą foreachBatch()
tych operacji można zastosować niektóre operacje na danych wyjściowych mikrosadowych. Na przykład można użyć foreachBatch()
i operacji SQL MERGE INTO
, aby zapisać wyniki strumieniowych agregacji do tabeli Delta w trybie aktualizacji. Zobacz więcej szczegółów w MERGE INTO.
Ważne
-
foreachBatch()
zapewnia tylko co najmniej jednokrotne gwarancje zapisu. Można jednak użyć argumentubatchId
przekazywanego do funkcji jako sposobu deduplikacji danych wyjściowych i uzyskania gwarancji dokładnie jednokrotnego wykonania. W obu przypadkach konieczne będzie samodzielne wnioskowanie o semantyce kompleksowej. -
foreachBatch()
nie działa w trybie ciągłego przetwarzania, ponieważ zasadniczo opiera się na mikrosadowym wykonywaniu zapytania przesyłania strumieniowego. Jeśli zapisujesz dane w trybie ciągłym, użyjforeach()
zamiast tego. - W przypadku używania
foreachBatch
z operatorem stanowym ważne jest całkowite użycie każdej partii przed zakończeniem przetwarzania. Zobacz Całkowite zużycie każdej wsadowej ramki danych
Pustą ramkę danych można wywołać za pomocą foreachBatch()
elementu , a kod użytkownika musi być odporny, aby umożliwić właściwą operację. Przykład można znaleźć tutaj:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Zmiany zachowania w foreachBatch
środowisku Databricks Runtime 14.0
W środowisku Databricks Runtime 14.0 lub nowszym w środowisku obliczeniowym skonfigurowanym ze standardowym trybem dostępu obowiązują następujące zmiany zachowania:
-
print()
polecenia zapisują dane wyjściowe w dziennikach sterowników. - Nie można uzyskać dostępu do modułu
dbutils.widgets
podrzędnego wewnątrz funkcji. - Wszystkie pliki, moduły lub obiekty, do których odwołuje się funkcja, muszą być serializowalne i dostępne na platformie Spark.
Ponowne używanie istniejących źródeł danych wsadowych
Za pomocą programu foreachBatch()
można użyć istniejących modułów zapisywania danych wsadowych do ujścia danych, które mogą nie obsługiwać przesyłania strumieniowego ze strukturą. Oto kilka przykładów:
Wiele innych źródeł danych wsadowych może być używanych z programu foreachBatch()
. Zobacz Łączenie ze źródłami danych i usługami zewnętrznymi.
Zapisywanie w wielu lokalizacjach
Jeśli musisz napisać dane wyjściowe zapytania przesyłania strumieniowego w wielu lokalizacjach, usługa Databricks zaleca używanie wielu składników zapisywania przesyłania strumieniowego ze strukturą w celu uzyskania najlepszej równoległości i przepływności.
Służy foreachBatch
do zapisywania w wielu ujściach serializuje wykonywanie zapisów przesyłanych strumieniowo, co może zwiększyć opóźnienie dla każdej mikrosadowej partii.
Jeśli używasz foreachBatch
do zapisu w wielu tabelach Delta, zobacz Idempotentny zapis do tabel w foreachBatch
.
Całkowicie zużywaj każdą ramkę danych wsadowych
Jeśli używasz operatorów stanowych (na przykład przy użyciu dropDuplicatesWithinWatermark
), każda iteracja wsadowa musi korzystać z całej ramki danych lub ponownie uruchomić zapytanie. Jeśli nie przetwarzasz całego DataFrame, zapytanie przesyłania strumieniowego zakończy się niepowodzeniem przy próbie przetworzenia kolejnej partii danych.
Może się to zdarzyć w kilku przypadkach. W poniższych przykładach pokazano, jak naprawić zapytania, które nie używają poprawnie ramki danych.
Celowo przy użyciu podzbioru partii
Jeśli interesuje Cię tylko podzbiór danych, możesz użyć kodu takiego jak poniżej.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
W tym przypadku batch_df.show(2)
obsługuje tylko dwa pierwsze elementy w partii, co jest oczekiwane, ale jeśli istnieje więcej elementów, muszą zostać przetworzone. Poniższy kod używa pełnej ramki danych.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
do_nothing
W tym miejscu funkcja dyskretnie ignoruje resztę ramki danych.
Obsługa błędu w pakiecie
Podczas uruchamiania foreachBatch
procesu może wystąpić błąd. Możesz mieć kod, taki jak poniższy (w tym przypadku przykład celowo zgłasza błąd, aby pokazać problem).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Radzenie sobie z błędem (i jego dyskretne ignorowanie) może spowodować, że reszta partii nie zostanie przetworzona. Istnieją dwie opcje obsługi tej sytuacji.
Najpierw można ponownie wywołać błąd, który przekazuje go do warstwy aranżacji, aby ponowić próbę wsadu. Może to rozwiązać ten błąd, jeśli jest to problem przejściowy lub zgłosić go zespołowi operacyjnemu, aby spróbować ręcznie rozwiązać problem. W tym celu zmień partial_func
kod tak, aby wyglądał następująco:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Drugą opcją, jeśli chcesz wyłapać wyjątek i zignorować pozostałą część partii, jest modyfikacja kodu na poniższy.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Ten kod używa funkcji do_nothing
do cichego ignorowania pozostałej części serii.