Odzyskiwanie po niepowodzeniu zapytań przesyłania strumieniowego ze strukturą za pomocą przepływów pracy

Przesyłanie strumieniowe ze strukturą zapewnia odporność na uszkodzenia i spójność danych dla zapytań przesyłanych strumieniowo; korzystając z przepływów pracy usługi Azure Databricks, można łatwo skonfigurować zapytania przesyłania strumieniowego ze strukturą w celu automatycznego ponownego uruchomienia po awarii. Włączenie punktów kontrolnych dla zapytania przesyłania strumieniowego umożliwia ponowne uruchomienie zapytania po niepowodzeniu. Uruchomione ponownie zapytanie jest kontynuowane, gdy niepowodzenie zostało przerwane.

Włączanie tworzenia punktów kontrolnych dla zapytań przesyłania strumieniowego ze strukturą

Usługa Databricks zaleca, aby przed rozpoczęciem zapytania zawsze określać checkpointLocation opcję ścieżki magazynu w chmurze. Przykład:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Ta lokalizacja punktu kontrolnego zachowuje wszystkie podstawowe informacje identyfikujące zapytanie. Każde zapytanie musi mieć inną lokalizację punktu kontrolnego. Wiele zapytań nigdy nie powinno mieć tej samej lokalizacji. Aby uzyskać więcej informacji, zobacz Przewodnik programowania przesyłania strumieniowego ze strukturą.

Uwaga

Chociaż checkpointLocation jest to wymagane w przypadku większości typów ujścia danych wyjściowych, niektóre ujścia pamięci, mogą automatycznie generować tymczasową lokalizację punktu kontrolnego, gdy nie podasz checkpointLocationelementu . Te tymczasowe lokalizacje punktów kontrolnych nie zapewniają żadnej odporności na uszkodzenia ani gwarancji spójności danych i mogą nie zostać prawidłowo wyczyszczone. Unikaj potencjalnych pułapek, zawsze określając checkpointLocationelement .

Konfigurowanie zadań przesyłania strumieniowego ze strukturą w celu ponownego uruchamiania zapytań przesyłanych strumieniowo w przypadku niepowodzenia

Zadanie usługi Azure Databricks można utworzyć za pomocą notesu lub pliku JAR zawierającego zapytania przesyłania strumieniowego i skonfigurować je w taki sposób:

  • Zawsze używaj nowego klastra.
  • Zawsze ponów próbę po awarii.

Automatyczne ponowne uruchamianie po niepowodzeniu zadania jest szczególnie ważne podczas konfigurowania obciążeń przesyłania strumieniowego przy użyciu ewolucji schematu. Ewolucja schematu działa w usłudze Azure Databricks, zgłaszając oczekiwany błąd po wykryciu zmiany schematu, a następnie prawidłowo przetwarzając dane przy użyciu nowego schematu po ponownym uruchomieniu zadania. Usługa Databricks zaleca zawsze konfigurowanie zadań przesyłania strumieniowego zawierających zapytania z ewolucją schematu w celu automatycznego ponownego uruchamiania w przepływach pracy usługi Databricks.

Zadania mają ścisłą integrację z interfejsami API przesyłania strumieniowego ze strukturą i mogą monitorować wszystkie zapytania przesyłane strumieniowo aktywne w przebiegu. Ta konfiguracja gwarantuje, że jeśli którakolwiek część zapytania zakończy się niepowodzeniem, zadania automatycznie zakończą przebieg (wraz ze wszystkimi innymi zapytaniami) i uruchom nowy przebieg w nowym klastrze. Spowoduje to ponowne uruchomienie notesu lub kodu JAR i ponowne uruchomienie wszystkich zapytań. Jest to najbezpieczniejszy sposób powrotu do dobrego stanu.

