Freigeben über


Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung

Von Bedeutung

Dieses Feature befindet sich in der öffentlichen Vorschau in Databricks Runtime 16.2 und höher.

Sie können Streaminganwendungen mit benutzerdefinierten Zustandsoperatoren erstellen, um Lösungen mit geringer Latenz und nahezu Echtzeitlösungen zu implementieren, die beliebige Zustandslogik verwenden. Benutzerdefinierte zustandsbehaftete Operatoren ermöglichen neue operationelle Anwendungsfälle und Muster, die mit herkömmlicher strukturierter Streaming-Verarbeitung nicht realisierbar sind.

Hinweis

Databricks empfiehlt die Verwendung integrierter Structured Streaming-Funktionen für unterstützte zustandsbehaftete Vorgänge wie Aggregationen, Deduplizierung und Streaming-Verknüpfungen. Siehe Was ist zustandsbehaftetes Streaming?.

Databricks empfiehlt die Verwendung von transformWithState anstelle von Legacyoperatoren für beliebige Zustandstransformationen. Dokumentation zu Legacy- flatMapGroupsWithState und mapGroupsWithState Operatoren finden Sie unter Legacy-Operatoren mit willkürlichen Zustandsoperatoren.

Anforderungen

Der transformWithState-Operator und die zugehörigen APIs und Klassen haben die folgenden Anforderungen:

  • Verfügbar in Databricks Runtime 16.2 und höher.
  • Compute muss den dedizierten oder nicht isolierten Zugriffsmodus verwenden.
  • Sie müssen den RocksDB-Statusspeicheranbieter verwenden. Databricks empfiehlt die Aktivierung von RocksDB als Teil der Computekonfiguration.
  • transformWithStateInPandas unterstützt den Standardzugriffsmodus in Databricks Runtime 16.3 und höher.

Hinweis

Führen Sie die folgenden Schritte aus, um den RocksDB-Statusspeicheranbieter für die aktuelle Sitzung zu aktivieren:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Was ist transformWithState?

Der transformWithState Operator wendet einen benutzerdefinierten zustandsbehafteten Prozessor auf eine Strukturierte Streaming-Abfrage an. Sie müssen einen benutzerdefinierten zustandsbehafteten Prozessor implementieren, um transformWithState verwenden zu können. Strukturiertes Streaming umfasst APIs zum Erstellen Ihres zustandsbehafteten Prozessors mit Python, Scala oder Java.

Sie verwenden transformWithState, um benutzerdefinierte Logik auf einen Gruppierungsschlüssel für Datensätze anzuwenden, die mit strukturiertem Streaming inkrementell verarbeitet werden. Im Folgenden wird das allgemeine Design beschrieben:

  • Definieren Sie eine oder mehrere Statusvariablen.
  • Statusinformationen werden für jeden Gruppierungsschlüssel beibehalten und können für jede Zustandsvariable gemäß benutzerdefinierter Logik aufgerufen werden.
  • Für jeden verarbeiteten Mikrobatch sind alle Datensätze für den Schlüssel als Iterator verfügbar.
  • Verwenden Sie integrierte Handles, um zu steuern, wann und wie Datensätze basierend auf Zeitgebern und benutzerdefinierten Bedingungen ausgegeben werden.
  • Zustandswerte unterstützen individuelle Time-to-Live-(TTL-)Definitionen und ermöglichen Flexibilität bei der Verwaltung von Zustandsablauf und -größe.

transformWithState unterstützt die Schemaentwicklung im Zustandsspeicher, sodass Sie Ihre Produktionsanwendungen durchlaufen und aktualisieren können, ohne dass historische Zustandsinformationen verloren gehen oder Datensätze neu verarbeitet werden müssen, was Flexibilität für die Entwicklung und einfache Wartung bietet. Siehe Schemaentwicklung im Zustandsspeicher.

Von Bedeutung

PySpark verwendet den Operator transformWithStateInPandas anstelle von transformWithState. In der Dokumentation zu Azure Databricks wird transformWithState verwendet, um die Funktionalität für Python- und Scala-Implementierungen zu beschreiben.

Die Implementierungen von transformWithState in Scala und Python sowie die zugehörigen APIs unterscheiden sich aufgrund von sprachspezifischen Unterschieden, bieten jedoch dieselbe Funktionalität. Weitere Informationen finden Sie in sprachspezifischen Beispielen und API-Dokumentationen für Ihre bevorzugte Programmiersprache.

Integrierte Verarbeitungshandles

Sie implementieren die Kernlogik für Ihre benutzerdefinierte Zustandsanwendung, indem Sie Handler mithilfe integrierter Handles implementieren.

  • Handles stellen die Methoden für die Interaktion mit Zustandswerten und Zeitgebern bereit, verarbeiten eingehende Datensätze und emittieren Datensätze.
  • Die Handler definieren Ihre benutzerdefinierte ereignisgesteuerte Logik.

Handler für jeden Statustyp werden basierend auf der zugrunde liegenden Datenstruktur implementiert, enthalten jedoch Funktionen zum Abrufen, Einfügen, Aktualisieren und Löschen von Datensätzen.

Handler werden basierend auf ereignissen implementiert, die in Eingabedatensätzen oder Timern beobachtet werden, wobei die folgende Semantik verwendet wird:

  • Definieren Sie einen Handler mithilfe der handleInputRows Methode, um zu steuern, wie Daten verarbeitet werden, der Zustand aktualisiert wird, und Datensätze werden für jeden Mikrobatch von Datensätzen ausgegeben, die für den Gruppierungsschlüssel verarbeitet werden. Siehe Behandeln von Eingabezeilen.
  • Definieren Sie einen Handler mithilfe der handleExpiredTimer Methode, um zeitbasierte Schwellenwerte zum Ausführen von Logik zu verwenden, unabhängig davon, ob zusätzliche Datensätze für den Gruppierungsschlüssel verarbeitet werden. Siehe Programmzeitgesteuerte Ereignisse.

Die folgende Tabelle enthält einen Vergleich der funktionalen Verhaltensweisen, die von diesen Handlern unterstützt werden:

Verhalten handleInputRows handleExpiredTimer
Abrufen, Platzieren, Aktualisieren oder Löschen von Zustandswerten Ja Ja
Erstellen oder Löschen eines Timers Ja Ja
Emittieren von Datensätzen Ja Ja
Durchlaufen von Datensätzen im aktuellen Mikrobatch Ja Nein
Triggerlogik basierend auf verstrichener Zeit Nein Ja

Sie können handleInputRows und handleExpiredTimer kombinieren, um bei Bedarf komplexe Logik zu implementieren.

Sie können beispielsweise eine Anwendung implementieren, die handleInputRows verwendet, um die Zustandswerte für jeden Mikro-Batch zu aktualisieren und einen Timer für die nächsten 10 Sekunden zu setzen. Wenn keine weiteren Datensätze verarbeitet werden, können Sie mit handleExpiredTimer die aktuellen Werte im Statusspeicher ausgeben. Wenn neue Datensätze für den Gruppierungsschlüssel verarbeitet werden, können Sie den vorhandenen Timer löschen und einen neuen Timer festlegen.

Benutzerdefinierte Zustandstypen

Sie können mehrere Zustandsobjekte in einem einzelnen Zustandsoperator implementieren. Die Namen, die Sie jedem Zustandsobjekt zuordnen, bleiben im Zustandsspeicher erhalten, auf den Sie mit dem Statusspeicherleser zugreifen können. Wenn Ihr Zustandsobjekt ein StructType verwendet, geben Sie Namen für jedes Feld in der Struktur an, während Sie das Schema übergeben. Diese Namen sind auch beim Lesen des Zustandsspeichers sichtbar. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.

Die von integrierten Klassen und Operatoren bereitgestellte Funktionalität soll Flexibilität und Erweiterbarkeit bieten, und Die Implementierungsoptionen sollten durch die vollständige Logik informiert werden, die Ihre Anwendung ausführen muss. Sie können z. B. nahezu identische Logik implementieren, indem Sie eine ValueState anwenden, die nach den Feldern user_id und session_id oder nach MapState gruppiert ist, wobei user_id der Schlüssel für session_id ist. In diesem Fall ist eine MapState möglicherweise die bevorzugte Implementierung, wenn die Logik Bedingungen für mehrere session_idWerte auswerten muss.

In den folgenden Abschnitten werden die Zustandstypen beschrieben, die von transformWithState unterstützt werden.

ValueState

Für jeden Gruppierungsschlüssel gibt es einen zugeordneten Wert.

