次の方法で共有


DLT パイプラインで更新を実行する

この記事では、パイプラインの更新について説明し、更新プログラムをトリガーする方法について詳しく説明します。

パイプラインの更新とは

パイプラインを作成し、実行する準備ができたら、"更新" を開始します。 パイプラインの更新では、次のことが実行されます。

  • 正しい構成でクラスターを起動します。
  • 定義されているすべてのテーブルとビューを検出し、無効な列名、不足している依存関係、構文エラーなどの分析エラーを確認します。
  • 使用可能な最新のデータを含むテーブルおよびビューを作成または更新します。

有効な更新プログラムを使用するとテーブルの作成や更新を待たずに、パイプラインのソース コードで問題を確認できます。 この機能は、パイプライン内の誤ったテーブル名や列名などのエラーをすばやく見つけて修正できるため、パイプラインを開発またはテストするときに役立ちます。

パイプラインの更新はどのようにトリガーされますか?

パイプラインの更新を開始するには、次のいずれかのオプションを使用します。

トリガーの更新 詳細
手動 パイプライン UI、パイプラインの一覧、またはパイプラインにアタッチされたノートブックから、パイプラインの更新を手動でトリガーできます。 「 パイプラインの更新を手動でトリガーすると「ノートブックで DLT パイプラインを開発およびデバッグする」を参照してください。
スケジュール済み ジョブを使用して、パイプラインの更新をスケジュールできます。 ジョブ については、DLT パイプライン タスクのを参照してください。
プログラム的な サード パーティ製のツール、API、および CLI を使用して、プログラムによって更新プログラムをトリガーできます。 ワークフローとパイプライン API での DLT パイプラインの実行に関する記事を参照してください。

パイプラインの更新を手動でトリガーする

パイプラインの更新を手動でトリガーするには、次のいずれかのオプションを使用します。

  • パイプラインの詳細ページの [DLT 開始アイコン ] ボタンをクリックします。
  • パイプラインの一覧で、アクション列のRight Arrow Icon右矢印アイコンをクリックします。

手動でトリガーされるパイプライン更新の既定の動作は、パイプラインで定義されているすべてのデータセットを更新することです。

パイプライン更新セマンティクス

次の表では、具体化されたビューとストリーミング テーブルの既定の更新、完全更新、およびリセット チェックポイントの動作について説明します。

更新の種類 具体化されたビュー ストリーミング テーブル
更新 (既定値) 定義クエリの現在の結果を反映するように結果を更新します。 ストリーミング テーブルとフローで定義されたロジックを使用して、新しいレコードを処理します。
完全更新 定義クエリの現在の結果を反映するように結果を更新します。 ストリーミング テーブルからデータをクリアし、フローから状態情報 (チェックポイント) をクリアし、データ ソースからすべてのレコードを再処理します。
ストリーミング フロー チェックポイントをリセットする 具体化されたビューには適用されません。 フローから状態情報 (チェックポイント) をクリアしますが、ストリーミング テーブルからデータを消去せず、データ ソースからのすべてのレコードを再処理します。

既定では、パイプライン内のすべての具体化されたビューとストリーミング テーブルは、更新ごとに更新されます。 必要に応じて、次の機能を使用して、更新プログラムからテーブルを省略できます。

  • 更新テーブルの選択: 更新を実行する前に、この UI を使用して具体化されたビューとストリーミング テーブルを追加または削除します。 "選択したテーブルのパイプライン更新を開始するには、を参照してください。"
  • 失敗したテーブルの更新: 失敗した具体化されたビューとストリーミング テーブル (ダウンストリームの依存関係を含む) の更新を開始します。 失敗したテーブルについては、「パイプラインの更新を開始する」を参照してください。

これらの機能はどちらも、既定の更新セマンティクスまたは完全更新をサポートしています。 必要に応じて、[更新 テーブルの選択] ダイアログを使用して、失敗したテーブルの更新を実行するときに追加のテーブルを除外できます。

ストリーミング テーブルの場合は、関連付けられているストリーミング テーブルのデータではなく、選択したフローのストリーミング チェックポイントをクリアすることを選択できます。 選択したフローのチェックポイントをクリアするには、Databricks REST API を使用して更新を開始します。 選択的ストリーミング フローのチェックポイントをクリアするには、パイプラインの更新を開始するを参照してください

完全更新を使用する必要がありますか?

Databricks では、必要な場合にのみ完全更新を実行することをお勧めします。 完全更新では、データセットを定義するロジックを使用して、指定したデータ ソースのすべてのレコードが常に再処理されます。 完全更新を完了するための時間とリソースは、ソース データのサイズに関連付けられます。

具体化されたビューは、既定の更新と完全更新のどちらを使用した場合でも、同じ結果を返します。 ストリーミング テーブルで完全更新を使用すると、すべての状態処理とチェックポイント情報がリセットされ、入力データが使用できなくなった場合にレコードが削除される可能性があります。

Databricks では、入力データ ソースに、テーブルまたはビューの目的の状態を再作成するために必要なデータが含まれている場合にのみ、完全な更新が推奨されます。 入力ソース データが使用できなくなった次のシナリオと、完全な更新を実行した結果を考えてみましょう。

