Delen via


Schemaontwikkeling in het toestand opslagsysteem

Dit artikel bevat een overzicht van de ontwikkeling van schema's in het statusarchief en voorbeelden van typen ondersteunde schemawijzigingen.

Wat is schemaontwikkeling in de staatwinkel?

Schemaontwikkeling verwijst naar de mogelijkheid van een toepassing om wijzigingen in het schema van gegevens te verwerken.

Azure Databricks ondersteunt de ontwikkeling van schema's in de RocksDB-statusopslag voor structured streaming-toepassingen die gebruikmaken van transformWithState.

De ontwikkeling van schema's biedt flexibiliteit voor ontwikkeling en onderhoudsgemak. Gebruik schemaontwikkeling om het gegevensmodel of gegevenstypen in uw staatsarchief aan te passen zonder dat de statusgegevens verloren gaan of dat historische gegevens volledig opnieuw moeten worden verwerkt.

Requirements

U moet de coderingsindeling voor het statusarchief instellen op Avro om schemaontwikkeling te kunnen gebruiken. Voer het volgende uit om dit in te stellen voor de huidige sessie:

spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

Schemaontwikkeling wordt alleen ondersteund voor stateful bewerkingen die gebruikmaken van transformWithState of transformWithStateInPandas. Deze operators en de bijbehorende API's en klassen hebben de volgende vereisten:

  • Beschikbaar in Databricks Runtime 16.2 en hoger.
  • Compute moet gebruikmaken van de toegewezen of geen isolatietoegangsmodus.
  • U moet de RocksDB state store-provider gebruiken. Databricks raadt aan om RocksDB in te schakelen als onderdeel van de rekenconfiguratie.
  • transformWithStateInPandas ondersteunt de standaardtoegangsmodus in Databricks Runtime 16.3 en hoger.

Voer het volgende uit om de provider van de rocksDB-statusopslag voor de huidige sessie in te schakelen:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Ondersteunde schemaontwikkelingspatronen in het statusarchief

Databricks ondersteunt de volgende schemaontwikkelingspatronen voor stateful gestructureerde streamingbewerkingen.

Patroon Description
Type breder maken Wijzig gegevenstypen van meer beperkend tot minder beperkende typen.
Velden toevoegen Voeg nieuwe velden toe aan het schema van bestaande statusopslagvariabelen.
Velden verwijderen Verwijder bestaande velden uit het schema of een statusarchiefvariabele.
Velden opnieuw ordenen De volgorde van velden in een variabele wijzigen.
Statusvariabelen toevoegen Voeg een nieuwe statusvariabele toe aan een toepassing.
Statusvariabelen verwijderen Verwijder een bestaande statusvariabele uit een toepassing.

Wanneer vindt de ontwikkeling van het schema plaats?

Schemaontwikkeling in het statusarchief resulteert in het bijwerken van de code die uw stateful toepassing definieert. Daarom zijn de volgende instructies van toepassing:

  • Schemaontwikkeling vindt niet automatisch plaats als gevolg van schemawijzigingen in de brongegevens voor de query.
  • Schemaontwikkeling vindt alleen plaats wanneer een nieuwe versie van de toepassing wordt geïmplementeerd. Omdat slechts één versie van een streamingquery tegelijkertijd kan worden uitgevoerd, moet u de streamingtaak opnieuw starten om het schema voor statusvariabelen te ontwikkelen.
  • Uw code definieert expliciet alle statusvariabelen en stelt het schema in voor alle statusvariabelen.
    • In Scala gebruikt u een Encoder om het schema voor elke variabele op te geven.
    • In Python maakt u expliciet een schema als een StructType.

Niet-ondersteunde patronen voor schemaontwikkeling

De volgende patronen voor schemaontwikkeling worden niet ondersteund:

  • Veldnaam wijzigen: het wijzigen van de naam van velden wordt niet ondersteund omdat velden overeenkomen met de naam. Het wijzigen van de naam van een veld wordt verwerkt door het veld te verwijderen en een nieuw veld toe te voegen. Deze bewerking resulteert niet in een fout omdat het verwijderen en toevoegen van velden is toegestaan, maar de waarden uit het oorspronkelijke veld worden niet overgedragen naar het nieuwe veld.

  • Sleutels hernoemen of typen wijzigen: U kunt de naam of het type van sleutels in kaarttoestandsvariabelen niet wijzigen.

  • Type versmalling Type-versmallingsbewerkingen, ook wel downcasting genoemd, worden niet ondersteund. Deze bewerkingen kunnen leiden tot gegevensverlies. Hier volgen voorbeelden van niet-ondersteunde bewerkingen voor het beperken van typen:

    • double kan niet worden beperkt tot float, longof int
    • float kan niet worden beperkt tot long of int
    • long kan niet worden beperkt tot int

