Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Artikel ini menyediakan gambaran umum evolusi skema di penyimpanan status dan contoh jenis perubahan skema yang didukung.
Apa itu evolusi skema di penyimpanan status?
Evolusi skema mengacu pada kemampuan aplikasi untuk menangani perubahan pada skema data.
Azure Databricks mendukung evolusi skema di penyimpanan status RocksDB untuk aplikasi Streaming Terstruktur yang menggunakan transformWithState.
Evolusi skema memberikan fleksibilitas untuk pengembangan dan kemudahan pemeliharaan. Gunakan evolusi skema untuk menyesuaikan model data atau jenis data di penyimpanan status Anda tanpa kehilangan informasi status atau memerlukan pemrosesan ulang penuh data historis.
Persyaratan
Anda harus mengatur format pengodean penyimpanan status ke Avro untuk menggunakan evolusi skema. Untuk mengatur ini untuk sesi saat ini, jalankan hal berikut:
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")
Evolusi skema hanya didukung untuk operasi stateful yang menggunakan transformWithState atau transformWithStateInPandas. Operator dan API dan kelas terkait ini memiliki persyaratan berikut:
- Tersedia di Databricks Runtime 16.2 ke atas.
- Komputasi harus menggunakan mode akses khusus atau tanpa isolasi.
- Anda harus menggunakan penyedia penyimpanan status RocksDB. Databricks merekomendasikan untuk mengaktifkan RocksDB sebagai bagian dari konfigurasi komputasi.
-
transformWithStateInPandasmendukung mode akses standar di Databricks Runtime 16.3 ke atas.
Untuk mengaktifkan penyedia penyimpanan status RocksDB untuk sesi saat ini, jalankan hal berikut:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Pola-pola evolusi skema yang didukung pada penyimpanan status.
Databricks mendukung pola evolusi skema berikut untuk operasi Streaming Terstruktur yang stateful.
| Pola | Deskripsi |
|---|---|
| Perluasan tipe | Ubah jenis data dari jenis yang lebih ketat menjadi kurang ketat. |
| Menambahkan bidang | Tambahkan bidang baru ke skema variabel penyimpanan status yang ada. |
| Menghapus bidang | Hapus bidang yang ada dari skema atau variabel penyimpanan status. |
| Menyusun ulang bidang | Menyusun ulang bidang dalam variabel. |
| Menambahkan variabel status | Tambahkan variabel status baru ke aplikasi. |
| Menghapus variabel status | Hapus variabel status yang ada dari aplikasi. |
Kapan evolusi skema terjadi?
Evolusi skema di penyimpanan status menghasilkan pembaruan kode yang menentukan aplikasi stateful Anda. Karena itu, pernyataan berikut berlaku:
- Evolusi skema tidak terjadi secara otomatis sebagai akibat dari perubahan skema dalam data sumber untuk kueri.
- Evolusi skema hanya terjadi ketika versi baru aplikasi disebarkan. Karena hanya satu versi kueri streaming yang dapat berjalan secara bersamaan, Anda harus memulai ulang pekerjaan streaming untuk mengembangkan skema untuk variabel status.
- Kode Anda secara eksplisit mendefinisikan semua variabel status dan mengatur skema untuk semua variabel status.
- Di Scala, Anda menggunakan
Encoderuntuk menentukan skema untuk setiap variabel. - Di Python, Anda secara eksplisit membuat skema sebagai
StructType.
- Di Scala, Anda menggunakan
pola evolusi skema yang tidak didukung
Pola evolusi skema berikut tidak didukung:
Penggantian nama bidang: Penggantian nama bidang tidak didukung karena bidang dicocokkan berdasarkan nama. Mencoba mengganti nama bidang ditangani dengan menghapus bidang dan menambahkan bidang baru. Operasi ini tidak mengakibatkan kesalahan karena menghapus dan menambahkan bidang diizinkan, tetapi nilai dari bidang asli tidak dibawa ke bidang baru.
Mungkin penggantian nama kunci atau jenis perubahan: Anda tidak dapat mengubah nama atau jenis kunci dalam variabel status peta.
Operasi penyempitan tipe, juga dikenal sebagai downcasting, tidak didukung. Operasi ini dapat mengakibatkan kehilangan data. Berikut ini adalah contoh operasi penyempitan jenis yang tidak didukung:
-
doubletidak dapat dipersempit kefloat,long, atauint -
floattidak dapat dipersempit kelongatauint -
longtidak dapat dipersempit keint
-
Pelebaran tipe di penyimpanan status
Anda dapat memperlebar jenis data primitif ke jenis yang lebih akomodatif. Perubahan pelebaran jenis berikut didukung:
-
intdapat dipromosikan kelong,float, ataudouble -
longdapat dipromosikan kefloatataudouble -
floatdapat dipromosikan kedouble -
stringdapat dipromosikan kebytes -
bytesdapat dipromosikan kestring
Nilai yang ada diubah atau ditingkatkan ke tipe data baru. Misalnya, 12 menjadi 12.00.
Contoh pelebaran tipe dengan transformWithState
Scala
// Initial run with Integer field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with Long field (type widening)
case class StateV2(value1: Long)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}
Python
class IntStateProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with Integer field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to integer and update state
value = pdf["value"].iloc[0]
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class LongStateProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with Long field (type widening)
state_schema = StructType([
StructField("value1", LongType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to long and update state
value = pdf["value"].iloc[0]
# When reading state written with IntStateProcessor,
# it will be automatically converted to Long
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
Tambahkan bidang ke nilai penyimpanan status
Anda dapat menambahkan bidang baru ke skema nilai penyimpanan status yang ada.
Saat membaca data yang ditulis dengan skema lama, encoder Avro mengembalikan data untuk bidang tambahan yang dikodekan secara asli sebagai null.
Python selalu menafsirkan nilai-nilai ini sebagai None. Scala memiliki perilaku default yang berbeda tergantung pada jenis untuk bidang . Databricks merekomendasikan penerapan logika untuk memastikan bahwa Scala tidak mengimplementasikan nilai untuk data yang hilang. Lihat nilai default untuk bidang yang ditambahkan ke variabel status.
Contoh penambahan bidang baru dengan transformWithState
Scala
// Initial run with single field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with additional field
case class StateV2(value1: Integer, value2: String)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class StateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class StateV2Processor(StatefulProcessor):
def init(self, handle):
# Later schema with additional fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Read current state
current_state = self.state.get()
# When reading state written with StateV1(1),
# it will be automatically converted to StateV2(1, None)
value1 = current_state[0]
value2 = current_state[1]
# Now update with both fields populated
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
Hapus kolom dari penyimpanan nilai status
Anda dapat menghapus bidang dari skema variabel yang ada. Saat membaca data dengan skema lama, bidang yang ada di data lama tetapi tidak dalam skema baru diabaikan.
Contoh menghapus bidang dari variabel status
Scala
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with field removed
case class StateV2(value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}
Python
class RemoveFieldsOriginalProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with multiple fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class RemoveFieldsReducedProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with field removed
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
# it will be automatically converted to just (1,)
current_state = self.state.get()
value1 = current_state[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]]
})
Urus ulang bidang dalam variabel status
Anda dapat menyusun ulang bidang dalam variabel status, termasuk saat Anda menambahkan atau menghapus bidang yang ada. Bidang dalam variabel status dicocokkan berdasarkan nama, bukan posisi.
Contoh menyusun ulang bidang dalam variabel status
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class OrderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with fields in original order
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ReorderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with reordered fields
state_schema = StructType([
StructField("value2", StringType(), True),
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
# it will be automatically converted to ("metadata-1", 1)
current_state = self.state.get()
value2 = current_state[0]
value1 = current_state[1]
self.state.update((f"new-metadata-{value}", int(value)))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value2": [current_state[0]],
"value1": [current_state[1]]
})
Menambahkan variabel status ke aplikasi stateful
Kita juga dapat menambahkan variabel status di antara eksekusi kueri.
Catatan: Pola ini tidak memerlukan encoder Avro dan didukung oleh semua aplikasi transformWithState.
Contoh penambahan variabel status ke aplikasi stateful
Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
case class StateV2(value1: String, value2: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
Python
class MultiStateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single state variable
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
Menghapus variabel status dari aplikasi stateful
Selain menghapus bidang, Anda juga dapat menghapus variabel status di antara kueri yang dijalankan.
Catatan: Pola ini tidak memerlukan encoder Avro dan didukung oleh semua aplikasi transformWithState.
Contoh menghapus variabel status ke aplikasi stateful
Scala
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
Python
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
class RemoveStateVarProcessor(StatefulProcessor):
def init(self, handle):
# Only use one state variable and delete the other
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
# Delete old state variable that we no longer need
handle.deleteIfExists("testState2")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
Nilai default untuk bidang yang ditambahkan ke variabel status
Saat Anda menambahkan bidang baru ke variabel status yang ada, variabel status yang ditulis menggunakan skema lama memiliki perilaku berikut:
- Encoder Avro mengembalikan nilai
nulluntuk bidang tambahan. - Python mengonversi nilai-nilai ini ke
Noneuntuk semua jenis data. - Perilaku default Scala berbeda menurut jenis data:
- Jenis referensi mengembalikan
null. - Jenis primitif mengembalikan nilai default, yang berbeda berdasarkan jenis primitif. Contohnya termasuk
0untuk jenisintataufalseuntuk jenisbool.
- Jenis referensi mengembalikan
Tidak ada fungsionalitas atau metadata bawaan yang menandai bidang seperti yang ditambahkan melalui evolusi skema. Anda harus menerapkan logika untuk menangani nilai null yang dikembalikan untuk bidang yang tidak ada dalam skema Anda sebelumnya.
Untuk Scala, Anda dapat menghindari imputasi nilai default dengan menggunakan Option[<Type>], yang mengembalikan nilai yang hilang sebagai None alih-alih menggunakan default jenis.
Anda harus menerapkan logika untuk menangani situasi dengan benar di mana nilai jenis None dikembalikan karena evolusi skema.
Contoh nilai default untuk bidang yang ditambahkan ke variabel status
Scala
// Example demonstrating how null defaults work in schema evolution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()
// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")
// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}
// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}
Python
class NullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ExpandedNullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Evolution: Adding new fields with null/default values
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True),
StructField("value3", LongType(), True),
StructField("value4", IntegerType(), True),
StructField("value5", BooleanType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Reading from state
current_state = self.state.get()
# Showing how null defaults work in Python
# When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
# it will be automatically converted to (1, "metadata-1", None, None, None)
# In Python, both primitive and reference types will be None
value1 = current_state[0]
value2 = current_state[1]
value3 = current_state[2] # Will be None when evolved from older schema
value4 = current_state[3] # Will be None when evolved from older schema
value5 = current_state[4] # Will be None when evolved from older schema
# Check if value3 is None
if value3 is None:
print("The value3 field is None (default value for evolution)")
value3 = 100 # Set a real value now
# Now update with all fields populated
self.state.update((
value1,
value2,
value3,
value4 if value4 is not None else 42,
value5 if value5 is not None else True
))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]],
"value3": [current_state[2]],
"value4": [current_state[3]],
"value5": [current_state[4]]
})
Keterbatasan
Tabel berikut menjelaskan batas default untuk perubahan evolusi skema:
| Deskripsi | Batas bawaan | Konfigurasi Spark untuk menggantikan |
|---|---|---|
| Evolusi skema untuk variabel status. Menerapkan beberapa perubahan skema dalam penghidupkan ulang kueri dihitung sebagai evolusi skema tunggal. | 16 | spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold |
| Evolusi skema untuk kueri streaming. Menerapkan beberapa perubahan skema dalam penghidupkan ulang kueri dihitung sebagai evolusi skema tunggal. | 128 | spark.sql.streaming.stateStore.maxNumStateSchemaFiles |
Pertimbangkan detail berikut dengan hati-hati saat memecahkan masalah evolusi skema untuk variabel status:
- Beberapa pola tidak didukung untuk evolusi skema. Lihat pola evolusi skema yang tidak didukung.
- Evolusi skema memenuhi semua persyaratan
transformWithStatedan memerlukan format pengodean Avro. Lihat Persyaratan. - Anda harus memulai ulang kueri streaming untuk menyebarkan perubahan kode yang mengakibatkan evolusi skema. Lihat Kapan evolusi skema terjadi?.