Ein Wertstatus kann komplexe Typen enthalten, z. B. eine Struktur oder ein Tupel. Wenn Sie eine ValueStateDatei aktualisieren, implementieren Sie logik, um den gesamten Wert zu ersetzen. Die TTL für einen Wertstatus wird zurückgesetzt, wenn der Wert aktualisiert wird, aber sie wird nicht zurückgesetzt, wenn ein Quellschlüssel, der einem ValueState entspricht, verarbeitet wird, ohne die gespeicherten ValueState zu aktualisieren.

ListState

Für jeden Gruppierungsschlüssel gibt es eine zugeordnete Liste.

Ein Listenstatus ist eine Sammlung von Werten, von denen jeder komplexe Typen enthalten kann. Jeder Wert in einer Liste verfügt über eine eigene TTL. Sie können einer Liste Elemente hinzufügen, indem Sie einzelne Elemente anfügen, eine Liste mit Elementen anfügen oder die gesamte Liste mit einer putüberschreiben. Nur der Put-Vorgang wird als Aktualisierung betrachtet, um die TTL zurückzusetzen.

MapState

Für jeden Gruppierungsschlüssel gibt es eine verknüpfte Abbildung. Karten sind das funktionale Apache Spark-Äquivalent zu einem Python-Diktat.

Von Bedeutung

Gruppierungsschlüssel beschreiben die in der GROUP BY Klausel der Strukturierten Streaming-Abfrage angegebenen Felder. Map-Zustände enthalten eine beliebige Anzahl von Schlüssel-Wert-Paaren für einen Gruppierungsschlüssel.

Wenn Sie z. B. nach user_id gruppieren und eine Karte für session_id definieren möchten, ist user_id Ihr Gruppierungsschlüssel und der Schlüssel in Ihrer Zuordnung session_id.

Ein Kartenstatus ist eine Sammlung unterschiedlicher Schlüssel, die jeweils einem Wert zugeordnet sind, der komplexe Typen enthalten kann. Jedes Schlüssel-Wert-Paar in einer Karte verfügt über eine eigene TTL. Sie können den Wert eines bestimmten Schlüssels aktualisieren oder einen Schlüssel und dessen Wert entfernen. Sie können einen einzelnen Wert mithilfe des Schlüssels zurückgeben, alle Schlüssel auflisten, alle Werte auflisten oder einen Iterator zurückgeben, um mit dem vollständigen Satz von Schlüsselwertpaaren in der Karte zu arbeiten.

Initialisieren einer benutzerdefinierten Zustandsvariable

Beim Initialisieren des StatefulProcessorObjekts erstellen Sie eine lokale Variable für jedes Zustandsobjekt, mit dem Sie mit Zustandsobjekten in Ihrer benutzerdefinierten Logik interagieren können. Zustandsvariablen werden definiert und initialisiert, indem die integrierte init Methode in der StatefulProcessor Klasse überschrieben wird.

Sie definieren eine beliebige Anzahl von Zustandsobjekten durch die Methoden getValueState, getListState und getMapState während der Initialisierung Ihres StatefulProcessor-Objekts.

Jedes Zustandsobjekt muss folgendes aufweisen:

  • Ein eindeutiger Name
  • Ein angegebenes Schema
    • In Python wird das Schema explizit angegeben.
    • Übergeben Sie in Scala ein Encoder Zustandsschema, um das Statusschema anzugeben.

Sie können auch eine optionale, zu spezifizierende TTL-Dauer (Time-to-Live) in Millisekunden angeben. Wenn Sie einen Kartenzustand implementieren, müssen Sie eine separate Schemadefinition für die Kartenschlüssel und die Werte bereitstellen.

Hinweis

Die Logik für das Abfragen, Aktualisieren und Ausgeben von Zustandsinformationen wird separat behandelt. Weitere Informationen finden Sie unter Verwenden der Statusvariablen.

Beispiel einer zustandsbehafteten Anwendung

Im Folgenden wird die grundlegende Syntax zum Definieren und Verwenden eines benutzerdefinierten Zustandsprozessors mit transformWithState veranschaulicht, einschließlich Beispielzustandsvariablen für jeden unterstützten Typ. Weitere Beispiele finden Sie unter Beispiel für zustandsbehaftete Anwendungen.

Hinweis