Uwaga

  • Niepowodzenie w dowolnym z aktywnych zapytań przesyłania strumieniowego powoduje niepowodzenie aktywnego przebiegu i zakończenie wszystkich pozostałych zapytań przesyłania strumieniowego.
  • Nie musisz używać streamingQuery.awaitTermination() ani spark.streams.awaitAnyTermination() na końcu notesu. Zadania automatycznie uniemożliwiają ukończenie przebiegu, gdy zapytanie przesyłane strumieniowo jest aktywne.
  • Usługa Databricks zaleca używanie zadań zamiast %run i dbutils.notebook.run() podczas organizowania notesów przesyłania strumieniowego ze strukturą. Zobacz Uruchamianie notesu usługi Databricks z innego notesu.

Poniżej przedstawiono przykład zalecanej konfiguracji zadania.

  • Klaster: ustaw tę opcję zawsze, aby używać nowego klastra i używać najnowszej wersji platformy Spark (lub co najmniej w wersji 2.1). Zapytania uruchomione na platformie Spark 2.1 lub nowszym można odzyskać po uaktualnieniu wersji zapytań i platformy Spark.
  • Powiadomienia: ustaw tę opcję, jeśli chcesz otrzymywać powiadomienia e-mail dotyczące niepowodzeń.
  • Harmonogram: nie ustawiaj harmonogramu.
  • Limit czasu: nie ustawiaj limitu czasu. Zapytania przesyłane strumieniowo są uruchamiane przez nieokreślony czas.
  • Maksymalna liczba współbieżnych przebiegów: ustaw wartość 1. Jednocześnie musi istnieć tylko jedno wystąpienie każdego zapytania.
  • Ponawianie prób: ustaw wartość Nieograniczone.

Zobacz Tworzenie i uruchamianie zadań usługi Azure Databricks, aby zrozumieć te konfiguracje.

Odzyskiwanie po zmianach w zapytaniu przesyłania strumieniowego ze strukturą

Istnieją ograniczenia dotyczące zmian w zapytaniu przesyłanym strumieniowo między ponownymi uruchomieniami z tej samej lokalizacji punktu kontrolnego. Poniżej przedstawiono kilka zmian, które nie są dozwolone lub efekt zmiany nie jest dobrze zdefiniowany. Dla wszystkich z nich:

  • Termin dozwolony oznacza, że można wykonać określoną zmianę, ale czy semantyka jej efektu jest dobrze zdefiniowana, zależy od zapytania i zmiany.
  • Termin niedozwolony oznacza, że nie należy wykonywać określonej zmiany, ponieważ ponownie uruchomione zapytanie prawdopodobnie nie powiedzie się z nieprzewidywalnymi błędami.
  • sdf reprezentuje przesyłania strumieniowego ramki danych/zestawu danych wygenerowanego za pomocą polecenia sparkSession.readStream.

