次の方法で共有


構造化ストリーミングのリアルタイム モード

リアルタイム モードは、構造化ストリーミングのトリガーの種類であり、エンドツーエンドの待機時間が 5 ミリ秒という非常に短い待機時間のデータ処理を可能にします。 不正行為の検出、リアルタイムのパーソナル化、インスタントな意思決定システムなど、ストリーミング データへの即時対応を必要とする運用ワークロードには、リアルタイム モードを使用します。

リアルタイム モードは、Databricks Runtime 16.4 LTS 以降で使用できます。 詳細なセットアップ手順については、「 リアルタイム モードの使用を開始する」を参照してください。 コード例については、 リアルタイム モードの例を参照してください。

リアルタイム モードとは

運用ワークロードと分析ワークロード

ストリーミング ワークロードは、分析ワークロードと運用ワークロードに大きく分けることができます。

  • 分析ワークロードでは、データインジェストと変換が使用されます。通常は medallion アーキテクチャに従います (たとえば、ブロンズ、シルバー、ゴールドの各テーブルにデータを取り込みます)。
  • 運用ワークロードは、リアルタイム データを使用し、ビジネス ロジックを適用し、ダウンストリームのアクションまたは決定をトリガーします。

運用ワークロードの例を次に示します。

  • 異常な場所、大きなトランザクション サイズ、迅速な支出パターンなどの要因に基づいて、不正行為スコアがしきい値を超えた場合に、クレジット カード トランザクションをリアルタイムでブロックまたはフラグ設定します。
  • クリックストリーム データで、ユーザーがジーンズを 5 分間閲覧していることを示すプロモーション メッセージを配信し、今後 15 分間に購入した場合は 25% 割引を提供します。

一般に、運用ワークロードは、2 秒未満のエンド ツー エンド待機時間の必要性によって特徴付けられます。 これは、Apache Spark 構造化ストリーミングのリアルタイム モードで実現できます。

リアルタイム モードで低待機時間を実現する方法

リアルタイム モードでは、次の方法で実行アーキテクチャが向上します。

  • 長時間バッチを実行する (既定値は 5 分) では、システムはソースでデータが利用可能になり次第、そのデータを処理します。
  • クエリのすべてのステージを同時にスケジュールする。 これには、使用可能なタスク スロットの数が、バッチ内のすべてのステージのタスクの数以上である必要があります。
  • データが生成されるとすぐに、ストリーミングシャッフルを使用してステージ間で渡します。

バッチの処理が終了し、次のバッチが開始される前に、Structured Streaming チェックポイントが進行し、メトリックが発行されます。 バッチ期間は、チェックポイント処理の頻度に影響します。

  • 長いバッチ: チェックポイント処理の頻度が低くなります。つまり、障害発生時の再生時間が長くなり、メトリックの可用性が遅れることを意味します。
  • 短いバッチ: チェックポイント処理の頻度が高くなり、待機時間に影響する可能性があります。

Databricks では、ターゲット ワークロードに対してリアルタイム モードをベンチマークして、適切なトリガー間隔を見つけることをお勧めします。

リアルタイム モードを使用する場合

ユース ケースで必要な場合は、リアルタイム モードを選択します。

  • 秒未満の待機時間: リアルタイムでトランザクションをブロックする必要がある不正検出システムなど、ミリ秒以内にデータに応答する必要があるアプリケーション。
  • 運用上の意思決定: リアルタイムのオファー、アラート、通知など、受信データに基づいて即時アクションをトリガーするシステム。
  • 継続的処理: 定期的なバッチではなく、到着するとすぐにデータを処理する必要があるワークロード。

次の場合は、マイクロバッチ モード (既定の構造化ストリーミング トリガー) を使用します。

  • 分析処理: ETL パイプライン、データ変換、およびメダリオン アーキテクチャの実装。待機時間の要件は秒単位または分単位で測定されます。
  • コストの最適化: リアルタイム モードでは専用のコンピューティング リソースが必要であるため、サブ秒の待機時間が不要なワークロード。
  • チェックポイントの頻度は重要です。より高速な回復のために、より頻繁なチェックポイント処理の恩恵を受けるアプリケーション。

要件と構成

リアルタイム モードには、コンピューティングのセットアップとクエリの構成に固有の要件があります。 このセクションでは、リアルタイム モードを使用するために必要な前提条件と構成手順について説明します。

前提条件