Typeverruiming in de statusopslag

U kunt primitieve gegevenstypen breder maken voor meer accommoderende typen. De volgende type-uitbreidingen worden ondersteund:

  • int kan worden gepromoveerd tot long, floatof double
  • long kan worden gepromoveerd naar float of double
  • float kan worden gepromoveerd tot double
  • string kan worden gepromoveerd tot bytes
  • bytes kan worden gepromoveerd tot string

Bestaande waarden worden upcast als het nieuwe type. Zo wordt 12 gewijzigd in 12.00.

Voorbeeld van type widening met transformWithState

Scala

// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV2(value.toLong))
      value
    }
  }
}

Python

class IntStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with Integer field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to integer and update state
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class LongStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with Long field (type widening)
        state_schema = StructType([
            StructField("value1", LongType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to long and update state
            value = pdf["value"].iloc[0]
            # When reading state written with IntStateProcessor,
            # it will be automatically converted to Long
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

Velden toevoegen aan waarden voor statusopslag

U kunt nieuwe velden toevoegen aan het schema van bestaande statusopslagwaarden.

Bij het lezen van gegevens die zijn geschreven met het oude schema, retourneert de Avro-encoder gegevens voor toegevoegde velden die systeemeigen zijn gecodeerd als null.

Python interpreteert deze waarden altijd als None. Scala heeft een ander standaardgedrag, afhankelijk van het type voor het veld. Databricks raadt u aan logica te implementeren om ervoor te zorgen dat Scala geen waarden inlegt voor ontbrekende gegevens. Zie De standaardwaarden voor velden die zijn toegevoegd aan de statusvariabele.

Voorbeelden van het toevoegen van nieuwe velden met transformWithState

Scala

// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1),
      // it will be automatically converted to StateV2(1, null)
      val currentState = state.get()
      // Now update with both fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Python

class StateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class StateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Later schema with additional fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Read current state
            current_state = self.state.get()
            # When reading state written with StateV1(1),
            # it will be automatically converted to StateV2(1, None)
            value1 = current_state[0]
            value2 = current_state[1]

            # Now update with both fields populated
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Velden verwijderen voor statusopslagwaarden

U kunt velden verwijderen uit het schema van een bestaande variabele. Wanneer u gegevens leest met het oude schema, worden velden die aanwezig zijn in de oude gegevens, maar niet in het nieuwe schema genegeerd.

Voorbeelden van het verwijderen van velden uit statusvariabelen

Scala

// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1)
      val currentState = state.get()
      state.update(StateV2(value.toInt))
      value
    }
  }
}

Python

class RemoveFieldsOriginalProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with multiple fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class RemoveFieldsReducedProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with field removed
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
            # it will be automatically converted to just (1,)
            current_state = self.state.get()
            value1 = current_state[0]

            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]]
        })

Velden in een statusvariabele opnieuw ordenen

U kunt velden in een statusvariabele opnieuw ordenen, bijvoorbeeld wanneer u bestaande velden toevoegt of verwijdert. Velden in statusvariabelen worden op naam vergeleken, niet op positie.

Voorbeelden van het opnieuw ordenen van velden in een statusvariabele

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2("metadata-1", 1)
      val currentState = state.get()
      state.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Python

class OrderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with fields in original order
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ReorderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with reordered fields
        state_schema = StructType([
            StructField("value2", StringType(), True),
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
            # it will be automatically converted to ("metadata-1", 1)
            current_state = self.state.get()
            value2 = current_state[0]
            value1 = current_state[1]

            self.state.update((f"new-metadata-{value}", int(value)))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value2": [current_state[0]],
            "value1": [current_state[1]]
        })

Een statusvariabele toevoegen aan een stateful toepassing

We kunnen ook statusvariabelen toevoegen tussen queryuitvoeringen.

Opmerking: dit patroon vereist geen Avro-encoder en wordt ondersteund door alle transformWithState toepassingen.

Voorbeeld van het toevoegen van een statusvariabele aan een stateful toepassing

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Python

class MultiStateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single state variable
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

Een toestandsvariabele verwijderen uit een stateful applicatie

Naast het verwijderen van velden kunt u ook statusvariabelen tussen queryuitvoeringen verwijderen.

Opmerking: dit patroon vereist geen Avro-encoder en wordt ondersteund door alle transformWithState toepassingen.

Voorbeeld van het verwijderen van een statusvariabele in een stateful toepassing

Scala

case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
      value
    }
  }
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    // delete old state variable that we no longer need
    getHandle.deleteIfExists("testState2")
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Python

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

class RemoveStateVarProcessor(StatefulProcessor):
    def init(self, handle):
        # Only use one state variable and delete the other
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

        # Delete old state variable that we no longer need
        handle.deleteIfExists("testState2")

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Standaardwaarden voor velden die zijn toegevoegd aan statusvariabele

Wanneer u nieuwe velden toevoegt aan een bestaande statusvariabele, hebben statusvariabelen die zijn geschreven met behulp van het oude schema het volgende gedrag:

  • De Avro-encoder retourneert een null waarde voor toegevoegde velden.
  • Python converteert deze waarden naar None alle gegevenstypen.
  • Standaardgedrag scala verschilt per gegevenstype:
    • Verwijzingstypen retourneren null.
    • Primitieve typen retourneren een standaardwaarde, die verschilt op basis van het primitieve type. Voorbeelden zijn onder andere 0 voor int typen of false voor bool typen.

Er is geen ingebouwde functionaliteit of metagegevens die het veld markeren als toegevoegd via schema-evolutie. U moet logica implementeren om null-waarden te verwerken die worden geretourneerd voor velden die niet in uw vorige schema bestonden.

Voor Scala kunt u voorkomen dat standaardwaarden worden toegerekend door Option[<Type>] te gebruiken. Dit retourneert ontbrekende waarden als None in plaats van de standaardwaarde van het type.

U moet logica implementeren om situaties waarin None typewaarden worden geretourneerd, correct te verwerken vanwege de evolutie van het schema.

Voorbeeld van standaardwaarden voor toegevoegde velden aan een statusvariabele

Scala

// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // Reading from state
      val currentState = state.get()

      // Showing how null defaults work for different types
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
      println(s"Current state: $currentState")

      // For primitive types like Long, the UnsafeRow default for null is 0
      val longValue = if (currentState.value3 == 0L) {
        println("The value3 field is the default value (0)")
        100L // Set a real value now
      } else {
        currentState.value3
      }

      // Now update with all fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
      value
    }
  }
}

Python

class NullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ExpandedNullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Evolution: Adding new fields with null/default values
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True),
            StructField("value3", LongType(), True),
            StructField("value4", IntegerType(), True),
            StructField("value5", BooleanType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Reading from state
            current_state = self.state.get()

            # Showing how null defaults work in Python
            # When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
            # it will be automatically converted to (1, "metadata-1", None, None, None)
            # In Python, both primitive and reference types will be None

            value1 = current_state[0]
            value2 = current_state[1]
            value3 = current_state[2]  # Will be None when evolved from older schema
            value4 = current_state[3]  # Will be None when evolved from older schema
            value5 = current_state[4]  # Will be None when evolved from older schema

            # Check if value3 is None
            if value3 is None:
                print("The value3 field is None (default value for evolution)")
                value3 = 100  # Set a real value now

            # Now update with all fields populated
            self.state.update((
                value1,
                value2,
                value3,
                value4 if value4 is not None else 42,
                value5 if value5 is not None else True
            ))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]],
            "value3": [current_state[2]],
            "value4": [current_state[3]],
            "value5": [current_state[4]]
        })

Beperkingen

In de volgende tabel worden standaardlimieten beschreven voor wijzigingen in de ontwikkeling van schema's:

Description Standaardlimiet Over te schrijven Spark-configuratie
Schemaontwikkelingen voor een statusvariabele. Het toepassen van meerdere schemawijzigingen in het opnieuw opstarten van een query telt als één schemaontwikkeling. 16 spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold
Schemaontwikkelingen voor de streamingquery. Het toepassen van meerdere schemawijzigingen in het opnieuw opstarten van een query telt als één schemaontwikkeling. 128 spark.sql.streaming.stateStore.maxNumStateSchemaFiles

Houd rekening met de volgende details bij het oplossen van problemen met de ontwikkeling van schema's voor statusvariabelen: