Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Von Bedeutung
Dieses Feature befindet sich in der öffentlichen Vorschau in Databricks Runtime 16.2 und höher.
Sie können Streaminganwendungen mit benutzerdefinierten Zustandsoperatoren erstellen, um Lösungen mit geringer Latenz und nahezu Echtzeitlösungen zu implementieren, die beliebige Zustandslogik verwenden. Benutzerdefinierte zustandsbehaftete Operatoren ermöglichen neue operationelle Anwendungsfälle und Muster, die mit herkömmlicher strukturierter Streaming-Verarbeitung nicht realisierbar sind.
Hinweis
Databricks empfiehlt die Verwendung integrierter Structured Streaming-Funktionen für unterstützte zustandsbehaftete Vorgänge wie Aggregationen, Deduplizierung und Streaming-Verknüpfungen. Siehe Was ist zustandsbehaftetes Streaming?.
Databricks empfiehlt die Verwendung von transformWithState
anstelle von Legacyoperatoren für beliebige Zustandstransformationen. Dokumentation zu Legacy- flatMapGroupsWithState
und mapGroupsWithState
Operatoren finden Sie unter Legacy-Operatoren mit willkürlichen Zustandsoperatoren.
Anforderungen
Der transformWithState
-Operator und die zugehörigen APIs und Klassen haben die folgenden Anforderungen:
- Verfügbar in Databricks Runtime 16.2 und höher.
- Compute muss den dedizierten oder nicht isolierten Zugriffsmodus verwenden.
- Sie müssen den RocksDB-Statusspeicheranbieter verwenden. Databricks empfiehlt die Aktivierung von RocksDB als Teil der Computekonfiguration.
-
transformWithStateInPandas
unterstützt den Standardzugriffsmodus in Databricks Runtime 16.3 und höher.
Hinweis
Führen Sie die folgenden Schritte aus, um den RocksDB-Statusspeicheranbieter für die aktuelle Sitzung zu aktivieren:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Was ist transformWithState?
Der transformWithState
Operator wendet einen benutzerdefinierten zustandsbehafteten Prozessor auf eine Strukturierte Streaming-Abfrage an. Sie müssen einen benutzerdefinierten zustandsbehafteten Prozessor implementieren, um transformWithState
verwenden zu können. Strukturiertes Streaming umfasst APIs zum Erstellen Ihres zustandsbehafteten Prozessors mit Python, Scala oder Java.
Sie verwenden transformWithState
, um benutzerdefinierte Logik auf einen Gruppierungsschlüssel für Datensätze anzuwenden, die mit strukturiertem Streaming inkrementell verarbeitet werden. Im Folgenden wird das allgemeine Design beschrieben:
- Definieren Sie eine oder mehrere Statusvariablen.
- Statusinformationen werden für jeden Gruppierungsschlüssel beibehalten und können für jede Zustandsvariable gemäß benutzerdefinierter Logik aufgerufen werden.
- Für jeden verarbeiteten Mikrobatch sind alle Datensätze für den Schlüssel als Iterator verfügbar.
- Verwenden Sie integrierte Handles, um zu steuern, wann und wie Datensätze basierend auf Zeitgebern und benutzerdefinierten Bedingungen ausgegeben werden.
- Zustandswerte unterstützen individuelle Time-to-Live-(TTL-)Definitionen und ermöglichen Flexibilität bei der Verwaltung von Zustandsablauf und -größe.
transformWithState
unterstützt die Schemaentwicklung im Zustandsspeicher, sodass Sie Ihre Produktionsanwendungen durchlaufen und aktualisieren können, ohne dass historische Zustandsinformationen verloren gehen oder Datensätze neu verarbeitet werden müssen, was Flexibilität für die Entwicklung und einfache Wartung bietet. Siehe Schemaentwicklung im Zustandsspeicher.
Von Bedeutung
PySpark verwendet den Operator transformWithStateInPandas
anstelle von transformWithState
. In der Dokumentation zu Azure Databricks wird transformWithState
verwendet, um die Funktionalität für Python- und Scala-Implementierungen zu beschreiben.
Die Implementierungen von transformWithState
in Scala und Python sowie die zugehörigen APIs unterscheiden sich aufgrund von sprachspezifischen Unterschieden, bieten jedoch dieselbe Funktionalität. Weitere Informationen finden Sie in sprachspezifischen Beispielen und API-Dokumentationen für Ihre bevorzugte Programmiersprache.
Integrierte Verarbeitungshandles
Sie implementieren die Kernlogik für Ihre benutzerdefinierte Zustandsanwendung, indem Sie Handler mithilfe integrierter Handles implementieren.
- Handles stellen die Methoden für die Interaktion mit Zustandswerten und Zeitgebern bereit, verarbeiten eingehende Datensätze und emittieren Datensätze.
- Die Handler definieren Ihre benutzerdefinierte ereignisgesteuerte Logik.
Handler für jeden Statustyp werden basierend auf der zugrunde liegenden Datenstruktur implementiert, enthalten jedoch Funktionen zum Abrufen, Einfügen, Aktualisieren und Löschen von Datensätzen.
Handler werden basierend auf ereignissen implementiert, die in Eingabedatensätzen oder Timern beobachtet werden, wobei die folgende Semantik verwendet wird:
- Definieren Sie einen Handler mithilfe der
handleInputRows
Methode, um zu steuern, wie Daten verarbeitet werden, der Zustand aktualisiert wird, und Datensätze werden für jeden Mikrobatch von Datensätzen ausgegeben, die für den Gruppierungsschlüssel verarbeitet werden. Siehe Behandeln von Eingabezeilen. - Definieren Sie einen Handler mithilfe der
handleExpiredTimer
Methode, um zeitbasierte Schwellenwerte zum Ausführen von Logik zu verwenden, unabhängig davon, ob zusätzliche Datensätze für den Gruppierungsschlüssel verarbeitet werden. Siehe Programmzeitgesteuerte Ereignisse.
Die folgende Tabelle enthält einen Vergleich der funktionalen Verhaltensweisen, die von diesen Handlern unterstützt werden:
Verhalten | handleInputRows |
handleExpiredTimer |
---|---|---|
Abrufen, Platzieren, Aktualisieren oder Löschen von Zustandswerten | Ja | Ja |
Erstellen oder Löschen eines Timers | Ja | Ja |
Emittieren von Datensätzen | Ja | Ja |
Durchlaufen von Datensätzen im aktuellen Mikrobatch | Ja | Nein |
Triggerlogik basierend auf verstrichener Zeit | Nein | Ja |
Sie können handleInputRows
und handleExpiredTimer
kombinieren, um bei Bedarf komplexe Logik zu implementieren.
Sie können beispielsweise eine Anwendung implementieren, die handleInputRows
verwendet, um die Zustandswerte für jeden Mikro-Batch zu aktualisieren und einen Timer für die nächsten 10 Sekunden zu setzen. Wenn keine weiteren Datensätze verarbeitet werden, können Sie mit handleExpiredTimer
die aktuellen Werte im Statusspeicher ausgeben. Wenn neue Datensätze für den Gruppierungsschlüssel verarbeitet werden, können Sie den vorhandenen Timer löschen und einen neuen Timer festlegen.
Benutzerdefinierte Zustandstypen
Sie können mehrere Zustandsobjekte in einem einzelnen Zustandsoperator implementieren. Die Namen, die Sie jedem Zustandsobjekt zuordnen, bleiben im Zustandsspeicher erhalten, auf den Sie mit dem Statusspeicherleser zugreifen können. Wenn Ihr Zustandsobjekt ein StructType
verwendet, geben Sie Namen für jedes Feld in der Struktur an, während Sie das Schema übergeben. Diese Namen sind auch beim Lesen des Zustandsspeichers sichtbar. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.
Die von integrierten Klassen und Operatoren bereitgestellte Funktionalität soll Flexibilität und Erweiterbarkeit bieten, und Die Implementierungsoptionen sollten durch die vollständige Logik informiert werden, die Ihre Anwendung ausführen muss. Sie können z. B. nahezu identische Logik implementieren, indem Sie eine ValueState
anwenden, die nach den Feldern user_id
und session_id
oder nach MapState
gruppiert ist, wobei user_id
der Schlüssel für session_id
ist. In diesem Fall ist eine MapState
möglicherweise die bevorzugte Implementierung, wenn die Logik Bedingungen für mehrere session_id
Werte auswerten muss.
In den folgenden Abschnitten werden die Zustandstypen beschrieben, die von transformWithState
unterstützt werden.
ValueState
Für jeden Gruppierungsschlüssel gibt es einen zugeordneten Wert.
Ein Wertstatus kann komplexe Typen enthalten, z. B. eine Struktur oder ein Tupel. Wenn Sie eine ValueState
Datei aktualisieren, implementieren Sie logik, um den gesamten Wert zu ersetzen. Die TTL für einen Wertstatus wird zurückgesetzt, wenn der Wert aktualisiert wird, aber sie wird nicht zurückgesetzt, wenn ein Quellschlüssel, der einem ValueState
entspricht, verarbeitet wird, ohne die gespeicherten ValueState
zu aktualisieren.
ListState
Für jeden Gruppierungsschlüssel gibt es eine zugeordnete Liste.
Ein Listenstatus ist eine Sammlung von Werten, von denen jeder komplexe Typen enthalten kann. Jeder Wert in einer Liste verfügt über eine eigene TTL. Sie können einer Liste Elemente hinzufügen, indem Sie einzelne Elemente anfügen, eine Liste mit Elementen anfügen oder die gesamte Liste mit einer put
überschreiben. Nur der Put-Vorgang wird als Aktualisierung betrachtet, um die TTL zurückzusetzen.
MapState
Für jeden Gruppierungsschlüssel gibt es eine verknüpfte Abbildung. Karten sind das funktionale Apache Spark-Äquivalent zu einem Python-Diktat.
Von Bedeutung
Gruppierungsschlüssel beschreiben die in der GROUP BY
Klausel der Strukturierten Streaming-Abfrage angegebenen Felder. Map-Zustände enthalten eine beliebige Anzahl von Schlüssel-Wert-Paaren für einen Gruppierungsschlüssel.
Wenn Sie z. B. nach user_id
gruppieren und eine Karte für session_id
definieren möchten, ist user_id
Ihr Gruppierungsschlüssel und der Schlüssel in Ihrer Zuordnung session_id
.
Ein Kartenstatus ist eine Sammlung unterschiedlicher Schlüssel, die jeweils einem Wert zugeordnet sind, der komplexe Typen enthalten kann. Jedes Schlüssel-Wert-Paar in einer Karte verfügt über eine eigene TTL. Sie können den Wert eines bestimmten Schlüssels aktualisieren oder einen Schlüssel und dessen Wert entfernen. Sie können einen einzelnen Wert mithilfe des Schlüssels zurückgeben, alle Schlüssel auflisten, alle Werte auflisten oder einen Iterator zurückgeben, um mit dem vollständigen Satz von Schlüsselwertpaaren in der Karte zu arbeiten.
Initialisieren einer benutzerdefinierten Zustandsvariable
Beim Initialisieren des StatefulProcessor
Objekts erstellen Sie eine lokale Variable für jedes Zustandsobjekt, mit dem Sie mit Zustandsobjekten in Ihrer benutzerdefinierten Logik interagieren können. Zustandsvariablen werden definiert und initialisiert, indem die integrierte init
Methode in der StatefulProcessor
Klasse überschrieben wird.
Sie definieren eine beliebige Anzahl von Zustandsobjekten durch die Methoden getValueState
, getListState
und getMapState
während der Initialisierung Ihres StatefulProcessor
-Objekts.
Jedes Zustandsobjekt muss folgendes aufweisen:
- Ein eindeutiger Name
- Ein angegebenes Schema
- In Python wird das Schema explizit angegeben.
- Übergeben Sie in Scala ein
Encoder
Zustandsschema, um das Statusschema anzugeben.
Sie können auch eine optionale, zu spezifizierende TTL-Dauer (Time-to-Live) in Millisekunden angeben. Wenn Sie einen Kartenzustand implementieren, müssen Sie eine separate Schemadefinition für die Kartenschlüssel und die Werte bereitstellen.
Hinweis
Die Logik für das Abfragen, Aktualisieren und Ausgeben von Zustandsinformationen wird separat behandelt. Weitere Informationen finden Sie unter Verwenden der Statusvariablen.
Beispiel einer zustandsbehafteten Anwendung
Im Folgenden wird die grundlegende Syntax zum Definieren und Verwenden eines benutzerdefinierten Zustandsprozessors mit transformWithState
veranschaulicht, einschließlich Beispielzustandsvariablen für jeden unterstützten Typ. Weitere Beispiele finden Sie unter Beispiel für zustandsbehaftete Anwendungen.
Hinweis
Python verwendet Tupel für alle Interaktionen mit Zustandswerten. Dies bedeutet, dass Python-Code Werte mithilfe von Tupeln übergeben sollte, wenn Vorgänge wie put
und update
verwendet werden und erwartet, dass Tupel bei der Verwendung von get
behandelt werden.
Wenn das Schema für den Wertstatus beispielsweise nur eine einzelne ganze Zahl ist, würden Sie Code wie folgt implementieren:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Dies gilt auch für Elemente in einem ListState
oder Werten in einem MapState
.
Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark enthält die StatefulProcessorHandle
Klasse, um Zugriff auf Funktionen bereitzustellen, die steuern, wie Ihr benutzerdefinierter Python-Code mit Zustandsinformationen interagiert. Sie müssen immer die StatefulProcessorHandle
an die handle
-Variable importieren und übergeben, wenn Sie eine StatefulProcessor
-Variable initialisieren.
Die handle
Variable verknüpft die lokale Variable in Ihrer Python-Klasse mit der Zustandsvariable.
Hinweis
Scala verwendet die getHandle
Methode.
Angeben des Anfangszustands
Optional können Sie einen Anfangszustand für die Verwendung mit dem ersten Mikrobatch bereitstellen. Dies kann hilfreich sein, wenn Sie einen vorhandenen Workflow zu einer neuen benutzerdefinierten Anwendung migrieren, einen zustandsbehafteten Operator aktualisieren, um Ihr Schema oder Ihre Logik zu ändern, oder ein Fehler zu reparieren, der nicht automatisch repariert werden kann und einen manuellen Eingriff erfordert.
Hinweis
Verwenden Sie den Statusspeicherleser, um Statusinformationen von einem vorhandenen Prüfpunkt abzufragen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.
Wenn Sie eine vorhandene Delta-Tabelle in eine zustandsbehaftete Anwendung konvertieren, lesen Sie die Tabelle, und spark.read.table("table_name")
übergeben Sie den resultierenden DataFrame. Sie können optional Felder auswählen oder ändern, um ihrer neuen zustandsbehafteten Anwendung zu entsprechen.
Sie stellen einen Anfangszustand mithilfe eines DataFrame mit demselben Gruppierungsschlüsselschema wie die Eingabezeilen bereit.
Hinweis
Python verwendet handleInitialState
um den Anfangszustand beim Definieren eines StatefulProcessor
anzugeben. Scala verwendet die unterschiedliche Klasse StatefulProcessorWithInitialState
.
Verwenden Sie Ihre Statusvariablen
Unterstützte Statusobjekte stellen Methoden zum Abrufen des Zustands, Aktualisieren vorhandener Statusinformationen oder Löschen des aktuellen Zustands bereit. Jeder unterstützte Zustandstyp verfügt über eine eindeutige Implementierung von Methoden, die der implementierten Datenstruktur entsprechen.
Jeder beobachtete Gruppierungsschlüssel weist dedizierte Statusinformationen auf.
- Datensätze werden basierend auf der Logik ausgegeben, die Sie implementieren, und dem Ausgabeschema, das Sie angeben. Siehe Emit-Datensätze.
- Sie können mithilfe des
statestore
Readers auf Werte im Zustandsspeicher zugreifen. Dieser Reader verfügt über Batchfunktionen und ist nicht für Workloads mit geringer Latenz vorgesehen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen. - Die mit
handleInputRows
angegebene Logik wird nur aktiviert, wenn Datensätze für den Schlüssel in einem Mikro-Batch vorhanden sind. Siehe Behandeln von Eingabezeilen. - Verwenden Sie
handleExpiredTimer
, um eine zeitbasierte Logik zu implementieren, die nicht darauf angewiesen ist, dass Datensätze beobachtet werden, um ausgelöst zu werden. Siehe Programmzeitgesteuerte Ereignisse.
Hinweis
Statusobjekte werden durch Gruppieren von Schlüsseln mit den folgenden Auswirkungen isoliert:
- Zustandswerte können nicht von Datensätzen beeinflusst werden, die einem anderen Gruppierungsschlüssel zugeordnet sind.
- Sie können keine Logik implementieren, die vom Vergleichen von Werten oder dem Aktualisieren des Zustands über Gruppierungsschlüssel abhängt.
Sie können Werte in einem Gruppierungsschlüssel vergleichen. Verwenden Sie MapState
um Logik mit einem zweiten Schlüssel zu implementieren, den Ihre benutzerdefinierte Logik verwenden kann. Sie können beispielsweise die Gruppierung nach user_id
und die Verwendung der IP-Adresse zur Schlüsselung Ihrer MapState
implementieren, um Logik zu realisieren, die gleichzeitige Benutzersitzungen nachverfolgt.
Erweiterte Überlegungen zum Arbeiten mit Zustand
Das Schreiben in eine Zustandsvariable löst einen Schreibvorgang in RocksDB aus. Für eine optimierte Leistung empfiehlt Databricks, alle Werte im Iterator für einen bestimmten Schlüssel zu verarbeiten und Aktualisierungen in einem einzigen Schreibvorgang nach Möglichkeit durchzuführen.
Hinweis
Zustandsupdates sind fehlertolerant. Wenn ein Vorgang abstürzt, bevor eine Mikrobatchverarbeitung abgeschlossen ist, wird der Wert aus dem letzten erfolgreichen Mikrobatch beim Wiederholen verwendet.
Statuswerte weisen keine integrierten Standardwerte auf. Wenn Ihre Logik das Lesen vorhandener Statusinformationen erfordert, verwenden Sie die exists
Methode, während Sie Ihre Logik implementieren.
Hinweis
MapState
Variablen verfügen über zusätzliche Funktionen, um nach einzelnen Schlüsseln zu suchen oder alle Schlüssel auflisten, um Logik für den NULL-Zustand zu implementieren.
Emittieren von Datensätzen
Benutzerdefinierte Logik steuert, wie transformWithState
Datensätze ausgegeben werden. Datensätze werden pro Gruppierungsschlüssel ausgegeben.
Benutzerdefinierte zustandsbehaftete Anwendungen machen keine Annahmen darüber, wie Zustandsinformationen verwendet werden, wenn bestimmt wird, wie Datensätze ausgegeben werden, und die zurückgegebene Anzahl von Datensätzen für eine bestimmte Bedingung kann keine, eine oder viele sein.
Sie implementieren Logik zur Ausgabe von Datensätzen entweder handleInputRows
oder handleExpiredTimer
. Siehe Behandeln von Eingabezeilen und Zeitgesteuerten Ereignissen des Programms.
Hinweis
Sie können mehrere Zustandswerte implementieren und mehrere Bedingungen für das Ausstellen von Datensätzen definieren, aber alle ausgegebenen Datensätze sollten dasselbe Schema verwenden.
Python
In Python definieren Sie Ihr Ausgabeschema mithilfe des outputStructType
Schlüsselworts beim Aufrufen transformWithStateInPandas
.
Sie geben Datensätze mit einem pandas DataFrame-Objekt und yield
aus.
Optional können yield
Sie einen leeren DataFrame verwenden. Bei Kombination mit update
dem Ausgabemodus aktualisiert das Ausstrahlen eines leeren DataFrames die Werte für den Gruppierungsschlüssel auf Null.
Scala
In Scala geben Sie Datensätze mithilfe eines Iterator
Objekts aus. Das Schema der Ausgabe wird von emittierten Datensätzen abgeleitet.
Optional können Sie ein leeres Iterator
Emittieren. Bei Kombination mit update
dem Ausgabemodus aktualisiert das Ausgeben eines leeren Iterator
Werts die Werte für den Gruppierungsschlüssel auf NULL.
Eingabezeilen bearbeiten
Verwenden Sie die handleInputRows
Methode, um die Logik für die Interaktion von Datensätzen in Ihrer Streamingabfrage mit Zustandswerten zu definieren und diese zu aktualisieren. Der Handler, den Sie mit der handleInputRows
Methode definieren, wird jedes Mal ausgeführt, wenn datensätze über die Strukturierte Streaming-Abfrage verarbeitet werden.
Für die meisten zustandsbehafteten Anwendungen, die mit transformWithState
implementiert werden, wird die Kernlogik mit handleInputRows
definiert.
Für jede verarbeitete Aktualisierung eines Mikrobatches sind alle Datensätze des Mikrobatches für einen gegebenen Gruppierungsschlüssel über einen Iterator verfügbar. Benutzerdefinierte Logik kann mit allen Datensätzen aus dem aktuellen Mikrobatch und den Werten im Statestore interagieren.
Zeitgesteuerte Ereignisse des Programms
Sie können Zeitgeber verwenden, um benutzerdefinierte Logik basierend auf verstrichener Zeit aus einer angegebenen Bedingung zu implementieren.
Sie arbeiten mit Zeitgebern, indem Sie eine handleExpiredTimer
Methode implementieren.
Innerhalb eines Gruppierungsschlüssels werden Zeitgeber durch ihren Zeitstempel eindeutig identifiziert.
Wenn ein Timer abläuft, wird das Ergebnis durch die in Ihrer Anwendung implementierte Logik bestimmt. Zu den gängigen Mustern gehören:
- Das Ausgeben von in einer Zustandsvariable gespeicherten Informationen.
- Entfernen gespeicherter Zustandsinformationen.
- Erstellen eines neuen Zeitgebers.
Abgelaufene Timer werden ausgelöst, auch wenn keine Datensätze für ihren zugeordneten Schlüssel in einem Mikrobatch verarbeitet werden.
Angeben des Zeitmodells
Beim Weitergeben Ihrer StatefulProcessor
an transformWithState
müssen Sie das Zeitmodell angeben. Die folgenden Optionen werden unterstützt:
ProcessingTime
EventTime
-
NoTime
oderTimeMode.None()
Die Angabe NoTime
bedeutet, dass Timer für den Prozessor nicht unterstützt werden.
Integrierte Zeitgeberwerte
Databricks empfiehlt, die Systemuhr in Ihrer benutzerdefinierten zustandsbehafteten Anwendung nicht zu verwenden, da dies zu unzuverlässigen Wiederholungen bei Vorgangsfehlern führen kann. Verwenden Sie die Methoden in der TimerValues
Klasse, wenn Sie auf die Verarbeitungszeit oder das Wasserzeichen zugreifen müssen:
TimerValues |
BESCHREIBUNG |
---|---|
getCurrentProcessingTimeInMs |
Gibt den Zeitstempel der Verarbeitungszeit für den aktuellen Batch in Millisekunden seit der Epoche zurück. |
getCurrentWatermarkInMs |
Gibt den Zeitstempel des Wasserzeichens für die aktuelle Charge in Millisekunden seit der Epoche zurück. |
Hinweis
Die Verarbeitungszeit beschreibt die Zeit, zu der der Mikrobatch von Apache Spark verarbeitet wird. Viele Streamingquellen, wie z. B. Kafka, umfassen auch die Systemverarbeitungszeit.
Wasserzeichen für Streamingabfragen werden häufig anhand der Ereigniszeit oder der Verarbeitungszeit der Streamingquelle definiert. Siehe Anwenden von Wasserzeichen zum Steuern von Schwellenwerten für die Datenverarbeitung.
Sowohl Wasserzeichen als auch Fenster können in Kombination mit transformWithState
verwendet werden. Sie können ähnliche Funktionsweise in Ihrer benutzerdefinierten zustandsbehafteten Anwendung implementieren, indem Sie TTL, Timer und MapState
oder ListState
Funktionalität verwenden.
Was ist die Lebensdauer eines Status (TTL)?
Die von den einzelnen Systemen verwendeten transformWithState
Zustandswerte unterstützen eine optionale Lebensdauer-Spezifikation (TTL). Wenn TTL abläuft, wird der Wert aus dem Statusspeicher entfernt. TTL interagiert nur mit Werten im Zustandsspeicher, d. h. Sie können Logik zum Entziehen von Zustandsinformationen implementieren, aber Sie können Logik nicht direkt auslösen, wenn TTL Zustandswerte ausscheidet.
Von Bedeutung
Wenn Sie TTL nicht implementieren, müssen Sie die Zustandsräumung mit einer anderen Logik behandeln, um endloses Zustandswachstum zu verhindern.
TTL wird für jeden Zustandswert mit unterschiedlichen Regeln für jeden Zustandstyp erzwungen.
- Zustandsvariablen sind an bestimmte Gruppenschlüssel gebunden.
- Bei
ValueState
Objekten wird pro Gruppierungsschlüssel nur ein einzelner Wert gespeichert. TTL gilt für diesen Wert. - Bei
ListState
Objekten kann die Liste viele Werte enthalten. TTL gilt für jeden Wert in einer Liste unabhängig. - Für
MapState
Objekte weist jeder Kartenschlüssel einen zugeordneten Statuswert auf. TTL gilt unabhängig für jedes Schlüsselwertpaar in einer Karte.
Für alle Statustypen setzt TTL zurück, wenn die Statusinformationen aktualisiert werden.
Hinweis
Während TTL auf einzelne Werte in einem ListState
Bereich festgelegt ist, besteht die einzige Möglichkeit zum Aktualisieren eines Werts in einer Liste darin, die put
Methode zum Überschreiben des gesamten Inhalts der ListState
Variablen zu verwenden.
Was ist der Unterschied zwischen Timern und TTL?
Es gibt einige Überlappungen zwischen Timern und Zeit zum Leben (TTL) für Zustandsvariablen, aber Zeitgeber bieten einen breiteren Satz von Features als TTL.
TTL entfernt Statusinformationen, die für den vom Benutzer angegebenen Zeitraum nicht aktualisiert wurden. Auf diese Weise können Benutzer das Wachstum des ungeprüften Zustands verhindern und veraltete Zustandseinträge entfernen. Da Karten und Listen TTL für jeden Wert implementieren, können Sie Funktionen implementieren, die nur Zustandswerte berücksichtigen, die kürzlich aktualisiert wurden, indem Sie TTL festlegen.
Timer ermöglichen es Ihnen, benutzerdefinierte Logik über die Zustandsräumung hinaus zu definieren, einschließlich des Ausgebens von Datensätzen. Sie können optional Timer verwenden, um Zustandsinformationen für einen bestimmten Zustandswert zu löschen, mit der zusätzlichen Flexibilität, Werte auszugeben oder andere bedingte Logik anhand des Timers auszulösen.