Aracılığıyla paylaş


Durum deposunda şema evrimi

Bu makalede, durum deposunda şema gelişimine genel bir bakış ve desteklenen şema değişiklikleri türlerine örnekler sağlanmaktadır.

Durum deposunda şema evrimi nedir?

Şema evrimi, bir uygulamanın veri şemasındaki değişiklikleri işleyebilmesini ifade eder.

transformWithState kullanan Yapılandırılmış Akış uygulamaları için Azure Databricks, RocksDB durum deposunda şema evrimini destekler.

Şema evrimi geliştirme ve bakım kolaylığı için esneklik sağlar. Durum bilgilerini kaybetmeden veya geçmiş verilerin tam olarak yeniden işlenmesini gerektirmeden durum deponuzdaki veri modelini veya veri türlerini uyarlamak için şema evrimini kullanın.

Gereksinimler

Şema evrimini kullanmak için durum deposu kodlama biçimini Avro olarak ayarlamanız gerekir. Bunu geçerli oturum için ayarlamak için aşağıdakileri çalıştırın:

spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

Şema evrimi yalnızca transformWithState veya transformWithStateInPandas kullanan durum bilgisi olan işlemler için desteklenir. Bu işleçler ve ilgili API'ler ve sınıflar aşağıdaki gereksinimlere sahiptir:

  • Databricks Runtime 16.2 ve üzerinde kullanılabilir.
  • İşlem, ayrılmış veya yalıtımsız erişim modunu kullanmalıdır.
  • RocksDB durum deposu sağlayıcısını kullanmanız gerekir. Databricks, işlem yapılandırmasının bir parçası olarak RocksDB'nin etkinleştirilmesini önerir.
  • transformWithStateInPandas Databricks Runtime 16.3 ve üzeri sürümleri için standart erişim modunu destekler.

Geçerli oturumda RocksDB durum deposu sağlayıcısını etkinleştirmek için aşağıdakileri çalıştırın:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Durum deposunda desteklenen şema evrim desenleri

Databricks, durum bilgisi olan Yapılandırılmış Akış işlemleri için aşağıdaki şema evrim desenlerini destekler.

Desen Description
Tür genişletme Veri türlerini daha kısıtlayıcı olandan daha az kısıtlayıcı türlere değiştirin.
Alan ekleme Mevcut durum deposu değişkenlerinin şemasına yeni alanlar ekleyin.
Alanlar kaldırılıyor Şemadan veya durum deposu değişkeninden mevcut alanları kaldırın.
Alanları yeniden sıralama Değişkendeki alanları yeniden sıralama.
Durum değişkenleri ekleme Uygulamaya yeni bir durum değişkeni ekleyin.
Durum değişkenlerini kaldırma Bir uygulamadan mevcut durum değişkenlerini kaldırma.

Şema evrimi ne zaman gerçekleşir?

Durum deposunda şema evrimi, durum bilgisi olan uygulamanızı tanımlayan kodun güncelleştirilmesinin sonuçlarıdır. Bu nedenle, aşağıdaki deyimler geçerlidir:

  • Şema evrimi, sorgunun kaynak verilerindeki şema değişikliklerinin bir sonucu olarak otomatik olarak gerçekleşmez.
  • Şema evrimi yalnızca uygulamanın yeni bir sürümü dağıtıldığında gerçekleşir. Akış sorgusunun yalnızca bir sürümü aynı anda çalışabildiğinden, durum değişkenleri için şemayı geliştirmek için akış işinizi yeniden başlatmanız gerekir.
  • Kodunuz tüm durum değişkenlerini açıkça tanımlar ve tüm durum değişkenleri için şemayı ayarlar.
    • Scala'da, her değişkenin şemasını belirtmek için bir Encoder kullanırsınız.
    • Python'da açıkça olarak bir StructTypeşema oluşturursunuz.

Desteklenmeyen şema evrim desenleri

Aşağıdaki şema evrim desenleri desteklenmez:

  • Alanların adlarının değiştirilmesi: Alanlar isimlerine göre eşleştirildiğinden, alanların adlarını değiştirmek desteklenmez. Bir alanı yeniden adlandırma girişimi, alanı kaldırıp yeni bir alan ekleyerek işlenir. Bu işlem, alanların kaldırılmasına ve eklenmesine izin verilirken hataya neden olmaz, ancak özgün alandaki değerler yeni alana taşınmaz.

  • Anahtar yeniden adlandırma veya tür değişiklikleri: Eşleme durumu değişkenlerinde anahtarların adını veya türünü değiştiremezsiniz.

  • Tür daraltma işlemleri, aşağıya döküm olarak da bilinir ve desteklenmez. Bu işlemler veri kaybına neden olabilir. Aşağıda desteklenmeyen tür daraltma işlemlerine örnekler verilmiştir:

    • double, float, long veya int olarak daraltılamaz
    • float, long veya int olarak daraltılamaz.
    • long daraltılamaz int

Durum deposunda tür genişletme

Temel veri türlerini daha uzlaşmalı türlere genişletebilirsiniz. Aşağıdaki tür genişletme değişiklikleri desteklenir:

  • int, long, veya float olarak yükseltilebilirdouble
  • long, float veya double olarak yükseltilebilir
  • float yükseltilebilir double
  • string yükseltilebilir bytes
  • bytes yükseltilebilir string

Mevcut değerler yeni tür olarak yükseltilir. Örneğin 12, 12.00 olur.

Tür genişletme örneği transformWithState

Scala

// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV2(value.toLong))
      value
    }
  }
}

Piton

class IntStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with Integer field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to integer and update state
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class LongStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with Long field (type widening)
        state_schema = StructType([
            StructField("value1", LongType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to long and update state
            value = pdf["value"].iloc[0]
            # When reading state written with IntStateProcessor,
            # it will be automatically converted to Long
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

Durum deposu değerlerine alan ekleme

Mevcut durum deposu değerlerinin şemasına yeni alanlar ekleyebilirsiniz.

Eski şemayla yazılmış verileri okurken, Avro kodlayıcı, eklenen alanlar için yerel olarak null kodlanmış verileri döndürür.

Python bu değerleri her zaman olarak Noneyorumlar. Scala, alanın türüne bağlı olarak farklı varsayılan davranışlara sahiptir. Databricks, Scala'nın eksik veriler için değer ataması yapmamasını sağlamak amacıyla uygun bir mantık uygulanmasını önerir. Bkz. Durum değişkenine eklenen alanlar için varsayılan değerler.

ile yeni alan ekleme örnekleri transformWithState

Scala

// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1),
      // it will be automatically converted to StateV2(1, null)
      val currentState = state.get()
      // Now update with both fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Piton

class StateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class StateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Later schema with additional fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Read current state
            current_state = self.state.get()
            # When reading state written with StateV1(1),
            # it will be automatically converted to StateV2(1, None)
            value1 = current_state[0]
            value2 = current_state[1]

            # Now update with both fields populated
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Alanları durum deposu değerlerinden kaldır

Varolan bir değişkenin şemasından alanları kaldırabilirsiniz. Verileri eski şemayla okurken, eski verilerde bulunan ancak yeni şemada bulunmayan alanlar yoksayılır.

Durum değişkenlerinden alan kaldırma örnekleri

Scala

// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1)
      val currentState = state.get()
      state.update(StateV2(value.toInt))
      value
    }
  }
}

Piton

class RemoveFieldsOriginalProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with multiple fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class RemoveFieldsReducedProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with field removed
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
            # it will be automatically converted to just (1,)
            current_state = self.state.get()
            value1 = current_state[0]

            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]]
        })

Durum değişkenindeki alanları yeniden sıralama

Mevcut alanları eklerken veya kaldırırken de dahil olmak üzere durum değişkenindeki alanları yeniden sıralayabilirsiniz. Durum değişkenlerindeki alanlar konuma değil, ada göre eşleştirilir.

Durum değişkenindeki alanları yeniden sıralama örnekleri

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2("metadata-1", 1)
      val currentState = state.get()
      state.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Piton

class OrderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with fields in original order
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ReorderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with reordered fields
        state_schema = StructType([
            StructField("value2", StringType(), True),
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
            # it will be automatically converted to ("metadata-1", 1)
            current_state = self.state.get()
            value2 = current_state[0]
            value1 = current_state[1]

            self.state.update((f"new-metadata-{value}", int(value)))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value2": [current_state[0]],
            "value1": [current_state[1]]
        })

Durum bilgisi olan bir uygulamaya durum değişkeni ekleme

Sorgu çalıştırmaları arasında durum değişkenleri de ekleyebiliriz.

Not: Bu düzen avro kodlayıcı gerektirmez ve tüm transformWithState uygulamalar tarafından destekleniyor.

Durum bilgisi olan bir uygulamaya durum değişkeni ekleme örneği

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Piton

class MultiStateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single state variable
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

Durum bilgisi olan bir uygulamadan durum değişkenlerini kaldırma

Alanları kaldırmaya ek olarak, sorgu çalıştırmaları arasında durum değişkenlerini de kaldırabilirsiniz.

Not: Bu desen Avro kodlayıcı gerektirmez ve tüm transformWithState uygulamalar tarafından desteklenir.

Durum bilgisi olan bir uygulamaya durum değişkeni kaldırma örneği

Scala

case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
      value
    }
  }
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    // delete old state variable that we no longer need
    getHandle.deleteIfExists("testState2")
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Piton

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

class RemoveStateVarProcessor(StatefulProcessor):
    def init(self, handle):
        # Only use one state variable and delete the other
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

        # Delete old state variable that we no longer need
        handle.deleteIfExists("testState2")

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Durum değişkenine eklenen alanlar için varsayılan değerler

Var olan bir durum değişkenine yeni alanlar eklediğinizde, eski şema kullanılarak yazılan durum değişkenleri aşağıdaki davranışa sahiptir:

  • Avro kodlayıcı, eklenen alanlar için bir null değer döndürür.
  • Python bu değerleri tüm veri türleri için olarak None dönüştürür.
  • Scala varsayılan davranışı veri türüne göre farklılık gösterir:
    • Referans türleri döner null.
    • Temel türler, temel türe göre farklılık gösteren varsayılan bir değer döndürür. Örnekler, 0 türler için int veya false türler için bool şeklinde verilebilir.

Şema evrimi aracılığıyla eklenen alana bayrak ekleyen yerleşik bir işlev veya meta veri yoktur. Önceki şemanızda bulunmayan alanlar için döndürülen null değerleri işlemek için mantık uygulamanız gerekir.

Scala için, türün varsayılanını kullanmak yerine Option[<Type>] döndüren None kullanarak varsayılan değer atanmaktan kaçınabilirsiniz.

Şema evrimi nedeniyle tür değerlerinin döndürüldüğü None durumları doğru şekilde işlemek için mantık uygulamanız gerekir.

Durum değişkenine eklenen alanlar için varsayılan değerler örneği

Scala

// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // Reading from state
      val currentState = state.get()

      // Showing how null defaults work for different types
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
      println(s"Current state: $currentState")

      // For primitive types like Long, the UnsafeRow default for null is 0
      val longValue = if (currentState.value3 == 0L) {
        println("The value3 field is the default value (0)")
        100L // Set a real value now
      } else {
        currentState.value3
      }

      // Now update with all fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
      value
    }
  }
}

Piton

class NullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ExpandedNullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Evolution: Adding new fields with null/default values
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True),
            StructField("value3", LongType(), True),
            StructField("value4", IntegerType(), True),
            StructField("value5", BooleanType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Reading from state
            current_state = self.state.get()

            # Showing how null defaults work in Python
            # When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
            # it will be automatically converted to (1, "metadata-1", None, None, None)
            # In Python, both primitive and reference types will be None

            value1 = current_state[0]
            value2 = current_state[1]
            value3 = current_state[2]  # Will be None when evolved from older schema
            value4 = current_state[3]  # Will be None when evolved from older schema
            value5 = current_state[4]  # Will be None when evolved from older schema

            # Check if value3 is None
            if value3 is None:
                print("The value3 field is None (default value for evolution)")
                value3 = 100  # Set a real value now

            # Now update with all fields populated
            self.state.update((
                value1,
                value2,
                value3,
                value4 if value4 is not None else 42,
                value5 if value5 is not None else True
            ))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]],
            "value3": [current_state[2]],
            "value4": [current_state[3]],
            "value5": [current_state[4]]
        })

Sınırlamalar

Aşağıdaki tabloda şema evrim değişiklikleri için varsayılan sınırlar açıklanmaktadır:

Description Varsayılan limit Geçersiz kılınacak Spark yapılandırması
Durum değişkeni için şema evrimleri. Sorgu yeniden başlatma işlemine birden çok şema değişikliği uygulamak tek bir şema evrimi olarak sayılır. 16 spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold
Akış sorgusu için şema evrimleri. Sorgu yeniden başlatma işlemine birden çok şema değişikliği uygulamak tek bir şema evrimi olarak sayılır. 128 spark.sql.streaming.stateStore.maxNumStateSchemaFiles

Durum değişkenleri için şema evrimi sorunlarını giderirken aşağıdaki ayrıntıları dikkatle göz önünde bulundurun: