Freigeben über


Wiederherstellen bei Abfragefehlern beim strukturierten Streaming mit Workflows

Strukturiertes Streaming bietet Fehlertoleranz und Datenkonsistenz für Streaming-Abfragen; mit Azure Databricks-Workflows können Sie Ihre strukturierten Streaming-Abfragen einfach so konfigurieren, dass sie bei einem Fehler automatisch neu gestartet werden. Durch Aktivieren der Prüfpunkte für eine Streaming-Abfrage können Sie die Abfrage nach einem Fehler neu starten. Die neu gestartete Abfrage wird dort fortgesetzt, wo die fehlgeschlagene beendet wurde.

Aktivieren erweiterter Prüfpunkte für strukturierte Streaming-Abfragen

Databricks empfiehlt, immer die checkpointLocation Option für einen Cloudspeicher-Pfad anzugeben, bevor Sie die Abfrage starten. Beispiele:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

An diesem Prüfpunkt-Speicherort werden alle wichtigen Informationen aufbewahrt, die eine Abfrage identifizieren. Jede Abfrage muss einen anderen Prüfpunkt-Speicherort haben. Mehrere Abfragen sollten niemals denselben Speicherort haben. Weitere Informationen finden Sie im Programmierleitfaden für strukturiertes Streaming.

Hinweis

Während checkpointLocation für die meisten Arten von Ausgabesenken erforderlich ist, können einige Senken, z. B. Speichersenken, automatisch einen temporären Prüfpunkt-Speicherort generieren, wenn Sie checkpointLocation nicht angeben. Die temporären Prüfpunkt-Speicherorte bieten keine Garantien für Fehlertoleranz oder Datenkonsistenz und werden möglicherweise nicht ordnungsgemäß bereinigt. Vermeiden Sie potenzielle Probleme, indem Sie immer checkpointLocation angeben.

Konfigurieren von strukturierten Streaming-Aufträgen zum Neustarten von Streaming-Abfragen bei einem Fehler

Sie können einen Azure Databricks-Auftrag mit dem Notebook oder JAR-Code erstellen, das bzw. der Ihre Streamingabfragen enthält, und ihn für Folgendes konfigurieren:

  • Es wird immer ein neuer Cluster verwendet.
  • Bei einem Fehler erfolgt immer eine Wiederholung.

Das automatische Neustarten bei Auftragsfehlern ist besonders wichtig beim Konfigurieren von Streamingworkloads mit Schemaentwicklung. Die Schemaentwicklung funktioniert bei Azure Databricks, indem ein erwarteter Fehler ausgelöst wird, wenn eine Schemaänderung erkannt wird, und dann Daten ordnungsgemäß mithilfe des neuen Schemas verarbeitet werden, wenn der Auftrag neu gestartet wird. Databricks empfiehlt immer die Konfiguration von Streamingaufgaben, die Abfragen mit Schemaentwicklung enthalten, um automatisch in Databricks-Workflows neu zu starten.

Aufträge sind eng in APIs für strukturiertes Streaming integriert und können alle Streaming-Abfragen überwachen, die während einer Ausführung aktiv sind. Diese Konfiguration stellt sicher, dass Aufträge die Ausführung (zusammen mit allen anderen Abfragen) automatisch beenden und eine neue Ausführung in einem neuen Cluster starten, wenn bei einem Teil der Abfrage ein Fehler auftritt. Dadurch wird das Notebook oder der JAR-Code erneut ausgeführt und alle Abfragen erneut gestartet. Das ist die sicherste Methode, um zu einem guten Status zurückzukehren.

Hinweis

  • Ein Fehler in einer der aktiven Streamingabfragen führt zu einem Fehler in der aktiven Ausführung und zur Beendigung aller anderen Streamingabfragen.
  • Sie müssen streamingQuery.awaitTermination() oder spark.streams.awaitAnyTermination() nicht am Ende Ihres Notebooks verwenden. Aufträge verhindern automatisch, dass eine Ausführung abgeschlossen wird, wenn eine Streamingabfrage aktiv ist.
  • Databricks empfiehlt beim Orchestrieren von Notebooks mit strukturiertem Streaming die Verwendung von Aufträgen anstelle von %run und dbutils.notebook.run(). Weitere Informationen finden Sie unter Ausführen eines Databricks-Notebooks über ein anderes Notebook.

