Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
Bu özellik Genel Önizleme aşamasındadır.
Bu sayfa, durum bilgisi olmayan basit dönüştürmelerden özel durum yönetimiyle karmaşık durum bilgisi işlemeye kadar Yapılandırılmış Akış'taki gerçek zamanlı mod sorguları için çalışma kodu örnekleri sağlar. Yapılandırılmış Akış'ta gerçek zamanlı mod ile ilgili kavramlar ve yapılandırma için ve gerçek zamanlı mod kullanımı başlangıç kılavuzu için uygulamalı öğreticiye bakın.
Önkoşullar
Bu sayfadaki örnekleri çalıştırmak için şunlar gerekir:
- Yapılandırılan ve çalışan gerçek zamanlı mod kümesi. Adım adım kurulum yönergeleri için bkz. Gerçek zamanlı modu kullanmaya başlama .
- Databricks Runtime 16.4 LTS veya üzeri.
- Desteklenen akış kaynaklarına ve havuzlarına erişim:
- Kafka örnekleri için: Giriş/çıkış konularının yapılandırıldığı bir Kafka aracısı
- Kinesis örnekleri için: AWS kimlik bilgileri ve Gelişmiş Fan-Out (EFO) modu için yapılandırılmış bir Kinesis akışı
- Özel havuz örnekleri için: Yapılandırılan hedef veritabanı veya hizmet (sağlanan örnek için PostgreSQL)
- Yapılandırılmış Akış kavramları hakkında temel bilgiler. Akış konusunda yeniyseniz bkz. Yapılandırılmış Akış kavramları .
Uyarı
Örneklerde , broker_addressve input_topicgibi checkpoint_locationyer tutucu değerleri kullanılır. Kodu çalıştırmadan önce bunları gerçek yapılandırma değerlerinizle değiştirin.
Durum bilgisi olmayan sorgu örnekleri
Durumsuz sorgular, her bir kaydı bağımsız olarak işlerken kayıtlar arasında herhangi bir durum bilgisi korumaz. Bu sorgular genellikle daha basittir ve durum bilgisi olan sorgulardan daha düşük gecikme süresine sahiptir çünkü durum depolama alanını yönetmeleri veya arama gerçekleştirmeleri gerekmez. Dönüştürmeler, filtreleme, statik verilerle birleştirmeler ve yönlendirme işlemleri için durum bilgisi olmayan sorgular kullanın.
Kafka kaynağı-Kafka havuzu
Bu örnekte, bir Kafka kaynağından okur ve bir Kafka havuzuna yazarsınız.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Repartition
Bu örnekte, kafka kaynağından okuma, verileri 20 bölüme yeniden bölümleme ve kafka havuzuna yazma.
Geçerli uygulama sınırlaması nedeniyle, yeniden bölümlemesi kullanmadan önce Spark yapılandırmasını spark.sql.execution.sortBeforeRepartition olarak false ayarlamanız gerekir.
Python
# Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Stream-snapshot join (yalnızca yayın)
Bu örnekte Kafka'dan okur, verileri statik bir tabloyla birleştirir ve kafka havuzuna yazarsınız. Yalnızca statik tabloyu yayınlayan akış statik birleşimleri desteklenir; bu da statik tablonun belleğe sığması gerektiği anlamına gelir.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `static_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kinesis kaynağı-Kafka havuzu
Bu örnekte, bir Kinesis kaynağından okuyorsunuz ve Kafka havuzuna yazıyorsunuz.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("partitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
Bu örnekte, iki farklı konudan iki Kafka DataFrame'i birleştirilir ve bir Kafka havuzuna yazarsınız.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Durum bilgisi olan sorgu örnekleri
Durum bilgisi olan sorgular kayıtlarda durum bilgilerini korur ve yinelenenleri kaldırma, toplama ve pencereleme gibi işlemleri etkinleştirir. Bu sorgular, zaman içinde veya birden çok olayda izleme bilgileri gerektiren kullanım örnekleri için gereklidir. Gerçek zamanlı mod, mikro toplu iş moduyla aynı semantike sahip durum bilgisi olan işlemleri destekler, ancak daha düşük gecikme süresi için verileri sürekli işler. Durum bilgisi olan sorgular, durum bilgisi olmayan sorgulardan daha fazla bellek ve işlem kaynağı gerektirir çünkü durumu korumaları ve güncelleştirmeleri gerekir.
Deduplication
Bu örnekte kayıtları timestamp ve value sütunlarına göre yineleneceklerden ayıklayacaksınız.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Bu örnekte, kayıtları timestamp ve value kriterlerine göre gruplandırıp varlıklarını sayıyorsunuz.
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Birleştirme ile gruplama
Bu örnekte, önce iki farklı konudan iki Kafka DataFrame'i bir araya getirip bir toplama yaparsınız. Sonunda Kafka havuzuna yazarsınız.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
transformWithState
Bu örnekte, transformWithState kullanarak TTL (yaşam süresi) ile özel durumu korursunuz. İşlemci, her anahtar için görülen kayıt sayısını sayar.
Python
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Scala
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Uyarı
Gerçek zamanlı mod ile Yapılandırılmış Akış'taki diğer yürütme modlarının StatefulProcessor içinde transformWithState nasıl çalıştığı arasında bir fark vardır. Bkz. TransformWithState'i gerçek zamanlı modda kullanma.
Geliştirme ve test
Etkileşimli geliştirme için ekran kullanma
gerçek zamanlı akış verilerini doğrudan bir not defterinde görselleştirmek için işlevini kullanabilirsiniz display . Bu, dış veri depoları veya üretim altyapısı kurmadan, gerçek zamanlı moddaki sorgularda etkileşimli geliştirme, test ve hata ayıklama için kullanışlıdır.
display Tetikleyicili realTime işlev Databricks Runtime 17.1 ve üzerinde kullanılabilir. Kafka veya özel havuzlarla üretime dağıtmadan önce sorgu mantığınızı ve veri dönüşümlerinizi doğrulamak için geliştirme sırasında kullanın display . ile displayhız kaynağının kullanıldığı eksiksiz bir örnek için bkz. Gerçek zamanlı modu kullanmaya başlama.
Görüntüleme hızı kaynağı
Bu örnekte, bir hız kaynağından okur ve akış DataFrame'i bir not defterinde görüntülersiniz.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
Özel havuz örnekleri
Yerleşik Yapılandırılmış Akış desteği olmayan hedeflere akış verileri yazmanız gerektiğinde, özel yazma mantığı uygulamak için kullanın foreachSink . Özel havuzlar, verilerin nasıl yazılması konusunda tam denetim sahibi olmanıza olanak tanıyarak herhangi bir veritabanı, API veya depolama sistemiyle tümleştirmenize olanak sağlar. Aşağıdaki örnekte, JDBC kullanarak postgreSQL veritabanına yazma işlemi gösterilmektedir.
foreachSink kullanarak PostgreSQL'e yazma
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Sonraki Adımlar
Bu gerçek zamanlı mod örneklerini incelediğinize göre, bilginizi derinleştirmeye ve üretime hazır akış uygulamaları oluşturmaya yönelik kaynaklar aşağıda verilmiştir:
- Gerçek zamanlı mod başvurusu - Gerçek zamanlı mod kavramları, yapılandırma seçenekleri, desteklenen özellikler ve sınırlamalar hakkında bilgi edinin
- Gerçek zamanlı modu kullanmaya başlama - Kümenizi yapılandırmak ve ilk gerçek zamanlı sorgunuzu çalıştırmak için adım adım yönergeleri izleyin
- Yapılandırılmış Akış kavramları - Databricks'te Yapılandırılmış Akış'ın temel kavramlarını öğrenin
- Durum bilgisi olan uygulamalar - transformWithState ve özel durum yönetimi desenlerine derinlemesine bakış