Zagadnienia dotyczące produkcji związane ze strukturalnym przesyłaniem strumieniowym

Ta strona zawiera zalecenia dotyczące harmonogramowania obciążeń roboczych Structured Streaming za pomocą Lakeflow Jobs w usłudze Azure Databricks. Zobacz Zadania lakeflow.

Usługa Databricks zaleca, aby zawsze konfigurować następujące elementy:

  • Usuń niepotrzebny kod z notatników, który może zwrócić wyniki, takie jak display i count.
  • Nie uruchamiaj obciążeń Structured Streaming w środowisku obliczeniowym ogólnego przeznaczenia. Zawsze planuj strumienie jako zadania lakeflow przy użyciu obliczeń zadań.
  • Planowanie zadań Lakeflow w trybie Continuousmode. Dotyczy to funkcji planowania zadań w Azure Databricks, a nie funkcji Structured Streaming dotyczącej interwału wyzwalania .
  • Nie włączaj automatycznego skalowania mocy obliczeniowej dla zadań Structured Streaming.

Niektóre obciążenia korzystają z następujących elementów:

Azure Databricks wprowadził deklaratywne potoki Lakeflow Spark w celu zmniejszenia złożoności zarządzania infrastrukturą produkcyjną dla obciążeń strumieniowego przesyłania danych ze zdefiniowaną strukturą. Databricks zaleca używanie potoków deklaratywnych Lakeflow Spark dla nowych potoków Structured Streaming. Zobacz Potoki deklaratywne platformy Spark w usłudze Lakeflow.

Uwaga

Automatyczne skalowanie zasobów obliczeniowych ma ograniczenia dotyczące zmniejszania rozmiaru klastra dla obciążeń przetwarzania strumieniowego ze zdefiniowaną strukturą. Usługa Databricks zaleca używanie potoków deklaratywnych Lakeflow Spark z ulepszonym automatycznym skalowaniem dla obciążeń przesyłania strumieniowego. Zobacz Optymalizowanie wykorzystania klastra Lakeflow Spark Declarative Pipelines za pomocą automatycznego skalowania.

:::uwaga Bezserwerowe obliczenia

W przypadku obliczeń bezserwerowych obsługiwane są tylko funkcje Trigger.AvailableNow() i Trigger.Once() . Databricks zaleca Trigger.AvailableNow().

W przypadku ciągłego przesyłania strumieniowego na obliczeniach bezserwerowych użyj trybu wyzwalanego i ciągłego potoku w trybie ciągłym.

Zobacz Ograniczenia przesyłania strumieniowego.

:::

Projektuj obciążenia strumieniowe, aby uwzględniały możliwość awarii

Databricks zaleca, aby zawsze konfigurować zadania strumieniowe tak, aby były automatycznie uruchamiane ponownie po awarii. Niektóre funkcje, w tym ewolucja schematu, wymagają, aby obciążenia Structured Streaming były automatycznie ponawiane. Zobacz Konfigurowanie zadań strumieniowania strukturalnego do ponownego uruchamiania zapytań strumieniowych w przypadku niepowodzenia.

Niektóre operacje, takie jak foreachBatch, zapewniają gwarancje co najmniej jednokrotne zamiast gwarancji dokładnie jednokrotnych. W przypadku tych operacji upewnij się, że potok przetwarzania jest idempotentny. Zobacz Używanie polecenia foreachBatch do zapisu w dowolnych odbiornikach danych.

Uwaga

Po ponownym uruchomieniu zapytania mikropartia przetwarzana podczas poprzedniego uruchomienia. Jeśli twoje zadanie nie powiodło się z powodu błędu braku pamięci lub ręcznie anulowałeś je z powodu zbyt dużego mikrosadowego przetwarzania, może być konieczne zwiększenie mocy obliczeniowej, aby pomyślnie przetworzyć mikrosadę.

Jeśli zmienisz konfiguracje między przebiegami, te konfiguracje zostaną zastosowane do pierwszej nowej partii zaplanowanej. Zobacz Odzyskiwanie po zmianach w zapytaniu Strukturowanego przesyłania strumieniowego.

Kiedy zadanie ponownie się uruchamia?

W ramach zadania Azure Databricks można zaplanować wiele zadań. Podczas konfigurowania zadania przy użyciu wyzwalacza ciągłego nie można ustawić zależności między zadaniami.

Możesz zdecydować się na zaplanowanie wielu strumieni w jednym zadaniu, korzystając z jednego z poniższych podejść.

  • Wiele zadań: Zdefiniuj zadanie obejmujące wiele zadań, które przetwarzają obciążenia strumieniowe przy użyciu wyzwalacza ciągłego.
  • Wiele zapytań: zdefiniuj wiele zapytań przesyłanych strumieniowo w kodzie źródłowym dla jednego zadania.

Można również połączyć te strategie. W poniższej tabeli porównaliśmy te podejścia.

