Udostępnij przez


Konfigurowanie interwałów wyzwalacza strumieniowania ustrukturyzowanego

W tym artykule wyjaśniono, jak skonfigurować interwały wyzwalaczy dla przesyłania strumieniowego ze strukturą w usłudze Azure Databricks.

Strukturalne przesyłanie strumieniowe Apache Spark przetwarza dane przyrostowo. Interwały wyzwalaczy kontrolują częstotliwość sprawdzania przesyłania strumieniowego ze strukturą pod kątem nowych danych. Interwały wyzwalaczy można skonfigurować na potrzeby przetwarzania niemal w czasie rzeczywistym, zaplanowanego odświeżania bazy danych lub przetwarzania wsadowego wszystkich nowych danych przez dzień lub tydzień.

Ponieważ co to jest funkcja automatycznego ładowania? używa przesyłania strumieniowego ze strukturą do ładowania danych, zrozumienie, jak działają wyzwalacze, zapewnia największą elastyczność kontrolowania kosztów podczas pozyskiwania danych z żądaną częstotliwością.

Omówienie trybów wyzwalacza

W poniższej tabeli podsumowano tryby uruchamiania dostępne w strumieniowaniu ze strukturą.

Tryb wyzwalacza Przykład składni (Python) Najlepsze dla
Nieokreślony (ustawienie domyślne) N/A Przesyłanie strumieniowe ogólnego przeznaczenia z opóźnieniem 3–5 sekund. Odpowiednik wyzwalacza processingTime z interwałami 0 ms. Przetwarzanie strumienia jest uruchamiane w sposób ciągły, o ile pojawią się nowe dane.
Czas przetwarzania .trigger(processingTime='10 seconds') Równoważenie kosztów i wydajności. Zmniejsza obciążenie, uniemożliwiając systemowi zbyt częste sprawdzanie danych.
Dostępne teraz .trigger(availableNow=True) Zaplanowane przyrostowe przetwarzanie wsadowe. Przetwarza tyle danych, ile jest dostępnych w momencie uruchomienia zadania streamingu.
Tryb czasu rzeczywistego .trigger(realTime='5 minutes') Obciążenia operacyjne o bardzo małych opóźnieniach wymagające przetwarzania podrzędnego, takie jak wykrywanie oszustw lub personalizacja w czasie rzeczywistym. Publiczna wersja zapoznawcza. Długość mikropartii wynosi "5 minut". Użyj 5 minut, aby zminimalizować obciążenie na partię, takie jak kompilacja zapytań.
Ciągły .trigger(continuous='1 second') Niewspierane. Jest to funkcja eksperymentalna zawarta w systemie operacyjnym Spark. Zamiast tego użyj trybu czasu rzeczywistego.

processingTime: harmonogram czasowy wyzwalaczy

Strukturalne przesyłanie strumieniowe odnosi się do interwałów wyzwalaczy opartych na czasie jako "mikropartie o stałym przedziale czasowym". Używając słowa kluczowego processingTime , określ czas trwania jako ciąg, taki jak .trigger(processingTime='10 seconds').

Konfiguracja tego interwału określa, jak często system wykonuje kontrole, aby sprawdzić, czy pojawiły się nowe dane. Skonfiguruj czas przetwarzania, aby zrównoważyć wymagania dotyczące opóźnienia i szybkość, z jaką dane docierają do źródła.

AvailableNow: Przyrostowe przetwarzanie wsadowe

Ważne

W środowisku Databricks Runtime 11.3 LTS i późniejszych wersjach Trigger.Once jest przestarzały. Użyj Trigger.AvailableNow do wszystkich obciążeń przetwarzania wsadowego przyrostowego.

Opcja AvailableNow wyzwalacza przetwarza wszystkie dostępne rekordy jako partię przyrostową, z możliwością skonfigurowania rozmiaru tej partii za pomocą opcji takich jak maxBytesPerTrigger. Opcje określania rozmiaru różnią się w zależności od źródła danych.

Obsługiwane źródła danych

Usługa Azure Databricks obsługuje używanie Trigger.AvailableNow do przetwarzania przyrostowego wsadowego z wielu źródeł strumieniowania o strukturze. Poniższa tabela zawiera minimalną obsługiwaną wersję środowiska Databricks Runtime wymaganą dla każdego źródła danych:

Źródło Minimalna wersja środowiska Databricks Runtime
Źródła plików (JSON, Parquet itp.) 9.1 Długoterminowe Wsparcie (LTS)
Delta Lake 10.4 Długoterminowe Wsparcie (LTS)
Automatyczny ładownik 10.4 Długoterminowe Wsparcie (LTS)
Apache Kafka 10.4 Długoterminowe Wsparcie (LTS)
Kineza 13,1

RealTime: obciążenia operacyjne o bardzo małych opóźnieniach

Ważne

Ta funkcja jest dostępna w publicznej wersji testowej.

Tryb czasu rzeczywistego dla Structured Streaming osiąga opóźnienie od końca do końca poniżej 1 sekundy, a w typowych przypadkach około 300 ms. Aby uzyskać więcej informacji na temat efektywnego konfigurowania i używania trybu czasu rzeczywistego, zobacz Tryb czasu rzeczywistego w Strukturalnym Przesyłaniu Strumieniowym.

Platforma Apache Spark ma dodatkowy interwał wyzwalacza znany jako Przetwarzanie ciągłe. Ten tryb został sklasyfikowany jako eksperymentalny od platformy Spark 2.3. Usługa Azure Databricks nie obsługuje ani nie zaleca tego trybu. Zamiast tego należy używać trybu czasu rzeczywistego w przypadku przypadków użycia o małych opóźnieniach.

Uwaga

Tryb ciągłego przetwarzania na tej stronie nie jest związany z ciągłym przetwarzaniem w Deklaratywnych Potokach Lakeflow Spark.

Zmień interwały wyzwalacza między przebiegami

Można zmieniać czas pomiędzy uruchomieniami przy użyciu tego samego punktu kontrolnego.

Zachowanie podczas zmieniania interwałów

Jeśli zadanie strumieniowe z użyciem struktury zostanie zatrzymane podczas przetwarzania mikropartii, ta mikropartia musi zostać ukończona, zanim nowy interwał wyzwalacza zostanie zastosowany. W związku z tym możesz obserwować przetwarzanie mikropartii z wcześniej określonymi ustawieniami po zmianie interwału wyzwalacza. Poniżej opisano oczekiwane zachowanie podczas przechodzenia:

  • Przejście z interwału opartego na czasie do AvailableNow: partia mikrosadowa może rozpocząć przetwarzanie przed wszystkimi dostępnymi rekordami, jako przyrostowa partia.

  • Przejście z AvailableNow na interwał czasowy: przetwarzanie może być kontynuowane dla wszystkich rekordów, które były dostępne w momencie wyzwolenia ostatniego AvailableNow zadania. Jest to oczekiwane zachowanie.

Odzyskiwanie po błędach zapytań

Uwaga

Jeśli próbujesz odzyskać sprawność po niepowodzeniu zapytania skojarzonego z partią przyrostową, zmiana interwału wyzwalacza nie rozwiąże tego problemu, ponieważ partia musi być nadal ukończona. Przeskaluj w górę pojemność obliczeniową używaną do przetwarzania partii, aby spróbować rozwiązać ten problem. W rzadkich przypadkach może być konieczne ponowne uruchomienie strumienia przy użyciu nowego punktu kontrolnego.