Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Den här artikeln innehåller en översikt över schemautvecklingen i tillståndslagret och exempel på typer av schemaändringar som stöds.
Vad är schemaversionering i tillståndslagringen?
Schemautveckling avser möjligheten för ett program att hantera ändringar i dataschemat.
Azure Databricks stöder schemautveckling i RocksDB-tillståndsarkivet för strukturerade direktuppspelningsprogram som använder transformWithState.
Schemautveckling ger flexibilitet för utveckling och enkelt underhåll. Använd schemautveckling för att anpassa datamodellen eller datatyperna i ditt tillståndslager utan att förlora tillståndsinformation eller kräva fullständig ombearbetning av historiska data.
Krav
Du måste ange kodningsformatet för tillståndsarkivet till Avro för att kunna använda schemautveckling. Om du vill ange detta för den aktuella sessionen kör du följande:
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")
Schemautveckling stöds endast för tillståndsberoende operationer som använder transformWithState eller transformWithStateInPandas. Dessa operatorer och relaterade API:er och klasser har följande krav:
- Tillgänglig i Databricks Runtime 16.2 och senare.
- Beräkning måste använda dedikerat eller icke-isoleringsåtkomstläge.
- Du måste använda RocksDB-lagringsleverantören. Databricks rekommenderar att du aktiverar RocksDB som en del av beräkningskonfigurationen.
-
transformWithStateInPandasstöder standardåtkomstläge i Databricks Runtime 16.3 och senare.
Om du vill aktivera RocksDB-tillståndslagerprovidern för den aktuella sessionen kör du följande:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Schemautvecklingsmönster som stöds i tillståndslagret
Databricks stöder följande schemautvecklingsmönster för tillståndskänsliga åtgärder för strukturerad direktuppspelning.
| Mönster | Beskrivning |
|---|---|
| Typbreddning | Ändra datatyper från mer restriktiva till mindre restriktiva typer. |
| Lägga till fält | Lägg till nya fält i schemat för befintliga tillståndslagervariabler. |
| Ta bort fält | Ta bort befintliga fält från schemat eller en tillståndslagervariabel. |
| Ordna om fält | Ändra ordning på fält i en variabel. |
| Lägga till tillståndsvariabler | Lägg till en ny tillståndsvariabel i ett program. |
| Ta bort tillståndsvariabler | Ta bort en befintlig tillståndsvariabel från ett program. |
När sker schemautvecklingen?
Schemautvecklingen i tillståndsarkivet beror på att koden som definierar ditt tillståndskänsliga program uppdateras. På grund av detta gäller följande instruktioner:
- Schemautvecklingen sker inte automatiskt till följd av schemaändringar i källdata för frågan.
- Schemautvecklingen sker endast när en ny version av programmet distribueras. Eftersom endast en version av en direktuppspelningsfråga kan köras samtidigt måste du starta om strömningsjobbet för att utveckla schemat för tillståndsvariabler.
- Koden definierar uttryckligen alla tillståndsvariabler och anger schemat för alla tillståndsvariabler.
- I Scala använder du en
Encoderför att ange schemat för varje variabel. - I Python skapar du uttryckligen ett schema som en
StructType.
- I Scala använder du en
Schemautvecklingsmönster som inte stöds
Följande schemautvecklingsmönster stöds inte:
Omnamngivning av fält: Omnamngivning av fält stöds inte eftersom fält matchas efter namn. Försök att byta namn på ett fält hanteras genom att fältet tas bort och ett nytt fält läggs till. Den här åtgärden resulterar inte i ett fel eftersom det är tillåtet att ta bort och lägga till fält, men värdena från det ursprungliga fältet överförs inte till det nya fältet.
Kan ändra namn på nycklar eller ändra typ: Du kan inte ändra namn eller typ av nycklar i karttillståndsvariabler.
Typbegränsning typbegränsningsoperationer, även kallat nedåtgående typkonvertering, stöds inte. Dessa åtgärder kan leda till dataförlust. Följande är exempel på typsmalningsåtgärder som inte stöds:
-
doublekan inte begränsas tillfloat,longellerint -
floatkan inte begränsas tilllongellerint -
longkan inte begränsas tillint
-
Typbreddning i statuslagring
Du kan bredda primitiva datatyper till mer tillmötesgående typer. Följande typbreddningsändringar stöds:
-
intkan höjas tilllong,floatellerdouble -
longkan höjas upp tillfloatellerdouble -
floatkan höjas upp tilldouble -
stringkan höjas upp tillbytes -
byteskan höjas upp tillstring
Befintliga värden är upphöjda som en ny typ. Till exempel blir 1212.00.
Exempel på typbreddning med transformWithState
Scala
// Initial run with Integer field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with Long field (type widening)
case class StateV2(value1: Long)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}
Python
class IntStateProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with Integer field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to integer and update state
value = pdf["value"].iloc[0]
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class LongStateProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with Long field (type widening)
state_schema = StructType([
StructField("value1", LongType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to long and update state
value = pdf["value"].iloc[0]
# When reading state written with IntStateProcessor,
# it will be automatically converted to Long
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
Lägg till fält i statuslagervärden
Du kan lägga till nya fält i schemat för befintliga tillståndslagervärden.
När du läser data som skrivits med det gamla schemat returnerar Avro-kodaren data för tillagda fält som är internt kodade som null.
Python tolkar alltid dessa värden som None. Scala har olika standardbeteende beroende på typen för fältet. Databricks rekommenderar att du implementerar logik för att säkerställa att Scala inte imputerar värden för saknade data. Se Standardvärden för fält som har lagts till i tillståndsvariabeln.
Exempel på att lägga till nya fält med transformWithState
Scala
// Initial run with single field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with additional field
case class StateV2(value1: Integer, value2: String)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class StateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class StateV2Processor(StatefulProcessor):
def init(self, handle):
# Later schema with additional fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Read current state
current_state = self.state.get()
# When reading state written with StateV1(1),
# it will be automatically converted to StateV2(1, None)
value1 = current_state[0]
value2 = current_state[1]
# Now update with both fields populated
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
Ta bort fält för att ange lagringsvärden
Du kan ta bort fält från schemat för en befintlig variabel. När du läser data med det gamla schemat ignoreras fält som finns i gamla data men inte i det nya schemat.
Exempel på hur du tar bort fält från tillståndsvariabler
Scala
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with field removed
case class StateV2(value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}
Python
class RemoveFieldsOriginalProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with multiple fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class RemoveFieldsReducedProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with field removed
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
# it will be automatically converted to just (1,)
current_state = self.state.get()
value1 = current_state[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]]
})
Ordna om fält i en tillståndsvariabel
Du kan ordna om fält i en tillståndsvariabel, inklusive när du lägger till eller tar bort befintliga fält. Fält i tillståndsvariabler matchas efter namn, inte position.
Exempel på omordning av fält i en tillståndsvariabel
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class OrderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with fields in original order
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ReorderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with reordered fields
state_schema = StructType([
StructField("value2", StringType(), True),
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
# it will be automatically converted to ("metadata-1", 1)
current_state = self.state.get()
value2 = current_state[0]
value1 = current_state[1]
self.state.update((f"new-metadata-{value}", int(value)))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value2": [current_state[0]],
"value1": [current_state[1]]
})
Lägga till en tillståndsvariabel i ett tillståndskänsligt program
Vi kan också lägga till tillståndsvariabler mellan frågekörningar.
Obs: Det här mönstret kräver ingen Avro-kodare och stöds av alla transformWithState program.
Exempel på att lägga till en tillståndsvariabel i ett tillståndskänsligt program
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
case class StateV2(value1: String, value2: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class MultiStateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single state variable
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
Ta bort en tillståndsvariabel från ett tillståndskänsligt program
Förutom att ta bort fält kan du även ta bort tillståndsvariabler mellan frågekörningar.
Obs: Det här mönstret kräver ingen Avro-kodare och stöds av alla transformWithState program.
Exempel på hur du tar bort en tillståndsvariabel till ett tillståndskänsligt program
Scala
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
class RemoveStateVarProcessor(StatefulProcessor):
def init(self, handle):
# Only use one state variable and delete the other
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
# Delete old state variable that we no longer need
handle.deleteIfExists("testState2")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
Standardvärden för fält som läggs till i tillståndsvariabeln
När du lägger till nya fält i en befintlig tillståndsvariabel har tillståndsvariabler som skrivits med det gamla schemat följande beteende:
- Avro-kodaren returnerar ett
nullvärde för tillagda fält. - Python konverterar dessa värden till
Noneför alla datatyper. - Standardbeteendet för Scala skiljer sig åt beroende på datatyp:
- Referenstyper returnerar
null. - Primitiva typer returnerar ett standardvärde som skiljer sig beroende på den primitiva typen. Exempel är
0förinttyper ellerfalseförbooltyper.
- Referenstyper returnerar
Det finns inga inbyggda funktioner eller metadata som flaggar fältet som lagts till genom schemautveckling. Du måste implementera logik för att hantera null-värden som returneras för fält som inte fanns i ditt tidigare schema.
För Scala kan du undvika att imputera standardvärden med hjälp av Option[<Type>], som returnerar saknade värden som None i stället för att använda standardtypen.
Du måste implementera logik för att korrekt hantera situationer där None typvärden returneras på grund av schemaevolution.
Exempel på standardvärden för tillagda fält i en tillståndsvariabel
Scala
// Example demonstrating how null defaults work in schema evolution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()
// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")
// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}
// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}
Python
class NullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ExpandedNullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Evolution: Adding new fields with null/default values
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True),
StructField("value3", LongType(), True),
StructField("value4", IntegerType(), True),
StructField("value5", BooleanType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Reading from state
current_state = self.state.get()
# Showing how null defaults work in Python
# When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
# it will be automatically converted to (1, "metadata-1", None, None, None)
# In Python, both primitive and reference types will be None
value1 = current_state[0]
value2 = current_state[1]
value3 = current_state[2] # Will be None when evolved from older schema
value4 = current_state[3] # Will be None when evolved from older schema
value5 = current_state[4] # Will be None when evolved from older schema
# Check if value3 is None
if value3 is None:
print("The value3 field is None (default value for evolution)")
value3 = 100 # Set a real value now
# Now update with all fields populated
self.state.update((
value1,
value2,
value3,
value4 if value4 is not None else 42,
value5 if value5 is not None else True
))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]],
"value3": [current_state[2]],
"value4": [current_state[3]],
"value5": [current_state[4]]
})
Begränsningar
I följande tabell beskrivs standardgränser för schemautvecklingsändringar:
| Beskrivning | Standardgräns | Spark-konfiguration som ska åsidosättas |
|---|---|---|
| Schemautvecklingar för en tillståndsvariabel. Att tillämpa flera schemaändringar i en frågeomstart räknas som en enda schemautveckling. | 16 | spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold |
| Schemaevolutioner för strömningsfrågor. Att tillämpa flera schemaändringar i en frågeomstart räknas som en enda schemautveckling. | 128 | spark.sql.streaming.stateStore.maxNumStateSchemaFiles |
Överväg följande information noggrant när du felsöker schemautvecklingen för tillståndsvariabler:
- Vissa mönster stöds inte för schemautveckling. Se schemautvecklingsmönster som inte stöds.
- Schemautvecklingen har alla krav för
transformWithStateoch kräver Avro-kodningsformatet. Se Krav. - Du måste starta om en strömmande fråga för att distribuera kodändringar som resulterar i schemaändring. Se När sker schemautvecklingen?.