Strategia Wiele zadań Wiele zapytań
Jak współużytkowane są zasoby obliczeniowe? Usługa Databricks zaleca odpowiednie ustawianie rozmiaru zadań obliczeniowych dla każdego zadania przesyłania strumieniowego. Opcjonalnie możesz udostępniać zasoby obliczeniowe między zadaniami. Wszystkie zapytania współdzielą te same obliczenia. Opcjonalnie można przypisywać zapytania do pul harmonogramu.
Jak są obsługiwane ponawianie prób? Wszystkie zadania muszą zakończyć się niepowodzeniem, zanim praca zostanie ponownie podjęta. Zadanie ponawia próbę, jeśli jakiekolwiek zapytanie zakończy się niepowodzeniem.

Konfigurowanie zadań przesyłania strumieniowego ze strukturą w celu ponownego uruchamiania zapytań przesyłanych strumieniowo w przypadku niepowodzenia

Usługa Databricks zaleca skonfigurowanie wszystkich obciążeń przesyłania strumieniowego przy użyciu wyzwalacza ciągłego. Zobacz Uruchamianie zadań w sposób ciągły.

Wyzwalacz ciągły domyślnie ma następujące zachowanie:

  • Powstrzymuje więcej niż jednoczesne uruchomienie zadania.
  • Uruchamia nowy przebieg, gdy poprzedni przebieg zakończy się niepowodzeniem.
  • Używa wykładniczego odstępu dla ponownych prób.

Databricks zaleca zawsze używanie zasobów obliczeniowych dla zadań zamiast ogólnych zasobów obliczeniowych podczas planowania przepływów pracy. W przypadku niepowodzenia zadania i ponawiania próby nowe zasoby obliczeniowe są wdrażane.

Uwaga

Usługa Databricks zaleca, aby nie używać streamingQuery.awaitTermination() ani spark.streams.awaitAnyTermination(). Zobacz Kiedy używać awaitTermination().

Kiedy należy używać awaitTermination()

streamingQuery.awaitTermination() i spark.streams.awaitAnyTermination() blokują bieżący wątek do momentu zakończenia przesyłanego strumieniowo zapytania. To, czy używać tych funkcji, zależy od środowiska wykonawczego.

W przypadku zadań Lakeflow nie należy używać streamingQuery.awaitTermination() ani spark.streams.awaitAnyTermination(). Te funkcje nie są niezbędne, ponieważ usługa Zadań automatycznie uniemożliwia zakończenie przebiegu, gdy zapytanie strumieniowe jest aktywne. Obie funkcje blokują wykonanie komórek notatnika i uniemożliwiają usłudze Jobs śledzenie zapytania przesyłania strumieniowego, co zakłóca metryki zadań w kolejce i powiadomienia o zadaniach.

Użyj awaitTermination() w następujących przypadkach:

Przypadek użycia Zachowanie
Interaktywne notebooki w uniwersalnych obliczeniach awaitTermination() utrzymuje działanie komórki, pozwala monitorować stan zapytania i zapewnia, że błędy są widoczne w danych wyjściowych notesu.
Środowiska lokalne i programistyczne Podczas lokalnego uruchamiania programu Spark proces kończy się po zakończeniu głównego wątku. Wywołaj awaitTermination() aby utrzymać działanie programu aż do zakończenia lub niepowodzenia zapytania strumieniowego.
Propagacja błędu do sterownika Bez awaitTermination() parametru niepowodzenie zapytania strumieniowego w kontekście niezwiązanym bezpośrednio z zadaniem może nie być propagowane do wątku wywołującego. Zapytanie może zakończyć się niepowodzeniem w trybie dyskretnym, co utrudnia wykrywanie i diagnozowanie błędów. Wywołanie awaitTermination() ponownie powoduje zgłoszenie wyjątku zapytania w sterowniku.

Użyj pul harmonogramu dla wielu zapytań strumieniowych

Pule harmonogramu można skonfigurować tak, aby przypisywać pojemność obliczeniową do zapytań podczas uruchamiania wielu zapytań przesyłanych strumieniowo z tego samego kodu źródłowego.

Domyślnie wszystkie zapytania uruchomione w notesie są uruchamiane w tej samej puli równomiernego przydzielania zasobów. Zadania platformy Apache Spark generowane przez wyzwalacze ze wszystkich zapytań przesyłanych strumieniowo w notesie są uruchamiane jeden po drugim w kolejności "pierwszy na wejściu, pierwszy na wyjściu" (FIFO). Może to spowodować niepotrzebne opóźnienia w zapytaniach, ponieważ nie współdzielą zasobów klastra.

Pule harmonogramów umożliwiają deklarowanie, które ustrukturyzowane zapytania przesyłania strumieniowego współdzielą zasoby obliczeniowe.

Poniższy przykład przypisuje query1 do dedykowanej puli, natomiast query2 i query3 współdzielą wspólną pulę harmonogramu.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Uwaga

Konfiguracja lokalnych właściwości musi znajdować się w tej samej komórce notesu, w której rozpoczynasz zapytanie strumieniowe.

Aby uzyskać więcej informacji na temat pul harmonogramów fair platformy Apache, zobacz dokumentację usługi Apache Fair Scheduler.