このページには、Azure Databricksのジョブを使用して構造化ストリーミング ワークロードをスケジュールするための推奨事項が含まれています。
Databricks では、常に次を構成することをお勧めします。
-
displayやcountなど、結果を返すノートブックから不要なコードを削除します。 - All-Purpose Compute を使用して構造化ストリーミング ワークロードを実行しないでください。 ストリームをジョブとしてスケジュールする際は、Jobs Compute を使用してください。
-
Continuousモードを使用してジョブをスケジュールします。 これは、Azure Databricks ジョブのスケジューリング機能を指し、構造化ストリーミングのトリガー間隔ではありません。 - 構造化ストリーミング ジョブのコンピューティングに対して自動スケールを有効にしないでください。
一部のワークロードには、次の利点があります。
Azure Databricksでは、構造化ストリーミング ワークロードの運用インフラストラクチャを管理する複雑さを軽減するために、Lakeflow Spark 宣言型パイプラインが導入されました。 Databricks では、新しい構造化ストリーミング パイプラインに Lakeflow Spark 宣言パイプラインを使用することをお勧めします。 「Lakeflow Spark 宣言型パイプライン」を参照してください。
注
コンピューティングの自動スケールには、構造化ストリーミング ワークロードのクラスター サイズのスケールダウンに制限があります。 Databricks では、ストリーミング ワークロード用に拡張された自動スケーリングを備えた Lakeflow Spark 宣言型パイプラインを使用することをお勧めします。 自動スケールを使用した Lakeflow Spark 宣言パイプラインのクラスター使用率の最適化に関するページを参照してください。
:::note サーバーレス コンピューティング
サーバーレス コンピューティングでは、 Trigger.AvailableNow() と Trigger.Once() のみがサポートされます。 Databricks では Trigger.AvailableNow() を推奨しています。
サーバーレス コンピューティングでの継続的ストリーミングには、継続的パイプライン モードを使用します。トリガーされたパイプライン モードではなく、連続モードで行ってください。
ストリーミングの制限事項を参照してください。
:::
失敗が予想されるストリーミング ワークロードを設計する
Databricks では、失敗時に自動的に再起動するように、ストリーミング ジョブを設定することを推奨しています。 スキーマの進化を含む一部の機能では、Structured Streaming ワークロードを自動的に再試行するように構成する必要があります。 「障害時にストリーミング クエリを再起動するように、構造化ストリーミング ジョブを構成する」を参照してください。
foreachBatch のような一部の操作では、1 回限りの保証ではなく、少なくとも 1 回の保証が提供されます。 これらの操作を行う際には、処理パイプラインが冪等性を持つことを確認してください。
foreachBatch を使用した任意のデータ シンクへの書き込みに関するページを参照してください。
注
クエリが再起動すると、前回の実行中に予定されたマイクロバッチが処理されます。 メモリ不足エラーが原因でジョブが失敗した場合、またはマイクロバッチのサイズが大きいためにジョブを手動で取り消した場合は、マイクロバッチを正常に処理するためにコンピューティングのスケールアップが必要になる場合があります。
実行間で構成を変更した場合、これらの構成は計画された最初の新しいバッチに適用されます。 「構造化ストリーミング クエリの変更後に復旧する」を参照してください。
ジョブはいつリトライされますか?
Azure Databricks ジョブの一部として複数のタスクをスケジュールできます。 継続的トリガーを使用してジョブを構成する場合、タスク間の依存関係を設定することはできません。
次のいずれかの方法を使用して、1 つのジョブで複数のストリームをスケジュールすることができます。
- 複数のタスク: 継続的トリガーを使用してストリーミング ワークロードを実行する複数のタスクを含むジョブを定義します。
- 複数のクエリ: 1 つのタスクのソース コードで複数のストリーミング クエリを定義します。
これらの戦略を組み合わせることもできます。 次の表では、これらの方法を比較します。
| 戦略 | 複数のタスク | 複数のクエリ |
|---|---|---|
| コンピューティングはどのように共有されますか? | Databricks では、各ストリーミング タスクに適したサイズの Jobs Compute をデプロイすることをお勧めします。 必要に応じて、タスク間でコンピューティングを共有できます。 | すべてのクエリで同じコンピューティングが共有されます。 必要に応じて、 スケジューラ プールにクエリを割り当てることができます。 |
| 再試行はどのように処理されますか? | すべてのタスクが失敗した場合にのみ、ジョブが再試行されます。 | クエリが失敗すると、タスクは再試行します。 |
障害時にストリーミング クエリを再起動するように、構造化ストリーミング ジョブを構成する
Databricks では、継続的トリガーを使用して、すべてのストリーミング ワークロードを構成することをお勧めします。 「ジョブを継続的に実行する」を参照してください。
継続的トリガーの既定の動作は次のとおりです。
- ジョブの複数の同時実行を阻止します。
- 前の実行が失敗したときに新しい実行を開始します。
- 再試行にエクスポネンシャル バックオフを使用します。
Databricks では、ワークフローをスケジュールするときに、All-Purpose Compute ではなく Jobs Compute を常に使用することをお勧めします。 ジョブが失敗して再試行すると、新しいコンピューティング リソースがデプロイされます。
注
Databricks では、 streamingQuery.awaitTermination() や spark.streams.awaitAnyTermination()は使用しないことをお勧めします。
awaitTermination()を使用するタイミングを参照してください。
使用するタイミング awaitTermination()
streamingQuery.awaitTermination() 現在のスレッドをストリーミングクエリが終了するまでブロックします spark.streams.awaitAnyTermination() 。 これらの関数を使用するかどうかは、実行環境によって異なります。
Databricks ジョブの場合は、 streamingQuery.awaitTermination() または spark.streams.awaitAnyTermination()を使用しないでください。 これらの関数は、ストリーミング クエリがアクティブな場合にジョブ サービスによって実行の完了が自動的に防止されるため、必要ありません。 どちらの関数もノートブック のセルが完了するのをブロックし、ジョブ サービスがストリーミング クエリを追跡するのを防ぎます。これにより、バックログ メトリックとジョブ通知が中断されます。
次の場合は、 awaitTermination() を使用します。
| 利用シーン | 行動 |
|---|---|
| 万能コンピューティング上の対話型ノートブック |
awaitTermination() は、セルの実行を維持し、クエリの状態を観察し、ノートブックの出力でエラーが発生することを確認します。 |
| ローカル環境と開発環境 | Spark プログラムをローカルで実行すると、メイン スレッドが完了するとプロセスが終了します。
awaitTermination()を呼び出して、ストリーミング クエリが完了または失敗するまでプログラムを維持します。 |
| ドライバーへのエラー伝達 |
awaitTermination()しないと、ジョブ以外のコンテキストでのストリーミング クエリエラーが呼び出し元のスレッドに伝達されない可能性があります。 クエリは警告なしに失敗する可能性があり、エラーの検出と診断が困難になります。 ドライバーでクエリ例外を再スローするためには、awaitTermination()を呼び出します。 |
複数のストリーミング クエリにスケジューラ プールを使用する
同じソース コードから複数のストリーミング クエリを実行するときに、クエリにコンピューティング容量を割り当てるスケジューラ プールを構成できます。
既定では、ノートブックで開始されたクエリはすべて、同じ公平なスケジュール プールで実行されます。 ノートブック内のすべてのストリーミング クエリからトリガーによって生成された Apache Spark ジョブは、“先入れ先出し”(FIFO) 順に順番に実行されます。 これにより、クラスター リソースを効率的に共有しないので、クエリで不要な遅延が発生する可能性があります。
スケジューラ プールを使用すると、コンピューティング リソースを共有する構造化ストリーミング クエリを宣言できます。
次の例では、専用プールに query1 を割り当てますが、 query2 と query3 はスケジューラ プールを共有します。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
注
ローカル プロパティの構成は、ストリーミング クエリを開始するノートブック セルと同じである必要があります。
Apache Fair Scheduler プールの詳細については、 Apache Fair Scheduler のドキュメントを参照してください。