リアルタイム モードを使用するには、次の要件を満たす必要があります。

  • Databricks Runtime 16.4 LTS 以降: リアルタイム モードは、DBR 16.4 LTS 以降のバージョンでのみ使用できます。
  • 専用コンピューティング: 専用 (以前のシングル ユーザー) コンピューティングを使用する必要があります。 Standard (以前の共有)、Lakeflow Spark 宣言パイプライン、およびサーバーレス クラスターはサポートされていません。
  • 自動スケールなし: 自動スケールを無効にする必要があります。
  • フォトンなし: リアルタイム モードでは、Photon アクセラレーションはサポートされていません。
  • Spark の構成: spark.databricks.streaming.realTimeMode.enabledtrue に設定する必要があります。

コンピューティング構成

次の設定でコンピューティングを構成します。

  • spark.databricks.streaming.realTimeMode.enabledを Spark 構成でtrueに設定します。
  • 自動スケールを無効にします。
  • Photon アクセラレーションを無効にします。
  • コンピューティングが (標準、Lakeflow Spark 宣言パイプライン、またはサーバーレスではなく) 専用クラスターとして構成されていることを確認します。

リアルタイム モード用のコンピューティングの作成と構成の詳細な手順については、「リアルタイム モード の使用を開始する」を参照してください。

クエリの構成

リアルタイム モードでクエリを実行するには、リアルタイム トリガーを有効にする必要があります。 リアルタイム トリガーは、更新モードでのみサポートされます。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

コンピューティングのサイズ設定

コンピューティングに十分なタスク スロットがある場合は、コンピューティング リソースごとに 1 つのリアルタイム ジョブを実行できます。

低待機時間モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージのタスク数以上である必要があります。

スロット計算の例

パイプラインの種類 コンフィギュレーション 必要なスロット
単一ステージステートレス (Kafka ソース + シンク) maxPartitions = 8 8 スロット
2段階のステートフル (Kafkaソース + シャッフル) maxPartitions = 8、シャッフル パーティション = 20 28 スロット (8 + 20)
3段階 (Kafka ソース + シャッフル + リパーティション) maxPartitions = 8、各 20 の 2 つのシャッフル ステージ 48 スロット (8 + 20 + 20)

maxPartitions設定しない場合は、Kafka トピックのパーティションの数を使用します。

主な考慮事項

コンピューティングを構成するときは、次の点を考慮してください。

  • マイクロバッチ モードとは異なり、リアルタイム タスクはデータの待機中にアイドル状態を維持できるため、リソースの無駄を避けるためには適切なサイズ設定が不可欠です。
  • 次の調整により、目標使用率レベル (たとえば、50%) を目指します。
    • maxPartitions (Kafka の場合)
    • spark.sql.shuffle.partitions (シャッフル ステージの場合)
  • Databricks では、オーバーヘッドを削減するために、各タスクが複数の Kafka パーティションを処理するように、 maxPartitions を設定することをお勧めします。
  • 単純な 1 段階のジョブのワークロードに合わせて、ワーカーごとのタスク スロットを調整します。
  • シャッフルが多いジョブの場合、バックログを回避するためのシャッフルパーティションの最小数を実験で見つけ、その結果に基づいて調整してください。 十分なスロットがない場合、コンピューターはジョブをスケジュールしません。

Note

Databricks Runtime 16.4 LTS 以降では、すべてのリアルタイム パイプラインでチェックポイント v2 が使用されます。これにより、リアルタイムモードとマイクロバッチ モードをシームレスに切り替えることができます。

最適化手法

リアルタイム モードで待機時間を短縮するには、次の手法を使用します。

  • 非同期進行状況の追跡: オフセットへの書き込みとコミット ログを非同期スレッドに移動し、ステートレス クエリのバッチ間時間を短縮します。
  • 非同期状態チェックポイント処理: 状態のチェックポイント処理を待たずに、計算が完了するとすぐに次のマイクロバッチの処理を開始し、ステートフル クエリの待機時間を短縮します。

Note

どちらの手法も既定では有効になっていません。 個別に有効にする必要があります。

監視と可観測性

クエリ パフォーマンスの測定は、リアルタイム ワークロードに不可欠です。 リアルタイム モードでは、従来のバッチ期間メトリックは実際の待機時間を反映しないため、別の方法が必要です。

エンドツーエンドの待機時間はワークロード固有であり、ビジネス ロジックでのみ正確に測定できる場合があります。 たとえば、ソース タイムスタンプが Kafka で出力される場合、Kafka の出力タイムスタンプとソース タイムスタンプの差として待機時間を計算できます。

また、以下で説明する組み込みのメトリックと API を使用して、エンドツーエンドの待機時間を見積もることもできます。

組み込みのメトリック StreamingQueryProgress

