実稼働ワークロード用に自動ローダーを構成する
Databricks では、実稼働環境で自動ローダーを実行するためのストリーミングのベスト プラクティスに従うことをお勧めします。
Databricks では、増分データ インジェストに Delta Live Tables で自動ローダーを使用することが推奨されています。 Delta Live Tables は、Apache Spark Structured Streaming の機能を拡張し、数行の宣言型 Python または SQL を記述するだけで、次のような実稼働品質のデータ パイプラインをデプロイできるようにします。
自動ローダーの監視
自動ローダーによって検出されたファイルのクエリ
Note
cloud_files_state
関数は、Databricks Runtime 11.3 LTS 以降で使用できます。
自動ローダーは、ストリームの状態を検査するための SQL API を提供します。 cloud_files_state
関数を使用して、自動ローダー ストリームによって検出されたファイルに関するメタデータを見つけることができます。 cloud_files_state
に対してクエリを実行するだけで、自動ローダー ストリームに関連付けられているチェックポイントの場所が提供されます。
SELECT * FROM cloud_files_state('path/to/checkpoint');
ストリームの更新をリッスンする
自動ローダー ストリームをさらに監視するために、Databricks では、Apache Spark のストリーミング クエリ リスナー インターフェイスの使用を推奨します。
バッチごとに自動ローダーからストリーミング クエリ リスナーにメトリックが報告されます。 バックログに存在するファイルの数と、バックログのサイズを、ストリーミング クエリの進行状況ダッシュボードにある [生データ] タブの numFilesOutstanding
メトリックと numBytesOutstanding
メトリックで表示できます。
{
"sources" : [
{
"description" : "CloudFilesSource[/path/to/source]",
"metrics" : {
"numFilesOutstanding" : "238",
"numBytesOutstanding" : "163939124006"
}
}
]
}
Databricks Runtime 10.4 LTS 以降では、ファイル通知モードを使用する場合、メトリックには、AWS と Azure の approximateQueueSize
として、クラウド キューに存在するファイル イベントの概数も含まれます。
コストに関する考慮事項
自動ローダーを実行する場合、コストの主なソースは、コンピューティング リソースとファイル検出のコストになります。
Databricks では、コンピューティング コストを削減するために、短い待機時間の要件がない限り、継続的に自動ローダーを実行する代わりに Trigger.AvailableNow
を使用したバッチ ジョブとして、自動ローダーのスケジュールを設定する Databricks ジョブを使用することをお勧めします。 「構造化ストリーミングのトリガー間隔を構成する」を参照してください。
ファイル検出のコストは、ディレクトリ一覧表示モードのストレージ アカウントでの LIST 操作数の形や、サブスクリプション サービス、およびファイル通知モードのキュー サービスでの API 要求数の形で現れることがあります。 ファイル検出コストを削減するために、Databricks では次のように推奨しています。
- ディレクトリ一覧モードで継続的に自動ローダーを実行するときには
ProcessingTime
トリガーを提供する - 可能な場合に増分一覧 (非推奨) を利用するために、字句順でストレージ アカウントにファイルをアップロードするように設計する
- 増分一覧が可能でない場合にファイル通知を利用する
- リソース タグを使用して、自動ローダーで作成されたリソースにタグを付け、コストを追跡する
Trigger.AvailableNow とレート制限の使用
Note
Databricks Runtime 10.4 LTS 以降で使用できます。
自動ローダーは、Trigger.AvailableNow
を使用して、Databricks ジョブでバッチ ジョブとして実行するスケジュールを設定できます。 AvailableNow
トリガーは、クエリの開始時刻の前に到着したすべてのファイルを処理するように、自動ローダーに指示します。 ストリームの開始後にアップロードされた新しいファイルは、次のトリガーまで無視されます。
Trigger.AvailableNow
を使用すると、ファイルの検出はデータ処理と非同期に行われ、データはレート制限付きの複数のマイクロバッチ間で処理できます。 自動ローダーでは、既定で、マイクロバッチごとに最大 1,000 個のファイルが処理されます。 cloudFiles.maxFilesPerTrigger
と cloudFiles.maxBytesPerTrigger
を構成して、マイクロバッチで処理する必要があるファイル数またはバイト数を構成できます。 ファイル制限はハード制限ですが、バイト制限はソフト制限です。つまり、指定された maxBytesPerTrigger
より多くのバイトが処理されることがあります。 両方のオプションが一緒に指定されている場合、自動ローダーでは、いずれかの制限に達するために必要な数のファイルを処理します。
イベントの保持
自動ローダーは、RocksDB を使用してチェックポイントの場所で検出されたファイルを追跡し、厳密に 1 回のインジェスト保証を提供します。 Databricks では、インジェスト ストリームが大量になるとき、あるいはその有効期間が長いときは例外なくcloudFiles.maxFileAge
オプションを使用することを強くお勧めしています。 このオプションを使用すると、チェックポイントの場所からイベントが有効期限切れとなり、自動ローダーの起動時間が短縮されます。 起動時間は自動ローダーの実行毎に分単位まで増加することがあります。ソース ディレクトリに格納されるファイルの最大有効期間に上限があるとき、不要なコストが増えることになります。 cloudFiles.maxFileAge
に設定できる最小値は "14 days"
です。 RocksDB での削除は廃棄標識エントリとして表示されます。そのため、イベントの有効期限が切れるとストレージの使用量が一時的に増加し、その後横ばいになり始めると想定されます。
警告
cloudFiles.maxFileAge
は大量のデータセットのためのコスト制御メカニズムとして提供されます。 cloudFiles.maxFileAge
のチューニングがアグレッシブすぎると、重複取り込みやファイル欠如など、データ品質の問題を引き起こすことがあります。 そのため、Databricks は cloudFiles.maxFileAge
に 90 日間などの控えめな設定を推奨しています。同等のデータ インジェスト ソリューションもこのくらいを推奨しています。
cloudFiles.maxFileAge
オプションをチューニングしようとすると、処理されていないファイルが自動ローダーによって無視されたり、既に処理されたファイルの有効期限が切れて再処理され、データが重複する可能性があります。 次に、cloudFiles.maxFileAge
を選択する際の考慮事項をいくつか示します。
- 長い時間が経過した後にストリームが再起動された場合、キューからプルされたファイル通知イベントのうち、
cloudFiles.maxFileAge
より前のものは無視されます。 同様に、ディレクトリ一覧を使用する場合、ダウンタイム中に表示された可能性がある、cloudFiles.maxFileAge
より前のファイルは無視されます。 - ディレクトリ一覧モードを使用し、
cloudFiles.maxFileAge
を使用する場合は (例:"1 month"
に設定)、ストリームを停止し、cloudFiles.maxFileAge
を"2 months"
に設定してストリームを再起動すると、1 か月より古く、2 か月より新しいファイルは再処理されます。
ストリームを初めて開始するときにこのオプションを設定する場合は、cloudFiles.maxFileAge
より前のデータは取り込まれないため、前のデータを取り込む場合、ストリームを初めて開始するときにこのオプションを設定しないでください。 ただし、後続の実行では、このオプションを設定してください。
cloudFiles.backfillInterval を使って定期的なバックフィルをトリガーする
自動ローダーを使用すると、特定の間隔 (たとえば、1 日は 1 日に 1 回、1 週は 1 週間に 1 回) で非同期のバックフィルをトリガーできます。 ファイル イベント通知システムは、アップロードされたすべてのファイルが 100% 配信されることを保証するものではなく、ファイル イベントの待機時間に関する厳格な SLA を提供するものでもありません。 Databricks では、データの完全性が要件になる場合、cloudFiles.backfillInterval
オプションを使用することで自動ローダーによる定期的なバックフィルをトリガーし、所定の SLA の範囲内ですべてのファイルが検出されるように保証することを推奨しています。 定期的なバックフィルをトリガーしても、重複は発生しません。