次の方法で共有


リアルタイム モードを設定する

このページでは、Structured Streaming でリアルタイム モード クエリを実行するために必要な前提条件と構成について説明します。 詳細なチュートリアルについては、「 チュートリアル: リアルタイム ストリーミング ワークロードを実行する」を参照してください。 リアルタイム モードの概念については、「 構造化ストリーミングのリアルタイム モード」を参照してください。

前提 条件

リアルタイム モードを使用するには、次の要件を満たすようにコンピューティングを構成する必要があります。

  • クラシック コンピューティングでは専用アクセス モードを使用します。 標準アクセス モード、Lakeflow Spark 宣言パイプライン、およびサーバーレス クラスターはサポートされていません。
  • Databricks Runtime 16.4 LTS 以降を使用します。
  • 自動スケールをオフにします。
  • Photon をオフにします。
  • spark.databricks.streaming.realTimeMode.enabledtrue に設定します。
  • 中断を回避するためにスポット インスタンスをオフにします。

クラシック コンピューティングの作成と構成の手順については、「 コンピューティング構成リファレンス」を参照してください。

クエリの構成

リアルタイム モードでクエリを実行するには、リアルタイム トリガーを有効にする必要があります。 リアルタイム トリガーは、更新モードでのみサポートされます。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

コンピューティングのサイズ設定

コンピューティングに十分なタスク スロットがある場合は、コンピューティング リソースごとに 1 つのリアルタイム ジョブを実行できます。

低待機時間モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージのタスク数以上である必要があります。

スロット計算の例

パイプラインの種類 コンフィギュレーション 必要なスロット
単一ステージステートレス (Kafka ソース + シンク) maxPartitions = 8 8 スロット
2段階のステートフル (Kafkaソース + シャッフル) maxPartitions = 8、シャッフル パーティション = 20 28 スロット (8 + 20)
3段階 (Kafka ソース + シャッフル + リパーティション) maxPartitions = 8、各 20 の 2 つのシャッフル ステージ 48 スロット (8 + 20 + 20)

maxPartitions設定しない場合は、Kafka トピックのパーティションの数を使用します。

その他のリソース