次の方法で共有


チュートリアル: リアルタイム ストリーミング ワークロードを実行する

リアルタイム モードを使用すると、エンドツーエンドの待機時間が 5 ミリ秒という超低待機時間のストリーミングが可能になり、不正行為の検出やリアルタイムのパーソナル化などの運用ワークロードに最適です。 このチュートリアルでは、簡単な例を使用して、初めてのリアルタイム ストリーミング クエリを設定する方法について説明します。

リアルタイム モード、使用するタイミング、およびサポートされている機能の概念については、「 構造化ストリーミングのリアルタイム モード」を参照してください。 構成要件については、「 リアルタイム モードの設定」を参照してください。

必要条件

開始する前に、「 リアルタイム モードのセットアップ」で指定した構成を使用するクラシック コンピューティング クラスターを作成するためのアクセス許可があることを確認します。 または、ワークスペース管理者に連絡して、リアルタイム モード クラスターを作成してください。

手順 1: ノートブックを作成する

ノートブックは、ストリーミング クエリを開発およびテストするための対話型環境を提供します。 このノートブックを使用して、リアルタイム クエリを記述し、結果の更新を継続的に確認します。

ノートブックを作成するには:

  1. サイドバーで[ 新規 ]をクリックし、[ ノートブック]アイコンをクリックします。ノートブック
  2. コンピューティング ドロップダウン メニューで、リアルタイム モード クラスターを選択します。
  3. 既定の言語として Python または Scala を選択します。

手順 2: リアルタイム モードのクエリを実行する

次のコードをコピーしてノートブック セルに貼り付けて実行します。 この例では、指定したレートで行を生成し、結果をリアルタイムで表示するレート ソースを使用します。

display トリガーを持つrealTime関数は、Databricks Runtime 17.1 以降で使用できます。

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

コードを実行すると、新しい行が生成されるとリアルタイムで更新されるテーブルが表示されます。 テーブルには、 timestamp 列と、各行でインクリメントされる value 列が表示されます。

コードについて

上記のコードは、リアルタイム ストリーミング クエリの重要なコンポーネントを示しています。 次の表では、キー パラメーターとその制御内容について説明します。

Python

パラメーター 説明
format("rate") 構成可能なレートで行を生成する組み込みソースであるレート ソースを使用します。 これは、外部依存関係のないテストに役立ちます。
numPartitions 生成されたデータのパーティションの数を設定します。
rowsPerSecond 1 秒あたりに生成される行の数を制御します。
realTime="5 minutes" リアルタイム モードを有効にします。 間隔は、クエリ チェックポイントの進行状況を指定します。 間隔が長いほど、チェックポイント処理の頻度は低くなりますが、障害後の復旧時間が長くなる可能性があります。
outputMode="update" リアルタイム モードでは、更新出力モードが必要です。

Scala

パラメーター 説明
format("rate") 構成可能なレートで行を生成する組み込みソースであるレート ソースを使用します。 これは、外部依存関係のないテストに役立ちます。
numPartitions 生成されたデータのパーティションの数を設定します。
rowsPerSecond 1 秒あたりに生成される行の数を制御します。
Trigger.RealTime() 既定のチェックポイント間隔でリアルタイム モードを有効にします。 Trigger.RealTime("5 minutes")などの間隔を指定することもできます。
OutputMode.Update() リアルタイム モードでは、更新出力モードが必要です。

手順 3: 結果を検証する

クエリを実行すると、 display 関数は、レート ソースが新しい行を生成するときにリアルタイムで更新するテーブルを作成します。 各行には次のものが含まれます。

  • レート ソースによって行が生成されたときのタイムスタンプ。
  • 新しい行ごとにインクリメントされる単調に増加するカウンター。

このテーブルは、待機時間を最小限に抑えて継続的に更新され、リアルタイム モードで使用可能になるとすぐにデータがどのように処理されるかを示します。 これは、リアルタイム モードの主な利点です。バッチ処理を待機するのではなく、データをすぐに表示して操作する機能です。

その他のリソース

最初のリアルタイム クエリを実行したので、これらのリソースを調べて、Kafka、Kinesis、およびその他のサポートされているソースを使用して実稼働ストリーミング アプリケーションを構築します。