Compartir a través de


Adición de tareas a trabajos en conjuntos de recursos de Databricks

En este artículo se proporcionan ejemplos de varios tipos de tareas que puede agregar a trabajos de Lakeflow en Conjuntos de recursos de Databricks. Consulte ¿Qué son los conjuntos de recursos de Databricks?

La mayoría de los tipos de tareas de trabajo tienen parámetros específicos de la tarea entre sus valores admitidos, pero también puede definir parámetros de trabajo que se pasan a las tareas. Se admiten referencias de valores dinámicos para los parámetros de trabajo, lo que permite pasar valores específicos de la ejecución del trabajo entre tareas. Consulte ¿Qué es una referencia de valores dinámicos?.

También puede invalidar la configuración de la tarea. Consulte Invalidar la configuración de tareas de trabajo en conjuntos de recursos de Databricks.

Importante

No se recomienda configurar el campo de trabajo git_source en source y el campo de tarea GIT para los paquetes, ya que es posible que las rutas de acceso relativas locales no apunten al mismo contenido en el repositorio de Git. Las agrupaciones esperan que un trabajo implementado tenga los mismos archivos que la copia local desde donde se implementó.

En su lugar, clone el repositorio localmente y configure el proyecto de agrupación dentro de este repositorio, de modo que el origen de las tareas sea el área de trabajo.

Sugerencia

Para generar rápidamente la configuración de recursos para un trabajo existente mediante el Databricks CLI, puede usar el comando bundle generate job. Vea comandos de agrupación.

Tarea de cuaderno

Esta tarea se usa para ejecutar un cuaderno. Consulte Tarea de cuaderno para trabajos.

En el ejemplo siguiente se agrega una tarea de cuaderno a un trabajo y se establece un parámetro de trabajo denominado my_job_run_id. La ruta de acceso del cuaderno que se va a implementar es relativa al archivo de configuración en el que se declara esta tarea. La tarea obtiene el cuaderno de su ubicación implementada en el área de trabajo de Azure Databricks.

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

Para establecer asignaciones adicionales para esta tarea, consulte tasks > notebook_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.

Tarea de script de Python

Esta tarea se usa para ejecutar un archivo de Python.

En el ejemplo siguiente se agrega una tarea de script de Python a un trabajo. La ruta de acceso del archivo de Python que se va a implementar es relativa al archivo de configuración en el que se declara esta tarea. La tarea obtiene el archivo de Python de su ubicación implementada en el área de trabajo de Azure Databricks.

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

Para establecer asignaciones adicionales para esta tarea, consulte tasks > spark_python_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte también Tarea de script de Python para trabajos.

Tarea de paquete wheel de Python

Esta tarea se usa para ejecutar un archivo wheel de Python.

En el ejemplo siguiente se agrega una tarea de paquete wheel de Python a un trabajo. La ruta de acceso del archivo de paquete wheel de Python que se va a implementar es relativa al archivo de configuración en el que se declara esta tarea. Consulte Dependencias de la biblioteca de conjuntos de recursos de Databricks.

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

Para establecer asignaciones adicionales para esta tarea, consulte tasks > python_wheel_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte también Crear un archivo wheel de Python mediante conjuntos de recursos de Databricks y la tarea de paquete wheel de Python para trabajos.

Tarea JAR

Esta tarea se usa para ejecutar un archivo JAR. Puede hacer referencia a bibliotecas JAR locales o a las de un área de trabajo, un volumen de catálogo de Unity o una ubicación de almacenamiento en la nube externa. Consulte Dependencias de la biblioteca de conjuntos de recursos de Databricks.

Para obtener más información sobre cómo compilar e implementar archivos JAR de Scala en un clúster habilitado para catálogos de Unity en modo de acceso estándar, consulte Deploy Scala JAR on Unity Catalog clusters (Implementación de JRS de Scala en clústeres de catálogo de Unity).

En el ejemplo siguiente se agrega una tarea JAR a un trabajo. La ruta de acceso del archivo JAR se dirige a la ubicación del volumen especificada.

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

Para establecer asignaciones adicionales para esta tarea, consulte tasks > spark_jar_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte Tarea JAR para trabajos.

Tarea de archivo SQL

Esta tarea se usa para ejecutar un archivo SQL ubicado en un área de trabajo o en un repositorio de Git remoto.

En el ejemplo siguiente se agrega una tarea de archivo SQL a un trabajo. Esta tarea de archivo SQL usa el almacenamiento SQL especificado para ejecutar el archivo SQL especificado.

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