Python verwendet Tupel für alle Interaktionen mit Zustandswerten. Dies bedeutet, dass Python-Code Werte mithilfe von Tupeln übergeben sollte, wenn Vorgänge wie put und update verwendet werden und erwartet, dass Tupel bei der Verwendung von get behandelt werden.

Wenn das Schema für den Wertstatus beispielsweise nur eine einzelne ganze Zahl ist, würden Sie Code wie folgt implementieren:

current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0]  # Extracts the first item in the tuple
new_value = current_value + 1           # Calculate a new value
value_state.update((new_value,))        # Pass the new value formatted as a tuple

Dies gilt auch für Elemente in einem ListState oder Werten in einem MapState.

Python

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("countAsString", StringType(), True),
    ]
)

class SimpleCounterProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    value_state_schema = StructType([StructField("count", IntegerType(), True)])
    list_state_schema = StructType([StructField("count", IntegerType(), True)])
    self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
    self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
    # Schema can also be defined using strings and SQL DDL syntax
    self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

  def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
    count = 0
    for pdf in rows:
      list_state_rows = [(120,), (20,)] # A list of tuples
      self.list_state.put(list_state_rows)
      self.list_state.appendValue((111,))
      self.list_state.appendList(list_state_rows)
      pdf_count = pdf.count()
      count += pdf_count.get("value")
    self.value_state.update((count,)) # Count is passed as a tuple
    iter = self.list_state.get()
    list_state_value = next(iter1)[0]
    value = count
    user_key = ("user_key",)
    if self.map_state.exists():
      if self.map_state.containsKey(user_key):
        value += self.map_state.getValue(user_key)[0]
    self.map_state.updateValue(user_key, (value,)) # Value is a tuple
    yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=SimpleCounterProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
)

Scala

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
  @transient private var countState: ValueState[Int] = _
  @transient private var listState: ListState[Int] = _
  @transient private var mapState: MapState[String, Int] = _

  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    countState = getHandle.getValueState[Int]("countState",
      Encoders.scalaLong, TTLConfig.NONE)
    listState = getHandle.getListState[Int]("listState",
      Encoders.scalaInt, TTLConfig.NONE)
    mapState = getHandle.getMapState[String, Int]("mapState",
      Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, String)],
      timerValues: TimerValues): Iterator[(String, String)] = {
    var count = countState.getOption().getOrElse(0)
    for (row <- inputRows) {
      val listData = Array(120, 20)
      listState.put(listData)
      listState.appendValue(count)
      listState.appendList(listData)
      count += 1
    }
    val iter = listState.get()
    var listStateValue = 0
    if (iter.hasNext) {
      listStateValue = iter.next()
    }
    countState.update(count)
    var value = count
    val userKey = "userKey"
    if (mapState.exists()) {
      if (mapState.containsKey(userKey)) {
        value += mapState.getValue(userKey)
      }
    }
    mapState.updateValue(userKey, value)
    Iterator((key, count.toString))
  }
}

val q = spark
        .readStream
        .format("delta")
        .load("$srcDeltaTableDir")
        .as[(String, String)]
        .groupByKey(x => x._1)
        .transformWithState(
            new SimpleCounterProcessor(),
            TimeMode.None(),
            OutputMode.Update(),
        )
        .writeStream...

StatefulProcessorHandle

PySpark enthält die StatefulProcessorHandle Klasse, um Zugriff auf Funktionen bereitzustellen, die steuern, wie Ihr benutzerdefinierter Python-Code mit Zustandsinformationen interagiert. Sie müssen immer die StatefulProcessorHandle an die handle-Variable importieren und übergeben, wenn Sie eine StatefulProcessor-Variable initialisieren.

Die handle Variable verknüpft die lokale Variable in Ihrer Python-Klasse mit der Zustandsvariable.

Hinweis

Scala verwendet die getHandle Methode.

Angeben des Anfangszustands

Optional können Sie einen Anfangszustand für die Verwendung mit dem ersten Mikrobatch bereitstellen. Dies kann hilfreich sein, wenn Sie einen vorhandenen Workflow zu einer neuen benutzerdefinierten Anwendung migrieren, einen zustandsbehafteten Operator aktualisieren, um Ihr Schema oder Ihre Logik zu ändern, oder ein Fehler zu reparieren, der nicht automatisch repariert werden kann und einen manuellen Eingriff erfordert.

