次の方法で共有


ワークフローでパイプラインを実行する

Lakeflow ジョブ、Apache Flow、または Azure Data Factory を使用して、データ処理ワークフローの一部としてパイプラインを実行できます。

Jobs

Databricks ジョブ内の複数のタスクを調整して、データ処理ワークフローを実装できます。 ジョブにパイプラインを含めるには、ジョブの作成時に パイプライン タスクを使用します。 ジョブのパイプライン タスクを参照してください。

Apache エアフロー

Apache エアフロー は、データ ワークフローを管理およびスケジュールするためのオープン ソース ソリューションです。 エアフローは、ワークフローを操作の有向非環式グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、エアフローによってスケジュールと実行が管理されます。 Azure Databricks でのエアフローのインストールと使用の詳細については、「 Apache エアフローを使用した Lakeflow ジョブの調整」を参照してください。

エアフロー ワークフローの一部としてパイプラインを実行するには、 DatabricksSubmitRunOperator を使用します。

Requirements

Lakeflow Spark 宣言パイプラインのエアフロー サポートを使用するには、次のものが必要です。

Example

次の例では、識別子 8279d543-063c-4d63-9926-dae38e35ce8bを使用してパイプラインの更新をトリガーするエアフロー DAG を作成します。

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('ldp',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_IDをワークスペースへのエアフロー接続の識別子に置き換えます。

この例を airflow/dags ディレクトリに保存し、エアフロー UI を使用して DAG を 表示およびトリガー します。 パイプライン UI を使用して、パイプラインの更新の詳細を表示します。

Azure Data Factory

Lakeflow Spark 宣言パイプラインと Azure Data Factory には、それぞれ、障害が発生した場合の再試行回数を構成するためのオプションが含まれています。 パイプライン 、パイプラインを呼び出す Azure Data Factory アクティビティで再試行値が構成されている場合、再試行回数は、Azure Data Factory の再試行値にパイプラインの再試行値を乗算したものです。

たとえば、パイプラインの更新が失敗した場合、Lakeflow Spark 宣言パイプラインは既定で最大 5 回更新を再試行します。 Azure Data Factory の再試行が 3 に設定されていて、パイプラインで既定値の 5 回の再試行が使用されている場合、失敗したパイプラインは最大 15 回再試行される可能性があります。 パイプラインの更新が失敗したときの過剰な再試行を回避するために、Databricks では、パイプラインを構成するときの再試行回数またはパイプラインを呼び出す Azure Data Factory アクティビティの回数を制限することをお勧めします。

パイプラインの再試行構成を変更するには、パイプラインを構成するときに pipelines.numUpdateRetryAttempts 設定を使用します。

Azure Data Factory は、データ統合と変換ワークフローを調整できるクラウドベースの ETL サービスです。 Azure Data Factory では、 ノートブック、JAR タスク、Python スクリプトなど、ワークフロー内での Azure Databricks タスクの実行を直接サポートしています。 Azure Data Factory Web アクティビティからパイプライン REST API を呼び出すことによって、ワークフローにパイプラインを含めることもできます。 たとえば、Azure Data Factory からパイプラインの更新をトリガーするには、次のようにします。

  1. データ ファクトリを作成 するか、既存のデータ ファクトリを開きます。

  2. 作成が完了したら、データ ファクトリのページを開き、[ Azure Data Factory Studio を開く ] タイルをクリックします。 Azure Data Factory ユーザー インターフェイスが表示されます。

  3. Azure Data Factory Studio ユーザー インターフェイスの [新しい] ドロップダウン メニューから [パイプライン] を選択して、新しい Azure Data Factory パイプラインを作成します。

  4. [ アクティビティ ] ツールボックスで、[ 全般] を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 [ 設定] タブをクリックし、次の値を入力します。

    セキュリティのベスト プラクティスとして、自動化されたツール、システム、スクリプト、アプリを使用して認証する場合、Databricks では、ワークスペース ユーザーではなく サービス プリンシパル に属する個人用アクセス トークンを使用することをお勧めします。 サービス プリンシパルのトークンを作成するには、「サービス プリンシパルのトークンを管理する」をご覧ください。

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates

      <get-workspace-instance> を置き換えます。

      <pipeline-id>をパイプライン識別子に置き換えます。

    • 方法: ドロップダウン メニューから POST を選択します。

    • ヘッダー: [ + 新規] をクリックします。 [名前] テキスト ボックスに「Authorization」と入力します。 [ ] テキスト ボックスに「 Bearer <personal-access-token>」と入力します。

      <personal-access-token>を Azure Databricks 個人用アクセス トークンに置き換えます。

    • 本文: 追加の要求パラメーターを渡すには、パラメーターを含む JSON ドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、 {"full_refresh": "true"}。 追加の要求パラメーターがない場合は、空の波括弧 ({}) を入力します。

Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーの [デバッグ ] をクリックします。 エラーを含む実行の出力と状態は、Azure Data Factory パイプラインの [出力 ] タブに表示されます。 パイプライン UI を使用して、パイプラインの更新の詳細を表示します。

ヒント

一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。 パイプライン updates 要求は非同期であるため、要求は更新を開始した後、更新が完了する前に返されます。パイプラインの更新に依存する Azure Data Factory パイプライン内のタスクは、更新が完了するまで待機する必要があります。 更新の完了を待機するオプションは、Lakeflow Spark 宣言パイプラインの更新をトリガーする Web アクティビティの後に Until アクティビティ を追加することです。 Until アクティビティで次の操作を行います。

  1. Wait アクティビティを追加して、更新が完了するまで構成された秒数待機します。
  2. パイプライン更新の詳細要求を使用して更新の状態を取得する Wait アクティビティの後に Web アクティビティを追加します。 応答の state フィールドは、更新が完了したかどうかなど、更新プログラムの現在の状態を返します。
  3. state フィールドの値を使用して、Until アクティビティの終了条件を設定します。 変数 の設定アクティビティ を使用して、 state 値に基づいてパイプライン変数を追加し、終了条件にこの変数を使用することもできます。