Para obtener el identificador de un almacén de SQL, abra la página de configuración de SQL Warehouse y, a continuación, copie el identificador que se encuentra entre paréntesis después del nombre del almacén en el campo Nombre de la pestaña Información general .

Para establecer asignaciones adicionales para esta tarea, consulte tasks > sql_task > file en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte Tarea SQL para trabajos.

Tarea de canalización

Usas esta tarea para ejecutar una canalización. Consulte Pipelines declarativas de Lakeflow.

En el ejemplo siguiente se agrega una tarea de canalización a un trabajo. Esta tarea ejecuta la canalización especificada.

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

Para obtener el ID de una canalización, abra la canalización en el área de trabajo y copie el valor ID de canalización en la pestaña Detalles de canalización de la página de configuración de la canalización.

Para establecer asignaciones adicionales para esta tarea, consulte tasks > pipeline_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte Tarea de canalización para trabajos.

Tarea del panel de control

Esta tarea se usa para actualizar un panel y enviar una instantánea a los suscriptores. Para obtener más información sobre la tarea del panel de control, consulte Tarea del panel de control para trabajos.

En el ejemplo siguiente se agrega una tarea del panel de control a un trabajo. Cuando se ejecuta el trabajo, se actualiza el panel con el identificador especificado.

resources:
  jobs:
    my-dashboard-job:
      name: my-dashboard-job
      tasks:
        - task_key: my-dashboard-task
          dashboard_task:
            dashboard_id: 11111111-1111-1111-1111-111111111111

Para obtener mapeos adicionales que puede establecer para esta tarea, consulte tasks > dashboard_task en el payload de la solicitud de la operación de creación del trabajo, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, expresada en formato YAML.

tarea dbt

Esta tarea se usa para ejecutar uno o varios comandos dbt. Vea Conexión a dbt Cloud.

En el ejemplo siguiente se agrega una tarea dbt a un trabajo. Esta tarea dbt usa el almacenamiento SQL especificado para ejecutar los comandos dbt especificados.

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

Para obtener el identificador de un almacén de SQL, abra la página de configuración de SQL Warehouse y, a continuación, copie el identificador que se encuentra entre paréntesis después del nombre del almacén en el campo Nombre de la pestaña Información general .

Para establecer asignaciones adicionales para esta tarea, consulte tasks > dbt_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte tarea dbt para trabajos.

Databricks Asset Bundles también incluye una plantilla de proyecto dbt-sql que define un trabajo con una tarea dbt, así como perfiles de dbt para trabajos dbt implementados. Para obtener información sobre las plantillas de conjuntos de activos de Databricks, consulte Plantillas de agrupación predeterminadas.

Tarea de condición If/else

condition_task permite agregar una tarea con lógica condicional if/else al trabajo. La tarea evalúa una condición que se puede usar para controlar la ejecución de otras tareas. La tarea de condición no requiere que un clúster se ejecute y no admita reintentos ni notificaciones. Para obtener más información sobre la tarea if/else, vea Agregar lógica de bifurcación a un trabajo con la tarea If/else.

El ejemplo siguiente contiene una tarea de condición y una tarea de cuaderno, donde la tarea del cuaderno solo se ejecuta si el número de reparaciones de trabajos es inferior a 5.

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

Para establecer asignaciones adicionales para esta tarea, consulte tasks > condition_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.

Para cada tarea

for_each_task permite agregar una tarea con un bucle for each al trabajo. La tarea ejecuta una tarea anidada para cada entrada proporcionada. Para obtener más información sobre for_each_task, vea Usar una For each tarea para ejecutar otra tarea en un bucle.

En el ejemplo siguiente se agrega un for_each_task a un trabajo, donde hace un bucle sobre los valores de otra tarea y los procesa.

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

Para establecer asignaciones adicionales para esta tarea, consulte tasks > for_each_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.

Ejecutar tarea de trabajo

Esta tarea se usa para ejecutar otro trabajo.

El ejemplo siguiente contiene una tarea de trabajo de ejecución en el segundo trabajo que ejecuta el primer trabajo.

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

En este ejemplo, se utiliza una sustitución para recuperar el identificador del trabajo que se va a ejecutar. Para obtener el identificador de un trabajo de la interfaz de usuario, abra el trabajo en el área de trabajo y copie el ID del valor ID de trabajo en la pestaña Detalles del trabajo de la página de configuración del trabajo.

Para establecer asignaciones adicionales para esta tarea, consulte tasks > run_job_task en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.