Hinweis

Verwenden Sie den Statusspeicherleser, um Statusinformationen von einem vorhandenen Prüfpunkt abzufragen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.

Wenn Sie eine vorhandene Delta-Tabelle in eine zustandsbehaftete Anwendung konvertieren, lesen Sie die Tabelle, und spark.read.table("table_name") übergeben Sie den resultierenden DataFrame. Sie können optional Felder auswählen oder ändern, um ihrer neuen zustandsbehafteten Anwendung zu entsprechen.

Sie stellen einen Anfangszustand mithilfe eines DataFrame mit demselben Gruppierungsschlüsselschema wie die Eingabezeilen bereit.

Hinweis

Python verwendet handleInitialState um den Anfangszustand beim Definieren eines StatefulProcessor anzugeben. Scala verwendet die unterschiedliche Klasse StatefulProcessorWithInitialState.

Verwenden Sie Ihre Statusvariablen

Unterstützte Statusobjekte stellen Methoden zum Abrufen des Zustands, Aktualisieren vorhandener Statusinformationen oder Löschen des aktuellen Zustands bereit. Jeder unterstützte Zustandstyp verfügt über eine eindeutige Implementierung von Methoden, die der implementierten Datenstruktur entsprechen.

Jeder beobachtete Gruppierungsschlüssel weist dedizierte Statusinformationen auf.

  • Datensätze werden basierend auf der Logik ausgegeben, die Sie implementieren, und dem Ausgabeschema, das Sie angeben. Siehe Emit-Datensätze.
  • Sie können mithilfe des statestore Readers auf Werte im Zustandsspeicher zugreifen. Dieser Reader verfügt über Batchfunktionen und ist nicht für Workloads mit geringer Latenz vorgesehen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.
  • Die mit handleInputRows angegebene Logik wird nur aktiviert, wenn Datensätze für den Schlüssel in einem Mikro-Batch vorhanden sind. Siehe Behandeln von Eingabezeilen.
  • Verwenden Sie handleExpiredTimer, um eine zeitbasierte Logik zu implementieren, die nicht darauf angewiesen ist, dass Datensätze beobachtet werden, um ausgelöst zu werden. Siehe Programmzeitgesteuerte Ereignisse.

Hinweis

Statusobjekte werden durch Gruppieren von Schlüsseln mit den folgenden Auswirkungen isoliert:

  • Zustandswerte können nicht von Datensätzen beeinflusst werden, die einem anderen Gruppierungsschlüssel zugeordnet sind.
  • Sie können keine Logik implementieren, die vom Vergleichen von Werten oder dem Aktualisieren des Zustands über Gruppierungsschlüssel abhängt.

Sie können Werte in einem Gruppierungsschlüssel vergleichen. Verwenden Sie MapState um Logik mit einem zweiten Schlüssel zu implementieren, den Ihre benutzerdefinierte Logik verwenden kann. Sie können beispielsweise die Gruppierung nach user_id und die Verwendung der IP-Adresse zur Schlüsselung Ihrer MapState implementieren, um Logik zu realisieren, die gleichzeitige Benutzersitzungen nachverfolgt.

Erweiterte Überlegungen zum Arbeiten mit Zustand

Das Schreiben in eine Zustandsvariable löst einen Schreibvorgang in RocksDB aus. Für eine optimierte Leistung empfiehlt Databricks, alle Werte im Iterator für einen bestimmten Schlüssel zu verarbeiten und Aktualisierungen in einem einzigen Schreibvorgang nach Möglichkeit durchzuführen.

Hinweis

Zustandsupdates sind fehlertolerant. Wenn ein Vorgang abstürzt, bevor eine Mikrobatchverarbeitung abgeschlossen ist, wird der Wert aus dem letzten erfolgreichen Mikrobatch beim Wiederholen verwendet.

Statuswerte weisen keine integrierten Standardwerte auf. Wenn Ihre Logik das Lesen vorhandener Statusinformationen erfordert, verwenden Sie die exists Methode, während Sie Ihre Logik implementieren.

Hinweis

MapState Variablen verfügen über zusätzliche Funktionen, um nach einzelnen Schlüsseln zu suchen oder alle Schlüssel auflisten, um Logik für den NULL-Zustand zu implementieren.

Emittieren von Datensätzen

