Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Prüfpunkte und Write-Ahead-Protokolle arbeiten zusammen, um Verarbeitungsgarantien für strukturierte Streaming-Workloads zu bieten. Der Prüfpunkt verfolgt die Informationen, die die Abfrage identifizieren, darunter Statusinformationen und verarbeitete Datensätze. Wenn Sie die Dateien in einem Prüfpunktverzeichnis löschen oder zu einem neuen Prüfpunkt-Speicherort wechseln, beginnt die nächste Ausführung der Abfrage von vorne.
Ein Prüfpunktverzeichnis enthält Folgendes:
- Offsets: Die Quell-Offsets, die in den einzelnen Mikrobatches verarbeitet werden. Auf diese Weise kann die Abfrage von genau der Stelle fortgesetzt werden, an der sie unterbrochen wurde, ohne Daten erneut zu verarbeiten.
- Commits: Ein Protokoll, welches Mikrobatches protokolliert, die an die Senke committet wurden, wodurch genau-einmal-Semantik ermöglicht wird.
-
Status: Für zustandsbehaftete Abfragen (Aggregationen, Streamstreamverknüpfungen, Deduplizierung und benutzerdefinierte Zustandsoperatoren wie
transformWithState), speichert der Prüfpunkt Metadaten über den Zustandsoperator, das Zustandsschema und den vom Statusspeicheranbieter verwalteten Inhalt des Prüfzustands. - Metadaten: Die eindeutige Abfrage-ID, die zum Identifizieren der Abfrage verwendet wird. Konfigurationseinstellungen werden als Teil des Offsetprotokolls gespeichert.
Jede Abfrage muss einen anderen Prüfpunkt-Speicherort haben. Mehrere Abfragen sollten sich niemals denselben Speicherort teilen.
Hinweis
In diesem Artikel werden strukturierte Streaming-Prüfpunkte für Streamingabfragen behandelt. Informationen zur Verwendung von DataFrame.checkpoint() mit Unity-Katalog-Volumes zum Abschneiden von Ausführungsplänen für nicht-streamingfähige DataFrames finden Sie unter DataFrame-Prüfpunkte in Volumes.
Aktivieren von Checkpointing für Structured Streaming-Abfragen
Sie müssen die Option checkpointLocation angeben, bevor Sie eine Streamingabfrage ausführen, wie im folgenden Beispiel veranschaulicht:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Hinweis
Einige Senken, z. B. die Ausgabe für display() in Notebooks und die Senke memory, generieren automatisch einen temporären Prüfpunkt-Speicherort, wenn Sie diese Option weglassen. Die temporären Prüfpunkt-Speicherorte bieten keine Garantien für Fehlertoleranz oder Datenkonsistenz und werden unter Umständen nicht ordnungsgemäß bereinigt. Databricks empfiehlt, für diese Senken stets einen Prüfpunkt-Speicherort anzugeben.
Wiederherstellen nach Änderungen in einer Abfrage für strukturiertes Streaming
Es gibt Einschränkungen, welche Änderungen in einer Stream-Abfrage zwischen Neustart vom gleichen Checkpoint-Standort aus zulässig sind.
Änderungen, die in der Regel einen neuen Prüfpunkt erfordern, umfassen die Anzahl oder den Typ der Eingabequellen, abonnierte Kafka-Themen oder AutoLadepfade, zustandsbehaftete Vorgangstypen, Zustandsschema und Ausgabesenkentyp.
Änderungen, die im Allgemeinen sicher sind, umfassen das Hinzufügen oder Entfernen von Filtern, das Ändern von Grenzwerten, Triggerintervallen und das Aktualisieren der benutzerdefinierten Funktionslogik innerhalb mapGroupsWithState, obwohl sich die Semantik ändern kann.
Im folgenden Abschnitt werden Änderungen beschrieben, die entweder nicht zulässig sind oder die Auswirkung der Änderung nicht gut definiert ist, wobei:
- Der Begriff zulässig bedeutet, dass Sie die angegebene Änderung durchführen können, aber ob die Semantik ihrer Auswirkung klar definiert ist, hängt von der Abfrage und der Änderung ab.
- Der Begriff nicht zulässig bedeutet, dass Sie die angegebene Änderung nicht durchführen sollten, da bei der neu gestarteten Abfrage wahrscheinlich unvorhersehbare Fehler auftreten.
-
sdfsteht für einen Streamingdatenrahmen oder ein Streamingdataset, der bzw. das mitsparkSession.readStreamgeneriert wurde.
Arten von Änderungen in strukturierten Streaming-Abfragen
Änderungen an der Anzahl oder art der Eingabequellen: Dies ist standardmäßig nicht zulässig, da Structured Streaming Quellen anhand ihrer Position im Abfrageplan identifiziert. Wenn Sie die Quellbenennung aktivieren, können Sie vorhandene Quellen neu anordnen und neue Quellen hinzufügen, ohne mit einem neuen Prüfpunkt zu beginnen. Siehe Streamingquellen mit Quellenevolution ändern.
Änderungen an den Parametern von Eingabequellen: Ob dies zulässig ist und ob die Semantik der Änderung klar definiert ist, hängt von der Quelle und der Abfrage ab, einschließlich Kontrollmechanismen wie
maxFilesPerTriggerodermaxOffsetsPerTrigger. Hier sind einige Beispiele:Das Hinzufügen, Löschen und Ändern von Ratenbegrenzungen ist zulässig:
spark.readStream.format("kafka").option("subscribe", "article")zu
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)Ausführliche Informationen finden Sie unter Configure Structured Streaming batch size on Azure Databricks
Änderungen an abonnierten Artikeln und Dateien sind im Allgemeinen nicht zulässig, da die Ergebnisse unvorhersehbar sind:
spark.readStream.format("kafka").option("subscribe", "article")inspark.readStream.format("kafka").option("subscribe", "newarticle")
Änderungen im Auslöserintervall: Sie können Trigger zwischen inkrementellen Batches und Zeitintervallen ändern. Siehe Änderungsauslöserintervalle zwischen den Läufen.
Änderungen am Typ der Ausgabesenke: Änderungen zwischen einigen bestimmten Senkenkombinationen sind zulässig. Dies muss von Fall zu Fall geprüft werden. Nachstehend finden Sie einige Beispiele:
- Datei-Sink zu Kafka-Sink ist zulässig. Für Kafka sind nur die neuen Daten sichtbar.
- Von Kafka-Senke zu Dateisenke ist nicht erlaubt.
- Die Änderung einer Kafka-Senke in eine foreach-Senke oder umgekehrt ist zulässig.
Änderungen an den Parametern der Ausgabesenke: Ob dies zulässig ist und ob die Semantik der Änderung klar definiert ist, hängt von der Senke und der Abfrage ab. Nachstehend finden Sie einige Beispiele:
- Änderungen am Ausgabeverzeichnis einer Dateisenke sind nicht zulässig:
sdf.writeStream.format("parquet").option("path", "/somePath")insdf.writeStream.format("parquet").option("path", "/anotherPath") - Änderungen am Ausgabethema sind zulässig:
sdf.writeStream.format("kafka").option("topic", "topic1")zusdf.writeStream.format("kafka").option("topic", "topic2") - Änderungen an der benutzerdefinierten foreach-Senke (d. h. dem
ForeachWriter-Code) sind zulässig, aber die Semantik der Änderung hängt vom Code ab.
- Änderungen am Ausgabeverzeichnis einer Dateisenke sind nicht zulässig:
Änderungen an Projektions-/Filter-/Abbildungsoperationen: In einigen Fällen zulässig. Beispiel:
- Das Hinzufügen/Löschen von Filtern ist zulässig:
sdf.selectExpr("a")insdf.where(...).selectExpr("a").filter(...). - Änderungen in Projektionen mit gleichem Ausgabeschema sind zulässig:
sdf.selectExpr("stringColumn AS json").writeStreaminsdf.select(to_json(...).as("json")).writeStream. - Änderungen in Projektionen mit unterschiedlichem Ausgabeschema sind bedingt zulässig:
sdf.selectExpr("a").writeStreaminsdf.selectExpr("b").writeStreamist nur zulässig, wenn die Ausgabesenke das Ändern des Schemas von"a"in"b"zulässt.
- Das Hinzufügen/Löschen von Filtern ist zulässig:
Änderungen an zustandsbehafteten Vorgängen: Bei einigen Vorgängen in Streamingabfragen müssen Zustandsdaten beibehalten werden, um das Ergebnis kontinuierlich zu aktualisieren. Beim strukturierten Streaming werden automatisch Prüfpunkte für die Zustandsdaten in einem fehlertoleranten Speicher erstellt (z. B. DBFS, Azure Blob Storage), und diese werden nach einem Neustart wiederhergestellt. Das setzt jedoch voraus, dass das Schema der Zustandsdaten bei allen Neustarts gleich bleibt. Das bedeutet, dass alle Änderungen (d. h. Hinzufügungen, Löschungen oder Schemaänderungen) an den zustandsbehafteten Vorgängen einer Streamingabfrage zwischen Neustarts nicht zulässig sind. Dies ist die Liste der zustandsbehafteten Vorgänge, deren Schema nicht zwischen Neustarts geändert werden sollte, um die Zustandswiederherstellung sicherzustellen:
-
Streaming-Aggregation: Zum Beispiel
sdf.groupBy("a").agg(...). Änderungen der Anzahl oder des Typs von Gruppierungsschlüsseln oder Aggregaten sind nicht zulässig. -
Streaming-Deduplizierung: Zum Beispiel
sdf.dropDuplicates("a"). Änderungen der Anzahl oder des Typs von Gruppierungsschlüsseln oder Aggregaten sind nicht zulässig. -
Stream-stream-Join: Zum Beispiel
sdf1.join(sdf2, ...)(d. h. beide Eingaben werden mitsparkSession.readStreamgeneriert). Änderungen am Schema oder an den für Equi-Joins verwendeten Spalten sind nicht zulässig. Änderungen am Jointyp (äußerer oder innerer) sind nicht zulässig. Andere Änderungen in der Joinbedingung sind ungenau definiert. -
Beliebiger zustandsbehafteter Vorgang: Zum Beispiel
sdf.groupByKey(...).mapGroupsWithState(...)odersdf.groupByKey(...).flatMapGroupsWithState(...). Änderungen am Schema des benutzerdefinierten Zustands und des Timeouttyps sind nicht zulässig. Änderungen innerhalb der Funktion für benutzerdefinierte Zustandszuordnung sind zulässig, aber die semantische Auswirkung der Änderung hängt von der benutzerdefinierten Logik ab. Wenn Sie Änderungen am Zustandsschema wirklich unterstützen möchten, können Sie Ihre komplexen Zustandsdatenstrukturen mithilfe eines Codierungs-/Decodierungsschemas, das die Schemamigration unterstützt, explizit in Bytes codieren/decodieren. Wenn Sie beispielsweise Ihren Status als Avro-codierte Bytes speichern, können Sie das Avro-State-Schema zwischen Abfrageneustarts ändern, da dies den binären Status wiederherstellt.
-
Streaming-Aggregation: Zum Beispiel
Von Bedeutung
Die zustandsbehafteten Operatoren dropDuplicates() und dropDuplicatesWithinWatermark() können aufgrund einer Zustandsschemakompatibilitätsprüfung beim Ändern zwischen Computezugriffsmodi nicht neu gestartet werden.
Der Wechsel zwischen dedizierten und keinen Isolationszugriffsmodi ist zulässig. Der Wechsel zwischen standard- und serverlosen Zugriffsmodi ist zulässig. Versuchen Sie nicht, zwischen anderen Zugriffsmoduskombinationen zu wechseln.
Um diesen Fehler zu vermeiden, ändern Sie nicht den Computezugriffsmodus für Streamingabfragen, die diese Operatoren enthalten.
Ändern von Streamingquellen mit Quellentwicklung
Standardmäßig identifiziert Structured Streaming Quellen anhand ihrer Position im Query-Plan, z. B. 0, 1, 2 usw. Jede Änderung an der Anzahl oder Reihenfolge der Eingabequellen hebt die Checkpoint-Kompatibilität auf und erfordert einen neuen Checkpoint. Mit der Quellentwicklung können Sie jeder Streamingquelle stabile, benutzerdefinierte Namen zuweisen, sodass Sie Quellen aus einer Abfrage neu anordnen, hinzufügen oder entfernen können, ohne den Prüfpunktstatus zu verlieren.
Die Quellentwicklung erfordert Databricks Runtime 18.2 und höher.
Erforderliche Konfiguration
Um die Quellentwicklung zu aktivieren, legen Sie zwei Spark-Konfigurationen fest:
-
spark.sql.streaming.queryEvolution.enableSourceEvolution: Wenntrue, müssen alle Streamingquellen in der Abfrage explizit mit der.name()API benannt werden. Der Standardwert istfalse. -
spark.sql.streaming.offsetLog.formatVersion: Muss auf2festgelegt werden, um das namensbasierte Offsetnachverfolgungsformat zu verwenden. Der Standardwert ist1.
Legen Sie beide Konfigurationen fest, bevor Sie die Streamingabfrage definieren:
spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")
Benennungsregeln
- Namen dürfen nur alphanumerische Zeichen und Unterstriche (
[a-zA-Z0-9_]+) enthalten. - Jeder Quellname muss innerhalb einer Abfrage eindeutig sein.
- Wenn die Quellentwicklung aktiviert ist, muss jede Streamingquelle einen Namen haben. Nicht benannte Quellen verursachen einen
UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENTFehler.
Neuanordnen, Hinzufügen und Entfernen von Quellen
Die folgenden Änderungen sind für abfrageübergreifende Neustarts mit demselben Prüfpunkt sicher:
- Neuanordnen von Quellen: Starten Sie die Abfrage mit einer anderen Reihenfolge von Quellen neu. Jede Quelle setzt anhand ihres Namens am zuletzt bestätigten Offset fort und ändert den Checkpoint-Status nicht.
- Neue Quellen hinzufügen: Starten Sie die Abfrage mit einer neuen Quelle neu. Neue Quellen werden von Beginn an verarbeitet, und vorhandene Quellen werden ab ihrem letzten Offset weiterverarbeitet.
- Quellen entfernen: Starten Sie die Abfrage ohne Quelle neu. Die Quelle wird dauerhaft aus dem Prüfpunkt entfernt. Eine entfernte Quelle kann nicht erneut mit demselben Namen hinzugefügt werden.
Example
Verwenden Sie .name() auf DataStreamReader, bevor Sie .load() oder .table() aufrufen:
Python
orders_us = (spark.readStream
.name("orders_us")
.table("catalog.schema.orders_us")
)
orders_eu = (spark.readStream
.name("orders_eu")
.table("catalog.schema.orders_eu")
)
all_orders = orders_us.union(orders_eu)
Scala
val ordersUS = spark.readStream
.name("orders_us")
.table("catalog.schema.orders_us")
val ordersEU = spark.readStream
.name("orders_eu")
.table("catalog.schema.orders_eu")
val allOrders = ordersUS.union(ordersEU)
Einschränkungen
- Die Quellbenennung erfordert einen neuen Prüfpunkt. Sie können die Quellentwicklung nicht mit einem vorhandenen Prüfpunkt aktivieren, der das V1-Offsetprotokollformat verwendet.
- Nach einem Upgrade auf das Offset-Log-Format V2 können Sie nicht wieder auf V1 zurückstufen. Siehe erforderliche Konfiguration.
- Quellnamen sind dauerhaft. Um eine Quelle umzubenennen, entfernen Sie sie, und fügen Sie sie dann mit einem neuen Namen hinzu. Die umbenannten Quellprozesse von Anfang an.