Udostępnij za pośrednictwem


Zagadnienia dotyczące produkcji związane ze strukturalnym przesyłaniem strumieniowym

Ta strona zawiera zalecenia dotyczące planowania obciążeń przesyłania strumieniowego ze strukturą przy użyciu zadań w Azure Databricks.

Usługa Databricks zaleca, aby zawsze konfigurować następujące elementy:

  • Usuń niepotrzebny kod z notatników, który może zwrócić wyniki, takie jak display i count.
  • Nie uruchamiaj obciążeń Structured Streaming przy użyciu obliczeń o ogólnym przeznaczeniu. Zawsze planuj strumienie jako zadania, korzystając z obliczeń zadaniowych.
  • Planowanie zadań przy użyciu Continuous trybu. Dotyczy to funkcji planowania zadań w Azure Databricks, a nie funkcji Structured Streaming dotyczącej interwału wyzwalania .
  • Nie włączaj automatycznego skalowania obliczeń dla zadań strukturalnego przesyłania strumieniowego.

Niektóre obciążenia korzystają z następujących elementów:

Azure Databricks wprowadził deklaratywne potoki Lakeflow Spark w celu zmniejszenia złożoności zarządzania infrastrukturą produkcyjną dla obciążeń strumieniowego przesyłania danych ze zdefiniowaną strukturą. Databricks zaleca używanie potoków deklaratywnych Lakeflow Spark dla nowych potoków Structured Streaming. Zobacz Potoki deklaratywne platformy Spark w usłudze Lakeflow.

Uwaga

Automatyczne skalowanie zasobów obliczeniowych ma ograniczenia dotyczące zmniejszania rozmiaru klastra dla obciążeń przetwarzania strumieniowego ze zdefiniowaną strukturą. Usługa Databricks zaleca używanie potoków deklaratywnych Lakeflow Spark z ulepszonym automatycznym skalowaniem dla obciążeń przesyłania strumieniowego. Zobacz Optymalizowanie wykorzystania klastra Lakeflow Spark Declarative Pipelines za pomocą automatycznego skalowania.

:::uwaga Bezserwerowe obliczenia

W przypadku obliczeń bezserwerowych obsługiwane są tylko funkcje Trigger.AvailableNow() i Trigger.Once() . Databricks zaleca Trigger.AvailableNow().

W przypadku ciągłego przesyłania strumieniowego na obliczeniach bezserwerowych użyj trybu wyzwalanego i ciągłego potoku w trybie ciągłym.

Zobacz Ograniczenia przesyłania strumieniowego.

:::

Projektuj obciążenia strumieniowe, aby uwzględniały możliwość awarii

Usługa Databricks zaleca zawsze konfigurowanie zadań streamingu do automatycznego restartu po awarii. Niektóre możliwości, w tym ewolucja schematu, wymagają skonfigurowania obciążeń przesyłania strumieniowego ze strukturą w celu automatycznego ponawiania prób. Zobacz Konfigurowanie zadań strumieniowania strukturalnego do ponownego uruchamiania zapytań strumieniowych w przypadku niepowodzenia.

Niektóre operacje, takie jak foreachBatch, zapewniają gwarancje co najmniej jednokrotne zamiast gwarancji dokładnie jednokrotnych. W przypadku tych operacji upewnij się, że potok przetwarzania jest idempotentny. Zobacz Używanie polecenia foreachBatch do zapisu w dowolnych odbiornikach danych.

Uwaga

Po ponownym uruchomieniu zapytania mikropartia przetwarzana podczas poprzedniego uruchomienia. Jeśli twoje zadanie nie powiodło się z powodu błędu braku pamięci lub ręcznie anulowałeś je z powodu zbyt dużego mikrosadowego przetwarzania, może być konieczne zwiększenie mocy obliczeniowej, aby pomyślnie przetworzyć mikrosadę.

Jeśli zmienisz konfiguracje między przebiegami, te konfiguracje zostaną zastosowane do pierwszej nowej partii zaplanowanej. Zobacz Odzyskiwanie po zmianach w zapytaniu Strukturowanego przesyłania strumieniowego.

Kiedy zadanie ponownie się uruchamia?

W ramach zadania Azure Databricks można zaplanować wiele zadań. Podczas konfigurowania zadania przy użyciu wyzwalacza ciągłego nie można ustawić zależności między zadaniami.

Możesz zdecydować się na zaplanowanie wielu strumieni w jednym zadaniu, korzystając z jednego z poniższych podejść.

  • Wiele zadań: Zdefiniuj zadanie obejmujące wiele zadań, które przetwarzają obciążenia strumieniowe przy użyciu wyzwalacza ciągłego.
  • Wiele zapytań: zdefiniuj wiele zapytań przesyłanych strumieniowo w kodzie źródłowym dla jednego zadania.

Można również połączyć te strategie. W poniższej tabeli porównaliśmy te podejścia.