ドライバー ログに自動的に記録される StreamingQueryProgress イベントには、次のメトリックが含まれています。 StreamingQueryListeneronQueryProgress()コールバック関数を使用してアクセスすることもできます。 QueryProgressEvent.json() またはtoString() には追加のリアルタイムモードのメトリックが含まれています。

  1. 処理待機時間 (processingLatencyMs)。 リアルタイム モードのクエリがレコードを読み取ったときと、クエリが次のステージまたはダウンストリームにレコードを書き込むまでの経過時間。 単一ステージ クエリの場合、これは E2E 待機時間と同じ期間を測定します。 システムは、タスクごとにこのメトリックを報告します。
  2. ソース キューの待機時間 (sourceQueuingLatencyMs)。 システムがメッセージ バスにレコードを書き込むまでの経過時間 (Kafka のログの追加時間など) と、リアルタイム モード クエリが最初にレコードを読み取る時間。 システムは、タスクごとにこのメトリックを報告します。
  3. E2E 待機時間 (e2eLatencyMs)。 システムがメッセージ バスにレコードを書き込み、リアルタイム モードクエリがレコードをダウンストリームに書き込むまでの時間。 システムは、すべてのタスクによって処理されたすべてのレコードにわたって、バッチごとにこのメトリックを集計します。

例えば次が挙げられます。

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Observe API を使用したカスタム待機時間の測定

Observe API は、別のジョブを起動せずに待機時間を測定するのに役立ちます。 ソース データの到着時間に近いソース タイムスタンプがある場合は、Observe API を使用して各バッチの待機時間を見積もることができます。 シンクに到達する前にタイムスタンプを渡します。

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

この例では、エントリを出力する前に現在のタイムスタンプが記録され、このタイムスタンプとレコードのソース タイムスタンプの差を計算することによって待機時間が見積もられます。 結果は進行状況レポートに含まれており、リスナーが利用できるように提供されます。 出力例を次に示します。

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

機能のサポートと制限事項

このセクションでは、サポートされている機能と、互換性のある環境、言語、ソース、シンク、演算子、特定の機能に関する特別な考慮事項など、リアルタイム モードの現在の制限について説明します。

サポートされている環境、言語、モード

サポートされている言語: リアルタイム モードでは、Scala、Java、Python がサポートされます。

サポートされているコンピューティングの種類:

コンピューティングの種類 Supported
専用 (以前: シングル ユーザー)
Standard (以前: 共有) ✓ (Python のみ)
Lakeflow Spark 宣言型パイプライン クラシック サポートしていません
Lakeflow Spark 宣言パイプライン サーバーレス サポートしていません
Serverless サポートしていません

サポートされている実行モード:

実行モード Supported
更新モード
追加モード サポートしていません
完全モード サポートしていません

ソースとシンクのサポート

ソースまたはシンク ソースとして シンクとして
Apache Kafka
Event Hubs (Kafka コネクタを使用)
Kinesis ✓ (EFO モードのみ) サポートしていません
AWS MSK サポートしていません
Delta サポートしていません サポートしていません
Google Pub/Sub (グーグルパブサブ) サポートしていません サポートしていません
Apache Pulsar サポートしていません サポートしていません
任意のシンク ( forEachWriterを使用) 適用なし

サポートされている演算子

Operators Supported
ステートレス操作
Selection
Projection
UDF
Scalaユーザー定義関数 (UDF) ✓ (いくつかの制限あり)
Python ユーザー定義関数(UDF) ✓ (いくつかの制限あり)
集計
sum
count
max
min
avg
集計関数
ウィンドウ処理
Tumbling
Sliding
Session サポートしていません
Deduplication (重複除去)
dropDuplicates ✓ (状態は無制限です)
ウォーターマーク内の重複を削除する サポートしていません
Stream - テーブル結合
ブロードキャスト テーブル (小さい必要があります)
ストリーム - ストリーム結合 サポートしていません
(フラット)MapGroupsWithState サポートしていません
transformWithState ✓ (いくつかの違いあり)
UNION ✓ (いくつかの制限あり)
forEach
forEachBatch サポートしていません
mapPartitions サポートされていません (制限を参照)

特別な考慮事項

一部の演算子と機能は、リアルタイム モードで使用する場合、特定の考慮事項や違いがあります。

transformWithState のリアルタイム モード

カスタムステートフル アプリケーションを構築するために、Databricks は Apache Spark Structured Streaming の API である transformWithState をサポートします。 API とコード スニペットの詳細については、「 カスタム ステートフル アプリケーションの構築 」を参照してください。

