Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ten artykuł zawiera omówienie ewolucji schematu w przechowywaniu stanów oraz przykłady typów obsługiwanych zmian w schemacie.
Co to jest ewolucja schematu w repozytorium stanów?
Ewolucja schematu odnosi się do możliwości aplikacji do obsługi zmian w schemacie danych.
Usługa Azure Databricks obsługuje ewolucję schematu w magazynie stanów RocksDB dla aplikacji strukturalnego przesyłania strumieniowego, które używają transformWithState.
Ewolucja schematu zapewnia elastyczność programowania i łatwości konserwacji. Użyj ewolucji schematu, aby dostosować model danych lub typy danych w magazynie stanów bez utraty informacji o stanie lub konieczności pełnego ponownego przetwarzania danych historycznych.
Wymagania
Aby używać ewolucji schematu, należy ustawić format kodowania magazynu stanów na Avro. Aby ustawić tę opcję dla bieżącej sesji, uruchom następujące polecenie:
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")
Ewolucja schematu jest obsługiwana tylko w przypadku operacji stanowych korzystających z transformWithState lub transformWithStateInPandas. Te operatory i powiązane interfejsy API i klasy mają następujące wymagania:
- Dostępne w środowisku Databricks Runtime 16.2 lub nowszym.
- Środowisko obliczeniowe musi używać trybu dostępu dedykowanego lub bez izolacji.
- Należy użyć dostawcy magazynu stanów bazy danych RocksDB. Usługa Databricks zaleca włączenie bazy danych RocksDB w ramach konfiguracji obliczeniowej.
-
transformWithStateInPandasobsługuje standardowy tryb dostępu w środowisku Databricks Runtime 16.3 lub nowszym.
Aby włączyć dostawcę magazynu stanów bazy danych RocksDB dla bieżącej sesji, uruchom następujące polecenie:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Obsługiwane wzorce ewolucji schematu w repozytorium stanów
Usługa Databricks obsługuje następujące wzorce ewolucji schematu dla stanowych operacji przesyłania strumieniowego ze strukturą.
| Wzorzec | Opis |
|---|---|
| Rozszerzanie typu | Zmień typy danych z bardziej restrykcyjnych na mniej restrykcyjne typy. |
| dodawanie pól | Dodaj nowe pola do schematu istniejących zmiennych magazynu stanów. |
| Usuwanie pól | Usuń istniejące pola ze schematu lub zmiennej magazynu stanów. |
| Zmienianie kolejności pól | Zmień kolejność pól w zmiennej. |
| dodawanie zmiennych stanu | Dodaj nową zmienną stanu do aplikacji. |
| Usuwanie zmiennych stanu | Usuń istniejącą zmienną stanu z aplikacji. |
Kiedy występuje ewolucja schematu?
Ewolucja schematu w magazynie stanów wynika z aktualizowania kodu definiującego aplikację stanową. W związku z tym obowiązują następujące stwierdzenia:
- Ewolucja schematu nie występuje automatycznie w wyniku zmian schematu w danych źródłowych zapytania.
- Ewolucja schematu występuje tylko wtedy, gdy zostanie wdrożona nowa wersja aplikacji. Ponieważ tylko jedna wersja zapytania przesyłania strumieniowego może być uruchamiana jednocześnie, należy ponownie uruchomić zadanie przesyłania strumieniowego, aby rozwinąć schemat zmiennych stanu.
- Kod jawnie definiuje wszystkie zmienne stanu i ustawia schemat dla wszystkich zmiennych stanu.
- W języku Scala użyjesz
Encoder, aby określić schemat dla każdej zmiennej. - W języku Python jawnie skonstruujesz schemat jako
StructType.
- W języku Scala użyjesz
Nieobsługiwane wzorce ewolucji schematu
Następujące wzorce ewolucji schematu nie są obsługiwane:
zmiana nazwy pola: zmiana nazw pól nie jest obsługiwana, ponieważ pola są zgodne z nazwą. Próba zmiany nazwy pola jest obsługiwana przez usunięcie pola i dodanie nowego pola. Ta operacja nie powoduje błędu, ponieważ usuwanie i dodawanie pól jest dozwolone, ale wartości z oryginalnego pola nie są przenoszone do nowego pola.
Zmiana nazw kluczy lub typów: nie można zmieniać nazw ani typów kluczy w zmiennych stanu w mapach.
Zawężanie typu Operacje zawężania typu, znane również jako rzutowanie w dół, nie są obsługiwane. Te operacje mogą spowodować utratę danych. Oto przykłady operacji zawężania typu, które nie są obsługiwane:
- nie można zawęzić
doubledofloat,longlubint - nie można zawęzić
floatdolonganiint - nie można zawęzić
longdoint
- nie można zawęzić
Rozszerzenie typu w sklepie stanu
Można rozszerzyć typy danych pierwotnych na bardziej pojemne typy. Obsługiwane są następujące zmiany rozszerzenia typów.
-
intmożna awansować dolong,floatlubdouble -
longmożna awansować dofloatlubdouble -
floatmożna awansować dodouble -
stringmożna awansować dobytes -
bytesmożna awansować dostring
Istniejące wartości są wyświetlane jako nowy typ. Na przykład 12 staje się 12.00.
Przykład rozszerzania typu przy użyciu transformWithState
Scala
// Initial run with Integer field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with Long field (type widening)
case class StateV2(value1: Long)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}
Python
class IntStateProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with Integer field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to integer and update state
value = pdf["value"].iloc[0]
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class LongStateProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with Long field (type widening)
state_schema = StructType([
StructField("value1", LongType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to long and update state
value = pdf["value"].iloc[0]
# When reading state written with IntStateProcessor,
# it will be automatically converted to Long
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
Dodawanie pól do wartości magazynu stanów
Można dodać nowe pola do schematu wartości przechowywanych w istniejącym magazynie stanów.
Podczas odczytywania danych zapisanych przy użyciu starego schematu koder Avro zwraca dane dla dodanych pól zakodowanych natywnie jako null.
Język Python zawsze interpretuje te wartości jako None. Język Scala ma inne domyślne zachowanie w zależności od typu pola. Usługa Databricks zaleca implementację logiki, aby upewnić się, że język Scala nie imputuje wartości brakujących danych. Zobacz Wartości domyślne dla pól dodanych do zmiennej stanu.
Przykłady dodawania nowych pól przy użyciu transformWithState
Scala
// Initial run with single field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with additional field
case class StateV2(value1: Integer, value2: String)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class StateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class StateV2Processor(StatefulProcessor):
def init(self, handle):
# Later schema with additional fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Read current state
current_state = self.state.get()
# When reading state written with StateV1(1),
# it will be automatically converted to StateV2(1, None)
value1 = current_state[0]
value2 = current_state[1]
# Now update with both fields populated
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
Usuń pola do przechowywania wartości stanu
Pola można usunąć ze schematu istniejącej zmiennej. Podczas odczytywania danych ze starym schematem pola obecne w starych danych, ale nie w nowym schemacie, są ignorowane.
Przykłady usuwania pól ze zmiennych stanu
Scala
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with field removed
case class StateV2(value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}
Python
class RemoveFieldsOriginalProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with multiple fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class RemoveFieldsReducedProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with field removed
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
# it will be automatically converted to just (1,)
current_state = self.state.get()
value1 = current_state[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]]
})
Zmień kolejność pól w zmiennej stanu
Możesz zmienić kolejność pól w zmiennej stanu, w tym podczas dodawania lub usuwania istniejących pól. Pola w zmiennych stanu są dopasowywane według nazwy, a nie pozycji.
Przykłady zmiany kolejności pól w zmiennej stanu
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class OrderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with fields in original order
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ReorderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with reordered fields
state_schema = StructType([
StructField("value2", StringType(), True),
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
# it will be automatically converted to ("metadata-1", 1)
current_state = self.state.get()
value2 = current_state[0]
value1 = current_state[1]
self.state.update((f"new-metadata-{value}", int(value)))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value2": [current_state[0]],
"value1": [current_state[1]]
})
Dodawanie zmiennej stanu do aplikacji stanowej
Możemy również dodać zmienne stanu między przebiegami zapytań.
Uwaga: ten wzorzec nie wymaga kodera Avro i obsługuje wszystkie aplikacje transformWithState.
Przykład dodawania zmiennej stanu do aplikacji stanowej
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
case class StateV2(value1: String, value2: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class MultiStateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single state variable
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
Usuwanie zmiennej stanu z aplikacji stanowej
Oprócz usuwania pól można również usunąć zmienne stanu między przebiegami zapytań.
Uwaga: ten wzorzec nie wymaga kodera Avro i jest obsługiwany przez wszystkie aplikacje transformWithState.
Przykład przenoszenia zmiennej stanu do aplikacji stanowej
Scala
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
class RemoveStateVarProcessor(StatefulProcessor):
def init(self, handle):
# Only use one state variable and delete the other
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
# Delete old state variable that we no longer need
handle.deleteIfExists("testState2")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
domyślne wartości pól dodanych do zmiennej stanu
Podczas dodawania nowych pól do istniejącej zmiennej stanu zmienne stanu zapisywane przy użyciu starego schematu mają następujące zachowanie:
- Koder Avro zwraca wartość
nulldla pól dodanych. - Język Python konwertuje te wartości na
Nonedla wszystkich typów danych. - Domyślne zachowanie języka Scala różni się od typu danych:
- Typy odwołań zwracają
null. - Typy pierwotne zwracają wartość domyślną, która różni się w zależności od typu pierwotnego. Przykłady obejmują
0dla typówintlubfalsedla typówbool.
- Typy odwołań zwracają
Nie ma wbudowanej funkcjonalności ani metadanych, które oznaczają, że elementy zostały dodane w wyniku ewolucji schematu. Należy zaimplementować logikę, aby obsługiwać wartości null zwracane dla pól, które nie istniały w poprzednim schemacie.
W przypadku języka Scala można uniknąć przypisania wartości domyślnych przy użyciu Option[<Type>], która zwraca brakujące wartości jako None zamiast używać domyślnego typu.
Należy zaimplementować logikę, aby prawidłowo obsługiwać sytuacje, w których wartości typu None są zwracane z powodu ewolucji schematu.
Przykład wartości domyślnych dla dodanych pól do zmiennej stanu
Scala
// Example demonstrating how null defaults work in schema evolution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()
// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")
// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}
// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}
Python
class NullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ExpandedNullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Evolution: Adding new fields with null/default values
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True),
StructField("value3", LongType(), True),
StructField("value4", IntegerType(), True),
StructField("value5", BooleanType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Reading from state
current_state = self.state.get()
# Showing how null defaults work in Python
# When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
# it will be automatically converted to (1, "metadata-1", None, None, None)
# In Python, both primitive and reference types will be None
value1 = current_state[0]
value2 = current_state[1]
value3 = current_state[2] # Will be None when evolved from older schema
value4 = current_state[3] # Will be None when evolved from older schema
value5 = current_state[4] # Will be None when evolved from older schema
# Check if value3 is None
if value3 is None:
print("The value3 field is None (default value for evolution)")
value3 = 100 # Set a real value now
# Now update with all fields populated
self.state.update((
value1,
value2,
value3,
value4 if value4 is not None else 42,
value5 if value5 is not None else True
))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]],
"value3": [current_state[2]],
"value4": [current_state[3]],
"value5": [current_state[4]]
})
Ograniczenia
W poniższej tabeli opisano domyślne limity zmian ewolucji schematu:
| Opis | Limit domyślny | Konfiguracja Spark do nadpisania |
|---|---|---|
| Ewolucje schematu dla zmiennej stanu. Stosowanie wielu zmian schematu w ponownym uruchomieniu zapytania jest liczone jako pojedyncza ewolucja schematu. | 16 | spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold |
| Ewolucje schematu dla zapytania strumieniowego. Stosowanie wielu zmian schematu w ponownym uruchomieniu zapytania jest liczone jako pojedyncza ewolucja schematu. | 128 | spark.sql.streaming.stateStore.maxNumStateSchemaFiles |
Podczas rozwiązywania problemów z ewolucją schematu dla zmiennych stanu należy dokładnie rozważyć następujące szczegóły:
- Niektóre wzorce nie są obsługiwane w przypadku ewolucji schematu. Zobacz Nieobsługiwane wzorce ewolucji schematu.
- Ewolucja schematu spełnia wszystkie wymagania
transformWithStatei wymaga formatu kodowania Avro. Zobacz Wymagania. - Aby wdrożyć zmiany kodu, które powodują ewolucję schematu, należy ponownie uruchomić zapytanie przesyłania strumieniowego. Zobacz Kiedy występuje ewolucja schematu?.