Strategia Wiele zadań Wiele zapytań
Jak współużytkowane są zasoby obliczeniowe? Databricks zaleca wdrożenie zasobów obliczeniowych o odpowiednim rozmiarze do każdego zadania przesyłania strumieniowego. Opcjonalnie możesz udostępniać zasoby obliczeniowe między zadaniami. Wszystkie zapytania współdzielą te same obliczenia. Opcjonalnie można przypisywać zapytania do pul harmonogramu.
Jak są obsługiwane ponawianie prób? Wszystkie zadania muszą zakończyć się niepowodzeniem, zanim praca zostanie ponownie podjęta. Zadanie ponawia próbę, jeśli jakiekolwiek zapytanie zakończy się niepowodzeniem.

Konfigurowanie zadań przesyłania strumieniowego ze strukturą w celu ponownego uruchamiania zapytań przesyłanych strumieniowo w przypadku niepowodzenia

Databricks zaleca skonfigurowanie wszystkich obciążeń przesyłania strumieniowego za pomocą wyzwalacza ciągłego. Zobacz Uruchamianie zadań w sposób ciągły.

Wyzwalacz ciągły domyślnie ma następujące zachowanie:

  • Powstrzymuje więcej niż jednoczesne uruchomienie zadania.
  • Uruchamia nowy przebieg, gdy poprzedni przebieg zakończy się niepowodzeniem.
  • Używa wykładniczego odstępu dla ponownych prób.

Databricks zaleca zawsze używanie zasobów obliczeniowych dla zadań zamiast ogólnych zasobów obliczeniowych podczas planowania przepływów pracy. W przypadku niepowodzenia zadania i ponawiania próby nowe zasoby obliczeniowe są wdrażane.

Uwaga

Usługa Databricks zaleca, aby nie używać streamingQuery.awaitTermination() ani spark.streams.awaitAnyTermination(). Zobacz Kiedy używać awaitTermination().

Kiedy należy używać awaitTermination()

streamingQuery.awaitTermination() i spark.streams.awaitAnyTermination() blokują bieżący wątek do momentu zakończenia przesyłanego strumieniowo zapytania. To, czy używać tych funkcji, zależy od środowiska wykonawczego.

W przypadku zadań usługi Databricks nie należy używać streamingQuery.awaitTermination() lub spark.streams.awaitAnyTermination(). Te funkcje nie są niezbędne, ponieważ usługa Zadań automatycznie uniemożliwia zakończenie przebiegu, gdy zapytanie strumieniowe jest aktywne. Obie funkcje blokują wykonanie komórek notatnika i uniemożliwiają usłudze Jobs śledzenie zapytania przesyłania strumieniowego, co zakłóca metryki zadań w kolejce i powiadomienia o zadaniach.

Użyj awaitTermination() w następujących przypadkach:

Przypadek użycia Zachowanie
Interaktywne notebooki w uniwersalnych obliczeniach awaitTermination() utrzymuje działanie komórki, pozwala monitorować stan zapytania i zapewnia, że błędy są widoczne w danych wyjściowych notesu.
Środowiska lokalne i programistyczne Podczas lokalnego uruchamiania programu Spark proces kończy się po zakończeniu głównego wątku. Wywołaj awaitTermination() aby utrzymać działanie programu aż do zakończenia lub niepowodzenia zapytania strumieniowego.
Propagacja błędu do sterownika Bez awaitTermination() parametru niepowodzenie zapytania strumieniowego w kontekście niezwiązanym bezpośrednio z zadaniem może nie być propagowane do wątku wywołującego. Zapytanie może zakończyć się niepowodzeniem w trybie dyskretnym, co utrudnia wykrywanie i diagnozowanie błędów. Wywołanie awaitTermination() ponownie rzuca wyjątek zapytania na sterowniku.

Użyj pul harmonogramu dla wielu zapytań strumieniowych

Pule harmonogramu można skonfigurować tak, aby przypisywać pojemność obliczeniową do zapytań podczas uruchamiania wielu zapytań przesyłanych strumieniowo z tego samego kodu źródłowego.

Domyślnie wszystkie zapytania uruchomione w notesie są uruchamiane w tej samej puli równomiernego przydzielania zasobów. Zadania platformy Apache Spark generowane przez wyzwalacze ze wszystkich zapytań przesyłanych strumieniowo w notesie są uruchamiane jeden po drugim w kolejności "pierwszy na wejściu, pierwszy na wyjściu" (FIFO). Może to spowodować niepotrzebne opóźnienia w zapytaniach, ponieważ nie współdzielą zasobów klastra.

Pule harmonogramów umożliwiają deklarowanie, które ustrukturyzowane zapytania przesyłania strumieniowego współdzielą zasoby obliczeniowe.

Poniższy przykład przypisuje query1 do dedykowanej puli, natomiast query2 i query3 współdzielą wspólną pulę harmonogramu.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Uwaga

Konfiguracja lokalnych właściwości musi znajdować się w tej samej komórce notesu, w której rozpoczynasz zapytanie strumieniowe.

Aby uzyskać więcej informacji na temat pul harmonogramów fair platformy Apache, zobacz dokumentację usługi Apache Fair Scheduler.