Comparteix via


Evolución del esquema en el almacén de estado

En este artículo se proporciona información general sobre la evolución del esquema en el almacén de estados y ejemplos de tipos de cambios de esquema admitidos.

¿Qué es la evolución del esquema en el almacén de estado?

La evolución del esquema hace referencia a la capacidad de una aplicación para controlar los cambios en el esquema de los datos.

Azure Databricks admite la evolución del esquema en el almacén de estado de RocksDB para aplicaciones de Structured Streaming que usan transformWithState.

La evolución del esquema proporciona flexibilidad para el desarrollo y facilidad de mantenimiento. Use la evolución del esquema para adaptar el modelo de datos o los tipos de datos del almacén de estado sin perder información de estado ni requerir el reprocesamiento completo de los datos históricos.

Requisitos

Debe establecer el formato de codificación del almacén de estado en Avro para usar la evolución del esquema. Para establecer esto para la sesión actual, ejecute lo siguiente:

spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

La evolución del esquema solo se admite para operaciones con estado que usan transformWithState o transformWithStateInPandas. Estos operadores y las API y clases relacionadas tienen los siguientes requisitos:

  • Disponible en Databricks Runtime 16.2 y versiones posteriores.
  • El sistema debe usar el modo de acceso dedicado o sin aislamiento.
  • Debe usar el proveedor del almacén de estado de RocksDB. Databricks recomienda habilitar RocksDB como parte de la configuración de proceso.
  • transformWithStateInPandas admite el modo de acceso estándar en Databricks Runtime 16.3 y versiones posteriores.

Para habilitar el proveedor de almacén de estado de RocksDB para la sesión actual, ejecute lo siguiente:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Patrones de evolución de esquema admitidos en el almacén de estado

Databricks admite los siguientes patrones de evolución de esquemas para operaciones con estado de Structured Streaming.

Modelo Description
Ampliación de tipos Cambie los tipos de datos de tipos más restrictivos a menos restrictivos.
Agregar campos Agregue nuevos campos al esquema de las variables de almacén de estado existentes.
Quitar campos Quite los campos existentes del esquema o de una variable de almacén de estado.
Reordenación de campos Reordenar campos en una variable.
Adición de variables de estado Agregue una nueva variable de estado a una aplicación.
Eliminación de variables de estado Quite una variable de estado existente de una aplicación.

¿Cuándo se produce la evolución del esquema?

La evolución del esquema en el almacén de estado es el resultado de actualizar el código que define tu aplicación con estado persistente. Debido a esto, se aplican las siguientes declaraciones:

  • La evolución del esquema no se produce automáticamente como resultado de los cambios de esquema en los datos de origen de la consulta.
  • La evolución del esquema solo se produce cuando se implementa una nueva versión de la aplicación. Dado que solo una versión de una consulta de streaming se puede ejecutar simultáneamente, debe reiniciar el trabajo de streaming para evolucionar el esquema de las variables de estado.
  • El código define explícitamente todas las variables de estado y establece el esquema para todas las variables de estado.
    • En Scala, se usa un Encoder para especificar el esquema de cada variable.
    • En Python, se crea explícitamente un esquema como .StructType

Patrones de evolución de esquema no admitidos

No se admiten los siguientes patrones de evolución de esquema:

  • Cambio de nombre de campo: no se admite el cambio de nombre de los campos porque los campos coinciden por nombre. Si se intenta cambiar el nombre de un campo, se quita el campo y se agrega un nuevo campo. Esta operación no produce un error ya que se permiten quitar y agregar campos, pero los valores del campo original no se transfieren al nuevo campo.

  • Cambios de nombre o tipo de clave: no se puede cambiar el nombre o el tipo de clave en las variables de estado de mapa.

  • Reducción de tipo No se admiten las operaciones de reducción de tipo, también conocidas como conversión descendente. Estas operaciones pueden dar lugar a una pérdida de datos. A continuación se muestran ejemplos de operaciones de restricción de tipos no admitidos:

    • double no se puede restringir a float, longo int
    • float no se puede restringir a long ni int
    • long no se puede restringir a int

Ampliación de tipos en el almacén de estado

Puede ampliar los tipos de datos primitivos a tipos más serviciales. Se admiten los siguientes cambios de ampliación de tipos:

  • int se puede promover a long, floato double
  • long se puede promover a float o double
  • float se puede promover a double
  • string se puede promover a bytes
  • bytes se puede promover a string

Los valores existentes se muestran como el nuevo tipo. Por ejemplo, 12 se convierte en 12.00.

Ejemplo de ampliación de tipos con transformWithState

Scala

// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV2(value.toLong))
      value
    }
  }
}

Pitón

class IntStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with Integer field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to integer and update state
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class LongStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with Long field (type widening)
        state_schema = StructType([
            StructField("value1", LongType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to long and update state
            value = pdf["value"].iloc[0]
            # When reading state written with IntStateProcessor,
            # it will be automatically converted to Long
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

Añadir campos a los valores del almacén de estado

Puede agregar nuevos campos al esquema de los valores de almacén de estado existentes.

Al leer los datos escritos con el esquema antiguo, el codificador avro devuelve datos para los campos agregados codificados de forma nativa como null.

Python siempre interpreta estos valores como None. Scala tiene un comportamiento predeterminado diferente en función del tipo del campo. Databricks recomienda implementar lógica para asegurarse de que Scala no impute los valores de los datos que faltan. Consulte Valores predeterminados para los campos agregados a la variable de estado.

Ejemplos de adición de nuevos campos con transformWithState

Scala

// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1),
      // it will be automatically converted to StateV2(1, null)
      val currentState = state.get()
      // Now update with both fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Pitón

class StateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class StateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Later schema with additional fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Read current state
            current_state = self.state.get()
            # When reading state written with StateV1(1),
            # it will be automatically converted to StateV2(1, None)
            value1 = current_state[0]
            value2 = current_state[1]

            # Now update with both fields populated
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Eliminar campos para almacenar valores de estado

Puede quitar campos del esquema de una variable existente. Al leer datos con el esquema antiguo, se omiten los campos presentes en los datos antiguos, pero no en el nuevo esquema.

Ejemplos de cómo quitar campos de variables de estado

Scala

// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1)
      val currentState = state.get()
      state.update(StateV2(value.toInt))
      value
    }
  }
}

Pitón

class RemoveFieldsOriginalProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with multiple fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class RemoveFieldsReducedProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with field removed
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
            # it will be automatically converted to just (1,)
            current_state = self.state.get()
            value1 = current_state[0]

            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]]
        })

Reordenar campos en una variable de estado

Puede reordenar campos en una variable de estado, incluido cuando se agregan o quitan campos existentes. Los campos de las variables de estado coinciden por nombre, no por posición.

Ejemplos de reordenación de campos en una variable de estado

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2("metadata-1", 1)
      val currentState = state.get()
      state.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Pitón

class OrderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with fields in original order
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ReorderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with reordered fields
        state_schema = StructType([
            StructField("value2", StringType(), True),
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
            # it will be automatically converted to ("metadata-1", 1)
            current_state = self.state.get()
            value2 = current_state[0]
            value1 = current_state[1]

            self.state.update((f"new-metadata-{value}", int(value)))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value2": [current_state[0]],
            "value1": [current_state[1]]
        })

Adición de una variable de estado a una aplicación con estado

También podemos agregar variables de estado entre ejecuciones de consulta.

Nota: Este patrón no requiere un codificador avro y es compatible con todas las transformWithState aplicaciones.

Ejemplo de adición de una variable de estado a una aplicación con estado

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Pitón

class MultiStateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single state variable
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

Quitar una variable de estado de una aplicación con estado

Además de quitar campos, también puede quitar variables de estado entre ejecuciones de consulta.

