Important
この機能は パブリック プレビュー段階です。
リアルタイム モードを使用すると、エンドツーエンドの待機時間が 5 ミリ秒という超低待機時間のストリーミングが可能になり、不正行為の検出やリアルタイムのパーソナル化などの運用ワークロードに最適です。 このチュートリアルでは、簡単な例を使用して、初めてのリアルタイム ストリーミング クエリを設定する方法について説明します。
リアルタイム モード、使用するタイミング、およびサポートされている機能の概念については、「 構造化ストリーミングのリアルタイム モード」を参照してください。
必要条件
- クラシック コンピューティングを作成するためのアクセス許可がある。
- Databricks Runtime 17.1 以降 (リアルタイム モードで
display関数を使用するために必要)。
注
クラシック コンピューティング作成特権がない場合は、ワークスペース管理者に問い合わせて、手順 1 の構成を使用してリアルタイム モード クラスターを作成してください。
手順 1: リアルタイム モードのクラシック コンピューティングを作成する
リアルタイム モードでは、超低待機時間を実現するために、特定のクラシック コンピューティング構成が必要です。 これらの設定により、タスクはすべてのステージで同時に実行され、データはバッチではなく、到着した時点で継続的に処理されます。
正しく構成されたクラシック コンピューティングを作成するには:
Azure Databricks ワークスペースで、サイドバーの [ コンピューティング ] をクリックします。
[コンピュートの作成] をクリックします。
名前を入力してください。
Databricks Runtime 17.1 以降を選択します。
Clear Photon アクセラレーション (リアルタイム モードでは Photon はサポートされていません)。
[ 自動スケールを有効にする] をオフにします (リアルタイム モードでは、固定クラスター サイズが必要です)。
[ 高度なパフォーマンス] で、[ スポット インスタンスの使用 ] をオフにします (スポット インスタンスによって中断が発生する可能性があります)。
[ 詳細オプション ] をクリックして、追加の設定を展開します。
[ アクセス モード] で、[ 専用] (旧称: シングル ユーザー) を選択します。
Spark 構成で、次の構成を追加します。
spark.databricks.streaming.realTimeMode.enabled trueコンピュート作成をクリックします。
手順 2: ノートブックを作成する
ノートブックは、ストリーミング クエリを開発およびテストするための対話型環境を提供します。 このノートブックを使用して、リアルタイム クエリを記述し、結果の更新を継続的に確認します。
ノートブックを作成するには:
- サイドバーで[ 新規 ]をクリックし、[ ノートブック]をクリックします。
- コンピューティング ドロップダウン メニューで、手順 1 で作成したコンピューティングを選択します。
- 既定の言語として Python または Scala を選択します。
手順 3: リアルタイム モードのクエリを実行する
次のコードをコピーしてノートブック セルに貼り付けて実行します。 この例では、指定したレートで行を生成し、結果をリアルタイムで表示するレート ソースを使用します。
注
display トリガーを持つrealTime関数は、Databricks Runtime 17.1 以降で使用できます。
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
コードを実行すると、新しい行が生成されるとリアルタイムで更新されるテーブルが表示されます。 テーブルには、 timestamp 列と、各行でインクリメントされる value 列が表示されます。
コードについて
上記のコードは、リアルタイム ストリーミング クエリの重要なコンポーネントを示しています。 次の表では、キー パラメーターとその制御内容について説明します。
Python
| パラメーター | 説明 |
|---|---|
format("rate") |
構成可能なレートで行を生成する組み込みソースであるレート ソースを使用します。 これは、外部依存関係のないテストに役立ちます。 |
numPartitions |
生成されたデータのパーティションの数を設定します。 |
rowsPerSecond |
1 秒あたりに生成される行の数を制御します。 |
realTime="5 minutes" |
リアルタイム モードを有効にします。 間隔は、クエリ チェックポイントの進行状況を指定します。 間隔が長いほど、チェックポイント処理の頻度は低くなりますが、障害後の復旧時間が長くなる可能性があります。 |
outputMode="update" |
リアルタイム モードでは、更新出力モードが必要です。 |
Scala
| パラメーター | 説明 |
|---|---|
format("rate") |
構成可能なレートで行を生成する組み込みソースであるレート ソースを使用します。 これは、外部依存関係のないテストに役立ちます。 |
numPartitions |
生成されたデータのパーティションの数を設定します。 |
rowsPerSecond |
1 秒あたりに生成される行の数を制御します。 |
Trigger.RealTime() |
既定のチェックポイント間隔でリアルタイム モードを有効にします。
Trigger.RealTime("5 minutes")などの間隔を指定することもできます。 |
OutputMode.Update() |
リアルタイム モードでは、更新出力モードが必要です。 |
あなたが見ているもの
クエリを実行すると、 display 関数は、レート ソースが新しい行を生成するときにリアルタイムで更新するテーブルを作成します。 各行には次のものが含まれます。
- timestamp: レート ソースによって行が生成された時刻
- value: 新しい行ごとにインクリメントされる単調に増加するカウンター
このテーブルは、待機時間を最小限に抑えて継続的に更新され、リアルタイム モードで使用可能になるとすぐにデータがどのように処理されるかを示します。 これは、リアルタイム モードの主な利点です。バッチ処理を待機するのではなく、データをすぐに表示して操作する機能です。
学習した内容
初めてのリアルタイム ストリーミング クエリが正常に設定され、実行されました。 これで、次の方法がわかります。
- リアルタイム モードに必要な設定を使用してクラシック コンピューティングを構成する (専用クラスター、Photon が無効、自動スケールが無効、Spark 構成)
-
realTimeトリガーを使用してリアルタイム処理を有効にする - 対話型の開発とテストに
display関数を使用する - 継続的な更新を監視して、クエリがリアルタイム モードで実行されていることを確認する
Kafka、Kinesis、およびその他のサポートされているソースを使用して、実稼働リアルタイム パイプラインを構築する準備ができました。 構造化ストリーミングの詳細については、「構造化ストリーミングの 概念」を参照してください。
次のステップ
最初のリアルタイム クエリを実行したので、次のリソースを調べて運用ストリーミング アプリケーションを構築します。
- リアルタイム モードの例 - Kafka のソースとシンク、ステートフル クエリ、集計、カスタム シンクの作業コード例
- リアルタイム モード リファレンス - クラスターのサイズ設定、サポートされるオペレーター、監視、および機能の制限について説明します
- ステートフル ストリーミング アプリケーション - 重複除去、集計、ウィンドウ化のための状態管理をストリーミング クエリに追加する
-
高度な状態管理 - time to Live (TTL) と複雑なロジックを使用したカスタム ステートフル処理に
transformWithStateを使用する