データ ソース 入力データが存在しない理由 完全更新の結果
カフカ 短い保存期間のしきい値 Kafka ソースに存在しなくなったレコードは、ターゲット テーブルから削除されます。
オブジェクト ストレージ内のファイル ライフサイクル ポリシー ソース ディレクトリに存在しなくなったデータ ファイルは、ターゲット テーブルから削除されます。
テーブル内のレコード コンプライアンスのために削除済み ソース テーブルに存在するレコードのみが処理されます。

テーブルまたはビューで完全な更新が実行されないようにするには、テーブルのプロパティ pipelines.reset.allowedfalse に設定します。 DLT テーブルのプロパティを参照してください。 追加フローを使って、完全な更新の必要なしに、データを既存のストリーミング テーブルに追加することもできます。

選択したテーブルのパイプライン更新を開始する

必要に応じて、パイプライン内の選択したテーブルのデータのみを再処理できます。 たとえば、開発中は、1 つのテーブルのみを変更し、テスト時間を短縮したり、パイプラインの更新が失敗したり、 失敗したテーブルのみを更新したりします。

選択的更新は、トリガーされたパイプラインでのみ使用できます。

選択したテーブルのみを更新する更新プログラムを開始するには、[パイプラインの詳細] ページで次の手順を実行します。

  1. [更新するテーブルの選択] をクリックして更新します。 [更新するテーブルの選択] ダイアログボックスが表示されます。

    [更新するテーブルの選択] ボタンが表示されない場合は、Pipeline の詳細ページに最新の更新プログラムが表示され、更新が完了したことを確認します。 たとえば、更新に失敗したために DAG が最新の更新プログラムに対して表示されない場合、 更新のテーブルの選択 ボタンは表示されません。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。

  3. [選択の更新] をクリックします。

    選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。

選択したテーブルに対してすでに取り込まれているデータを再処理するには、「更新を選択」ボタンの横にある「青い下向き矢印」をクリックし、「完全更新を選択」をクリックします。

失敗したテーブルのパイプライン更新を開始する

パイプライン グラフ内の 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみに関する更新を開始できます。

除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。

失敗したテーブルを更新するには、[パイプラインの詳細] ページで [失敗したテーブルの更新] をクリックします。

選択した失敗テーブルのみを更新するには

  1. Button Down[失敗したテーブルの更新] ボタンの横にある をクリックし、[更新するテーブルの選択] をクリックします。 [更新するテーブルの選択] ダイアログボックスが表示されます。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。

  3. [選択の更新] をクリックします。

    選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。

選択したテーブルに対してすでに取り込まれているデータを再処理するには、「更新を選択」ボタンの横にある「青い下向き矢印」をクリックし、「完全更新を選択」をクリックします。

パイプラインの更新を開始して、選択的ストリーミング フローのチェックポイントをクリアする

必要に応じて、既に取り込まれたデータをクリアすることなく、パイプライン内の選択したストリーミング フローのデータを再処理できます。

選択されていないフローは、 REFRESH 更新を使用して実行されます。 full_refresh_selectionまたはrefresh_selectionを指定して、他のテーブルを選択的に更新することもできます。

選択したストリーミング チェックポイントを更新するための更新を開始するには、DLT REST API で 更新 要求を使用します。 次の例では、 curl コマンドを使用して updates 要求を呼び出してパイプラインの更新を開始します。

curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": [<streaming flow1>, <streaming flow 2>...]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates

テーブルの更新を待たずにパイプラインにエラーがないかどうかを確認する

重要

DLT Validate 更新機能は パブリック プレビュー段階です

完全な更新を実行せずにパイプラインのソース コードが有効であるかどうかを確認するには、Validate を使用します。 Validate 更新では、パイプラインで定義されたデータセットとフローの定義が解決されますが、データセットの具体化または発行は行われません。 検証中に見つかったエラー (間違ったテーブル名や列名など) は、UI で報告されます。

Validate更新を実行するには、パイプラインの詳細ページで下向きの青いキャレットの横にある開始をクリックして、Validateをクリックします。

Validateの更新が完了すると、イベント ログにはValidate更新にのみ関連するイベントが表示され、DAG にはメトリックは表示されません。 エラーが検出された場合、詳細をイベント ログで確認できます。

最新の Validate 更新に関する結果のみが表示されます。 Validate 更新が、最後に実行された更新である場合は、[更新履歴] でそれを選択すると、結果を確認できます。 その結果は、Validate 更新の後に別の更新が実行されると UI で確認できなくなります。

開発モードと実稼働モード

開発モードと実稼働モードを切り替えることによって、パイプラインの実行を最適化できます。 パイプライン UI の DLT 環境トグル アイコン ボタンを使用して、これら 2 つのモードを切り替えます。 既定では、パイプラインは開発モードで実行されます。

開発モードでパイプラインを実行すると、DLT システムによって次の処理が実行されます。

  • クラスターを再利用して、再起動のオーバーヘッドを回避します。 既定では、開発モードが有効になっている場合、クラスターは 2 時間実行されます。 これは、pipelines.clusterShutdown.delay設定で変更できます。
  • エラーを即座に検出して修正できるように、パイプラインの再試行を無効にします。

実稼働モードでは、DLT システムは次の処理を行います。

  • メモリ リークや古い資格情報など、特定の回復可能なエラーが発生した場合にクラスターを再起動します。
  • クラスターの起動に失敗するなど、特定のエラーが発生した場合に実行を再試行します。

開発モードと実稼働モードの切り替えで制御されるのは、クラスターとパイプラインの実行動作のみです。 発行テーブル用のカタログ内のストレージの場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。