Konfigurieren strukturierter Streaming-Triggerintervalle

Apache Spark Structured Streaming verarbeitet Daten inkrementell. Triggerintervalle steuern, wie häufig strukturiertes Streaming nach neuen Daten sucht. Sie können Triggerintervalle für die Nahezu-Echtzeit-Verarbeitung, für geplante Datenbankaktualisierungen oder die Batchverarbeitung aller neuen Daten für einen Tag oder eine Woche konfigurieren.

Da Was ist Auto Loader? strukturiertes Streaming zum Laden von Daten verwendet, bietet Ihnen das Verständnis, wie Trigger funktionieren, die größtmögliche Flexibilität, um die Kosten zu kontrollieren, während Daten mit der gewünschten Häufigkeit aufgenommen werden.

Wichtig

Azure Databricks empfiehlt, einen Triggermodus festzulegen, der Latenz und Kosten für Ihren Anwendungsfall ausgleicht. Andernfalls werden möglicherweise unerwartete Speicherkosten von Ihrem Cloudanbieter angezeigt. Details finden Sie unter "Steuern der Cloudspeicherkosten ".

Übersicht über Triggermodi

In der folgenden Tabelle sind die in strukturiertem Streaming verfügbaren Triggermodi zusammengefasst:

Trigger-Modus Syntaxbeispiel (Python) Am besten geeignet für
Nicht angegeben (Standard) N/A Allgemeines Streaming mit 3-5 Sekunden Latenz. Entspricht dem ProcessingTime-Trigger mit 0 ms-Intervallen. Die Datenstromverarbeitung wird kontinuierlich ausgeführt, solange neue Daten eintreffen.
Bearbeitungsdauer .trigger(processingTime='10 seconds') Ausgleich von Kosten und Leistung. Reduziert den Aufwand, indem verhindert wird, dass das System zu häufig nach Daten sucht.
Jetzt verfügbar .trigger(availableNow=True) Geplante inkrementelle Batchverarbeitung. Verarbeitet so viele Daten wie verfügbar zum Zeitpunkt, zu dem der Streamingauftrag ausgelöst wird.
Echtzeitmodus .trigger(realTime='5 minutes') Betriebsbelastungen mit ultraniedriger Latenz, die eine Verarbeitung unterhalb einer Sekunde erfordern, wie beispielsweise Betrugserkennung oder Echtzeitpersonalisierung. Öffentliche Vorschau. "5 Minuten" gibt die Länge eines Mikrobatches an. Verwenden Sie 5 Minuten, um den Aufwand pro Batch zu minimieren, z. B. die Abfragekompilierung.
Fortlaufend .trigger(continuous='1 second') Nicht unterstützt. Dies ist ein experimentelles Feature, das in Spark OSS enthalten ist. Verwenden Sie stattdessen den Echtzeitmodus.

:::note Serverless-Berechnung

Bei serverlosem Computing werden nur Trigger.AvailableNow() und Trigger.Once() unterstützt. Databricks empfiehlt Trigger.AvailableNow().

Verwenden Sie für kontinuierliches Streaming auf serverlosen Berechnungen den modus "Triggered vs. continuous pipeline mode " im fortlaufenden Modus.

Siehe Streaming-Einschränkungen.

:::

processingTime: Zeitbasierte Triggerintervalle

Strukturiertes Streaming bezieht sich auf zeitbasierte Triggerintervalle als "Feste Intervall-Mikrobatches". Geben Sie mithilfe des processingTime-Schlüsselworts eine Zeitdauer als Zeichenfolge an, z. B. .trigger(processingTime='10 seconds').

Die Konfiguration dieses Intervalls bestimmt, wie oft das System Überprüfungen durchführt, um festzustellen, ob neue Daten eingegangen sind. Konfigurieren Sie Ihre Verarbeitungszeit, um die Latenzanforderungen und die Geschwindigkeit, mit der Daten in der Datenquelle eintreffen, auszugleichen.

AvailableNow: Inkrementelle Batchverarbeitung

Wichtig

In Databricks Runtime 11.3 LTS und höher Trigger.Once ist veraltet. Verwenden Sie Trigger.AvailableNow für alle inkrementellen Batchverarbeitungsworkloads.

