次の方法で共有


Databricks アセット バンドルのジョブにタスクを追加する

この記事では、Databricks アセット バンドルの Lakeflow ジョブに追加できるさまざまな種類のタスクの例を示します。 Databricks アセット バンドルとはを参照してください。

ほとんどのジョブ タスクの種類には、サポートされている設定の中でタスク固有のパラメーターがありますが、タスクに渡されるジョブ パラメーター 定義することもできます。 動的値参照は、ジョブ パラメーターでサポートされています。このパラメーターを使用すると、タスク間でジョブの実行に固有の値を渡すことができます。 「動的な値の参照とは」を参照してください

ジョブ タスクの設定をオーバーライドすることもできます。 Databricks アセット バンドルのジョブ タスク設定のオーバーライドを参照してください。

重要

ローカル相対パスが Git リポジトリ内の同じコンテンツを指していない可能性があるため、git_sourceに設定されたジョブ source フィールドとタスク GIT フィールドはバンドルには推奨されません。 バンドルでは、デプロイされたジョブのファイルが、デプロイ元のローカル コピーと同じであることを想定しています。

代わりに、リポジトリをローカルに複製し、タスクのソースがワークスペースになるように、このリポジトリ内にバンドル プロジェクトを設定します。

ヒント

Databricks CLI を使用して既存のジョブのリソース構成をすばやく生成するには、bundle generate job コマンドを使用できます。 バンドル コマンドを参照してください。

ノートブック タスク

このタスクを使用してノートブックを実行します。 「ジョブのノートブック タスク」を参照してください。

次の例では、ノートブック タスクをジョブに追加し、my_job_run_id という名前のジョブ パラメーターを設定します。 デプロイするノートブックのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 このタスクは、Azure Databricks ワークスペース内のデプロイされた場所からノートブックを取得します。

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > notebook_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。

Python スクリプト タスク

このタスクを使用して、Python ファイルを実行します。

次の例では、Python スクリプト タスクをジョブに追加します。 デプロイする Python ファイルのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 このタスクは、Azure Databricks ワークスペース内のデプロイされた場所から Python ファイルを取得します。

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > spark_python_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。 「ジョブの Python スクリプト タスク」も参照してください。

Python ホイール タスク

このタスクを使用して、Python ホイール ファイルを実行します。

次の例では、Python ホイール タスクをジョブに追加します。 デプロイする Python ホイール ファイルのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 Databricks アセット バンドル ライブラリの依存関係を参照してください。

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > python_wheel_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。 Databricks アセット バンドルを使用した Python ホイール ファイルのビルドジョブ用 Python ホイールタスクも参照してください。

JAR タスク

このタスクを使用して JAR を実行します。 ローカル JAR ライブラリ、またはワークスペース内のライブラリ、Unity カタログ ボリューム、または外部クラウド ストレージの場所を参照できます。 Databricks アセット バンドル ライブラリの依存関係を参照してください。

標準アクセス モードで Unity カタログ対応クラスターに Scala JAR ファイルをコンパイルしてデプロイする方法の詳細については、「 Unity カタログ クラスターに Scala JAR をデプロイする」を参照してください。

次の例では、JAR タスクをジョブに追加します。 JAR のパスは、指定されたボリュームの場所を指します。

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > spark_jar_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。 「ジョブの JAR タスク」を参照してください。

SQL ファイル タスク

このタスクを使用して、ワークスペースまたはリモート Git リポジトリ内にある SQL ファイルを実行します。

次の例では、SQL ファイル タスクをジョブに追加します。 この SQL ファイル タスクは、指定した SQL ウェアハウスを使用して、指定した SQL ファイルを実行します。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

SQL ウェアハウスの ID を取得するには、SQL ウェアハウスの設定ページを開き、[概要] タブの [名前] フィールドで、倉庫の名前の後にかっこで囲まれた ID をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > sql_task > file で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。 ジョブについては、SQL タスクを参照してください。

パイプラインのタスク

このタスクを使用してパイプラインを実行します。 「Lakeflow 宣言型パイプライン」を参照してください。

次の例では、パイプライン タスクをジョブに追加します。 このタスクは、指定されたパイプラインを実行します。

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

パイプラインの ID を取得するには、ワークスペースでパイプラインを開き、パイプラインの設定ページの [ パイプライン の詳細] タブで パイプライン ID の値をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > pipeline_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。 ジョブのパイプライン タスクを参照してください。

ダッシュボード タスク

このタスクを使用して、ダッシュボードを更新し、スナップショットをサブスクライバーに送信します。 ダッシュボード タスクの詳細については、「 ジョブのダッシュボード タスク」を参照してください。

次の例では、ダッシュボード タスクをジョブに追加します。 ジョブが実行されると、指定した ID のダッシュボードが更新されます。

resources:
  jobs:
    my-dashboard-job:
      name: my-dashboard-job
      tasks:
        - task_key: my-dashboard-task
          dashboard_task:
            dashboard_id: 11111111-1111-1111-1111-111111111111

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > dashboard_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。

dbt タスク

このタスクを使用して、1 つ以上の dbt コマンドを実行します。 dbt Cloud への接続を参照してください。

次の例では、dbt タスクをジョブに追加します。 この dbt タスクは、指定した SQL ウェアハウスを使用して、指定した dbt コマンドを実行します。

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

SQL ウェアハウスの ID を取得するには、SQL ウェアハウスの設定ページを開き、[概要] タブの [名前] フィールドで、倉庫の名前の後にかっこで囲まれた ID をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > dbt_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。 「ジョブの dbt タスク」を参照してください。

Databricks アセット バンドルには、dbt タスクを使用してジョブを定義する dbt-sql プロジェクト テンプレートと、デプロイされた dbt ジョブの dbt プロファイルも含まれています。 Databricks アセット バンドル テンプレートの詳細については、「既定のバンドル テンプレート 」を参照してください。

If/else 条件タスク

condition_task を使用すると、if/else 条件付きロジックを持つタスクをジョブに追加できます。 タスクは、他のタスクの実行を制御するために使用できる条件を評価します。 条件タスクは、クラスターの実行を必要とせず、再試行や通知をサポートしません。 if/else タスクの詳細については、「 If/else タスクを使用してジョブに分岐ロジックを追加する」を参照してください。

次の例には、条件タスクとノートブック タスクが含まれています。ノートブック タスクは、ジョブの修復の数が 5 未満の場合にのみ実行されます。

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > condition_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。

For each タスク

for_each_task を使用すると、for each ループを使用したタスクをジョブに追加できます。 このタスクは、指定されたすべての入力に対して入れ子になったタスクを実行します。 for_each_taskの詳細については、「For each タスクを使用して別のタスクをループで実行する」を参照してください。

次の例では、ジョブに for_each_task を追加します。ジョブは、別のタスクの値を順番にループし、それらを処理します。

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > for_each_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。

ジョブタスクを実行する

このタスクを使って、別のジョブを実行します。

次の例では、最初のジョブを実行する実行ジョブ タスクが、2 つめのジョブに含まれています。

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

この例では、 置換 を使用して、実行するジョブの ID を取得します。 UI からジョブの ID を取得するには、ワークスペースでジョブを開き、ジョブの設定ページの [ジョブ の詳細] タブの [ジョブ ID] の値から ID をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンスの tasks > run_job_task で定義されているジョブの作成操作の要求ペイロードの (YAML 形式で表される) を参照してください。