Benutzerdefinierte Logik steuert, wie transformWithState Datensätze ausgegeben werden. Datensätze werden pro Gruppierungsschlüssel ausgegeben.

Benutzerdefinierte zustandsbehaftete Anwendungen machen keine Annahmen darüber, wie Zustandsinformationen verwendet werden, wenn bestimmt wird, wie Datensätze ausgegeben werden, und die zurückgegebene Anzahl von Datensätzen für eine bestimmte Bedingung kann keine, eine oder viele sein.

Sie implementieren Logik zur Ausgabe von Datensätzen entweder handleInputRows oder handleExpiredTimer. Siehe Behandeln von Eingabezeilen und Zeitgesteuerten Ereignissen des Programms.

Hinweis

Sie können mehrere Zustandswerte implementieren und mehrere Bedingungen für das Ausstellen von Datensätzen definieren, aber alle ausgegebenen Datensätze sollten dasselbe Schema verwenden.

Python

In Python definieren Sie Ihr Ausgabeschema mithilfe des outputStructType Schlüsselworts beim Aufrufen transformWithStateInPandas.

Sie geben Datensätze mit einem pandas DataFrame-Objekt und yield aus.

Optional können yield Sie einen leeren DataFrame verwenden. Bei Kombination mit update dem Ausgabemodus aktualisiert das Ausstrahlen eines leeren DataFrames die Werte für den Gruppierungsschlüssel auf Null.

Scala

In Scala geben Sie Datensätze mithilfe eines Iterator Objekts aus. Das Schema der Ausgabe wird von emittierten Datensätzen abgeleitet.

Optional können Sie ein leeres IteratorEmittieren. Bei Kombination mit update dem Ausgabemodus aktualisiert das Ausgeben eines leeren Iterator Werts die Werte für den Gruppierungsschlüssel auf NULL.

Eingabezeilen bearbeiten

Verwenden Sie die handleInputRows Methode, um die Logik für die Interaktion von Datensätzen in Ihrer Streamingabfrage mit Zustandswerten zu definieren und diese zu aktualisieren. Der Handler, den Sie mit der handleInputRows Methode definieren, wird jedes Mal ausgeführt, wenn datensätze über die Strukturierte Streaming-Abfrage verarbeitet werden.

Für die meisten zustandsbehafteten Anwendungen, die mit transformWithState implementiert werden, wird die Kernlogik mit handleInputRows definiert.

Für jede verarbeitete Aktualisierung eines Mikrobatches sind alle Datensätze des Mikrobatches für einen gegebenen Gruppierungsschlüssel über einen Iterator verfügbar. Benutzerdefinierte Logik kann mit allen Datensätzen aus dem aktuellen Mikrobatch und den Werten im Statestore interagieren.

Zeitgesteuerte Ereignisse des Programms

Sie können Zeitgeber verwenden, um benutzerdefinierte Logik basierend auf verstrichener Zeit aus einer angegebenen Bedingung zu implementieren.

Sie arbeiten mit Zeitgebern, indem Sie eine handleExpiredTimer Methode implementieren.

Innerhalb eines Gruppierungsschlüssels werden Zeitgeber durch ihren Zeitstempel eindeutig identifiziert.

Wenn ein Timer abläuft, wird das Ergebnis durch die in Ihrer Anwendung implementierte Logik bestimmt. Zu den gängigen Mustern gehören:

  • Das Ausgeben von in einer Zustandsvariable gespeicherten Informationen.
  • Entfernen gespeicherter Zustandsinformationen.
  • Erstellen eines neuen Zeitgebers.

Abgelaufene Timer werden ausgelöst, auch wenn keine Datensätze für ihren zugeordneten Schlüssel in einem Mikrobatch verarbeitet werden.

Angeben des Zeitmodells

Beim Weitergeben Ihrer StatefulProcessor an transformWithState müssen Sie das Zeitmodell angeben. Die folgenden Optionen werden unterstützt:

  • ProcessingTime
  • EventTime
  • NoTime oder TimeMode.None()

Die Angabe NoTime bedeutet, dass Timer für den Prozessor nicht unterstützt werden.

Integrierte Zeitgeberwerte

Databricks empfiehlt, die Systemuhr in Ihrer benutzerdefinierten zustandsbehafteten Anwendung nicht zu verwenden, da dies zu unzuverlässigen Wiederholungen bei Vorgangsfehlern führen kann. Verwenden Sie die Methoden in der TimerValues Klasse, wenn Sie auf die Verarbeitungszeit oder das Wasserzeichen zugreifen müssen:

