Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
Bu özellik Databricks Runtime 16.2 ve üzeri Genel Önizleme'ndedir.
Rastgele durum bilgisi olan mantık kullanan düşük gecikme süreli ve neredeyse gerçek zamanlı çözümler uygulamak için özel durum bilgisi olan işleçler kullanarak akış uygulamaları oluşturabilirsiniz. Özel durumlu operatörler, geleneksel Yapılandırılmış Akış işleme aracılığıyla ulaşılamayan yeni operasyonel kullanım senaryolarının ve desenlerinin kilidini açar.
Not
Databricks, toplamalar, yinelenenleri kaldırma ve akış birleştirmeleri gibi durumsal desteklenen işlemler için yerleşik Yapılandırılmış Akış işlevselliğini kullanılmasını önerir. Bkz. Durumlu akış nedir?.
Databricks, rastgele durum dönüştürmeleri için eski işleçler üzerinden transformWithState kullanılmasını önerir. Eski flatMapGroupsWithState ve mapGroupsWithState işleçleri hakkında belgeler için bkz. Eski rastgele durum bilgisi olan işleçler.
Gereksinimler
transformWithState işleci ile ilgili API'ler ve sınıflar aşağıdaki gereksinimlere sahiptir:
- Databricks Runtime 16.2 ve üzerinde kullanılabilir.
- İşlem, yalıtımsız veya ayrılmış erişim modunu kullanmalıdır, ancak Databricks Runtime 16.3 ve üzeri sürümlerde Python (
transformWithStateInPandas) ve Databricks Runtime 17.3 ve üzeri sürümlerde Scala (transformWithState) için standart erişim modu desteklenmektedir. - RocksDB durum deposu sağlayıcısını kullanmanız gerekir. Databricks, işlem yapılandırmasının bir parçası olarak RocksDB'nin etkinleştirilmesini önerir.
Not
Geçerli oturumda RocksDB durum deposu sağlayıcısını etkinleştirmek için aşağıdakileri çalıştırın:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
transformWithState nedir?
transformWithState operatörü, Yapılandırılmış Akış sorgusuna özel bir durum bilgili işlemci uygular.
transformWithStatekullanmak için durum bilgisi olan özel bir işlemci uygulamanız gerekir. Yapılandırılmış Akış, Python, Scala veya Java kullanarak durum bilgisi olan işlemcinizi oluşturmaya yönelik API'leri içerir.
Yapılandırılmış Akış ile artımlı olarak işlenen kayıtlar için gruplandırma anahtarına özel mantık uygulamak için transformWithState kullanırsınız. Aşağıda üst düzey tasarım açıklanmaktadır:
- Bir veya daha fazla durum değişkeni tanımlayın.
- Durum bilgileri her gruplandırma anahtarı için kalıcıdır ve kullanıcı tanımlı mantığa göre her durum değişkeni için erişilebilir.
- İşlenen her mikro toplu işlem için anahtarın tüm kayıtları yineleyici olarak kullanılabilir.
- Yerleşik tutamaçları kullanarak süreölçerlere ve kullanıcı tanımlı koşullara göre kayıtların ne zaman ve nasıl yayınlandığını kontrol edin.
- Durum değerleri tek tek yaşam süresi (TTL) tanımlarını destekleyerek durum süre sonu ve durum boyutunu yönetme esnekliği sağlar.
transformWithState durum deposunda şema evrimini desteklediğinden, geçmiş durum bilgilerini kaybetmeden veya kayıtları yeniden işlemeye gerek kalmadan üretim uygulamalarınızı yineleyebilir ve güncelleştirebilir, geliştirme ve bakım kolaylığı için esneklik sağlayabilirsiniz. Bkz. durum deposunda şema evrimi.
Önemli
PySpark, transformWithStateInPandasyerine transformWithState işlecini kullanır. Azure Databricks belgelerinde hem Python hem de Scala uygulamalarının işlevselliğini açıklamak için transformWithState kullanılır.
transformWithState ve ilgili API'lerin Scala ve Python uygulamaları dil özelliklerine göre farklılık gösterir ancak aynı işlevselliği sağlar. Tercih ettiğiniz programlama dili için dile özgü örneklere ve API belgelerine bakın.
Yerleşik işleme tutamaçları
Yerleşik tanıtıcılarını kullanarak işleyicileri uygulayarak özel durum bilgisi taşıyan uygulamanızın temel mantığını uygularsınız.
- Kollar, durum değerleri ve zamanlayıcılarla etkileşime geçmek, gelen kayıtları işlemek ve kayıtları göndermek için yöntemler sağlar.
- İşleyiciler, özelleştirilmiş olay odaklı mantığınızı tanımlar.
Her bir durum türü için kollar, kullanılan veri yapısına göre uygulanır, ancak her biri kayıtları alma, yerleştirme, güncelleme ve silme işlevlerine sahiptir.
İşleyiciler, aşağıdaki semantik kullanılarak giriş kayıtlarında veya zamanlayıcılarda gözlemlenen olaylara göre uygulanır:
- Verilerin nasıl işlendiğini kontrol etmek, durumun güncellenmesi ve gruplama anahtarı için işlenen her mikro toplu kayıt işlemi sırasında kayıtların yayımlanması amacıyla
handleInputRowsyöntemini kullanarak bir işleyici tanımlayın. Bkz. giriş satırlarını işleme. - Gruplandırma anahtarı için ek kayıtların işlenip işlenmediğine bakılmaksızın mantığı çalıştırmak için zamana dayalı eşikleri kullanmak üzere
handleExpiredTimeryöntemini kullanarak bir işleyici tanımlayın. Bkz. program zamanlanmış olayları .
Aşağıdaki tabloda, bu işleyiciler tarafından desteklenen işlevsel davranışların karşılaştırması yer alır:
| Davranış | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Durum değerlerini alma, yerleştirme, güncelleştirme veya temizleme | Evet | Evet |
| Zamanlayıcı oluşturma veya silme | Evet | Evet |
| Kayıtları yayınla | Evet | Evet |
| Geçerli mikro iş kümesindeki kayıtlar üzerinde dolaş | Evet | Hayır |
| Geçen süreyi temel alan tetikleyici mantığı | Hayır | Evet |
karmaşık mantığı gerektiği gibi uygulamak için handleInputRows ve handleExpiredTimer birleştirebilirsiniz.
Örneğin, her mikro toplu iş için durum değerlerini güncelleştirmek için handleInputRows kullanan ve gelecekte 10 saniyelik bir süreölçer ayarlayan bir uygulama uygulayabilirsiniz. Başka kayıt işlenmezse, durum deposundaki geçerli değerleri çıkarmak için handleExpiredTimer kullanabilirsiniz. Gruplandırma anahtarı için yeni kayıtlar işlenirse, mevcut zamanlayıcıyı temizleyebilir ve yeni bir zamanlayıcı ayarlayabilirsiniz.
Özel durum türleri
Tek bir durum bilgisi olan işleçte birden çok durum nesnesi uygulayabilirsiniz. Her durum nesnesine verdiğiniz adlar, durum deposu okuyucusuyla erişebileceğiniz durum deposunda kalır. Durum nesneniz bir StructTypekullanıyorsa, şemayı geçirirken yapıdaki her alan için adlar sağlarsınız. Bu adlar, durum deposu okunurken de görülebilir. Bkz. Yapılandırılmış Akış durumu bilgilerini okuyun.
Yerleşik sınıflar ve işleçler tarafından sağlanan işlevsellik esneklik ve genişletilebilirlik sağlamaya yöneliktir ve uygulama seçimleri, uygulamanızın çalışması için gereken tüm mantık tarafından bilgilendirilmelidir. Örneğin, ValueState ve user_id alanlarına göre gruplandırılmış bir session_id veya MapState'in user_idanahtarı olduğu session_id'e göre gruplandırılmış bir MapState kullanarak neredeyse aynı mantığı uygulayabilirsiniz. Bu örnekte, mantığın birden çok MapStatekoşulları değerlendirmesi gerekiyorsa tercih edilen uygulama bir session_id olabilir.
Aşağıdaki bölümlerde, transformWithStatetarafından desteklenen durum türleri açıklanmaktadır.
ValueState
Her gruplandırma anahtarı için ilişkili bir değer vardır.
Değer durumu, yapı veya demet gibi karmaşık türler içerebilir. bir ValueStategüncelleştirdiğinizde, değerin tamamını değiştirmek için mantık uygularsınız. Değer durumu için TTL, değer güncelleştirildiğinde sıfırlanır, ancak ValueState ile eşleşen bir kaynak anahtar, depolanan ValueStategüncellenmeden işlendiğinde sıfırlanmaz.
ListState
Her gruplandırma anahtarı için ilişkili bir liste vardır.
Liste durumu, her biri karmaşık türler içerebilen bir değer koleksiyonudur. Listedeki her değerin kendi TTL'leri vardır. Tek tek öğeleri ekleyerek, bir öğe listesi ekleyerek veya putile listenin tamamının üzerine yazarak listeye öğe ekleyebilirsiniz. TTL'yi sıfırlamak için yalnızca put işlemi bir güncelleştirme olarak kabul edilir.
MapState
Her bir gruplandırma anahtarı için ilişkili bir harita vardır. Haritalar, Python diktesine eşdeğer Apache Spark işlevidir.
Önemli
Gruplandırma anahtarları, Yapılandırılmış Akış sorgusunun GROUP BY yan tümcesinde belirtilen alanları açıklar. Harita durumları, gruplandırma anahtarı için belirli olmayan sayıda anahtar-değer çifti barındırır.
Örneğin, user_id göre gruplandırıyorsanız ve her session_idiçin bir harita tanımlamak istiyorsanız, gruplandırma anahtarınız user_id ve haritanızdaki anahtar session_id.
Eşleme durumu, her biri karmaşık türler içerebilen bir değerle birebir eşleştirilmiş ayrı anahtarlardan oluşan bir koleksiyondur. Bir eşlemedeki her anahtar-değer çifti kendi TTL'sine sahiptir. Belirli bir anahtarın değerini güncelleştirebilir veya bir anahtarı ve değerini kaldırabilirsiniz. Anahtarını kullanarak tek bir değer döndürebilir, tüm anahtarları listeleyebilir, tüm değerleri listeleyebilir veya eşlemedeki tam anahtar-değer çiftleri kümesiyle çalışmak için bir yineleyici döndürebilirsiniz.
Özel durum değişkeni başlatma
StatefulProcessor'nizi başlatırken, her durum nesnesi için özel mantığınızdaki durum nesneleriyle etkileşim kurmanızı sağlayan bir yerel değişken oluşturursunuz. Durum değişkenleri, init sınıfındaki yerleşik StatefulProcessor yöntemi geçersiz kılınarak tanımlanır ve başlatılır.
getValueStatebaşlatırken getListState, getMapStateve StatefulProcessor yöntemlerini kullanarak rastgele miktarda durum nesnesi tanımlarsınız.
Her durum nesnesi aşağıdakilere sahip olmalıdır:
- Benzersiz bir ad
- Belirtilen şema
- Python'da şema açıkça belirtilir.
- Scala'da durum şemasını belirtmek için bir
Encodergeçirin.
Ayrıca milisaniye cinsinden isteğe bağlı bir yaşam süresi (TTL) süresi de sağlayabilirsiniz. Eşleme durumu uyguluyorsanız, eşleme anahtarları ve değerler için ayrı bir şema tanımı sağlamanız gerekir.
Not
Durum bilgilerinin sorgulanma, güncelleştirme ve yayma mantığı ayrı ayrı işlenir. bkz. durum değişkenlerinizi kullanma.
Durumlu örnek uygulama
Aşağıda, desteklenen her tür için örnek durum değişkenleri de dahil olmak üzere transformWithStateile özel durum bilgisi olan bir işlemci tanımlamak ve kullanmak için temel söz dizimi gösterilmektedir. Daha fazla örnek için bkz. örnek durum bilgisi olan uygulamalar.
Not
Python, durum değerleriyle tüm etkileşimler için tanımlama kümeleri kullanır. Bu, Python kodunun put ve update gibi işlemleri kullanırken tanımlama kümeleri kullanarak değerler geçirmesi ve getkullanılırken tanımlama kümelerini işlemeyi beklemesi gerektiği anlamına gelir.
Örneğin, değer durumunuz için şema yalnızca tek bir tamsayıysa, aşağıdaki gibi bir kod uygularsınız:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Bu, bir ListState içindeki öğeler veya MapStateiçindeki değerler için de geçerlidir.
Piton
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala programlama dili
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark, kullanıcı tanımlı Python kodunuzun durum bilgileriyle nasıl etkileşim kurduğunu denetleen işlevlere erişim sağlamak için StatefulProcessorHandle sınıfını içerir.
StatefulProcessorHandlebaşlatırken handle'ı her zaman içeri aktarmanız ve StatefulProcessor değişkenine geçirmeniz gerekir.
handle değişkeni Python sınıfınızdaki yerel değişkeni durum değişkeniyle bağlar.
Not
Scala getHandle yöntemini kullanır.
İlk durumu belirtin
İsteğe bağlı olarak, ilk mikro toplu işlemle kullanılacak bir başlangıç durumu sağlayabilirsiniz. Bu, mevcut bir iş akışını yeni bir özel uygulamaya geçirirken, şemanızı veya mantığınızı değiştirmek için durum bilgisi olan bir işleci yükseltirken veya otomatik olarak onarılamayan ve el ile müdahale gerektiren bir hatayı onarırken yararlı olabilir.
Not
Mevcut bir denetim noktasından durum bilgilerini sorgulamak için durum deposu okuyucusunu kullanın. Bkz. Yapılandırılmış Akış durumu bilgilerini okuyun.
Var olan bir Delta tablosunu durum bilgisi olan bir uygulamaya dönüştürüyorsanız, spark.read.table("table_name") kullanarak tabloyu okuyun ve sonuçta elde edilen DataFrame'i geçirin. Alanları, yeni durum bilgisi olan uygulamanızla tam uyumlu olacak şekilde isteğe bağlı olarak seçebilir veya değiştirebilirsiniz.
Giriş satırlarıyla aynı gruplandırma anahtarı şemasına sahip bir DataFrame kullanarak bir başlangıç durumu sağlarsınız.
Not
Python, handleInitialStatetanımlarken ilk durumu belirtmek için StatefulProcessor kullanır. Scala, StatefulProcessorWithInitialStateayrı sınıfını kullanır.
Durum değişkenlerinizi kullanma
Desteklenen durum nesneleri durum alma, mevcut durum bilgilerini güncelleştirme veya geçerli durumu temizleme yöntemleri sağlar. Desteklenen her durum türü, uygulanan veri yapısına karşılık gelen yöntemlerin benzersiz bir uygulamasına sahiptir.
Gözlemlenen her gruplandırma anahtarının ayrılmış durum bilgileri vardır.
- Kayıtlar, uyguladığınız mantığa göre ve belirttiğiniz çıkış şeması kullanılarak gönderilir. Bakınız yayma kayıtları.
-
statestoreokuyucuyu kullanarak durum deposundaki değerlere erişebilirsiniz. Bu okuyucu toplu iş işlevlerine sahiptir ve düşük gecikme süreli iş yükleri için tasarlanmamıştır. Bkz. Yapılandırılmış Akış durumu bilgilerini okuyun. -
handleInputRowskullanılarak belirtilen mantık, yalnızca anahtarın kayıtları bir mikro veri kümesinde mevcutsa tetiklenir. Bkz. giriş satırlarını işleme. - Zamana dayalı mantık kurgusu oluşturmak ve kayıtları gözlemlemeye bağımlı olmadan çalıştırmak için
handleExpiredTimerkullanın. Bkz. program zamanlanmış olayları .
Not
Durum nesneleri, anahtarları gruplandırarak yalıtılır ve aşağıdaki etkilere neden olabilir:
- Durum değerleri farklı bir gruplandırma anahtarıyla ilişkili kayıtlardan etkilenmez.
- Değerleri karşılaştırmaya veya gruplandırma anahtarları arasındaki durumu güncellemeye bağımlı olan bir mantığı uygulayamazsınız.
Gruplandırma anahtarındaki değerleri karşılaştırabilirsiniz. Özel mantığınızın kullanabileceği ikinci bir anahtarla mantık uygulamak için bir MapState kullanın. Örneğin, user_id göre gruplandırma ve IP adresini kullanarak MapState anahtarlama, eşzamanlı kullanıcı oturumlarını izleyen mantık uygulamanıza olanak tanır.
Durumla çalışırken dikkat edilmesi gereken gelişmiş hususlar
Durum değişkenine yazma işlemi RocksDB'ye yazma işlemi tetikler. En iyi duruma getirilmiş performans için Databricks, belirli bir anahtar için yineleyicideki tüm değerlerin işlenmesini ve mümkün olduğunda güncelleştirmelerin tek bir yazmada işlenmesini önerir.
Not
Durum bilgisi güncellemeleri hataya dayanıklıdır. Bir mikro toplu işin işlenmesi tamamlanmadan bir görev kilitlenirse, yeniden denemede son başarılı mikro toplu işin değeri kullanılır.
Durum değerlerinin yerleşik varsayılanları yoktur. Mantığınız mevcut durum bilgilerinin okunmasını gerektiriyorsa, mantığınızı uygularken exists yöntemini kullanın.
Not
MapState değişkenleri, tek tek anahtarları denetlemek veya null durum mantığı uygulamak için tüm anahtarları listelemek için ek işlevlere sahiptir.
Kayıtları yayınla
Kullanıcı tanımlı mantık, transformWithState kayıtları nasıl yaytığını denetler. Kayıtlar gruplandırma anahtarı başına gönderilir.
Özel durum bilgisi olan uygulamalar, kayıtların nasıl yayılacağı belirlenirken durum bilgilerinin nasıl kullanıldığı hakkında hiçbir varsayımda bulunmaz ve belirli bir koşul için döndürülen kayıt sayısı yok, bir veya daha fazla olabilir.
Kayıtları yaymak için handleInputRows veya handleExpiredTimerkullanarak bir mantık uygularsınız.
Giriş satırlarını işleme ve program zamanlanmış olayları bölümlerine bakın.
Not
Birden çok durum değeri uygulayabilir ve kayıtları yayma için birden çok koşul tanımlayabilirsiniz, ancak yayılan tüm kayıtların aynı şemayı kullanması gerekir.
Piton
Python'da, outputStructTypeçağırırken transformWithStateInPandas anahtar sözcüğünü kullanarak çıkış şemanızı tanımlarsınız.
Bir pandas DataFrame nesnesi kullanarak kayıtları oluşturur ve yield kullanırsınız.
İsteğe bağlı olarak boş bir DataFrame oluşturabilirsiniz yield.
update çıkış moduyla birleştirildiğinde, boş bir DataFrame yayım işlemi, gruplandırma anahtarının değerlerini null olarak günceller.
Scala programlama dili
Scala'da, Iterator nesnesi kullanarak kayıtları yayarsınız. Çıkışın şeması, yayılan kayıtlardan türetilir.
İsteğe bağlı olarak boş bir Iteratoryayabilirsiniz.
update çıkış moduyla birleştirildiğinde, boş bir Iterator yaymak gruplandırma anahtarının değerlerini null olarak güncelleştirir.
Giriş satırlarını işleme
Akış sorgunuzda gözlemlenen kayıtların durum değerleriyle etkileşim kurma ve güncelleştirme mantığını tanımlamak için handleInputRows yöntemini kullanın.
handleInputRows yöntemiyle tanımladığınız işleyici, Yapılandırılmış Akış sorgusu aracılığıyla her kayıt işlendiğinde çalışır.
transformWithStateile uygulanan durum bilgisi olan uygulamaların çoğunda, çekirdek mantık handleInputRowskullanılarak tanımlanır.
İşlenen her mikro toplu iş güncelleştirmesi için, belirli bir gruplandırma anahtarı için mikro toplu işteki tüm kayıtlar yineleyici kullanılarak kullanılabilir. Kullanıcı tanımlı mantık, geçerli mikrobatch ve statestore'daki değerlerden tüm kayıtlarla etkileşime geçebilir.
Program zamanlanmış olayları
Belirli bir koşuldan geçen süreye göre özel mantık uygulamak için zamanlayıcıları kullanabilirsiniz.
bir handleExpiredTimer yöntemi uygulayarak zamanlayıcılarla çalışırsınız.
Bir gruplandırma anahtarında zamanlayıcılar, zaman damgaları tarafından benzersiz olarak tanımlanır.
Süreölçerin süresi dolduğunda, sonuç uygulamanızda uygulanan mantık tarafından belirlenir. Yaygın desenler şunlardır:
- Durum değişkeninde depolanan bilgileri yayma.
- Depolanan durum bilgilerini silme.
- Yeni zamanlayıcı oluşturuluyor.
Süresi dolan zamanlayıcılar, ilişkili anahtarlarıyla ilgili mikro toplu işlemde herhangi bir kayıt işlenmese bile tetiklenir.
Zaman modelini belirtme
StatefulProcessor'ı transformWithState'e geçirirken, zaman modelini belirtmeniz gerekir. Aşağıdaki seçenekler desteklenir:
ProcessingTimeEventTime-
NoTimeveyaTimeMode.None()
NoTime belirtilmesi, zamanlayıcıların işlemciniz için desteklenmediği anlamına gelir.
Yerleşik zamanlayıcı değerleri
Databricks, durum bilgisi olan özel uygulamanızda sistem saatinin çağrılmasını önermemektedir, çünkü bu, görev hatası durumunda güvenilir olmayan yeniden denemelere yol açabilir. İşleme süresine veya filigrana erişmeniz gerektiğinde TimerValues sınıfındaki yöntemleri kullanın:
TimerValues |
Açıklama |
|---|---|
getCurrentProcessingTimeInMs |
İşlem süresinin zaman damgasını mevcut toplu işlem için Unix epoch'undan beri milisaniye cinsinden döndürür. |
getCurrentWatermarkInMs |
Geçerli toplu iş için filigranın zaman damgasını, Unix dönemi başlangıcından itibaren milisaniye cinsinden geri döndürüyor. |
Not
İşlem süresi, mikro toplu işlemin Apache Spark tarafından işlenme süresini açıklar. Kafka gibi birçok akış kaynağı sistem işleme süresini de içerir.
Akış sorgularında filigranlar genellikle olay zamanına veya akış kaynağının işleme süresine göre tanımlanır. veri işleme eşiklerini denetlemek için filigranları uygulama bölümüne bakın.
transformWithStateile hem filigranlar hem de pencereler birlikte kullanılabilir. TTL, zamanlayıcılar ve MapState veya ListState işlevselliğinden yararlanarak özel durum bilgisi olan uygulamanızda benzer işlevler uygulayabilirsiniz.
Durum yaşam süresi (TTL) nedir?
transformWithState tarafından kullanılan her durum değeri, isteğe bağlı bir yaşam süresi (TTL) belirtimini destekler. TTL'nin süresi bittiğinde, değer durum deposundan çıkarılır. TTL yalnızca durum deposundaki değerlerle etkileşim kurar, yani durum bilgilerini çıkarmak için mantık uygulayabilirsiniz, ancak TTL durum değerlerini çıkardıkça mantığı doğrudan tetikleyemezsiniz.
Önemli
TTL uygulamazsanız, durumun sonsuz şekilde büyümesini önlemek için başka bir mantık kullanarak durum temizleme işlemi yapmanız gerekir.
TTL her durum değeri için, her durum türüne göre farklı kurallarla uygulanır.
- Durum değişkenleri, gruplandırma anahtarları ile sınırlıdır.
-
ValueStatenesneler için gruplandırma anahtarı başına yalnızca tek bir değer depolanır. TTL bu değer için geçerlidir. -
ListStatenesneler için liste birçok değer içerebilir. TTL, listedeki her değere bağımsız olarak uygulanır. -
MapStatenesneler için her eşleme anahtarının ilişkili bir durum değeri vardır. TTL, bir eşlemedeki her anahtar-değer çiftine bağımsız olarak uygulanır.
Durum bilgileri güncelleştirilirse tüm durum türleri için TTL sıfırlanır.
Not
TTL'nin kapsamı bir ListStateiçindeki tek tek değerler olarak belirlenmiş olsa da, listedeki bir değeri güncelleştirmenin tek yolu, put değişkeninin tüm içeriğinin üzerine yazmak için ListState yöntemini kullanmaktır.
Zamanlayıcılar ile TTL arasındaki fark nedir?
Durum değişkenleri için süreölçerler ile yaşam süresi (TTL) arasında bazı çakışmalar vardır, ancak zamanlayıcılar TTL'den daha geniş bir özellik kümesi sağlar.
TTL, kullanıcı tarafından belirtilen süre boyunca güncelleştirilmemiş durum bilgilerini çıkartır. Bu, kullanıcıların denetlenmeyen durum büyümesini engellemesine ve eski durum girdilerini kaldırmasına olanak tanır. Haritalar ve listeler her değer için TTL uyguladığından, TTL'yi ayarlayarak yalnızca yakın zamanda güncelleştirilmiş olan durum değerlerini dikkate alan işlevler uygulayabilirsiniz.
Zamanlayıcılar, durum temizlemenin ötesinde kayıtları yayınlamak da dahil olmak üzere özel mantık tanımlamanıza olanak sağlar. İsteğe bağlı olarak, belirli bir durum değeri için durum bilgilerini temizlemek için zamanlayıcıları kullanabilirsiniz. Bu ek esneklik, değerleri yayma veya zamanlayıcıyı temel alan diğer koşullu mantığı tetikleme esnekliği sağlar.