ただし、リアルタイム モードでの API の動作と、マイクロバッチ アーキテクチャを利用する従来のストリーミング クエリには、いくつかの違いがあります。

  • リアルタイム モードでは、各行に対して handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) メソッドが呼び出されます。
    • inputRows反復子は 1 つの値を返します。 マイクロバッチ モードでは、キーごとに 1 回呼び出され、 inputRows 反復子はマイクロ バッチ内のキーのすべての値を返します。
    • コードを記述するときは、この違いに注意する必要があります。
  • イベント時間タイマーは、リアルタイム モードではサポートされていません。
  • リアルタイム モードでは、データ到着に応じてタイマーの起動が遅れます。
    • タイマーが 10:00:00 にスケジュールされていても、データが到着しない場合、タイマーはすぐに起動しません。
    • データが 10:00:10 に到着すると、タイマーは 10 秒の遅延で起動します。
    • データが到着せず、実行時間の長いバッチが終了している場合は、バッチが終了する前にタイマーが起動します。

リアルタイム モードの Python UDF

Databricks では、Python ユーザー定義関数 (UDF) の大部分がリアルタイム モードでサポートされています。

カテゴリ UDF の種類 Supported
ステートレス Python スカラー UDF (ユーザー定義スカラー関数 - Python)
ステートレス 矢印スカラー UDF
ステートレス Pandas スカラー UDF (pandas ユーザー定義関数)
ステートレス Arrow 関数 (mapInArrow)
ステートレス Pandas 関数 (マップ)
ステートフル グループ化 (UDAF) transformWithState (Row インターフェイスのみ)
ステートフル グループ化 (UDAF) applyInPandasWithState サポートしていません
ステートフルでないグループ化 (UDAF) apply サポートしていません
ステートフルでないグループ化 (UDAF) applyInArrow サポートしていません
ステートレスグルーピング (UDAF) applyInPandas サポートしていません
テーブル関数 UDTF (Python ユーザー定義テーブル関数 (UDF)) サポートしていません
テーブル関数 UC UDF サポートしていません

Python UDF をリアルタイム モードで使用する場合は、いくつかの点を考慮する必要があります。

  • 待機時間を最小限に抑えるには、方向バッチ サイズ (spark.sql.execution.arrow.maxRecordsPerBatch) を 1 に構成します。
    • トレードオフ: この構成は、スループットを犠牲にして待機時間を最適化します。 ほとんどのワークロードでは、この設定をお勧めします。
    • バッチ サイズを増やすのは、入力ボリュームに対応するためにより高いスループットが必要な場合に限り、待機時間の増加の可能性を受け入れます。
  • Pandas UDF と関数は、矢印バッチ サイズが 1 の場合、うまく機能しません。
    • pandas UDF または関数を使用する場合は、矢印バッチ サイズを高い値 (100 以上など) に設定します。
    • これは待機時間が長くなることに注意してください。 Databricks では、可能であれば、Arrow UDF または関数を使用することをお勧めします。
  • pandas のパフォーマンスの問題により、transformWithState は Row インターフェイスでのみサポートされます。

Limitations

ソースの制限事項

Kinesis の場合、リアルタイム モードではポーリング モードはサポートされません。 さらに、頻繁な再パーティション分割は待機時間に悪影響を与える可能性があります。

共用体の制限事項

Union 演算子には、いくつかの制限があります。

  • リアルタイム モードでは、自己結合はサポートされていません。
    • Kafka: 同じソース データ フレーム オブジェクトと、そこから派生したデータ フレームを共用体で使用することはできません。 回避策: 同じソースから読み取るさまざまな DataFrame を使用します。
    • Kinesis: 同じ構成の同じ Kinesis ソースから派生したデータ フレームを結合することはできません。 回避策: 異なる DataFrame を使用するだけでなく、各 DataFrame に異なる 'consumerName' オプションを割り当てることができます。
  • リアルタイム モードでは、Union の前に定義されているステートフル演算子 ( aggregatededuplicatetransformWithStateなど) はサポートされていません。
  • リアルタイムモードでは、バッチソースとの結合はサポートされていません。

MapPartitions の制限事項

mapPartitions Scala と同様の Python API (mapInPandasmapInArrow) では、入力パーティション全体の反復子を受け取り、入力と出力の間に任意のマッピングを持つ出力全体の反復子を生成します。 これらの API は、出力全体をブロックすることでストリーミング Real-Time モードでパフォーマンスの問題を引き起こす可能性があるため、待機時間が長くなります。 これらの API のセマンティクスは、ウォーターマークの伝達を適切にサポートしていません。

同様の機能を実現するには、スカラー UDF と 複雑なデータ型の変換 または filter を組み合わせて使用します。

次のステップ

リアルタイム モードの内容と構成方法を理解したら、次のリソースを調べて、リアルタイム ストリーミング アプリケーションの実装を開始します。