Die AvailableNow-Triggeroption verwendet alle verfügbaren Datensätze als inkrementellen Batch, mit der Möglichkeit, die Batchgröße mit Optionen wie maxBytesPerTrigger zu konfigurieren. Die Größenanpassungsoptionen variieren je nach Datenquelle.

Unterstützte Datenquellen

Azure Databricks unterstützt die Verwendung von Trigger.AvailableNow für die inkrementelle Batchverarbeitung aus vielen strukturierten Streamingquellen. Die folgende Tabelle enthält die mindestens unterstützte Databricks Runtime-Version, die für jede Datenquelle erforderlich ist:

`Source` Mindestversion von Databricks Runtime
Dateiquellen (JSON, Parquet usw.) 9.1 LTS
Delta Lake 10.4 LTS
Autoloader 10.4 LTS
Apache Kafka 10.4 LTS
Kinesis 13.1

realTime: Ultra-niedrige Latenz betriebliche Arbeitslasten

Der Echtzeitmodus für strukturiertes Streaming erreicht im Endbereich eine End-to-End-Latenz von unter 1 Sekunde und in gängigen Fällen etwa 300 ms. Weitere Informationen zum effektiven Konfigurieren und Verwenden des Echtzeitmodus finden Sie im Echtzeitmodus im strukturierten Streaming.

Apache Spark verfügt über ein zusätzliches Triggerintervall, das als fortlaufende Verarbeitung bezeichnet wird. Dieser Modus ist seit Spark 2.3 als experimentell klassifiziert. Azure Databricks unterstützt diesen Modus nicht oder empfiehlt diesen Modus nicht. Verwenden Sie stattdessen den Echtzeitmodus für Anwendungsfälle mit geringer Latenz.

Hinweis

Der fortlaufende Verarbeitungsmodus auf dieser Seite ist nicht mit der kontinuierlichen Verarbeitung in Lakeflow Spark Declarative Pipelines verknüpft.

Steuern der Cloudspeicherkosten

Wenn Sie keinen Triggermodus festlegen, legt Strukturiertes Streaming standardmäßig den Triggermodus auf processingTime und das Intervall fest 0, auf das alle paar Millisekunden nach neuen Daten gesucht wird. Dies kann ein hohes Volumen von Cloudspeicher-API-Aufrufen pro Tag generieren und zu unerwarteten Gebühren von Ihrem Cloudanbieter führen.

Azure Databricks empfiehlt, einen Triggermodus zu konfigurieren, der für Ihre Latenz- und Kostenanforderungen geeignet ist. Informationen zum Konfigurieren eines zeitbasierten Triggerintervalls finden Sie processingTime unter.

Ändern von Triggerintervallen zwischen Läufen

Sie können das Auslöseintervall zwischen den Ausführungen ändern, während Sie denselben Prüfpunkt verwenden.

Verhalten beim Ändern von Intervallen

Wenn eine Strukturierte Streaming-Abfrage beendet wird, während derzeit ein Mikrobatch verarbeitet wird, muss dieser Mikrobatch abgeschlossen werden, bevor das neue Triggerintervall angewendet wird. Nachdem Sie das Triggerintervall geändert haben, können Sie feststellen, dass ein Mikrobatch mit der zuvor angegebenen Konfiguration verarbeitet wird. Im Folgenden wird das erwartete Verhalten nach einem Übergang beschrieben:

  • Von zeitbasiertem Intervall bis: AvailableNow Ein Mikrobatch kann wie ein inkrementeller Batch verarbeitet werden, bevor alle verfügbaren Datensätze bearbeitet sind.
  • Von AvailableNow zeitbasiertem Intervall: Die Verarbeitung kann für alle Datensätze fortgesetzt werden, die verfügbar waren, als der letzte AvailableNow Auftrag ausgelöst wurde.

Behebung von Abfragefehlern

Wenn Sie versuchen, aus einem Abfragefehler mit einem inkrementellen Batch wiederherzustellen, löst eine Änderung des Triggerintervalls das Problem nicht. Der vorherige erfolglose Batch muss abgeschlossen werden, da für strukturiertes Streaming idempotente Mikrobatches erforderlich sind. Siehe Fehlertoleranzsemantik für Apache Spark.

Um den Fehler zu beheben, skalieren Sie die Rechenkapazität, indem Sie zum Beispiel die Anzahl der Arbeitsknoten erhöhen. In seltenen Fällen müssen Sie den Stream möglicherweise mit einem neuen Prüfpunkt neu starten.