Nota: Este patrón no requiere un codificador Avro y es compatible con todas las transformWithState aplicaciones.

Ejemplo de eliminación de una variable de estado en una aplicación con estado

Scala

case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
      value
    }
  }
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    // delete old state variable that we no longer need
    getHandle.deleteIfExists("testState2")
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Pitón

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

class RemoveStateVarProcessor(StatefulProcessor):
    def init(self, handle):
        # Only use one state variable and delete the other
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

        # Delete old state variable that we no longer need
        handle.deleteIfExists("testState2")

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Valores predeterminados para los campos agregados a la variable de estado

Al agregar nuevos campos a una variable de estado existente, las variables de estado escritas mediante el esquema anterior tienen el siguiente comportamiento:

  • El codificador Avro devuelve un null valor para los campos agregados.
  • Python convierte estos valores en None para todos los tipos de datos.
  • El comportamiento predeterminado de Scala difiere según el tipo de datos:
    • Los tipos de referencia devuelven null.
    • Los tipos primitivos devuelven un valor predeterminado, que difiere en función del tipo primitivo. Algunos ejemplos son 0 para int los tipos o false para bool los tipos.

No hay ninguna funcionalidad o metadatos integrados que marquen el campo como agregado a través de la evolución del esquema. Debe implementar lógica para controlar los valores NULL devueltos para los campos que no existían en el esquema anterior.

Para Scala, puede evitar la imputación de valores predeterminados mediante Option[<Type>], que devuelve valores que faltan como None en lugar de usar el tipo predeterminado.

Debe implementar lógica para gestionar adecuadamente situaciones en las que None se devuelven valores de tipo debido a la evolución del esquema.

Ejemplo de valores predeterminados para campos agregados a una variable de estado

Scala

// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // Reading from state
      val currentState = state.get()

      // Showing how null defaults work for different types
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
      println(s"Current state: $currentState")

      // For primitive types like Long, the UnsafeRow default for null is 0
      val longValue = if (currentState.value3 == 0L) {
        println("The value3 field is the default value (0)")
        100L // Set a real value now
      } else {
        currentState.value3
      }

      // Now update with all fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
      value
    }
  }
}

Pitón

class NullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ExpandedNullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Evolution: Adding new fields with null/default values
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True),
            StructField("value3", LongType(), True),
            StructField("value4", IntegerType(), True),
            StructField("value5", BooleanType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Reading from state
            current_state = self.state.get()

            # Showing how null defaults work in Python
            # When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
            # it will be automatically converted to (1, "metadata-1", None, None, None)
            # In Python, both primitive and reference types will be None

            value1 = current_state[0]
            value2 = current_state[1]
            value3 = current_state[2]  # Will be None when evolved from older schema
            value4 = current_state[3]  # Will be None when evolved from older schema
            value5 = current_state[4]  # Will be None when evolved from older schema

            # Check if value3 is None
            if value3 is None:
                print("The value3 field is None (default value for evolution)")
                value3 = 100  # Set a real value now

            # Now update with all fields populated
            self.state.update((
                value1,
                value2,
                value3,
                value4 if value4 is not None else 42,
                value5 if value5 is not None else True
            ))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]],
            "value3": [current_state[2]],
            "value4": [current_state[3]],
            "value5": [current_state[4]]
        })

Limitaciones

En la tabla siguiente se describen los límites predeterminados para los cambios de evolución del esquema:

Description Límite predeterminado Configuración de Spark que se va a invalidar
Evolución del esquema de una variable de estado. La aplicación de varios cambios de esquema en un reinicio de consulta cuenta como una única evolución del esquema. 16 spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold
Evolución del esquema de la consulta de streaming. La aplicación de varios cambios de esquema en un reinicio de consulta cuenta como una única evolución del esquema. 128 spark.sql.streaming.stateStore.maxNumStateSchemaFiles

Tenga en cuenta los detalles siguientes cuidadosamente al solucionar problemas de evolución del esquema para las variables de estado: