Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Penting
Fitur ini ada dalam Pratinjau Umum di Databricks Runtime 16.2 ke atas.
Anda dapat membangun aplikasi streaming menggunakan operator stateful kustom untuk menerapkan solusi berlatensi rendah dan hampir real-time yang menggunakan logika stateful yang fleksibel. Operator kustom stateful memungkinkan kasus penggunaan dan pola operasional baru yang tidak tersedia melalui pemrosesan Streaming Terstruktur tradisional.
Nota
Databricks merekomendasikan penggunaan fungsionalitas Streaming Terstruktur bawaan untuk operasi stateful yang didukung seperti agregasi, deduplikasi, dan gabungan streaming. Lihat Apa itu streaming stateful?.
Databricks merekomendasikan penggunaan transformWithState atas operator warisan untuk transformasi status arbitrer. Untuk dokumentasi tentang operator warisan flatMapGroupsWithState dan mapGroupsWithState, lihat operator stateful warisan di .
Persyaratan
Operator transformWithState dan API dan kelas terkait memiliki persyaratan berikut:
- Tersedia di Databricks Runtime 16.2 ke atas.
- Komputasi harus menggunakan mode akses khusus atau tanpa isolasi, kecuali mode akses standar didukung untuk Python (
transformWithStateInPandas) di Databricks Runtime 16.3 ke atas, dan untuk Scala (transformWithState) di Databricks Runtime 17.3 ke atas. - Anda harus menggunakan penyedia penyimpanan status RocksDB. Databricks merekomendasikan untuk mengaktifkan RocksDB sebagai bagian dari konfigurasi komputasi.
Nota
Untuk mengaktifkan penyedia penyimpanan status RocksDB untuk sesi saat ini, jalankan hal berikut:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Apa itu transformWithState?
Operator transformWithState menerapkan prosesor kustom dengan status khusus pada kueri Terstruktur Streaming. Anda harus menerapkan prosesor stateful kustom untuk menggunakan transformWithState. Streaming Terstruktur mencakup API untuk membangun prosesor 'stateful' Anda menggunakan Python, Scala, atau Java.
Anda menggunakan transformWithState untuk menerapkan logika kustom ke kunci pengelompokan untuk rekaman yang diproses secara bertahap dengan Streaming Terstruktur. Berikut ini menjelaskan desain tingkat tinggi:
- Tentukan satu atau beberapa variabel status.
- Informasi status dipertahankan untuk setiap kunci pengelompokan dan dapat diakses untuk setiap variabel status sesuai dengan logika yang ditentukan pengguna.
- Untuk setiap batch mikro yang diproses, semua rekaman untuk kunci tersedia sebagai iterator.
- Gunakan handel bawaan untuk mengontrol kapan dan bagaimana rekaman dipancarkan berdasarkan timer dan kondisi yang ditentukan pengguna.
- Nilai status mendukung definisi time-to-live (TTL) individual, memungkinkan fleksibilitas dalam mengelola kedaluwarsa status dan ukuran status.
Karena transformWithState mendukung evolusi skema di penyimpanan status, Anda dapat melakukan iterasi dan pembaruan aplikasi produksi Tanpa kehilangan informasi status historis atau perlu memproses ulang catatan, memberikan fleksibilitas untuk pengembangan dan kemudahan pemeliharaan. Lihat Evolusi skema di penyimpanan status.
Penting
PySpark menggunakan operator transformWithStateInPandas alih-alih transformWithState. Dokumentasi Azure Databricks menggunakan transformWithState untuk menjelaskan fungsionalitas untuk implementasi Python dan Scala.
Implementasi Scala dan Python dari transformWithState dan API terkait berbeda karena spesifik bahasa tetapi menyediakan fungsionalitas yang sama. Lihat contoh khusus bahasa dan dokumentasi API untuk bahasa pemrograman pilihan Anda.
Pemrosesan bawaan
Anda menerapkan logika inti untuk aplikasi stateful kustom Anda dengan menerapkan handler menggunakan handle bawaan.
- Pegangan menyediakan metode untuk berinteraksi dengan nilai status dan timer, memproses catatan masuk, dan memancarkan catatan.
- Handler menentukan logika berbasis peristiwa kustom Anda.
Handle untuk setiap tipe status diimplementasikan berdasarkan struktur data yang mendasar, tetapi masing-masing berisi fungsionalitas untuk mengambil, menyimpan, memperbarui, dan menghapus rekaman.
Handler diimplementasikan berdasarkan peristiwa yang diamati dalam rekaman input atau timer, menggunakan semantik berikut:
- Tentukan handler menggunakan metode
handleInputRowsuntuk mengontrol bagaimana data diproses, status diperbarui, dan rekaman dipancarkan untuk setiap batch mikro rekaman yang diproses untuk kunci pengelompokan. Lihat Mengelola baris input. - Tentukan handler menggunakan metode
handleExpiredTimeruntuk menggunakan ambang batas berbasis waktu untuk menjalankan logika apakah rekaman tambahan diproses untuk kunci pengelompokan atau tidak. Lihat peristiwa terjadwal Program.
Tabel berikut ini memiliki perbandingan perilaku fungsi yang didukung oleh handler ini:
| Perilaku | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Mendapatkan, menempatkan, memperbarui, atau menghapus nilai status | Ya | Ya |
| Membuat atau menghapus timer | Ya | Ya |
| Memancarkan rekaman | Ya | Ya |
| Melakukan iterasi atas rekaman dalam batch mikro saat ini | Ya | Tidak |
| Logika pemicu berdasarkan waktu yang berlalu | Tidak | Ya |
Anda dapat menggabungkan handleInputRows dan handleExpiredTimer untuk menerapkan logika kompleks sesuai kebutuhan.
Misalnya, Anda dapat menerapkan aplikasi yang menggunakan handleInputRows untuk memperbarui nilai status untuk setiap mikro-batch dan mengatur timer 10 detik di masa mendatang. Jika tidak ada rekaman tambahan yang diproses, Anda dapat menggunakan handleExpiredTimer untuk memancarkan nilai saat ini di penyimpanan status. Jika rekaman baru diproses untuk kunci pengelompokan, Anda dapat menghapus timer yang ada dan mengatur timer baru.
Jenis status kustom
Anda dapat menerapkan beberapa objek status dalam satu operator stateful. Nama yang Anda berikan untuk setiap objek status bertahan di penyimpanan status, yang dapat Anda akses dengan pembaca penyimpanan status. Jika objek status Anda menggunakan StructType, Anda memberikan nama untuk setiap bidang dalam struktur saat meneruskan skema. Nama-nama ini juga dapat dilihat saat membaca penyimpanan status. Lihat Membaca informasi status Streaming Terstruktur.
Fungsionalitas yang disediakan oleh kelas dan operator bawaan dimaksudkan untuk memberikan fleksibilitas dan ekstensibilitas, dan pilihan implementasi harus diberi tahu oleh logika lengkap yang perlu dijalankan aplikasi Anda. Misalnya, Anda mungkin menerapkan logika yang hampir identik menggunakan ValueState yang dikelompokkan menurut bidang user_id dan session_id atau MapState yang dikelompokkan menurut user_id di mana session_id adalah kunci untuk MapState. Dalam hal ini, MapState mungkin merupakan implementasi yang disukai jika logika perlu mengevaluasi kondisi di beberapa session_id.
Bagian berikut menjelaskan jenis status yang didukung oleh transformWithState.
ValueState
Untuk setiap kunci pengelompokan, ada nilai terkait.
Status nilai dapat mencakup jenis kompleks, seperti struct atau tuple. Saat memperbarui ValueState, Anda menerapkan logika untuk menggantikan seluruh nilai. TTL untuk status nilai direset saat nilai diperbarui tetapi tidak diatur ulang jika kunci sumber yang cocok dengan ValueState diproses tanpa memperbarui ValueStateyang disimpan.
ListState
Untuk setiap kunci pengelompokan, ada daftar terkait.
Status daftar adalah kumpulan nilai, yang masing-masing dapat menyertakan jenis kompleks. Setiap nilai dalam daftar memiliki TTL sendiri. Anda dapat menambahkan item ke daftar dengan menyisipkan item individual, menambahkan daftar item, atau menggantikan seluruh daftar dengan put. Hanya operasi put dianggap sebagai pembaruan untuk mengatur ulang TTL.
MapState
Untuk setiap kunci pengelompokan, ada peta terkait. Map dalam Apache Spark berfungsi setara dengan dict di Python.
Penting
Kunci pengelompokan menjelaskan bidang yang ditentukan dalam klausa GROUP BY kueri Streaming Terstruktur. Keadaan peta berisi sejumlah pasangan kunci-nilai secara sembarang untuk kunci pengelompokan.
Misalnya, jika Anda mengelompokkan menurut user_id dan ingin menentukan peta untuk setiap session_id, kunci pengelompokan Anda user_id dan kunci di peta Anda session_id.
Keadaan peta merupakan kumpulan kunci berbeda yang masing-masing memetakan ke nilai yang dapat berupa tipe kompleks. Setiap pasangan kunci-nilai dalam peta memiliki TTL sendiri. Anda dapat memperbarui nilai kunci tertentu atau menghapus kunci dan nilainya. Anda dapat mengembalikan nilai individual menggunakan kuncinya, mencantumkan semua kunci, mencantumkan semua nilai, atau mengembalikan iterator untuk bekerja dengan kumpulan lengkap pasangan kunci-nilai di peta.
Menginisialisasi variabel status kustom
Saat menginisialisasi StatefulProcessor, Anda membuat variabel lokal untuk setiap objek status yang memungkinkan Anda berinteraksi dengan objek status dalam logika kustom Anda. Variabel status didefinisikan dan diinisialisasi dengan mengambil alih metode init bawaan di kelas StatefulProcessor.
Anda menentukan jumlah objek status yang sewenang-wenang menggunakan metode getValueState, getListState, dan getMapState saat menginisialisasi StatefulProcessorAnda .
Setiap objek status harus memiliki yang berikut:
- Nama unik
- Skema yang ditentukan
- Di Python, skema ditentukan secara eksplisit.
- Di Scala, teruskan
Encoderuntuk menentukan skema status.
Anda juga dapat memberikan durasi time-to-live (TTL) opsional dalam milidetik. Jika menerapkan status peta, Anda harus memberikan definisi skema terpisah untuk kunci peta dan nilai.
Nota
Logika tentang bagaimana informasi status dikueri, diperbarui, dan dipancarkan ditangani secara terpisah. Lihat gunakan variabel status Anda.
Contoh aplikasi stateful
Berikut ini menunjukkan sintaks dasar untuk menentukan dan menggunakan prosesor berstatus kustom dengan transformWithState, termasuk contoh variabel status untuk setiap jenis yang didukung. Untuk lebih banyak contoh, lihat Contoh aplikasi stateful.
Nota
Python menggunakan tuple untuk semua interaksi dengan nilai status. Ini berarti kode Python harus meneruskan nilai menggunakan tuples saat menggunakan operasi seperti put dan update dan harus bisa menangani tuples saat menggunakan get.
Misalnya, jika skema untuk status nilai Anda hanyalah bilangan bulat tunggal, Anda akan menerapkan kode seperti berikut:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Ini juga berlaku untuk item dalam ListState atau nilai dalam MapState.
Phyton
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark menyertakan kelas StatefulProcessorHandle untuk menyediakan akses ke fungsi yang mengontrol bagaimana kode Python yang ditentukan pengguna Berinteraksi dengan informasi status. Anda harus selalu mengimpor dan meneruskan StatefulProcessorHandle ke variabel handle saat menginisialisasi StatefulProcessor.
Variabel handle mengikat variabel lokal di kelas Python Anda ke variabel status.
Nota
Scala menggunakan metode getHandle.
Tentukan status awal
Anda dapat memberikan status awal secara opsional untuk digunakan dengan mikro-batch pertama. Ini dapat berguna saat memigrasikan alur kerja yang ada ke aplikasi kustom baru, meningkatkan operator stateful untuk mengubah skema atau logika Anda, atau memperbaiki kegagalan yang tidak dapat diperbaiki secara otomatis dan memerlukan intervensi manual.
Nota
Gunakan pembaca penyimpanan status untuk mengkueri informasi status dari titik pemeriksaan yang sudah ada. Lihat Membaca informasi status Streaming Terstruktur.
Jika Anda mengonversi tabel Delta yang sudah ada ke aplikasi stateful, baca tabel menggunakan spark.read.table("table_name") dan berikan DataFrame yang dihasilkan. Anda dapat secara opsional memilih atau memodifikasi bidang agar sesuai dengan aplikasi stateful baru Anda.
Anda memberikan status awal menggunakan DataFrame dengan skema kunci pengelompokan yang sama dengan baris input.
Nota
Python menggunakan handleInitialState untuk menentukan status awal saat menentukan StatefulProcessor. Scala menggunakan kelas yang berbeda StatefulProcessorWithInitialState.
Menggunakan variabel status Anda
Objek status yang didukung menyediakan metode untuk mendapatkan status, memperbarui informasi status yang ada, atau menghapus status saat ini. Setiap jenis status yang didukung memiliki implementasi metode unik yang sesuai dengan struktur data yang diterapkan.
Setiap kunci pengelompokan yang diamati memiliki informasi status khusus.
- Rekaman dipancarkan berdasarkan logika yang Anda terapkan dan menggunakan skema output yang Anda tentukan. Lihat Mengeluarkan rekaman.
- Anda dapat mengakses nilai di penyimpanan status menggunakan pembaca
statestore. Pembaca ini memiliki fungsionalitas batch dan tidak ditujukan untuk beban kerja latensi rendah. Lihat Membaca informasi status Streaming Terstruktur. - Logika yang ditentukan menggunakan
handleInputRowshanya diaktifkan jika terdapat rekaman untuk kunci di dalam mikro-batch. Lihat Mengelola baris input. - Gunakan
handleExpiredTimeruntuk menerapkan logika berbasis waktu yang tidak bergantung pada pengamatan rekaman untuk dijalankan. Lihat peristiwa terjadwal Program.
Nota
Objek status diisolasi dengan mengelompokkan kunci dengan implikasi berikut:
- Nilai status tidak dapat terpengaruh oleh rekaman yang terkait dengan kunci pengelompokan yang berbeda.
- Anda tidak dapat menerapkan logika yang bergantung pada perbandingan nilai atau pembaruan status di antara kunci pengelompokan.
Anda dapat membandingkan nilai dalam kunci pengelompokan. Gunakan MapState untuk menerapkan logika dengan kunci kedua yang dapat digunakan logika kustom Anda. Misalnya, pengelompokan dengan user_id dan menggunakan MapState sebagai kunci menggunakan alamat IP akan memungkinkan Anda menerapkan logika yang melacak sesi pengguna yang simultan.
Pertimbangan tingkat lanjut untuk bekerja dengan status
Menulis ke variabel status memicu penulisan ke RocksDB. Untuk performa yang dioptimalkan, Databricks merekomendasikan pemrosesan semua nilai di iterator untuk kunci tertentu dan melakukan pembaruan dalam satu penulisan jika memungkinkan.
Nota
Pembaruan status memiliki ketahanan terhadap kesalahan. Jika tugas terhenti sebelum mikro-batch selesai diproses, nilai dari mikro-batch yang terakhir sukses digunakan saat diulang.
Nilai status tidak memiliki default bawaan. Jika logika Anda memerlukan pembacaan informasi status yang ada, gunakan metode exists saat menerapkan logika Anda.
Nota
MapState variabel memiliki fungsionalitas tambahan untuk memeriksa kunci individual atau mencantumkan semua kunci untuk menerapkan logika untuk status null.
Memancarkan rekaman
Logika yang ditentukan pengguna mengontrol cara transformWithState memancarkan rekaman. Rekaman dipancarkan per kunci pengelompokan.
Aplikasi stateful kustom tidak membuat asumsi tentang bagaimana informasi status digunakan saat menentukan cara mengeluarkan rekaman, dan jumlah rekaman yang dikembalikan untuk kondisi tertentu bisa tidak ada, satu, atau banyak.
Anda menerapkan logika untuk memancarkan rekaman menggunakan handleInputRows atau handleExpiredTimer. Lihat Menangani baris input dan Peristiwa yang dijadwalkan dalam Program.
Nota
Anda dapat menerapkan beberapa nilai status dan menentukan beberapa kondisi untuk memancarkan rekaman, tetapi semua rekaman yang dipancarkan harus menggunakan skema yang sama.
Phyton
Di Python, Anda menentukan skema output menggunakan kata kunci outputStructType saat memanggil transformWithStateInPandas.
Anda menghasilkan rekaman menggunakan objek DataFrame pandas dan yield.
Opsional, Anda dapat yield DataFrame kosong. Saat dikombinasikan dengan mode output update, memancarkan DataFrame kosong memperbarui nilai untuk kunci pengelompokan menjadi null.
Scala
Di Scala, Anda mengeluarkan rekaman menggunakan objek Iterator. Skema output berasal dari rekaman yang dipancarkan.
Anda dapat secara opsional memancarkan Iteratorkosong. Saat dikombinasikan dengan mode output update, mengeluarkan Iterator kosong memperbarui nilai untuk kunci pengelompokan agar menjadi null (tidak ada nilai).
Menangani baris input
Gunakan metode handleInputRows untuk menentukan logika tentang bagaimana rekaman yang diamati dalam kueri streaming Anda berinteraksi dengan dan memperbarui nilai status. Handler yang Anda tentukan dengan metode handleInputRows berjalan setiap kali rekaman diproses melalui kueri Streaming Terstruktur Anda.
Untuk sebagian besar aplikasi stateful yang diterapkan dengan transformWithState, logika inti didefinisikan menggunakan handleInputRows.
Untuk setiap pembaruan mikro-batch yang diproses, semua catatan dalam mikro-batch untuk kunci pengelompokan tertentu dapat diakses menggunakan iterator. Logika yang ditentukan pengguna dapat berinteraksi dengan semua rekaman dari mikrobatch saat ini dan nilai dalam statestore.
Program acara terjadwal
Anda dapat menggunakan timer untuk menerapkan logika kustom berdasarkan waktu yang berlalu dari kondisi tertentu.
Anda bekerja dengan penghitung waktu dengan menerapkan metode handleExpiredTimer.
Dalam kunci pengelompokan, timer diidentifikasi secara unik oleh tanda waktunya.
Ketika timer kedaluwarsa, hasilnya ditentukan oleh logika yang diterapkan dalam aplikasi Anda. Pola umum meliputi:
- Memancarkan informasi yang disimpan dalam variabel status.
- Membersihkan informasi status yang tersimpan.
- Membuat timer baru.
Timer yang sudah kedaluwarsa tetap aktif bahkan jika tidak ada rekaman untuk kunci yang berkaitan diproses dalam mikro-batch.
Tentukan model waktu
Saat meneruskan StatefulProcessor Anda ke transformWithState, Anda harus menentukan model waktu. Opsi berikut ini didukung:
ProcessingTimeEventTime-
NoTimeatauTimeMode.None()
Menentukan NoTime berarti timer tidak didukung untuk prosesor Anda.
Nilai timer bawaan
Databricks menyarankan agar tidak mengaktifkan jam sistem dalam aplikasi stateful kustom Anda, karena ini dapat menyebabkan upaya ulang yang tidak terpercaya pada kegagalan tugas. Gunakan metode kelas TimerValues ketika harus mengakses waktu pemrosesan atau tanda penanda air:
TimerValues |
Deskripsi |
|---|---|
getCurrentProcessingTimeInMs |
Mengembalikan tanda waktu waktu pemrosesan untuk batch saat ini dalam milidetik sejak epoch. |
getCurrentWatermarkInMs |
Mengembalikan stempel waktu tanda air untuk batch saat ini dalam milidetik sejak epoch. |
Nota
Waktu pemrosesan menjelaskan durasi pemrosesan mikro-batch dengan Apache Spark. Banyak sumber streaming, seperti Kafka, juga termasuk waktu pemrosesan sistem.
Tanda air pada kueri streaming sering didefinisikan sesuai waktu peristiwa atau waktu pemrosesan sumber streaming. Lihat Menerapkan marka air untuk mengontrol ambang batas pemrosesan data.
Tanda air dan jendela dapat digunakan dalam kombinasi dengan transformWithState. Anda dapat menerapkan fungsionalitas serupa dalam aplikasi stateful kustom Anda dengan memanfaatkan fungsionalitas TTL, timer, dan MapState atau ListState.
Apa itu State Time to Live (TTL)?
Nilai status yang digunakan oleh transformWithState masing-masing mendukung spesifikasi time to live (TTL) opsional. Ketika TTL habis masa berlakunya, nilai dikeluarkan dari penyimpanan status. TTL hanya berinteraksi dengan nilai di penyimpanan status, yang berarti Anda dapat menerapkan logika untuk mengeluarkan informasi status, tetapi Anda tidak dapat langsung memicu logika karena TTL mengeluarkan nilai status.
Penting
Jika Anda tidak menerapkan TTL, Anda harus menangani pengeluaran status menggunakan logika lain untuk menghindari pertumbuhan status tanpa akhir.
TTL diberlakukan untuk setiap nilai status, dengan aturan yang berbeda untuk setiap jenis status.
- Variabel state ditentukan oleh kunci pengelompokan.
- Untuk objek
ValueState, hanya satu nilai yang disimpan per kunci pengelompokan. TTL berlaku untuk nilai ini. - Untuk objek
ListState, daftar dapat berisi banyak nilai. TTL berlaku untuk setiap nilai dalam daftar secara independen. - Untuk objek
MapState, setiap kunci peta memiliki nilai status terkait. TTL berlaku secara independen untuk setiap pasangan kunci-nilai dalam peta.
Untuk semua jenis status, TTL mengatur ulang jika informasi status diperbarui.
Nota
Meskipun TTL dibatasi ke nilai individual dalam ListState, satu-satunya cara untuk memperbarui nilai dalam daftar adalah dengan menggunakan metode put untuk menimpa keseluruhan isi variabel ListState.
Apa perbedaan antara timer dan TTL?
Ada beberapa tumpang tindih antara timer dan time to live (TTL) untuk variabel status, tetapi timer menyediakan serangkaian fitur yang lebih luas daripada TTL.
TTL menghapus informasi status yang belum diperbarui selama periode yang ditentukan oleh pengguna. Ini memungkinkan pengguna untuk mencegah pertumbuhan status yang tidak dicentang dan menghapus entri status kedaluarsa. Karena peta dan daftar menerapkan TTL untuk setiap nilai, Anda dapat menerapkan fungsi yang hanya mempertimbangkan nilai status yang telah diperbarui baru-baru ini dengan mengatur TTL.
Timer memungkinkan Anda menentukan logika kustom di luar penyingkiran status, termasuk mengeluarkan catatan. Anda dapat secara opsional menggunakan timer untuk menghapus informasi status untuk nilai status tertentu, dengan fleksibilitas tambahan untuk memancarkan nilai atau memicu logika kondisional lainnya berdasarkan timer.