Delta Live Tables パイプラインで更新を実行する

この記事では、Delta Live Tables パイプラインの更新とは何か、およびその実行方法について説明します。

パイプラインを作成し、実行する準備ができたら、"更新" を開始します。 パイプラインの更新では、次のことが実行されます。

  • 正しい構成でクラスターを起動します。
  • 定義されているすべてのテーブルとビューを検出して、無効な列名、依存関係の欠落、構文エラーなどの分析エラーがないかどうかを調べます。
  • 使用可能な最新のデータを含むテーブルおよびビューを作成または更新します。

[更新の検証] を使用して、テーブルの作成または更新を待たずにパイプラインのソース コードに問題がないかどうかを確認できます。 Validate 機能を使用すると、間違ったテーブル名や列名など、パイプライン内のエラーをすばやく見つけて修正できるため、パイプラインの開発またはテスト時に役立ちます。

パイプラインを作成する方法については、「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。

パイプラインの更新を開始する

Azure Databricks には、パイプラインの更新を開始するための次のようないくつかのオプションが用意されています。

  • Delta Live Tables UI には、次のオプションがあります。
    • パイプラインの詳細ページで Delta Live Tables の開始アイコン ボタンをクリックします。
    • パイプラインの一覧で、[アクション]列にある右矢印アイコン をクリックします。
  • ノートブックで更新を開始するには、ノートブック ツール バーの [Delta Live Tables > を開始] をクリックします。 「ノートブックから Delta Live Tables パイプラインを開くか実行する」を参照してください。
  • API または CLI を使用して、プログラムでパイプラインをトリガーできます。 「Delta Live Tables API ガイド」を参照してください。
  • Delta Live Tables UI またはジョブ UI を使用してパイプラインをジョブとしてスケジュールできます。 「パイプラインのスケジュールを設定する」を参照してください。

Delta Live Tables でテーブルおよびビューを更新する方法

更新されたテーブルとビュー、およびそれらのテーブルのビューの更新方法は、更新の種類によって異なります。

  • すべて更新: すべてのライブ テーブルは、入力データ ソースの現在の状態を反映するように更新されます。 すべてのストリーミング テーブルに対して、新しい行がテーブルに追加されます。
  • Full refresh all (すべて完全更新): すべてのライブ テーブルは、入力データ ソースの現在の状態を反映するように更新されます。 すべてのストリーミング テーブルに対して、Delta Live Tables は、各テーブルからのすべてのデータの消去と、ストリーミング ソースからのすべてのデータの読み込みを試みます。
  • 選択範囲を更新: refresh selection の動作は refresh all と同じですが、選択したテーブルのみを更新できます。 選択したライブ テーブルは、入力データ ソースの現在の状態を反映するように更新されます。 選択したストリーミング テーブルに対して、新しい行がテーブルに追加されます。
  • Full refresh selection (完全更新の選択): full refresh selection の動作は full refresh all と同じですが、選択したテーブルのみの完全更新を実行できます。 選択したライブ テーブルは、入力データ ソースの現在の状態を反映するように更新されます。 選択したストリーミング テーブルに対して、Delta Live Tables は、各テーブルからのすべてのデータの消去と、ストリーミング ソースからのすべてのデータの読み込みを試みます。

既存のライブ テーブルの場合、更新は具体化されたビューの SQL REFRESH と同じ動作になります。 新しいライブ テーブルの場合、動作は SQL CREATE 操作と同じです。

選択したテーブルのパイプライン更新を開始する

パイプライン内の選択したテーブルのデータのみを再処理することもできます。 たとえば、開発中は、1 つのテーブルのみを変更し、テスト時間を短縮したり、パイプラインの更新が失敗したり、 失敗したテーブルのみを更新したりします。

注意

選択的更新は、トリガーされたパイプラインでのみ使用できます。

選択したテーブルのみを更新する更新プログラムを開始するには、[パイプラインの詳細] ページで次の手順を実行します。

  1. [更新するテーブルの選択] をクリックして更新します。 [更新するテーブルの選択] ダイアログボックスが表示されます。

    [更新のテーブルを選択する] ボタンが表示されない場合は、[Pipeline details] (パイプラインの詳細) ページに最新の更新が表示され、更新が完了していることを確認します。 最新の更新で DAG が表示されない場合 (更新に失敗したなどの理由で)、[更新のテーブルを選択する] ボタンは表示されません。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度そのテーブルをクリックします。

  3. [選択の更新] をクリックします。

    Note

    選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。

選択したテーブルに既に取り込まれたデータを再処理するには、[選択を更新] ボタンの横にある [Blue Down Caret] をクリックし、[フル リフレッシュ選択] をクイックします。

失敗したテーブルのパイプライン更新を開始する

パイプライン グラフ内の 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみに関する更新を開始できます。

Note

除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。

失敗したテーブルを更新するには、[パイプラインの詳細] ページで [失敗したテーブルの更新] をクリックします。

選択済みの失敗したテーブルのみを更新するには

  1. [失敗したテーブルの更新] ボタンの横にある [ボタン ダウン] をクリックし、[更新するテーブルを選択] をクリックします。 [更新するテーブルの選択] ダイアログボックスが表示されます。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度そのテーブルをクリックします。

  3. [選択の更新] をクリックします。

    Note

    選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。

選択したテーブルに既に取り込まれたデータを再処理するには、[選択を更新] ボタンの横にある [Blue Down Caret] をクリックし、[フル リフレッシュ選択] をクイックします。

テーブルの更新を待たずにパイプラインにエラーがないかどうかを確認する

重要

Delta Live Tables の Validate 更新機能は、パブリック プレビュー段階です。

完全な更新を実行せずにパイプラインのソース コードが有効であるかどうかを確認するには、Validate を使用します。 Validate 更新では、パイプラインで定義されたデータセットとフローの定義が解決されますが、データセットの具体化または発行は行われません。 検証中に見つかったエラー (間違ったテーブル名や列名など) は、UI で報告されます。

Validate 更新を実行するには、パイプラインの詳細ページで、[開始]の横にある 青い下向きキャレット をクリックし、[検証] をクリックします。

Validate 更新が完了すると、イベント ログに、Validate 更新に関連するイベントのみが示され、DAG にメトリックは表示されません。 エラーが検出された場合、詳細をイベント ログで確認できます。

最新の Validate 更新に関する結果のみが表示されます。 Validate 更新が、最後に実行された更新である場合は、[更新履歴] でそれを選択すると、結果を確認できます。 その結果は、Validate 更新の後に別の更新が実行されると UI で確認できなくなります。

継続的パイプライン実行とトリガーによるパイプライン実行

パイプラインでトリガーによる実行モードを使用する場合、更新の一部である各テーブルが、更新開始時に使用可能なデータに基づいて更新されるようにするために、パイプライン内のすべてのテーブルまたは選択したテーブルが正常に更新された後、システムは処理をいったん停止します。

パイプラインで 継続的実行を使用する場合、新しいデータがデータ ソースに到着すると、データは Delta Live Tables によって処理され、パイプライン全体のテーブルが最新の状態に維持されます。

実行モードは、計算されているテーブルの種類には依存しません。 具体化されたビューとストリーミング テーブルは両方とも、どちらの実行モードでも更新できます。 継続的実行モードで不要な処理を回避するために、パイプラインによって Delta 依存テーブルが自動的に監視され、それらの依存テーブルの内容が変更された場合にのみ更新が実行されます。

Note

Delta Live Tables ランタイムは、非 Delta データ ソース内の変更を検出できません。 テーブルは引き続き定期的に更新されますが、過剰な再計算によってクラスターで行われる増分処理が遅くならないように、既定のトリガー間隔は長くなっています。

データ パイプラインの実行モードの比較表

次の表は、これらの実行モードの違いを示しています。

トリガー済み 継続的
更新が停止するタイミング 完了すると自動的に停止する。 手動で停止されるまで継続的に実行される。
処理対象のデータ 更新開始時に使用可能なデータ。 構成済みのソースに到着したすべてのデータ。
最適なデータ更新要件 10 分ごと、1 時間ごと、または毎日実行されるデータ更新。 10 秒から数分までの間隔で実行する必要があるデータ更新。

トリガーされたパイプラインでは、パイプラインの実行に十分な時間だけクラスターが実行されるので、リソースの消費量とコストを削減できます。 ただし、新しいデータは、パイプラインがトリガーされるまで処理されません。 継続的パイプラインには、常に実行されているクラスターが必要であるため、コストは高くなりますが、処理の待機時間は短縮されます。

実行モードは、設定の [パイプライン モード] オプションを使用して構成できます。

パイプライン境界を選択する方法

Delta Live Tables パイプラインでは、単一のテーブル、依存リレーションシップを持つ多数のテーブル、リレーションシップのない多数のテーブル、または依存リレーションシップを持つテーブルの複数の独立フローに対する更新を処理できます。 このセクションでは、パイプラインの分割方法を決定するために役立つ考慮事項について説明します。

大規模な Delta Live Tables パイプラインには多くの利点があります。 コーディネートは次のとおりです。

  • クラスター リソースをより効率的に使用します。
  • ワークスペースの中のパイプラインの数を減らします。
  • ワークフロー オーケストレーションの複雑さを減らします。

処理パイプラインの分割方法に関する一般的な推奨事項としては、次のようなものがあります。

  • チームの境界で機能を分割します。 たとえば、データ チームはデータを変換するためのパイプラインを維持し、データ アナリストは変換されたデータを分析するパイプラインを維持します。
  • アプリケーション固有の境界で機能を分割して結合を減らし、共通機能の再利用を容易にします。

開発モードと実稼働モード

開発モードと実稼働モードを切り替えることによって、パイプラインの実行を最適化できます。 これら 2 つのモードを切り替えるには、Pipelines UI の Delta Live Tables 環境の切り替えアイコン ボタンを使用します。 既定では、パイプラインは開発モードで実行されます。

パイプラインを開発モードで実行する場合、Delta Live Tables システムによって次の処理が実行されます。

  • クラスターを再利用して、再起動のオーバーヘッドを回避します。 既定では、開発モードが有効になっている場合、クラスターは 2 時間実行されます。 これは、「コンピューティング設定を構成する」の pipelines.clusterShutdown.delay 設定を使用して変更できます。
  • エラーを即座に検出して修正できるように、パイプラインの再試行を無効にします。

運用モードの場合、Delta Live Tables システムによって次の処理が実行されます。

  • メモリ リークや古い資格情報など、特定の回復可能なエラーが発生した場合にクラスターを再起動します。
  • クラスターの起動エラーなど、特定のエラーが発生した場合に実行を再試行します。

注意

開発モードと実稼働モードの切り替えで制御されるのは、クラスターとパイプラインの実行動作のみです。 発行テーブル用のカタログ内のストレージの場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。

パイプラインのスケジュールを設定する

トリガーされるパイプラインを手動で開始することも、Azure Databricks ジョブを使用してスケジュールに基づいてパイプラインを実行することもできます。 Delta Live Tables UI で直接 1 つのパイプライン タスクを含むジョブを作成し、スケジュール設定できます。または、ジョブ UI のマルチタスク ワークフローにパイプライン タスクを追加できます。

Delta Live Tables UI で単一タスク ジョブとそのジョブのスケジュールを作成するには、次のようにします。

  1. [スケジュール] > [スケジュールの追加] をクリックします。 [スケジュール] ボタンが更新され、パイプラインが 1 つ以上のスケジュール済みジョブに含まれている場合、既存のスケジュールの数が表示されます (例: スケジュール (5))。
  2. [ジョブ名] フィールドに、ジョブの名前を入力します。
  3. [スケジュール][スケジュール済み] に設定します。
  4. 期間、開始時刻、タイム ゾーンを指定します。
  5. パイプラインの開始、成功、または失敗に関するアラートを受信するように、1 つ以上のメール アドレスを構成します。
  6. [作成] をクリックします。