TimerValues BESCHREIBUNG
getCurrentProcessingTimeInMs Gibt den Zeitstempel der Verarbeitungszeit für den aktuellen Batch in Millisekunden seit der Epoche zurück.
getCurrentWatermarkInMs Gibt den Zeitstempel des Wasserzeichens für die aktuelle Charge in Millisekunden seit der Epoche zurück.

Hinweis

Die Verarbeitungszeit beschreibt die Zeit, zu der der Mikrobatch von Apache Spark verarbeitet wird. Viele Streamingquellen, wie z. B. Kafka, umfassen auch die Systemverarbeitungszeit.

Wasserzeichen für Streamingabfragen werden häufig anhand der Ereigniszeit oder der Verarbeitungszeit der Streamingquelle definiert. Siehe Anwenden von Wasserzeichen zum Steuern von Schwellenwerten für die Datenverarbeitung.

Sowohl Wasserzeichen als auch Fenster können in Kombination mit transformWithState verwendet werden. Sie können ähnliche Funktionsweise in Ihrer benutzerdefinierten zustandsbehafteten Anwendung implementieren, indem Sie TTL, Timer und MapState oder ListState Funktionalität verwenden.

Was ist die Lebensdauer eines Status (TTL)?

Die von den einzelnen Systemen verwendeten transformWithState Zustandswerte unterstützen eine optionale Lebensdauer-Spezifikation (TTL). Wenn TTL abläuft, wird der Wert aus dem Statusspeicher entfernt. TTL interagiert nur mit Werten im Zustandsspeicher, d. h. Sie können Logik zum Entziehen von Zustandsinformationen implementieren, aber Sie können Logik nicht direkt auslösen, wenn TTL Zustandswerte ausscheidet.

Von Bedeutung

Wenn Sie TTL nicht implementieren, müssen Sie die Zustandsräumung mit einer anderen Logik behandeln, um endloses Zustandswachstum zu verhindern.

TTL wird für jeden Zustandswert mit unterschiedlichen Regeln für jeden Zustandstyp erzwungen.

  • Zustandsvariablen sind an bestimmte Gruppenschlüssel gebunden.
  • Bei ValueState Objekten wird pro Gruppierungsschlüssel nur ein einzelner Wert gespeichert. TTL gilt für diesen Wert.
  • Bei ListState Objekten kann die Liste viele Werte enthalten. TTL gilt für jeden Wert in einer Liste unabhängig.
  • Für MapState Objekte weist jeder Kartenschlüssel einen zugeordneten Statuswert auf. TTL gilt unabhängig für jedes Schlüsselwertpaar in einer Karte.

Für alle Statustypen setzt TTL zurück, wenn die Statusinformationen aktualisiert werden.

Hinweis

Während TTL auf einzelne Werte in einem ListStateBereich festgelegt ist, besteht die einzige Möglichkeit zum Aktualisieren eines Werts in einer Liste darin, die put Methode zum Überschreiben des gesamten Inhalts der ListState Variablen zu verwenden.

Was ist der Unterschied zwischen Timern und TTL?

Es gibt einige Überlappungen zwischen Timern und Zeit zum Leben (TTL) für Zustandsvariablen, aber Zeitgeber bieten einen breiteren Satz von Features als TTL.

TTL entfernt Statusinformationen, die für den vom Benutzer angegebenen Zeitraum nicht aktualisiert wurden. Auf diese Weise können Benutzer das Wachstum des ungeprüften Zustands verhindern und veraltete Zustandseinträge entfernen. Da Karten und Listen TTL für jeden Wert implementieren, können Sie Funktionen implementieren, die nur Zustandswerte berücksichtigen, die kürzlich aktualisiert wurden, indem Sie TTL festlegen.

Timer ermöglichen es Ihnen, benutzerdefinierte Logik über die Zustandsräumung hinaus zu definieren, einschließlich des Ausgebens von Datensätzen. Sie können optional Timer verwenden, um Zustandsinformationen für einen bestimmten Zustandswert zu löschen, mit der zusätzlichen Flexibilität, Werte auszugeben oder andere bedingte Logik anhand des Timers auszulösen.