この記事では、パイプラインの更新について説明し、更新プログラムをトリガーする方法について詳しく説明します。
パイプラインの更新とは
パイプラインを作成し、実行する準備ができたら、"更新" を開始します。 パイプラインの更新では、次のことが実行されます。
- 正しい構成でクラスターを起動します。
- 定義されているすべてのテーブルとビューを検出し、無効な列名、不足している依存関係、構文エラーなどの分析エラーを確認します。
- 使用可能な最新のデータを含むテーブルおよびビューを作成または更新します。
有効な更新プログラムを使用するとテーブルの作成や更新を待たずに、パイプラインのソース コードで問題を確認できます。 この機能は、パイプライン内の誤ったテーブル名や列名などのエラーをすばやく見つけて修正できるため、パイプラインを開発またはテストするときに役立ちます。
パイプラインの更新はどのようにトリガーされますか?
パイプラインの更新を開始するには、次のいずれかのオプションを使用します。
トリガーの更新 | 詳細 |
---|---|
手動 | パイプライン UI、パイプラインの一覧、またはパイプラインにアタッチされたノートブックから、パイプラインの更新を手動でトリガーできます。 「 パイプラインの更新を手動でトリガーする 」 と「ノートブックで DLT パイプラインを開発およびデバッグする」を参照してください。 |
スケジュール済み | ジョブを使用して、パイプラインの更新をスケジュールできます。 ジョブ については、DLT パイプライン タスクのを参照してください。 |
プログラム的な | サード パーティ製のツール、API、および CLI を使用して、プログラムによって更新プログラムをトリガーできます。 ワークフローとパイプライン API での DLT パイプラインの実行に関する記事を参照してください。 |
パイプラインの更新を手動でトリガーする
パイプラインの更新を手動でトリガーするには、次のいずれかのオプションを使用します。
- パイプラインの詳細ページの
] ボタンをクリックします。
- パイプラインの一覧で、アクション列の
右矢印アイコンをクリックします。
注
手動でトリガーされるパイプライン更新の既定の動作は、パイプラインで定義されているすべてのデータセットを更新することです。
パイプライン更新セマンティクス
次の表では、具体化されたビューとストリーミング テーブルの既定の更新、完全更新、およびリセット チェックポイントの動作について説明します。
更新の種類 | 具体化されたビュー | ストリーミング テーブル |
---|---|---|
更新 (既定値) | 定義クエリの現在の結果を反映するように結果を更新します。 | ストリーミング テーブルとフローで定義されたロジックを使用して、新しいレコードを処理します。 |
完全更新 | 定義クエリの現在の結果を反映するように結果を更新します。 | ストリーミング テーブルからデータをクリアし、フローから状態情報 (チェックポイント) をクリアし、データ ソースからすべてのレコードを再処理します。 |
ストリーミング フロー チェックポイントをリセットする | 具体化されたビューには適用されません。 | フローから状態情報 (チェックポイント) をクリアしますが、ストリーミング テーブルからデータを消去せず、データ ソースからのすべてのレコードを再処理します。 |
既定では、パイプライン内のすべての具体化されたビューとストリーミング テーブルは、更新ごとに更新されます。 必要に応じて、次の機能を使用して、更新プログラムからテーブルを省略できます。
- 更新テーブルの選択: 更新を実行する前に、この UI を使用して具体化されたビューとストリーミング テーブルを追加または削除します。 "選択したテーブルのパイプライン更新を開始するには、を参照してください。"
- 失敗したテーブルの更新: 失敗した具体化されたビューとストリーミング テーブル (ダウンストリームの依存関係を含む) の更新を開始します。 失敗したテーブルについては、「
パイプラインの更新を開始する」を参照してください。
これらの機能はどちらも、既定の更新セマンティクスまたは完全更新をサポートしています。 必要に応じて、[更新 テーブルの選択] ダイアログを使用して、失敗したテーブルの更新を実行するときに追加のテーブルを除外できます。
ストリーミング テーブルの場合は、関連付けられているストリーミング テーブルのデータではなく、選択したフローのストリーミング チェックポイントをクリアすることを選択できます。 選択したフローのチェックポイントをクリアするには、Databricks REST API を使用して更新を開始します。 選択的ストリーミング フローのチェックポイントをクリアするには、パイプラインの更新を開始するを参照してください。
完全更新を使用する必要がありますか?
Databricks では、必要な場合にのみ完全更新を実行することをお勧めします。 完全更新では、データセットを定義するロジックを使用して、指定したデータ ソースのすべてのレコードが常に再処理されます。 完全更新を完了するための時間とリソースは、ソース データのサイズに関連付けられます。
具体化されたビューは、既定の更新と完全更新のどちらを使用した場合でも、同じ結果を返します。 ストリーミング テーブルで完全更新を使用すると、すべての状態処理とチェックポイント情報がリセットされ、入力データが使用できなくなった場合にレコードが削除される可能性があります。
Databricks では、入力データ ソースに、テーブルまたはビューの目的の状態を再作成するために必要なデータが含まれている場合にのみ、完全な更新が推奨されます。 入力ソース データが使用できなくなった次のシナリオと、完全な更新を実行した結果を考えてみましょう。
データ ソース | 入力データが存在しない理由 | 完全更新の結果 |
---|---|---|
カフカ | 短い保存期間のしきい値 | Kafka ソースに存在しなくなったレコードは、ターゲット テーブルから削除されます。 |
オブジェクト ストレージ内のファイル | ライフサイクル ポリシー | ソース ディレクトリに存在しなくなったデータ ファイルは、ターゲット テーブルから削除されます。 |
テーブル内のレコード | コンプライアンスのために削除済み | ソース テーブルに存在するレコードのみが処理されます。 |
テーブルまたはビューで完全な更新が実行されないようにするには、テーブルのプロパティ pipelines.reset.allowed
を false
に設定します。 DLT テーブルのプロパティを参照してください。 追加フローを使って、完全な更新の必要なしに、データを既存のストリーミング テーブルに追加することもできます。
選択したテーブルのパイプライン更新を開始する
必要に応じて、パイプライン内の選択したテーブルのデータのみを再処理できます。 たとえば、開発中は、1 つのテーブルのみを変更し、テスト時間を短縮したり、パイプラインの更新が失敗したり、 失敗したテーブルのみを更新したりします。
注
選択的更新は、トリガーされたパイプラインでのみ使用できます。
選択したテーブルのみを更新する更新プログラムを開始するには、[パイプラインの詳細] ページで次の手順を実行します。
[更新するテーブルの選択] をクリックして更新します。 [更新するテーブルの選択] ダイアログボックスが表示されます。
[更新するテーブルの選択] ボタンが表示されない場合は、Pipeline の詳細ページに最新の更新プログラムが表示され、更新が完了したことを確認します。 たとえば、更新に失敗したために DAG が最新の更新プログラムに対して表示されない場合、 更新のテーブルの選択 ボタンは表示されません。
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。
[選択の更新] をクリックします。
注
選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。
選択したテーブルに対してすでに取り込まれているデータを再処理するには、「更新を選択」ボタンの横にある「青い下向き矢印」をクリックし、「完全更新を選択」をクリックします。
失敗したテーブルのパイプライン更新を開始する
パイプライン グラフ内の 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみに関する更新を開始できます。
注
除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。
失敗したテーブルを更新するには、[パイプラインの詳細] ページで [失敗したテーブルの更新] をクリックします。
選択した失敗テーブルのみを更新するには
[失敗したテーブルの更新] ボタンの横にある をクリックし、[更新するテーブルの選択] をクリックします。 [更新するテーブルの選択] ダイアログボックスが表示されます。
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。
[選択の更新] をクリックします。
注
選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。
選択したテーブルに対してすでに取り込まれているデータを再処理するには、「更新を選択」ボタンの横にある「青い下向き矢印」をクリックし、「完全更新を選択」をクリックします。
パイプラインの更新を開始して、選択的ストリーミング フローのチェックポイントをクリアする
必要に応じて、既に取り込まれたデータをクリアすることなく、パイプライン内の選択したストリーミング フローのデータを再処理できます。
注
選択されていないフローは、 REFRESH 更新を使用して実行されます。 full_refresh_selection
またはrefresh_selection
を指定して、他のテーブルを選択的に更新することもできます。
選択したストリーミング チェックポイントを更新するための更新を開始するには、DLT REST API で 更新 要求を使用します。 次の例では、 curl
コマンドを使用して updates
要求を呼び出してパイプラインの更新を開始します。
curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": [<streaming flow1>, <streaming flow 2>...]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates
テーブルの更新を待たずにパイプラインにエラーがないかどうかを確認する
重要
DLT Validate
更新機能は パブリック プレビュー段階です。
完全な更新を実行せずにパイプラインのソース コードが有効であるかどうかを確認するには、Validate を使用します。 Validate
更新では、パイプラインで定義されたデータセットとフローの定義が解決されますが、データセットの具体化または発行は行われません。 検証中に見つかったエラー (間違ったテーブル名や列名など) は、UI で報告されます。
Validate
更新を実行するには、パイプラインの詳細ページでの横にある開始をクリックして、Validateをクリックします。
Validate
の更新が完了すると、イベント ログにはValidate
更新にのみ関連するイベントが表示され、DAG にはメトリックは表示されません。 エラーが検出された場合、詳細をイベント ログで確認できます。
最新の Validate
更新に関する結果のみが表示されます。 Validate
更新が、最後に実行された更新である場合は、[更新履歴] でそれを選択すると、結果を確認できます。 その結果は、Validate
更新の後に別の更新が実行されると UI で確認できなくなります。
開発モードと実稼働モード
開発モードと実稼働モードを切り替えることによって、パイプラインの実行を最適化できます。 パイプライン UI の ボタンを使用して、これら 2 つのモードを切り替えます。 既定では、パイプラインは開発モードで実行されます。
開発モードでパイプラインを実行すると、DLT システムによって次の処理が実行されます。
- クラスターを再利用して、再起動のオーバーヘッドを回避します。 既定では、開発モードが有効になっている場合、クラスターは 2 時間実行されます。 これは、
pipelines.clusterShutdown.delay
の設定で変更できます。 - エラーを即座に検出して修正できるように、パイプラインの再試行を無効にします。
実稼働モードでは、DLT システムは次の処理を行います。
- メモリ リークや古い資格情報など、特定の回復可能なエラーが発生した場合にクラスターを再起動します。
- クラスターの起動に失敗するなど、特定のエラーが発生した場合に実行を再試行します。
注
開発モードと実稼働モードの切り替えで制御されるのは、クラスターとパイプラインの実行動作のみです。 発行テーブル用のカタログ内のストレージの場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。