次の方法で共有


リアルタイム モードを開始する

Important

この機能は パブリック プレビュー段階です

リアルタイム モードを使用すると、エンドツーエンドの待機時間が 5 ミリ秒という超低待機時間のストリーミングが可能になり、不正行為の検出やリアルタイムのパーソナル化などの運用ワークロードに最適です。 このチュートリアルでは、簡単な例を使用して、初めてのリアルタイム ストリーミング クエリを設定する方法について説明します。

リアルタイム モード、使用するタイミング、およびサポートされている機能の概念については、「 構造化ストリーミングのリアルタイム モード」を参照してください。

必要条件

クラシック コンピューティング作成特権がない場合は、ワークスペース管理者に問い合わせて、手順 1 の構成を使用してリアルタイム モード クラスターを作成してください。

手順 1: リアルタイム モードのクラシック コンピューティングを作成する

リアルタイム モードでは、超低待機時間を実現するために、特定のクラシック コンピューティング構成が必要です。 これらの設定により、タスクはすべてのステージで同時に実行され、データはバッチではなく、到着した時点で継続的に処理されます。

正しく構成されたクラシック コンピューティングを作成するには:

  1. Azure Databricks ワークスペースで、サイドバーの [ コンピューティング ] をクリックします。

  2. [コンピュートの作成] をクリックします。

  3. 名前を入力してください。

  4. Databricks Runtime 17.1 以降を選択します。

  5. Clear Photon アクセラレーション (リアルタイム モードでは Photon はサポートされていません)。

  6. [ 自動スケールを有効にする] をオフにします (リアルタイム モードでは、固定クラスター サイズが必要です)。

  7. [ 高度なパフォーマンス] で、[ スポット インスタンスの使用 ] をオフにします (スポット インスタンスによって中断が発生する可能性があります)。

  8. [ 詳細オプション ] をクリックして、追加の設定を展開します。

  9. [ アクセス モード] で、[ 専用] (旧称: シングル ユーザー) を選択します。

  10. Spark 構成で、次の構成を追加します。

    spark.databricks.streaming.realTimeMode.enabled true
    
  11. コンピュート作成をクリックします。

手順 2: ノートブックを作成する

ノートブックは、ストリーミング クエリを開発およびテストするための対話型環境を提供します。 このノートブックを使用して、リアルタイム クエリを記述し、結果の更新を継続的に確認します。

ノートブックを作成するには:

  1. サイドバーで[ 新規 ]をクリックし、[ ノートブック]をクリックします。
  2. コンピューティング ドロップダウン メニューで、手順 1 で作成したコンピューティングを選択します。
  3. 既定の言語として Python または Scala を選択します。

手順 3: リアルタイム モードのクエリを実行する

次のコードをコピーしてノートブック セルに貼り付けて実行します。 この例では、指定したレートで行を生成し、結果をリアルタイムで表示するレート ソースを使用します。

display トリガーを持つrealTime関数は、Databricks Runtime 17.1 以降で使用できます。

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

コードを実行すると、新しい行が生成されるとリアルタイムで更新されるテーブルが表示されます。 テーブルには、 timestamp 列と、各行でインクリメントされる value 列が表示されます。

コードについて

上記のコードは、リアルタイム ストリーミング クエリの重要なコンポーネントを示しています。 次の表では、キー パラメーターとその制御内容について説明します。

Python

パラメーター 説明
format("rate") 構成可能なレートで行を生成する組み込みソースであるレート ソースを使用します。 これは、外部依存関係のないテストに役立ちます。
numPartitions 生成されたデータのパーティションの数を設定します。
rowsPerSecond 1 秒あたりに生成される行の数を制御します。
realTime="5 minutes" リアルタイム モードを有効にします。 間隔は、クエリ チェックポイントの進行状況を指定します。 間隔が長いほど、チェックポイント処理の頻度は低くなりますが、障害後の復旧時間が長くなる可能性があります。
outputMode="update" リアルタイム モードでは、更新出力モードが必要です。

Scala

パラメーター 説明
format("rate") 構成可能なレートで行を生成する組み込みソースであるレート ソースを使用します。 これは、外部依存関係のないテストに役立ちます。
numPartitions 生成されたデータのパーティションの数を設定します。
rowsPerSecond 1 秒あたりに生成される行の数を制御します。
Trigger.RealTime() 既定のチェックポイント間隔でリアルタイム モードを有効にします。 Trigger.RealTime("5 minutes")などの間隔を指定することもできます。
OutputMode.Update() リアルタイム モードでは、更新出力モードが必要です。

あなたが見ているもの

クエリを実行すると、 display 関数は、レート ソースが新しい行を生成するときにリアルタイムで更新するテーブルを作成します。 各行には次のものが含まれます。

  • timestamp: レート ソースによって行が生成された時刻
  • value: 新しい行ごとにインクリメントされる単調に増加するカウンター

このテーブルは、待機時間を最小限に抑えて継続的に更新され、リアルタイム モードで使用可能になるとすぐにデータがどのように処理されるかを示します。 これは、リアルタイム モードの主な利点です。バッチ処理を待機するのではなく、データをすぐに表示して操作する機能です。

学習した内容

初めてのリアルタイム ストリーミング クエリが正常に設定され、実行されました。 これで、次の方法がわかります。

  • リアルタイム モードに必要な設定を使用してクラシック コンピューティングを構成する (専用クラスター、Photon が無効、自動スケールが無効、Spark 構成)
  • realTime トリガーを使用してリアルタイム処理を有効にする
  • 対話型の開発とテストに display 関数を使用する
  • 継続的な更新を監視して、クエリがリアルタイム モードで実行されていることを確認する

Kafka、Kinesis、およびその他のサポートされているソースを使用して、実稼働リアルタイム パイプラインを構築する準備ができました。 構造化ストリーミングの詳細については、「構造化ストリーミングの 概念」を参照してください。

次のステップ

最初のリアルタイム クエリを実行したので、次のリソースを調べて運用ストリーミング アプリケーションを構築します。