Asynchrone Zustandsprüfpunkte für zustandsbehaftete Abfragen

Hinweis

Verfügbar in Databricks Runtime 10.4 LTS und höher.

Asynchrone Zustandsprüfpunkte erhalten eine Exactly-Once-Garantie („Genau einmal“) für Streamingabfragen aufrecht, aber können beim strukturierten Streaming die Gesamtlatenz für einige zustandsbehaftete Workloads mit Engpässen bei Zustandsaktualisierungen verringern. Dies wird dadurch erreicht, dass mit der Verarbeitung des nächsten Mikrobatches begonnen wird, sobald die Berechnung des vorherigen Mikrobatches abgeschlossen ist, ohne dass auf den Abschluss der Zustandsprüfpunkte gewartet wird. In der folgenden Tabelle werden die Zielkonflikte für synchrone und asynchrone Prüfpunkte verglichen:

Merkmal Synchrone Prüfpunkte Asynchrone Prüfpunkte
Latency Höhere Latenz für jeden Mikrobatch. Verringerte Latenz, da sich Mikrobatches überlappen können.
Neu starten Schnelle Wiederherstellung, da nur der letzte Batch erneut ausgeführt werden muss. Größere Verzögerung beim Neustart, da möglicherweise mehrere Mikrobatches neu gestartet werden müssen.

Im Folgenden sind Merkmale von Streamingaufträgen aufgeführt, die von asynchronen Zustandsprüfpunkten profitieren können:

  • Der Auftrag weist einen oder mehrere zustandsbehaftete Vorgänge auf (z. B. Aggregation, flatMapGroupsWithState, mapGroupsWithState, stream-stream-Joins).
  • Die Latenz von Zustandsprüfpunkten trägt wesentlich zur Gesamtlatenz bei der Batchausführung bei. Diese Informationen finden Sie in den StreamingQueryProgress-Ereignissen. Diese Ereignisse sind auch in log4j-Protokollen im Spark-Treiber zu finden. Nachfolgend sehen Sie ein Beispiel für den Fortschritt einer Streamingabfrage und die Ermittlung der Auswirkungen des Zustandsprüfpunkts auf die Gesamtlatenz bei der Batchausführung.
    • {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
      }
      
    • Analyse der Latenz von Zustandsprüfpunkten beim oben angegebenen Abfragefortschrittsereignis

      • Die Batchdauer (durationMs.triggerDuration) beträgt ca. 547 Sekunden.
      • Die Latenz des Zustandsspeichercommits (stateOperations[0].commitTimeMs) beträgt ca. 3.186 Sekunden. Die Commitlatenz wird für alle Aufgaben aggregiert, die einen Zustandsspeicher enthalten. In diesem Fall gibt es 64 solcher Aufgaben (stateOperators[0].numShufflePartitions).
      • Jede Aufgabe, die den Zustandsoperator enthält, hat durchschnittlich 50 Sekunden (3.186/64) für den Prüfpunkte benötigt. Dies ist eine zusätzliche Latenz, die zur Batchdauer beiträgt. Wenn alle 64 Aufgaben gleichzeitig ausgeführt werden, macht der Prüfpunktschritt ungefähr 9 % (50 Sek./547 Sek.) der Batchdauer aus. Der Prozentsatz wird noch höher, wenn die maximale Anzahl gleichzeitiger Aufgaben kleiner als 64 ist.

Aktivieren asynchroner Zustandsprüfpunkte

Sie müssen für asynchrone Zustandsprüfpunkte den RocksDB-basierten Zustandsspeicher verwenden. Legen Sie die folgenden Konfigurationen fest:


spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Einschränkungen und Anforderungen für asynchrone Prüfpunkte

Hinweis

Die automatische Computeskalierung hat Einschränkungen beim Herunterskalieren der Clustergröße für strukturierten Streaming-Workloads. Databricks empfiehlt die Verwendung von Delta Live-Tabellen mit erweiterter automatischer Skalierung für Streaming-Workloads. Weitere Informationen finden Sie unter Optimieren der Clusternutzung von Delta Live Tables-Pipelines mit erweiterter automatischer Skalierung.

  • Jeder Fehler bei einem asynchronen Prüfpunkt in einem oder mehreren Speichern führt zu einem Fehler bei der Abfrage. Im synchronen Prüfpunktmodus wird der Prüfpunkt als Teil der Aufgabe ausgeführt, und Spark wiederholt die Aufgabe mehrmals, bevor ein Fehler für die Abfrage ausgegeben wird. Dieser Mechanismus ist bei asynchronen Zustandsprüfpunkten nicht vorhanden. Bei Verwendung der Databricks-Auftragswiederholungen können bei solchen Fehlern jedoch automatisch Wiederholungen ausgeführt werden.
  • Asynchrone Prüfpunkterstellung funktioniert am besten, wenn die Speicherorte zwischen den Mikrobatch-Ausführungen nicht geändert werden. Eine Größenanpassung des Clusters funktioniert in Kombination mit asynchronen Zustandsprüfpunkten möglicherweise nicht gut, da die Zustandsspeicherinstanz ggf. neu verteilt wird, wenn Knoten im Rahmen einer Änderung der Clustergröße hinzugefügt oder gelöscht werden.
  • Asynchrone Zustandsprüfpunkte werden nur in der Implementierung des RocksDB-Zustandsspeicheranbieters unterstützt. Die Standardimplementierung des In-Memory-Zustandsspeichers unterstützt diese nicht.