Typy zmian w zapytaniach przesyłania strumieniowego ze strukturą

  • Zmiany w liczbie lub typie (czyli innym źródle) źródeł wejściowych: nie jest to dozwolone.
  • Zmiany parametrów źródeł wejściowych: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od źródła i zapytania. Oto kilka przykładów:
    • Dodawanie, usuwanie i modyfikowanie limitów szybkości jest dozwolone:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      na wartość

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Zmiany w subskrybowanych artykułach i plikach są zwykle niedozwolone, ponieważ wyniki są nieprzewidywalne: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Zmiany w interwale wyzwalacza: wyzwalacze można zmieniać między partiami przyrostowymi i interwałami czasu. Zobacz Zmienianie interwałów wyzwalacza między przebiegami.
  • Zmiany typu ujścia danych wyjściowych: zmiany między kilkoma konkretnymi kombinacjami ujścia są dozwolone. Należy to zweryfikować na podstawie wielkości liter. Oto kilka przykładów:
    • Ujście pliku do ujścia platformy Kafka jest dozwolone. Platforma Kafka będzie widzieć tylko nowe dane.
    • Ujście platformy Kafka do ujścia plików nie jest dozwolone.
    • Ujście platformy Kafka zostało zmienione na foreach lub na odwrót jest dozwolone.
  • Zmiany parametrów ujścia danych wyjściowych: czy jest to dozwolone i czy semantyka zmiany jest dobrze zdefiniowana, zależy od ujścia i zapytania. Oto kilka przykładów:
    • Zmiany w katalogu wyjściowym ujścia pliku nie są dozwolone: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Zmiany w temacie wyjściowym są dozwolone: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Zmiany ujścia zdefiniowanego przez użytkownika (czyli ForeachWriter kodu) są dozwolone, ale semantyka zmiany zależy od kodu.
  • Zmiany w operacjach projekcji/filtru/mapowania: niektóre przypadki są dozwolone. Na przykład: .
    • Dodawanie/usuwanie filtrów jest dozwolone: sdf.selectExpr("a") do sdf.where(...).selectExpr("a").filter(...).
    • Zmiany w projekcjach z tym samym schematem wyjściowym są dozwolone: sdf.selectExpr("stringColumn AS json").writeStream do sdf.select(to_json(...).as("json")).writeStream.
    • Zmiany w projekcjach z innym schematem danych wyjściowych są warunkowo dozwolone: sdf.selectExpr("a").writeStream dozwolone sdf.selectExpr("b").writeStream jest tylko wtedy, gdy ujście danych wyjściowych zezwala na zmianę schematu z "a" na "b".
  • Zmiany w operacjach stanowych: niektóre operacje w zapytaniach przesyłania strumieniowego muszą obsługiwać dane stanu w celu ciągłego aktualizowania wyniku. Przesyłanie strumieniowe ze strukturą automatycznie określa dane stanu do magazynu odpornego na błędy (na przykład DBFS, Azure Blob Storage) i przywraca je po ponownym uruchomieniu. Zakłada się jednak, że schemat danych stanu pozostaje taki sam w przypadku ponownych uruchomień. Oznacza to, że wszelkie zmiany (czyli dodatki, usunięcia lub modyfikacje schematu) do stanowych operacji zapytania przesyłania strumieniowego nie są dozwolone między ponownymi uruchomieniami. Oto lista operacji stanowych, których schemat nie powinien być zmieniany między ponownymi uruchomieniami w celu zapewnienia odzyskiwania stanu:
    • Agregacja przesyłania strumieniowego: na przykład sdf.groupBy("a").agg(...). Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone.
    • Deduplikacja przesyłania strumieniowego: na przykład sdf.dropDuplicates("a"). Wszelkie zmiany w liczbie lub typie kluczy grupowania lub agregacji nie są dozwolone.
    • Sprzężenia strumienia strumienia: na przykład sdf1.join(sdf2, ...) (tj. oba dane wejściowe są generowane za pomocą polecenia sparkSession.readStream). Zmiany w kolumnach schematu lub łączenia równoczesnych są niedozwolone. Zmiany typu sprzężenia (zewnętrzne lub wewnętrzne) są niedozwolone. Inne zmiany w warunku sprzężenia są źle zdefiniowane.
    • Dowolna operacja stanowa: na przykład sdf.groupByKey(...).mapGroupsWithState(...) lub sdf.groupByKey(...).flatMapGroupsWithState(...). Wszelkie zmiany schematu stanu zdefiniowanego przez użytkownika i typ limitu czasu nie są dozwolone. Każda zmiana w funkcji mapowania stanu zdefiniowanego przez użytkownika jest dozwolona, ale semantyczny wpływ zmiany zależy od logiki zdefiniowanej przez użytkownika. Jeśli naprawdę chcesz obsługiwać zmiany schematu stanu, możesz jawnie kodować/dekodować złożone struktury danych stanu do bajtów przy użyciu schematu kodowania/dekodowania, który obsługuje migrację schematu. Jeśli na przykład zapiszesz stan jako bajty zakodowane w formacie Avro, możesz zmienić schemat Avro-state-między ponownym uruchomieniem zapytania, ponieważ spowoduje to przywrócenie stanu binarnego.