Nachfolgend finden Sie ein Beispiel für eine empfohlene Auftragskonfiguration.

  • Cluster: Legen Sie diese Einstellung immer auf die Verwendung eines neuen Clusters und der neuesten Spark-Version (oder mindestens Version 2.1) fest. Abfragen, die in Spark 2.1 und höher gestartet wurden, können nach Upgrades der Abfrage und der Spark-Version wiederhergestellt werden.
  • Benachrichtigungen: Legen Sie diese Einstellung fest, wenn Sie eine E-Mail-Benachrichtigung bei Fehlern wünschen.
  • Zeitplan: Legen Sie keinen Zeitplan fest.
  • Timeout: Legen Sie kein Timeout fest. Streamingabfragen werden auf unbestimmte Zeit ausgeführt.
  • Maximale Anzahl gleichzeitiger Ausführungen: Legen Sie diese Einstellung auf 1 fest. Es darf nur eine Instanz jeder Abfrage gleichzeitig aktiv sein.
  • Wiederholungen: Legen Sie die Einstellung auf Unbegrenzt fest.

Informationen zu diesen Konfigurationen finden Sie unter Erstellen und Ausführen von Azure Databricks-Aufträgen.

Wiederherstellen nach Änderungen in einer strukturierten Streaming-Abfrage

Es gibt Einschränkungen, welche Änderungen in einer Streamingabfrage zwischen Neustarts vom gleichen Punktspeicherort aus zulässig sind. Nachfolgend sind einige Änderungen angegeben, die entweder nicht zulässig oder deren Auswirkungen nicht klar definiert sind. Für alle gilt Folgendes:

  • Der Begriff zulässig bedeutet, dass Sie die angegebene Änderung durchführen können, aber ob die Semantik ihrer Auswirkung klar definiert ist, hängt von der Abfrage und der Änderung ab.
  • Der Begriff nicht zulässig bedeutet, dass Sie die angegebene Änderung nicht durchführen sollten, da bei der neu gestarteten Abfrage wahrscheinlich unvorhersehbare Fehler auftreten.
  • sdf steht für einen Streamingdatenrahmen oder ein Streamingdataset, der bzw. das mit sparkSession.readStream generiert wurde.

