この記事では、状態ストアでのスキーマの進化の概要と、サポートされているスキーマ変更の種類の例について説明します。
状態ストアでのスキーマの進化とは
スキーマの進化とは、データのスキーマの変更を処理するアプリケーションの機能を指します。
Azure Databricks では、 transformWithStateを使用する Structured Streaming アプリケーションの RocksDB ステート ストアでのスキーマの進化がサポートされています。
スキーマの進化により、開発の柔軟性とメンテナンスの容易さが実現します。 スキーマの進化を使用して、状態情報を失ったり履歴データの完全な再処理を必要としたりすることなく、状態ストアのデータ モデルまたはデータ型を調整します。
Requirements
スキーマの進化を使用するには、状態ストアのエンコード形式を Avro に設定する必要があります。 現在のセッションに対してこれを設定するには、次を実行します。
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")
スキーマの進化は、 transformWithState または transformWithStateInPandasを使用するステートフル操作でのみサポートされます。 これらの演算子と関連する API とクラスには、次の要件があります。
- Databricks Runtime 16.2 以降で使用できます。
- コンピューティングでは、専用または分離なしのアクセス モードを使用する必要があります。
- RocksDB ステート ストア プロバイダーを使用する必要があります。 Databricks では、コンピューティング構成の一部として RocksDB を有効にすることをお勧めします。
-
transformWithStateInPandasは、Databricks Runtime 16.3 以降で標準アクセス モードをサポートしています。
現在のセッションに対して RocksDB ステート ストア プロバイダーを有効にするには、次を実行します。
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
状態ストアでサポートされているスキーマの進化パターン
Databricks では、ステートフルな構造化ストリーミング操作に対して、次のスキーマ進化パターンがサポートされています。
| パターン | Description |
|---|---|
| 型の拡張 | データ型をより制限の厳しい型から制限の緩い型に変更します。 |
| フィールドの追加 | 既存の状態ストア変数のスキーマに新しいフィールドを追加します。 |
| フィールドの削除 | スキーマまたは状態ストア変数から既存のフィールドを削除します。 |
| フィールドの並べ替え | 変数内のフィールドの順序を変更します。 |
| 状態変数の追加 | 新しい状態変数をアプリケーションに追加します。 |
| 状態変数の削除 | アプリケーションから既存の状態変数を削除します。 |
スキーマの進化はいつ発生しますか?
状態ストアでのスキーマの進化は、ステートフル アプリケーションを定義するコードを更新した結果です。 このため、次のステートメントが適用されます。
- スキーマの進化は、クエリのソース データのスキーマ変更の結果として自動的には行われません。
- スキーマの進化は、アプリケーションの新しいバージョンがデプロイされている場合にのみ発生します。 同時に実行できるストリーミング クエリのバージョンは 1 つだけなので、状態変数のスキーマを進化させるには、ストリーミング ジョブを再起動する必要があります。
- コードでは、すべての状態変数を明示的に定義し、すべての状態変数のスキーマを設定します。
- Scala では、
Encoderを使用して各変数のスキーマを指定します。 - Python では、
StructTypeとしてスキーマを明示的に構築します。
- Scala では、
サポートされていないスキーマの進化パターン
次のスキーマ進化パターンはサポートされていません。
フィールド名の変更: フィールドの名前が一致するため、フィールドの名前変更はサポートされていません。 フィールドの名前を変更しようとすると、フィールドを削除し、新しいフィールドを追加することによって処理されます。 この操作では、フィールドの削除と追加は許可されますが、元のフィールドの値は新しいフィールドに引き継がれないので、エラーは発生しません。
キーの名前変更または型の変更: マップ状態変数でキーの名前または型を変更することはできません。
型の縮小 型の絞り込み操作 ( ダウンキャストとも呼ばれます) はサポートされていません。 これらの操作により、データが失われる可能性があります。 サポートされていない型の縮小操作の例を次に示します。
-
doubleをfloat、long、またはintに絞り込むことはできません。 -
floatをlongまたはintに変換できません。 -
longに絞り込むことができません。int
-
状態ストアで拡大を入力する
プリミティブ データ型をより多くの対応型に拡大できます。 次に示す型の拡大変更がサポートされています。
-
intは、long、float、またはdoubleに昇進できます。 -
longは、floatまたはdoubleに昇格できます。 -
floatに昇格できます。double -
stringに昇格できます。bytes -
bytesに昇格できます。string
既存の値は、新しい型としてアップキャストされます。 たとえば、12 が 12.00になります。
型拡大の例 transformWithState
Scala
// Initial run with Integer field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with Long field (type widening)
case class StateV2(value1: Long)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}
Python
class IntStateProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with Integer field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to integer and update state
value = pdf["value"].iloc[0]
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class LongStateProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with Long field (type widening)
state_schema = StructType([
StructField("value1", LongType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to long and update state
value = pdf["value"].iloc[0]
# When reading state written with IntStateProcessor,
# it will be automatically converted to Long
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
状態ストアの値にフィールドを追加する
既存の状態ストア値のスキーマに新しいフィールドを追加できます。
古いスキーマで書き込まれたデータを読み取ると、Avro エンコーダーは、 nullとしてネイティブにエンコードされた追加されたフィールドのデータを返します。
Python では、これらの値は常に Noneとして解釈されます。 Scala の既定の動作は、フィールドの種類によって異なります。 Databricks では、Scala が欠損データの値を補完しないようにロジックを実装することをお勧めします。
状態変数に追加されたフィールドの既定値を参照してください。
を使用して新しいフィールドを追加する例 transformWithState
Scala
// Initial run with single field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with additional field
case class StateV2(value1: Integer, value2: String)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class StateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class StateV2Processor(StatefulProcessor):
def init(self, handle):
# Later schema with additional fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Read current state
current_state = self.state.get()
# When reading state written with StateV1(1),
# it will be automatically converted to StateV2(1, None)
value1 = current_state[0]
value2 = current_state[1]
# Now update with both fields populated
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
状態ストア値を格納するためのフィールドを削除します。
既存の変数のスキーマからフィールドを削除できます。 古いスキーマでデータを読み取る場合、古いデータには存在するが、新しいスキーマには存在しないフィールドは無視されます。
状態変数からフィールドを削除する例
Scala
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with field removed
case class StateV2(value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}
Python
class RemoveFieldsOriginalProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with multiple fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class RemoveFieldsReducedProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with field removed
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
# it will be automatically converted to just (1,)
current_state = self.state.get()
value1 = current_state[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]]
})
状態変数のフィールドの順序を変更する
既存のフィールドを追加または削除する場合を含め、状態変数のフィールドの順序を変更できます。 状態変数のフィールドは、位置ではなく名前で照合されます。
状態変数のフィールドの並べ替えの例
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class OrderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with fields in original order
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ReorderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with reordered fields
state_schema = StructType([
StructField("value2", StringType(), True),
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
# it will be automatically converted to ("metadata-1", 1)
current_state = self.state.get()
value2 = current_state[0]
value1 = current_state[1]
self.state.update((f"new-metadata-{value}", int(value)))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value2": [current_state[0]],
"value1": [current_state[1]]
})
ステートフル アプリケーションに状態変数を追加する
クエリ実行の間に状態変数を追加することもできます。
注: このパターンは Avro エンコーダーを必要とせず、すべての transformWithState アプリケーションでサポートされています。
ステートフル アプリケーションに状態変数を追加する例
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
case class StateV2(value1: String, value2: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class MultiStateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single state variable
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
ステートフル アプリケーションから状態変数を削除する
フィールドを削除するだけでなく、クエリ実行の間に状態変数を削除することもできます。
注: このパターンは Avro エンコーダーを必要とせず、すべての transformWithState アプリケーションでサポートされています。
ステートフル アプリケーションへの状態変数の削除の例
Scala
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
class RemoveStateVarProcessor(StatefulProcessor):
def init(self, handle):
# Only use one state variable and delete the other
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
# Delete old state variable that we no longer need
handle.deleteIfExists("testState2")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
状態変数に追加されたフィールドの既定値
既存の状態変数に新しいフィールドを追加すると、古いスキーマを使用して書き込まれた状態変数の動作は次のようになります。
- Avro エンコーダーは、追加されたフィールドの
null値を返します。 - Python は、これらの値をすべてのデータ型の
Noneに変換します。 - Scala の既定の動作は、データ型によって異なります。
- 参照型は
nullを返します。 - プリミティブ型は既定値を返します。これはプリミティブ型によって異なります。 たとえば、
0型のintや、false型のboolなどがあります。
- 参照型は
スキーマの進化によって追加されたフィールドにフラグを設定する組み込みの機能やメタデータはありません。 前のスキーマに存在しなかったフィールドに対して返される null 値を処理するロジックを実装する必要があります。
Scala の場合、 Option[<Type>]を使用することで既定値の補完を回避できます。これは、型の既定値を使用する代わりに、 None として欠損値を返します。
スキーマの進化により None 型の値が返される状況を正しく処理するロジックを実装する必要があります。
状態変数に追加されたフィールドの既定値の例
Scala
// Example demonstrating how null defaults work in schema evolution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()
// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")
// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}
// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}
Python
class NullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ExpandedNullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Evolution: Adding new fields with null/default values
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True),
StructField("value3", LongType(), True),
StructField("value4", IntegerType(), True),
StructField("value5", BooleanType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Reading from state
current_state = self.state.get()
# Showing how null defaults work in Python
# When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
# it will be automatically converted to (1, "metadata-1", None, None, None)
# In Python, both primitive and reference types will be None
value1 = current_state[0]
value2 = current_state[1]
value3 = current_state[2] # Will be None when evolved from older schema
value4 = current_state[3] # Will be None when evolved from older schema
value5 = current_state[4] # Will be None when evolved from older schema
# Check if value3 is None
if value3 is None:
print("The value3 field is None (default value for evolution)")
value3 = 100 # Set a real value now
# Now update with all fields populated
self.state.update((
value1,
value2,
value3,
value4 if value4 is not None else 42,
value5 if value5 is not None else True
))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]],
"value3": [current_state[2]],
"value4": [current_state[3]],
"value5": [current_state[4]]
})
制限事項
次の表では、スキーマの進化の変更に関する既定の制限について説明します。
| Description | 既定の制限 | オーバーライドする Spark の構成 |
|---|---|---|
| 状態変数のスキーマの進化。 クエリの再起動に複数のスキーマ変更を適用すると、1 つのスキーマの進化としてカウントされます。 | 16 | spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold |
| ストリーミング クエリのスキーマの進化。 クエリの再起動に複数のスキーマ変更を適用すると、1 つのスキーマの進化としてカウントされます。 | 128 | spark.sql.streaming.stateStore.maxNumStateSchemaFiles |
状態変数のスキーマの進化のトラブルシューティングを行う場合は、次の詳細を慎重に検討してください。
- スキーマの進化では、一部のパターンはサポートされていません。 サポートされていないスキーマの進化パターンを参照してください。
- スキーマの進化には、
transformWithStateのすべての要件があり、Avro エンコード形式が必要です。 要件を参照してください。 - スキーマの進化をもたらすコード変更をデプロイするには、ストリーミング クエリを再起動する必要があります。 「スキーマの進化はいつ発生するか」を参照してください。