この記事では、Azure Databricks で構造化ストリーミングのトリガー間隔を構成する方法について説明します。
Apache Spark Structured Streaming は、データを段階的に処理します。 トリガー間隔は、構造化ストリーミングが新しいデータをチェックする頻度を制御します。 ほぼリアルタイムの処理、スケジュールされたデータベースの更新、1 日または 1 週間のすべての新しいデータのバッチ処理のトリガー間隔を構成できます。
自動ローダーとは、構造化ストリーミングを使用してデータを読み込むため、トリガーのしくみを理解することで、目的の頻度でデータを取り込みながらコストを制御する最大の柔軟性を提供します。
トリガー モードの概要
次の表は、構造化ストリーミングで使用できるトリガー モードをまとめたものです。
| トリガー モード | 構文の例 (Python) | 最適な対象者 |
|---|---|---|
| 未指定 (既定値) | N/A | 待機時間が 3 ~ 5 秒の汎用ストリーミング。 0 ミリ秒間隔の processingTime トリガーと同じです。 ストリーム処理は、新しいデータが到着する限り継続的に実行されます。 |
| 処理時間 | .trigger(processingTime='10 seconds') |
コストとパフォーマンスのバランスを取る。 システムでデータのチェックが頻繁に行われるのを防ぐことで、オーバーヘッドを軽減します。 |
| 今すぐ利用可能 | .trigger(availableNow=True) |
スケジュールされた増分バッチ処理。 ストリーミング ジョブがトリガーされた時点で使用可能な量のデータを処理します。 |
| リアルタイム モード | .trigger(realTime='5 minutes') |
不正行為の検出やリアルタイムのパーソナル化など、2 秒未満の処理を必要とする超低待機時間の運用ワークロード。 パブリック プレビュー。 '5 分' はマイクロバッチの長さを示します。 クエリのコンパイルなどのバッチごとのオーバーヘッドを最小限に抑えるには、5 分を使用します。 |
| 継続的 | .trigger(continuous='1 second') |
サポートされていません。 これは、Spark OSS に含まれる試験的な機能です。 代わりにリアルタイム モードを使用してください。 |
processingTime: 時間ベースのトリガー間隔
構造化ストリーミングは、時間ベースのトリガー間隔を "固定間隔マイクロバッチ" と言います。
processingTime キーワードを使用して、.trigger(processingTime='10 seconds') のように、期間を文字列として指定します。
この間隔の構成によって、新しいデータが到着したかどうかを確認するためにシステムがチェックを実行する頻度が決まります。 待機時間の要件とデータがソースに到達する速度のバランスを取るために処理時間を構成します。
AvailableNow: 増分バッチ処理
重要
Databricks Runtime 11.3 LTS 以降では、 Trigger.Once は非推奨です。 すべての増分バッチ処理ワークロードに対して Trigger.AvailableNow を使用します。
AvailableNow トリガー オプションでは、使用可能なすべてのレコードが増分バッチとして使用され、maxBytesPerTriggerなどのオプションを使用してバッチ サイズを構成できます。 サイズ設定オプションは、データ ソースによって異なります。
サポートされるデータ ソース
Azure Databricks では、多くの構造化ストリーミング ソースからの増分バッチ処理に Trigger.AvailableNow を使用できます。 次の表に、各データ ソースに必要な Databricks Runtime の最小サポート バージョンを示します。
| ソース | Databricks の最低ランタイム バージョン |
|---|---|
| ファイル ソース (JSON、Parquet など) | 9.1 LTS |
| Delta Lake | 10.4 LTS |
| 自動ローダー | 10.4 LTS |
| Apache Kafka | 10.4 LTS |
| キネシス | 13.1 |
realTime: 超低待機時間の運用ワークロード
重要
この機能は パブリック プレビュー段階です。
構造化ストリーミングのリアルタイム モードでは、末尾が 1 秒未満でエンドツーエンドの待機時間が実現され、一般的な場合は約 300 ミリ秒です。 リアルタイム モードを効果的に構成して使用する方法の詳細については、「 構造化ストリーミングのリアルタイム モード」を参照してください。
Apache Spark には、 継続的処理と呼ばれるトリガー間隔が追加されています。 このモードは、Spark 2.3 以降、試験段階として分類されています。 Azure Databricks では、このモードはサポートされていないか、推奨されていません。 待機時間の短いユース ケースでは、代わりにリアルタイム モードを使用してください。
注
このページの連続処理モードは、 Lakeflow Spark 宣言パイプラインでの連続処理とは無関係です。
実行間のトリガー間隔を変更する
同じチェックポイントを使用しながら、実行間のトリガー間隔を変更できます。
間隔を変更するときの動作
マイクロバッチの処理中に構造化ストリーミング ジョブが停止した場合、そのマイクロバッチは、新しいトリガー間隔が適用される前に完了する必要があります。 その結果、トリガー間隔を変更した後、以前に指定した設定でマイクロバッチ処理が行われる場合があります。 次に、移行時の予期される動作について説明します。
時間ベースの間隔から
AvailableNowへの切り替え: マイクロバッチは、使用可能なすべてのレコードを増分バッチとして処理する前に処理される可能性があります。AvailableNowから時間ベースの間隔への切り替え: 最後のAvailableNowジョブがトリガーされたときに使用可能なすべてのレコードに対して処理が続行される場合があります。 これは正しい動作です。
クエリエラーからの復旧
注
増分バッチに関連付けられているクエリエラーから復旧しようとしている場合、バッチを完了する必要があるため、トリガー間隔を変更してもこの問題は解決しません。 問題の解決を試みるために、バッチの処理に使用されるコンピューティング容量をスケールアップします。 まれに、新しいチェックポイントを使用してストリームを再起動することが必要になる場合があります。