중요합니다
이 기능은 공개 미리보기 단계에 있습니다.
이 페이지에서는 간단한 상태 비저장 변환에서 사용자 지정 상태 관리를 사용한 복잡한 상태 저장 처리에 이르기까지 구조적 스트리밍의 실시간 모드 쿼리에 대한 작업 코드 예제를 제공합니다. 개념 및 구성 에 대한 구조적 스트리밍의 실시간 모드 를 참조하고 실습 자습서의 실시간 모드를 시작 하세요.
사전 요구 사항
이 페이지에서 예제를 실행하려면 다음이 필요합니다.
- 실시간 모드 클러스터가 구성되고 실행됩니다. 실시간 모드 시작을 위한 단계별 설정 지침은 참조하세요.
- Databricks Runtime 16.4 LTS 이상.
- 지원되는 스트리밍 원본 및 싱크에 대한 액세스:
- Kafka 예제의 경우: 입력/출력 토픽이 구성된 Kafka broker
- Kinesis 예제: AWS 자격 증명 및 EFO(고급 Fan-Out) 모드로 구성된 Kinesis 스트림
- 사용자 지정 싱크 예제의 경우: 구성된 대상 데이터베이스 또는 서비스(제공된 예제의 경우 PostgreSQL)
- 구조적 스트리밍 개념에 대한 기본적인 숙지. 스트리밍 을 접하는 경우 구조적 스트리밍 개념을 참조하세요.
메모
이 예제에서는 자리 표시자 값(예: broker_address, input_topic및 checkpoint_location.)을 사용합니다. 코드를 실행하기 전에 이러한 값을 실제 구성 값으로 바꿉 있습니다.
무상태 쿼리 예제
상태 비지정 쿼리는 레코드 간에 상태를 유지하지 않고 각 레코드를 독립적으로 처리합니다. 이러한 쿼리는 일반적으로 더 간단하고 상태 저장 쿼리보다 대기 시간이 짧습니다. 상태 스토리지를 관리하거나 조회를 수행할 필요가 없기 때문입니다. 변환, 필터링, 정적 데이터 조인 및 라우팅 작업에는 상태 저장되지 않은 쿼리를 사용합니다.
Kafka 소스에서 Kafka 싱크로
이 예제에서는 Kafka 원본에서 읽고 Kafka 싱크에 씁니다.
파이썬
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
다시 분할
이 예제에서는 Kafka 원본에서 읽고, 데이터를 20개의 파티션으로 다시 분할하고, Kafka 싱크에 씁니다.
현재 구현 제한으로 인해 재분할을 사용하기 전에 Spark 구성 요소 spark.sql.execution.sortBeforeRepartition을 false로 설정해야 합니다.
파이썬
# Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
스트림 스냅샷 조인(브로드캐스트만 해당)
이 예제에서는 Kafka에서 읽고, 데이터를 정적 테이블과 조인하고, Kafka 싱크에 씁니다. 정적 테이블을 브로드캐스트하는 스트림 정적 조인만 지원됩니다. 즉, 정적 테이블이 메모리에 맞아야 합니다.
파이썬
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `static_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kafka 싱크에 대한 Kinesis 소스
이 예제에서는 Kinesis 원본에서 읽고 Kafka 싱크에 씁니다.
파이썬
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("partitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
이 예제에서는 서로 다른 두 항목의 두 Kafka DataFrame을 통합하고 Kafka 싱크에 씁니다.
파이썬
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
상태 저장 쿼리 예제
상태 저장 쿼리는 레코드 간에 상태 정보를 유지하여 중복 제거, 집계 및 윈도우링을 비롯한 작업을 수행할 수 있도록 합니다. 이러한 쿼리는 시간이 지남에 따라 또는 여러 이벤트에서 정보를 추적해야 하는 사용 사례에 필수적입니다. 실시간 모드는 마이크로 배치 모드와 동일한 의미 체계를 사용하여 상태 저장 작업을 지원하지만 대기 시간을 낮추기 위해 데이터를 지속적으로 처리합니다. 상태 저장 쿼리는 상태를 유지 관리하고 업데이트해야 하므로 상태 비지정 쿼리보다 더 많은 메모리 및 컴퓨팅 리소스가 필요합니다.
Deduplication
이 예제에서는 timestamp 열 및 value 열에 따라 레코드의 중복을 제거합니다.
파이썬
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
이 예제에서는 레코드를 그룹화하여 timestampvalue발생 횟수를 계산합니다.
파이썬
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
집계와 통합된 구조체
이 예제에서는 먼저 두 개의 서로 다른 항목에서 두 개의 Kafka DataFrame을 통합한 다음 집계를 수행합니다. 결국 Kafka 데이터 저장소에 기록합니다.
파이썬
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
transformWithState
이 예제에서는 TTL(수명 시간)을 사용하여 사용자 지정 상태를 유지합니다 transformWithState. 프로세서는 각 키에 대해 표시되는 레코드 수를 계산합니다.
파이썬
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Scala
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
메모
구조적 스트리밍에서 실시간 모드가 StatefulProcessor을/를 실행하는 방식과 다른 실행 모드에서 transformWithState을/를 실행하는 방식에는 차이가 있습니다.
실시간 모드에서 transformWithState 사용을 참조하세요.
개발 및 테스트
대화형 개발에 디스플레이 사용
이 함수를 display 사용하여 Notebook에서 직접 실시간 스트리밍 데이터를 시각화할 수 있습니다. 이는 외부 싱크 또는 프로덕션 인프라를 설정하지 않고 실시간 모드 쿼리를 대화형으로 개발, 테스트 및 디버깅하는 데 유용합니다.
트리거가 display 있는 realTime 함수는 Databricks Runtime 17.1 이상에서 사용할 수 있습니다. 개발 중에 Kafka 또는 사용자 지정 싱크를 사용하여 프로덕션에 배포하기 전에 쿼리 논리 및 데이터 변환을 확인하는 데 사용합니다 display . 속도 원본 display을 사용하는 전체 예제는 실시간 모드 시작을 참조하세요.
표시 속도 원본
이 예제에서는 속도 원본에서 데이터를 읽고 Notebook에 스트리밍되는 DataFrame을 표시합니다.
파이썬
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
사용자 지정 싱크 예제
기본 제공 구조적 스트리밍 지원이 없는 대상에 스트리밍 데이터를 작성해야 하는 경우 사용자 지정 쓰기 논리를 구현하는 데 사용합니다 foreachSink . 사용자 지정 싱크를 사용하면 데이터를 작성하는 방법을 완전히 제어할 수 있으므로 모든 데이터베이스, API 또는 스토리지 시스템과 통합할 수 있습니다. 다음 예제에서는 JDBC를 사용하여 PostgreSQL 데이터베이스에 쓰는 방법을 보여 줍니다.
foreachSink를 사용하여 PostgreSQL에 쓰기
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
다음 단계
이제 이러한 실시간 모드 예제를 살펴보았으므로 지식을 심화하고 프로덕션 준비 스트리밍 애플리케이션을 빌드하기 위한 리소스는 다음과 같습니다.
- 실시간 모드 참조 - 실시간 모드 개념, 구성 옵션, 지원되는 기능 및 제한 사항에 대해 알아봅니다.
- 실시간 모드 시작 - 단계별 지침에 따라 클러스터를 구성하고 첫 번째 실시간 쿼리를 실행합니다.
- 구조적 스트리밍 개념 - Databricks에서 구조적 스트리밍의 기본 개념 알아보기
- 상태 저장 애플리케이션 - transformWithState 및 사용자 지정 상태 관리 패턴에 대한 심층 분석