Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Sie können DataFrame-Vorgänge oder SQL-Tabellenwertfunktionen verwenden, um Strukturierte Streaming-Statusdaten und Metadaten abzufragen. Verwenden Sie diese Funktionen, um Zustandsinformationen für strukturierte Streaming-Zustandsabfragen zu beobachten, die für die Überwachung und das Debuggen nützlich sein können.
Sie benötigen Lesezugriff auf den Prüfpunktpfad für eine Streamingabfrage, um Zustandsdaten oder Metadaten abzufragen. Die in diesem Artikel beschriebenen Funktionen bieten schreibgeschützten Zugriff auf Zustandsdaten und Metadaten. Sie können nur die Batchlesesemantik zum Abfragen von Zustandsinformationen verwenden.
Hinweis
Sie können keine Statusinformationen für Lakeflow Spark Declarative Pipelines, Streamingtabellen oder materialisierte Ansichten abfragen. Sie können keine Statusinformationen mit serverlosem Computing oder Computing abfragen, das im Standardzugriffsmodus konfiguriert ist.
Anforderungen
- Verwenden Sie eine der folgenden Berechnungskonfigurationen:
- Databricks Runtime 16.3 und höher auf einem Rechner, der mit dem Standardzugriffsmodus konfiguriert ist.
- Databricks Runtime 14.3 LTS und höher auf Compute-Ressourcen mit dediziertem oder keinem Isolationszugriffsmodus.
- Lesezugriff auf den Prüfpunktpfad, der von der Streamingabfrage verwendet wird.
Lesen des Zustandsspeichers für strukturiertes Streaming
Sie können Zustandsspeicherinformationen für strukturierte Streaming-Abfragen lesen, die in unterstützten Databricks Runtime ausgeführt werden. Verwenden Sie die folgende Syntax:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Optionen und Schema der Statusleser-API
Eine vollständige Liste der statestore Formatoptionen finden Sie im Statusspeicher.
Die Ausgabedaten haben das folgende Schema:
| Spalte | Typ | Beschreibung |
|---|---|---|
key |
Struct (weiterer Typ, der vom Zustandsschlüssel abgeleitet wurde) | Der Schlüssel für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt. |
value |
Struct (weiterer Typ, der vom Zustandswert abgeleitet wurde) | Der Wert für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt. |
partition_id |
Ganze Zahl | Die Partition des Zustandsprüfpunkts, die den Datensatz des zustandsbehafteten Operatoren enthält. |
In Databricks Runtime 16.4 LTS und höher weist die Ausgabedaten beim Festlegen der readChangeFeed Option auf truedas folgende Schema auf:
| Spalte | Typ | Beschreibung |
|---|---|---|
batch_id |
Lang | Die Batch-ID, zu der die Statusänderung gehört. |
change_type |
Schnur | Der vom Batch angewendete Änderungstyp: update für Einfügungen und Aktualisierungen, delete für Löschungen. |
key |
Struct (weiterer Typ, der vom Zustandsschlüssel abgeleitet wurde) | Der Schlüssel für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt. |
value |
Struct (weiterer Typ, der vom Zustandswert abgeleitet wurde) | Der Wert für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt.
null für Datensätze, bei denen es sich bei change_type um delete handelt. |
partition_id |
Ganze Zahl | Die Partition des Zustandsprüfpunkts, die den Datensatz des zustandsbehafteten Operatoren enthält. |
Siehe read_statestore Tabellenwertfunktion.
Statusänderungen von Structured Streaming auslesen
Verfügbar auf Databricks Runtime 16.4 LTS und höher. Um zu lesen, wie sich der Zustand zwischen Mikrobatches verändert, anstatt den vollständigen Zustand für ein einzelnes Mikrobatch anzuzeigen, legen Sie readChangeFeed auf true fest und geben Sie changeStartBatchId an. Optional können Sie angeben changeEndBatchId. Eine vollständige Liste der Optionen finden Sie im Statusspeicher.
Um beispielsweise Zustandsänderungen ab Batch 2 bis zum neuesten bestätigten Batch zu lesen:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
Das Ausgabeschema enthält zusätzliche batch_id Und change_type Spalten. Das vollständige Schema finden Sie unter Statusleser-API-Optionen und -Schema.
Lesen von strukturierten Streamingzustandsmetadaten
Verfügbar auf Databricks Runtime 14.3 LTS oder höher. Sie können Statusmetadaten für Strukturierte Streaming-Abfragen lesen:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Die zurückgegebenen Daten haben das folgende Schema:
| Spalte | Typ | Beschreibung |
|---|---|---|
operatorId |
Ganze Zahl | Die ganzzahlige ID des zustandsbehafteten Streamingoperatoren. |
operatorName |
Schnur | Name des zustandsbehafteten Streamingoperatoren. |
stateStoreName |
Schnur | Name des Zustandsspeichers des Operators. |
numPartitions |
Ganze Zahl | Anzahl der Partitionen des Zustandsspeichers. |
minBatchId |
Lang | Die niedrigste Batch-ID, die für den Abfragezustand verfügbar ist. |
maxBatchId |
Lang | Die höchste Batch-ID, die für den Abfragezustand verfügbar ist. |
Hinweis
Die von minBatchId und maxBatchId bereitgestellten Batch-ID-Werte spiegeln den Zustand zum Zeitpunkt wider, zu dem der Prüfpunkt erstellt wurde. Alte Batches werden bei der Ausführung von Mikrobatches automatisch bereinigt, so dass nicht garantiert werden kann, dass der hier angegebene Wert noch verfügbar ist.
Siehe read_state_metadata Tabellenwertfunktion.
Beispiel: Eine Seite einer Stream-Stream-Verknüpfung abfragen
Verwenden Sie die folgende Syntax, um die linke Seite einer Stream-Stream-Verknüpfung abzufragen:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Beispiel: Abfragestatusspeicher für Datenstrom mit mehreren zustandsbehafteten Operatoren
In diesem Beispiel wird der Statusmetadatenleser verwendet, um Metadatendetails einer Streamingabfrage mit mehreren zustandsbehafteten Operatoren zu sammeln, und verwendet dann die Metadatenergebnisse als Optionen für den Statusleser.
Der Statusmetadatenleser verwendet den Prüfpunktpfad als einzige Option, wie im folgenden Syntaxbeispiel gezeigt:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Die folgende Tabelle stellt eine Beispielausgabe der Zustandsspeichermetadaten dar:
| operatorId | operatorName | Zustandspeichername | AnzahlPartitionen | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | Standardeinstellung | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | Standardeinstellung | 200 | 0 | 13 |
Um Ergebnisse für den dedupeWithinWatermark Operator zu erhalten, fragen Sie den Statusleser mit der operatorId Option ab, wie im folgenden Beispiel gezeigt:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);