Important
この機能は パブリック プレビュー段階です。
このページでは、構造化ストリーミングのトリガーの種類であるリアルタイム モードについて説明します。これにより、エンドツーエンドの待機時間が 5 ミリ秒という非常に短い待機時間のデータ処理が可能になります。 このモードは、ストリーミング データへの即時応答を必要とする運用ワークロード向けに設計されています。
リアルタイム モードは、Databricks Runtime 16.4 LTS 以降で使用できます。
運用ワークロード
ストリーミング ワークロードは、分析ワークロードと運用ワークロードに大きく分けることができます。
- 分析ワークロードでは、データインジェストと変換が使用されます。通常は medallion アーキテクチャに従います (たとえば、ブロンズ、シルバー、ゴールドの各テーブルにデータを取り込みます)。
- 運用ワークロードは、リアルタイム データを使用し、ビジネス ロジックを適用し、ダウンストリームのアクションまたは決定をトリガーします。
運用ワークロードの例を次に示します。
- 異常な場所、大きなトランザクション サイズ、迅速な支出パターンなどの要因に基づいて、不正行為スコアがしきい値を超えた場合に、クレジット カード トランザクションをリアルタイムでブロックまたはフラグ設定します。
- クリックストリーム データで、ユーザーがジーンズを 5 分間閲覧していることを示すプロモーション メッセージを配信し、今後 15 分間に購入した場合は 25% 割引を提供します。
一般に、運用ワークロードは、秒未満のエンド ツー エンド待機時間が必要な場合に特徴付けられます。 これは、Apache Spark 構造化ストリーミングのリアルタイム モードで実現できます。
リアルタイム モードで低待機時間を実現する方法
リアルタイム モードでは、次の方法で実行アーキテクチャが向上します。
- 実行時間の長いバッチ (既定値は 5 分) を実行します。このバッチでは、ソースで使用できるようになるとデータが処理されます。
- クエリのすべてのステージが同時にスケジュールされます。 これには、使用可能なタスク スロットの数が、バッチ内のすべてのステージのタスクの数以上である必要があります。
- データは、ストリーミング シャッフルを使用して生成されるとすぐにステージ間で渡されます。
バッチの処理が終了し、次のバッチが開始される前に、Structured Streaming チェックポイントが進行し、最後のバッチのメトリックが使用可能になります。 バッチが長い場合、これらのアクティビティの頻度が低くなり、失敗した場合の再生が長くなり、メトリックの可用性が遅れる可能性があります。 一方、バッチが小さい場合、これらのアクティビティはより頻繁になり、待機時間に影響を与える可能性があります。 Databricks では、ターゲット ワークロードと要件に対してリアルタイム モードをベンチマークして、適切なトリガー間隔を見つけることをお勧めします。
クラスターの構成
構造化ストリーミングでリアルタイム モードを使用するには、クラシック Lakeflow ジョブを構成する必要があります。
Azure Databricks ワークスペースで、左上隅にある [ 新規 ] をクリックします。 [ その他 ] を選択し、[ クラスター] をクリックします。
[Photon アクセラレーション] を消去します。
自動スケールの有効化を解除する。
[ 高度なパフォーマンス] で、[ スポット インスタンスの使用] をオフにしてください。
[ 詳細 モードと アクセス モード] で、[ 手動 ] をクリックし、[ 専用] (以前の場合: シングル ユーザー) を選択します。
[Spark] で、Spark 構成の下に次のように入力します。
spark.databricks.streaming.realTimeMode.enabled trueCreate をクリックしてください。
クラスター サイズの要件
クラスターに十分なタスク スロットがある場合は、クラスターごとに 1 つのリアルタイム ジョブを実行できます。
低待機時間モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージのタスク数以上である必要があります。
スロット計算の例
単一ステージのステートレス パイプライン (Kafka ソース + シンク):
maxPartitions = 8 の場合は、少なくとも 8 つのスロットが必要です。 maxPartitions が設定されていない場合は、Kafka トピック パーティションの数を使用します。
2 ステージのステートフル パイプライン (Kafka ソース + シャッフル):
maxPartitions = 8、シャッフル パーティション = 20 の場合は、8 + 20 = 28 スロットが必要です。
3 ステージのパイプライン (Kafka ソース + シャッフル + 再パーティション分割):
maxPartitions = 8、2 つのシャッフル ステージがそれぞれ 20 の場合は、8 + 20 + 20 = 48 スロットが必要です。
主な考慮事項
クラスターを構成するときは、次の事項を考慮してください。
- マイクロバッチ モードとは異なり、リアルタイム タスクはデータの待機中にアイドル状態を維持できるため、リソースの無駄を避けるためには適切なサイズ設定が不可欠です。
- 次の調整により、目標使用率レベル (例: 50%) を目指します。
-
maxPartitions(Kafka の場合) -
spark.sql.shuffle.partitions(シャッフル ステージの場合)
-
- Databricks では、オーバーヘッドを削減するために、各タスクが複数の Kafka パーティションを処理するように maxPartitions を設定することをお勧めします。
- 単純な 1 段階のジョブのワークロードに合わせて、ワーカーごとのタスク スロットを調整します。
- シャッフルが多いジョブの場合、バックログを回避するためのシャッフルパーティションの最小数を実験で見つけ、その結果に基づいて調整してください。 クラスターに十分なスロットがない場合、ジョブはスケジュールされません。
Note
Databricks Runtime 16.4 LTS 以降では、すべてのリアルタイム パイプラインでチェックポイント v2 が使用されます。これにより、リアルタイムモードとマイクロバッチ モードをシームレスに切り替えることができます。
クエリの構成
待機時間の短いモードを使用してクエリを実行するように指定するには、リアルタイム トリガーを有効にする必要があります。 さらに、リアルタイム トリガーは更新モードでのみサポートされます。 例えば次が挙げられます。
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
以前は、エンド ツー エンドのクエリ待機時間はバッチ期間に密接に関連付けられていたため、バッチ期間はクエリ待機時間の適切なインジケーターでした。 ただし、この方法はリアルタイム モードでは適用されなくなり、待機時間を測定するための別の方法が必要になります。 エンドツーエンドの待機時間はワークロード固有であり、ビジネス ロジックでのみ正確に測定できる場合があります。 たとえば、ソース タイムスタンプが Kafka で出力される場合、Kafka の出力タイムスタンプとソース タイムスタンプの差として待機時間を計算できます。
ストリーミング プロセス中に収集された部分的な情報に基づいて、複数の方法でエンドツーエンドの待機時間を見積もることができます。
StreamingQueryProgress を使用する
ドライバー ログに自動的に記録される StreamingQueryProgress イベントには、次のメトリックが含まれています。
StreamingQueryListenerのonQueryProgress()コールバック関数を使用してアクセスすることもできます。
QueryProgressEvent.json() またはtoString() には追加のリアルタイムモードのメトリックが含まれています。
- 処理待機時間 (processingLatencyMs)。 リアルタイム モード クエリがレコードを読み取ってから、次のステージまたはダウンストリームに書き込まれるまでの経過時間。 単一ステージ クエリの場合、これは E2E 待機時間と同じ期間を測定します。 このメトリックは、タスクごとに報告されます。
- ソース キューの待機時間 (sourceQueuingLatencyMs)。 レコードがメッセージ バスに正常に書き込まれるまでの時間 (Kafka のログの追加時間など) と、レコードがリアルタイム モード クエリによって最初に読み取られた時間。 このメトリックは、タスクごとに報告されます。
- E2E 待機時間 (e2eLatencyMs)。 レコードがメッセージ バスに正常に書き込まれるときから、リアルタイム モード クエリによってレコードがダウンストリームに書き込まれるまでの時間。 このメトリックは、すべてのタスクによって処理されるすべてのレコードにわたってバッチごとに集計されます。
例えば次が挙げられます。
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
ジョブで Observe API を使用する
Observe API は、別のジョブを起動せずに待機時間を測定するのに役立ちます。 ソース データの到着時間に近いソース タイムスタンプがあり、シンクに到達する前に渡される場合、またはタイムスタンプを渡す方法が見つかる場合は、Observe API を使用して各バッチの待機時間を見積もることができます。
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
この例では、エントリを出力する前に現在のタイムスタンプが記録され、このタイムスタンプとレコードのソース タイムスタンプの差を計算することによって待機時間が見積もられます。 結果は進行状況レポートに含まれており、リスナーが利用できるように提供されます。 出力例を次に示します。
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
サポートされる内容
Environments
| クラスターの種類 | Supported |
|---|---|
| 専用 (以前: シングル ユーザー) | Yes |
| Standard (以前: 共有) | No |
| Lakeflow Spark 宣言型パイプライン クラシック | No |
| Lakeflow Spark 宣言パイプライン サーバーレス | No |
| Serverless | No |
Languages
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
実行モード
| 実行モード | Supported |
|---|---|
| 更新モード | Yes |
| 追加モード | No |
| 完全モード | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Eventhub (Kafka コネクタを使用) | Yes |
| Kinesis | はい (EFO モードのみ) |
| Google Pub/Sub (グーグルパブサブ) | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Eventhub (Kafka コネクタを使用) | Yes |
| Kinesis | No |
| Google Pub/Sub (グーグルパブサブ) | No |
| Apache Pulsar | No |
| 任意のシンク (forEachWriter を使用) | Yes |
Operators
| Operators | Supported |
|---|---|
| ステートレス操作 | |
|
Yes |
|
Yes |
| UDFs | |
|
はい (いくつかの制限あり) |
|
はい (いくつかの制限あり) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| 集計関数 | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
はい (状態は無制限です) |
|
No |
| ストリーム - テーブル結合 | |
|
Yes |
| ストリーム - ストリーム結合 | No |
| (フラット)MapGroupsWithState | No |
| transformWithState | はい (いくつかの違いがあります) |
| UNION | はい (いくつかの制限あり) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | いいえ (制限を参照) |
transformWithState をリアルタイム モードで使用する
カスタムステートフル アプリケーションを構築するために、Databricks は Apache Spark Structured Streaming の API である transformWithState をサポートします。 API とコード スニペットの詳細については、「 カスタム ステートフル アプリケーションの構築 」を参照してください。
ただし、リアルタイム モードでの API の動作と、マイクロバッチ アーキテクチャを利用する従来のストリーミング クエリには、いくつかの違いがあります。
- リアルタイム モードの
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)のメソッドは、各行に対して呼び出されます。-
inputRows反復子は 1 つの値を返します。 マイクロ バッチ モードでは、キーごとに 1 回呼び出され、inputRows反復子はマイクロ バッチ内のキーのすべての値を返します。 - コードを記述するときは、この違いを認識する必要があります。
-
- イベント時間タイマーは、リアルタイム モードではサポートされていません。
- リアルタイム モードでは、データ到着に応じてタイマーの起動が遅れます。 それ以外の場合、データがない場合は、実行時間の長いバッチの最後に発生します。 たとえば、10:00:00 にタイマーが作動するはずで、同時にデータが到着しない場合は、タイマーは作動しません。 代わりに、データが 10:00:10 に到着した場合、タイマーは 10秒後に起動されます。 または、データが到着せず、実行時間の長いバッチが終了している場合は、実行時間の長いバッチを終了する前にタイマーを実行します。
Python UDF
Databricks では、Python ユーザー定義関数 (UDF) の大部分がリアルタイム モードでサポートされています。
| UDF の種類 | Supported |
|---|---|
| ステートレス UDF | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| ステートフル グループ化 UDF (UDAF) | |
|
Yes |
|
No |
| 非ステートフル グループ化 UDF (UDAF) | |
|
No |
|
No |
|
No |
| テーブル関数 | |
|
No |
| UC UDF | No |
Python UDF をリアルタイム モードで使用する場合は、いくつかの点を考慮する必要があります。
- 待機時間を最小限に抑えるには、方向バッチ サイズ (spark.sql.execution.arrow.maxRecordsPerBatch) を 1 に構成します。
- トレードオフ: この構成は、スループットを犠牲にして待機時間を最適化します。 ほとんどのワークロードでは、この設定をお勧めします。
- バッチ サイズを増やすのは、入力ボリュームに対応するためにより高いスループットが必要な場合に限り、待機時間の増加の可能性を受け入れます。
- Pandas UDF と関数は、矢印バッチ サイズが 1 の場合、うまく機能しません。
- pandas UDF または関数を使用する場合は、矢印バッチ サイズを高い値 (100 以上など) に設定します。
- これは待機時間が長くなることに注意してください。 Databricks では、可能であれば、方向 UDF または関数を使用することをお勧めします。
- pandas のパフォーマンスの問題により、transformWithState は
Rowインターフェイスでのみサポートされます。
最適化手法
| 手法 | 既定で有効 |
|---|---|
| 非同期進行状況の追跡: 書き込みをオフセット ログに移動し、ログをコミットして非同期スレッドに移動し、2 つのマイクロバッチ間のバッチ間時間を短縮します。 これは、ステートレス ストリーミング クエリの待機時間を短縮するのに役立ちます。 | No |
| 非同期状態のチェックポイント処理: 状態のチェックポイント処理を待たずに、前のマイクロバッチの計算が完了するとすぐに次のマイクロバッチの処理を開始することで、ステートフル ストリーミング クエリの待機時間を短縮するのに役立ちます。 | No |
Limitations
ソースの制限
Kinesis の場合、ポーリング モードはサポートされていません。 さらに、頻繁な再パーティション分割は待機時間に悪影響を与える可能性があります。
共用体の制限
Union の場合、いくつかの制限があります。
- 自己結合はサポートされていません。
- Kafka: 同じソース データ フレーム オブジェクトと、そこから派生したデータ フレームを共用体で使用することはできません。 回避策: 同じソースから読み取るさまざまなデータフレームを使用します。
- Kinesis: 同じ構成の同じ Kinesis ソースから派生したデータ フレームを結合することはできません。 回避策: 異なるデータフレームを使用するだけでなく、各 DataFrame に異なる 'consumerName' オプションを割り当てることができます。
- 結合の前に定義されているステートフル演算子 (
aggregate、deduplicate、transformWithStateなど) はサポートされていません。 - バッチ データ ソースとの結合はサポートされていません。
MapPartitions の制限事項
mapPartitions Scala と同様の Python API (mapInPandas、 mapInArrow) では、入力パーティション全体の反復子を受け取り、入力と出力の間に任意のマッピングを持つ出力全体の反復子を生成します。 これらの API は、出力全体をブロックすることでストリーミング Real-Time モードでパフォーマンスの問題を引き起こす可能性があるため、待機時間が長くなります。 これらの API のセマンティクスは、ウォーターマークの伝達を適切にサポートしていません。
同様の機能を実現するには、スカラー UDF と 複雑なデータ型の変換 または filter を組み合わせて使用します。
Examples
次の例は、サポートされているクエリを示しています。
ステートレス クエリ
単一または複数ステージのステートレス クエリがサポートされます。
Kafka ソースから Kafka シンクへの接続
この例では、Kafka ソースから読み取り、Kafka シンクに書き込みます。
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
パーティション
この例では、Kafka ソースから読み取り、データを 20 個のパーティションに再パーティション分割し、Kafka シンクに書き込みます。
再パーティションを使用する前に、Spark 構成 spark.sql.execution.sortBeforeRepartition を false に設定します。
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
ストリーム・スナップショット・ジョイン (ブロードキャストのみ)
この例では、Kafka から読み取り、データを静的テーブルと結合し、Kafka シンクに書き込みます。 静的テーブルをブロードキャストするストリーム静的結合のみがサポートされていることに注意してください。つまり、静的テーブルはメモリに収まる必要があります。
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kinesis ソースから Kafka シンク
この例では、Kinesis ソースから読み取り、Kafka シンクに書き込みます。
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
この例では、2 つの異なるトピックから 2 つの Kafka DataFrame を結合し、Kafka シンクに書き込みます。
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
ステートフル クエリ
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
集合と集約
この例では、最初に 2 つの異なるトピックから 2 つの Kafka DataFrame を結合してから、集計を行います。 最後に、Kafka シンクに書き込みます。
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
構造化ストリーミングのリアルタイム モードとその他の実行モードでは、StatefulProcessorでtransformWithStateを実行する方法に違いがあります。
リアルタイム モードでの transformWithState の使用を参照してください
TransformWithState (PySpark,Row インターフェイス)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
構造化ストリーミングのリアルタイム モードとその他の実行モードでは、StatefulProcessorでtransformWithStateを実行する方法に違いがあります。
リアルタイム モードでの transformWithState の使用を参照してください
Sinks
foreachSink を使用して Postgres に書き込む
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
この機能は、Databricks Runtime 17.1 以降で使用できます。
レート ソースの表示
この例では、レート ソースから読み取り、ストリーミング DataFrame をノートブックに表示します。
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())