Arten von Änderungen in strukturierten Streaming-Abfragen

  • Änderungen der Anzahl oder des Typs (d. h. eine unterschiedliche Quelle) von Eingabequellen: Dies ist nicht zulässig.
  • Änderungen an den Parametern von Eingabequellen: Ob dies zulässig ist und ob die Semantik der Änderung klar definiert ist, hängt von der Quelle und der Abfrage ab. Nachstehend finden Sie einige Beispiele:
    • Das Hinzufügen, Löschen und Ändern von Ratenbegrenzungen ist zulässig:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      zu

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Änderungen an abonnierten Artikeln und Dateien sind im Allgemeinen nicht zulässig, da die Ergebnisse unvorhersehbar sind: spark.readStream.format("kafka").option("subscribe", "article") in spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Änderungen im Auslöserintervall: Sie können Trigger zwischen inkrementellen Batches und Zeitintervallen ändern. Siehe Ändern von Auslöserintervallen zwischen Ausführungen.
  • Änderungen am Typ der Ausgabesenke: Änderungen zwischen einigen bestimmten Senkenkombinationen sind zulässig. Dies muss von Fall zu Fall geprüft werden. Nachstehend finden Sie einige Beispiele:
    • Dateisenke in Kafka-Senke ist zulässig. Für Kafka sind nur die neuen Daten sichtbar.
    • Kafka-Senke in Dateisenke ist nicht zulässig.
    • Die Änderung einer Kafka-Senke in eine foreach-Senke oder umgekehrt ist zulässig.
  • Änderungen an den Parametern der Ausgabesenke: Ob dies zulässig ist und ob die Semantik der Änderung klar definiert ist, hängt von der Senke und der Abfrage ab. Nachstehend finden Sie einige Beispiele:
    • Änderungen am Ausgabeverzeichnis einer Dateisenke sind nicht zulässig: sdf.writeStream.format("parquet").option("path", "/somePath") in sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Änderungen am Ausgabethema sind zulässig: sdf.writeStream.format("kafka").option("topic", "topic1") zu sdf.writeStream.format("kafka").option("topic", "topic2")
    • Änderungen an der benutzerdefinierten foreach-Senke (d. h. dem ForeachWriter-Code) sind zulässig, aber die Semantik der Änderung hängt vom Code ab.
  • Änderungen an Projektions-/Filter-/Zuordnungsvorgängen: Einige Fälle sind zulässig. Beispiel:
    • Das Hinzufügen/Löschen von Filtern ist zulässig: sdf.selectExpr("a") in sdf.where(...).selectExpr("a").filter(...).
    • Änderungen in Projektionen mit gleichem Ausgabeschema sind zulässig: sdf.selectExpr("stringColumn AS json").writeStream in sdf.select(to_json(...).as("json")).writeStream.
    • Änderungen in Projektionen mit unterschiedlichem Ausgabeschema sind bedingt zulässig: sdf.selectExpr("a").writeStream in sdf.selectExpr("b").writeStream ist nur zulässig, wenn die Ausgabesenke das Ändern des Schemas von "a" in "b" zulässt.
  • Änderungen an zustandsbehafteten Vorgängen: Bei einigen Vorgängen in Streamingabfragen müssen Zustandsdaten beibehalten werden, um das Ergebnis kontinuierlich zu aktualisieren. Beim strukturierten Streaming werden automatisch Prüfpunkte für die Zustandsdaten in einem fehlertoleranten Speicher erstellt (z. B. DBFS, Azure Blob Storage), und diese werden nach einem Neustart wiederhergestellt. Das setzt jedoch voraus, dass das Schema der Zustandsdaten bei allen Neustarts gleich bleibt. Das bedeutet, dass alle Änderungen (d. h. Hinzufügungen, Löschungen oder Schemaänderungen) an den zustandsbehafteten Vorgängen einer Streamingabfrage zwischen Neustarts nicht zulässig sind. Es folgt eine Liste der zustandsbehafteten Vorgänge, deren Schema zwischen Neustarts nicht geändert werden sollte, um die Zustandswiederherstellung zu gewährleisten:
    • Streamingaggregation: Zum Beispiel sdf.groupBy("a").agg(...). Änderungen der Anzahl oder des Typs von Gruppierungsschlüsseln oder Aggregaten sind nicht zulässig.
    • Streamingdeduplizierung: Zum Beispiel sdf.dropDuplicates("a"). Änderungen der Anzahl oder des Typs von Gruppierungsschlüsseln oder Aggregaten sind nicht zulässig.
    • Stream-stream-Join: Zum Beispiel sdf1.join(sdf2, ...) (d. h. beide Eingaben werden mit sparkSession.readStream generiert). Änderungen des Schemas oder der Spalten mit Equi-Join sind nicht zulässig. Änderungen am Jointyp (äußerer oder innerer) sind nicht zulässig. Andere Änderungen in der Joinbedingung sind falsch definiert.
    • Beliebiger zustandsbehafteter Vorgang: Zum Beispielsdf.groupByKey(...).mapGroupsWithState(...) oder sdf.groupByKey(...).flatMapGroupsWithState(...). Änderungen am Schema des benutzerdefinierten Zustands und des Timeouttyps sind nicht zulässig. Änderungen innerhalb der Funktion für benutzerdefinierte Zustandszuordnung sind zulässig, aber die semantische Auswirkung der Änderung hängt von der benutzerdefinierten Logik ab. Wenn Sie Änderungen am Zustandsschema wirklich unterstützen möchten, können Sie Ihre komplexen Zustandsdatenstrukturen mithilfe eines Codierungs-/Decodierungsschemas, das die Schemamigration unterstützt, explizit in Bytes codieren/decodieren. Wenn Sie beispielsweise Ihren Status als Avro-codierte Bytes speichern, können Sie das Avro-State-Schema zwischen Abfrageneustarts ändern, da dies den binären Status wiederherstellt.