Important
Lakeflow Spark 宣言型パイプラインのリアルタイム モードは、プレビュー チャネルの Databricks Runtime 18.1.2 で パブリック プレビュー段階にあります。
リアルタイム モードでは、超低待機時間のデータ処理が可能になり、エンド ツー エンドの待機時間は 5 ミリ秒と短くなります。 不正行為の検出やリアルタイムのパーソナル化など、ストリーミング データへの即時対応を必要とする運用ワークロードには、リアルタイム モードを使用します。
リアルタイム モードは、パイプラインの外部の構造化ストリーミングでも直接使用できます。 構造化ストリーミングのリアルタイム モードを参照してください。
リアルタイム モードで低待機時間を実現する方法
リアルタイム モードは、次の 3 つの重要な方法で標準の連続処理とは異なります。
- 実行時間の長いバッチ: システムは、実行時間の長いバッチ内でソースで使用できるようになるとデータを処理します (既定値は 5 分です)。
- 同時ステージ スケジューリング: すべてのクエリ ステージが同時にスケジュールされます。 コンピューティング リソースには、すべてのステージを同時にカバーするのに十分な使用可能なタスク スロットが必要です。 「 コンピューティングのサイズ設定」を参照してください。
- ストリーミング シャッフル: データは、ダウンストリーム ステージを開始する前にアップストリーム ステージが完了するのを待つのではなく、生成されるとすぐにステージ間で渡されます。
チェックポイント間隔 ( pipelines.trigger.interval を使用して構成) は、永続的ストレージに状態とソースのオフセットを保持する頻度を制御します。 間隔が長いほどチェックポイント処理のオーバーヘッドは軽減されますが、失敗後の復旧時間が長くなり、メトリックの報告が遅れます。 間隔を短くすると持続性が向上しますが、オーバーヘッドが増加します。
リアルタイム モードと継続的パイプライン
リアルタイム モードは、特殊な種類の連続トリガーです。 継続的モードは引き続き必要です。リアルタイム モードでは、フロー レベルの待機時間の最適化が上に追加されます。 リアルタイム モードを使用するには、パイプラインを最初に連続モードで実行する必要があります。 その後、リアルタイム モードでは、フロー レベルで追加の最適化が適用され、標準の連続処理を超える 2 秒未満の待機時間が実現されます。
リアルタイム モードを有効にするには、次の 3 つの構成手順が必要です。
- パイプラインを連続モードに設定します。
- パイプライン レベルでリアルタイム モードを有効にします。
- リアルタイムの更新フローを定義します。
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | SDP プレビュー チャネルの 18.1.2 |
| コンピューティングの種類 | クラシック コンピューティングまたはサーバーレス |
リアルタイム モードを構成する
手順 1: パイプラインを連続モードに設定する
パイプライン設定で、[ パイプライン モード ] を [継続的] に設定するか、パイプライン JSON で設定します。
{
"continuous": true
}
手順 2: パイプライン レベルでリアルタイム モードを有効にする
パイプラインの設定で、[ 詳細] > Spark 構成の Spark 構成に次のキーを追加します。
spark.databricks.streaming.realTimeMode.enabled = true
これは、パイプライン JSON で設定することもできます。
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
手順 3: リアルタイム更新フローを定義する
リアルタイム モードでは、更新フローが必要です。
dp.create_sink()を使用して出力先を定義し、次に@dp.update_flowデコレーターを使用して、pipelines.triggerを"RealTime"に設定し、targetでシンクを指定します。
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
フロー レベルの構成パラメーター:
| パラメーター | 必須 | デフォルト | Description |
|---|---|---|---|
pipelines.trigger |
はい | — | このフローのリアルタイム モードを有効にするには、 "RealTime" に設定します。 |
pipelines.trigger.interval |
いいえ | "5 minutes" |
チェックポイント間隔。 状態とオフセットをコミットする頻度を制御します。 値を短くすると回復性が向上します。長い値を使用すると、オーバーヘッドが削減されます。 |
コード例
Kafka から Kafka へ
Kafka トピックから読み取り、Kafka 出力ターゲットに書き込みます。
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
ブロードキャスト結合を使用したエンリッチ
静的参照テーブルに対して Kafka ストリームを結合します。 ブロードキャスト (ストリームから静的) の結合のみがサポートされます。 ストリーム間結合は、リアルタイム モードではサポートされていません。
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
集計
ステートフル groupByを使用して、キーによってイベントをカウントします。 ステートフル操作の入力パーティション数と一致するように spark.sql.shuffle.partitions を設定します。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
サポートされているソースとシンク
| コネクタ | ソースとして | シンクとして | 注記 |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Kafka 互換インターフェイスを使用します。 |
| Azure Event Hubs (Kafka コネクタ) | ✓ | ✓ | Kafka 互換インターフェイスを使用します。 |
| Amazon Kinesis | ✓ | サポートしていません | EFO(Enhanced Fan-Out)モード専用です。 |
| Delta | サポートしていません | サポートしていません | — |
計算リソースのサイズ設定
コンピューティングに十分なタスク スロットがある場合は、コンピューティング リソースごとに 1 つのリアルタイム パイプラインを実行できます。 使用可能なタスク スロットは、すべてのクエリ ステージのすべてのタスクをカバーする必要があります。
| パイプラインの種類 | Configuration | 必要なタスク スロット |
|---|---|---|
| 単一ステージステートレス (Kafka ソース + シンク) |
maxPartitions = 8 |
8 |
| 2段階のステートフル (Kafkaソース + シャッフル) |
maxPartitions = 8、シャッフル パーティション = 20 |
28 (8 + 20) |
| 3段階(Kafkaソース + 2回のシャッフル) |
maxPartitions = 8、各 20 の 2 つのシャッフル ステージ |
48 (8 + 20 + 20) |
maxPartitions設定しない場合は、Kafka トピックのパーティションの数を使用します。
オペレーターのサポート
| カテゴリ | Operator | Supported |
|---|---|---|
| ステートレス | 選択、射影 | ✓ |
| UDFs | Scalaユーザー定義関数 (UDF) | ✓ (制限あり) |
| UDFs | Python ユーザー定義関数(UDF) | ✓ (制限あり) |
| 集計 | 合計、件数、最大、最小、平均 | ✓ |
| Windowing | タンブリング、スライディング | ✓ |
| Windowing | Session | サポートしていません |
| 重複除去 (Deduplication) | dropDuplicates |
✓ (無制限の状態) |
| 重複除去 (Deduplication) | dropDuplicatesWithinWatermark |
サポートしていません |
| Joins | ブロードキャスト テーブルの結合 | ✓ |
| Joins | ストリーム間結合 | サポートしていません |
| Custom | transformWithState |
✓ (動作の違いあり) |
| Custom | union |
✓ (制限あり) |
| Custom | forEach |
サポートしていません |
| Custom | flatMapGroupsWithState |
サポートしていません |
| Custom | mapPartitions |
サポートしていません |
| Custom | forEachBatch |
サポートしていません |
transformWithState リアルタイムモード
transformWithState は、マイクロバッチ処理とは次の違いがあり、リアルタイム モードでサポートされています。
-
handleInputRowsは、バッチごとにキーごとに 1 回ではなく、行ごとに 1 回呼び出されます。inputRows反復子は、呼び出しごとに 1 つの値を生成します。 - イベント時間タイマーはサポートされていません。 処理時間タイマーは、データが到着していない場合に実行時間の長いバッチが終了したときに発生します。
-
transformWithStateInPandasはサポートされません。
リアルタイム モードでの Pandas UDFs
pandas UDF で待機時間を最小限に抑えるには、 spark.sql.execution.arrow.maxRecordsPerBatch を 1 に設定します。 これにより、スループットを犠牲にして待機時間が最適化されます。 スループットも重要な場合は、この値を 100 以上に設定します。
リアルタイム モードのパフォーマンスを監視する
リアルタイム モードでは、StreamingQueryProgress フィールドのlatenciesで待機時間メトリックが公開されます。 これらのメトリックには、 StreamingQueryListener 経由でアクセスするか、ストリーミング クエリの lastProgress プロパティを調べます。
| メトリック | Description |
|---|---|
processingLatencyMs |
レコードがフローによって読み取られ、フローによって完全に処理されるまでの時間 |
sourceQueuingLatencyMs |
レコードがメッセージ バスに正常に書き込まれるまでの時間 (Kafka のログの追加時刻など) と、フローによって最初に読み取られた時刻 |
e2eLatencyMs |
レコードがソースで生成されたときからフローによって完全に処理されたときまでのエンドツーエンドの待機時間の合計 |
各メトリックは、p50、p90、p95、および p99 パーセンタイルとして報告されます。
制限事項
パイプラインごとに 1 つのリアルタイム フローを使用することをお勧めします。 複数のフローが許可されますが、フロー間のタスク スロットの競合により待機時間が長くなります。
演算子とソースの制限事項の完全な一覧については、 リアルタイム モードの制限事項を参照してください。