Lesen von strukturierten Streamingzustandsinformationen

Sie können DataFrame-Vorgänge oder SQL-Tabellenwertfunktionen verwenden, um Strukturierte Streaming-Statusdaten und Metadaten abzufragen. Verwenden Sie diese Funktionen, um Zustandsinformationen für strukturierte Streaming-Zustandsabfragen zu beobachten, die für die Überwachung und das Debuggen nützlich sein können.

Sie benötigen Lesezugriff auf den Prüfpunktpfad für eine Streamingabfrage, um Zustandsdaten oder Metadaten abzufragen. Die in diesem Artikel beschriebenen Funktionen bieten schreibgeschützten Zugriff auf Zustandsdaten und Metadaten. Sie können nur die Batchlesesemantik zum Abfragen von Zustandsinformationen verwenden.

Hinweis

Sie können keine Statusinformationen für Lakeflow Spark Declarative Pipelines, Streamingtabellen oder materialisierte Ansichten abfragen. Sie können keine Statusinformationen mit serverlosem Computing oder Computing abfragen, das im Standardzugriffsmodus konfiguriert ist.

Anforderungen

  • Verwenden Sie eine der folgenden Berechnungskonfigurationen:
    • Databricks Runtime 16.3 und höher auf einem Rechner, der mit dem Standardzugriffsmodus konfiguriert ist.
    • Databricks Runtime 14.3 LTS und höher auf Compute-Ressourcen mit dediziertem oder keinem Isolationszugriffsmodus.
  • Lesezugriff auf den Prüfpunktpfad, der von der Streamingabfrage verwendet wird.

Lesen des Zustandsspeichers für strukturiertes Streaming

Sie können Zustandsspeicherinformationen für strukturierte Streaming-Abfragen lesen, die in unterstützten Databricks Runtime ausgeführt werden. Verwenden Sie die folgende Syntax:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Optionen und Schema der Statusleser-API

Eine vollständige Liste der statestore Formatoptionen finden Sie im Statusspeicher.

Die Ausgabedaten haben das folgende Schema:

Spalte Typ Beschreibung
key Struct (weiterer Typ, der vom Zustandsschlüssel abgeleitet wurde) Der Schlüssel für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt.
value Struct (weiterer Typ, der vom Zustandswert abgeleitet wurde) Der Wert für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt.
partition_id Ganze Zahl Die Partition des Zustandsprüfpunkts, die den Datensatz des zustandsbehafteten Operatoren enthält.

In Databricks Runtime 16.4 LTS und höher weist die Ausgabedaten beim Festlegen der readChangeFeed Option auf truedas folgende Schema auf:

Spalte Typ Beschreibung
batch_id Lang Die Batch-ID, zu der die Statusänderung gehört.
change_type Schnur Der vom Batch angewendete Änderungstyp: update für Einfügungen und Aktualisierungen, delete für Löschungen.
key Struct (weiterer Typ, der vom Zustandsschlüssel abgeleitet wurde) Der Schlüssel für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt.
value Struct (weiterer Typ, der vom Zustandswert abgeleitet wurde) Der Wert für einen zustandsbehafteten Operatordatensatz im Zustandsprüfpunkt. null für Datensätze, bei denen es sich bei change_type um delete handelt.
partition_id Ganze Zahl Die Partition des Zustandsprüfpunkts, die den Datensatz des zustandsbehafteten Operatoren enthält.

Siehe read_statestore Tabellenwertfunktion.

Statusänderungen von Structured Streaming auslesen

Verfügbar auf Databricks Runtime 16.4 LTS und höher. Um zu lesen, wie sich der Zustand zwischen Mikrobatches verändert, anstatt den vollständigen Zustand für ein einzelnes Mikrobatch anzuzeigen, legen Sie readChangeFeed auf true fest und geben Sie changeStartBatchId an. Optional können Sie angeben changeEndBatchId. Eine vollständige Liste der Optionen finden Sie im Statusspeicher.

Um beispielsweise Zustandsänderungen ab Batch 2 bis zum neuesten bestätigten Batch zu lesen:

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

Das Ausgabeschema enthält zusätzliche batch_id Und change_type Spalten. Das vollständige Schema finden Sie unter Statusleser-API-Optionen und -Schema.

Lesen von strukturierten Streamingzustandsmetadaten

Verfügbar auf Databricks Runtime 14.3 LTS oder höher. Sie können Statusmetadaten für Strukturierte Streaming-Abfragen lesen:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Die zurückgegebenen Daten haben das folgende Schema:

Spalte Typ Beschreibung
operatorId Ganze Zahl Die ganzzahlige ID des zustandsbehafteten Streamingoperatoren.
operatorName Schnur Name des zustandsbehafteten Streamingoperatoren.
stateStoreName Schnur Name des Zustandsspeichers des Operators.
numPartitions Ganze Zahl Anzahl der Partitionen des Zustandsspeichers.
minBatchId Lang Die niedrigste Batch-ID, die für den Abfragezustand verfügbar ist.
maxBatchId Lang Die höchste Batch-ID, die für den Abfragezustand verfügbar ist.

Hinweis

Die von minBatchId und maxBatchId bereitgestellten Batch-ID-Werte spiegeln den Zustand zum Zeitpunkt wider, zu dem der Prüfpunkt erstellt wurde. Alte Batches werden bei der Ausführung von Mikrobatches automatisch bereinigt, so dass nicht garantiert werden kann, dass der hier angegebene Wert noch verfügbar ist.

Siehe read_state_metadata Tabellenwertfunktion.

Beispiel: Eine Seite einer Stream-Stream-Verknüpfung abfragen

Verwenden Sie die folgende Syntax, um die linke Seite einer Stream-Stream-Verknüpfung abzufragen:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Beispiel: Abfragestatusspeicher für Datenstrom mit mehreren zustandsbehafteten Operatoren

In diesem Beispiel wird der Statusmetadatenleser verwendet, um Metadatendetails einer Streamingabfrage mit mehreren zustandsbehafteten Operatoren zu sammeln, und verwendet dann die Metadatenergebnisse als Optionen für den Statusleser.

Der Statusmetadatenleser verwendet den Prüfpunktpfad als einzige Option, wie im folgenden Syntaxbeispiel gezeigt:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Die folgende Tabelle stellt eine Beispielausgabe der Zustandsspeichermetadaten dar:

operatorId operatorName Zustandspeichername AnzahlPartitionen minBatchId maxBatchId
0 stateStoreSave Standardeinstellung 200 0 13
1 dedupeWithinWatermark Standardeinstellung 200 0 13

Um Ergebnisse für den dedupeWithinWatermark Operator zu erhalten, fragen Sie den Statusleser mit der operatorId Option ab, wie im folgenden Beispiel gezeigt:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);