Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo se proporcionan ejemplos de varios tipos de tareas que puede agregar a trabajos de Lakeflow en Conjuntos de recursos de Databricks. Consulte ¿Qué son los conjuntos de recursos de Databricks?
La mayoría de los tipos de tareas de trabajo tienen parámetros específicos de la tarea entre sus valores admitidos, pero también puede definir parámetros de trabajo que se pasan a las tareas. Se admiten referencias de valores dinámicos para los parámetros de trabajo, lo que permite pasar valores específicos de la ejecución del trabajo entre tareas. Consulte ¿Qué es una referencia de valores dinámicos?.
También puede invalidar la configuración de la tarea. Consulte Invalidar la configuración de tareas de trabajo en conjuntos de recursos de Databricks.
Importante
No se recomienda configurar el campo de trabajo git_source
en source
y el campo de tarea GIT
para los paquetes, ya que es posible que las rutas de acceso relativas locales no apunten al mismo contenido en el repositorio de Git. Las agrupaciones esperan que un trabajo implementado tenga los mismos archivos que la copia local desde donde se implementó.
En su lugar, clone el repositorio localmente y configure el proyecto de agrupación dentro de este repositorio, de modo que el origen de las tareas sea el área de trabajo.
Sugerencia
Para generar rápidamente la configuración de recursos para un trabajo existente mediante el Databricks CLI, puede usar el comando bundle generate job
. Vea comandos de agrupación.
Tarea de cuaderno
Esta tarea se usa para ejecutar un cuaderno. Consulte Tarea de cuaderno para trabajos.
En el ejemplo siguiente se agrega una tarea de cuaderno a un trabajo y se establece un parámetro de trabajo denominado my_job_run_id
. La ruta de acceso del cuaderno que se va a implementar es relativa al archivo de configuración en el que se declara esta tarea. La tarea obtiene el cuaderno de su ubicación implementada en el área de trabajo de Azure Databricks.
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: '{{job.run_id}}'
Para establecer asignaciones adicionales para esta tarea, consulte tasks > notebook_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.
Tarea de script de Python
Esta tarea se usa para ejecutar un archivo de Python.
En el ejemplo siguiente se agrega una tarea de script de Python a un trabajo. La ruta de acceso del archivo de Python que se va a implementar es relativa al archivo de configuración en el que se declara esta tarea. La tarea obtiene el archivo de Python de su ubicación implementada en el área de trabajo de Azure Databricks.
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
Para establecer asignaciones adicionales para esta tarea, consulte tasks > spark_python_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte también Tarea de script de Python para trabajos.
Tarea de paquete wheel de Python
Esta tarea se usa para ejecutar un archivo wheel de Python.
En el ejemplo siguiente se agrega una tarea de paquete wheel de Python a un trabajo. La ruta de acceso del archivo de paquete wheel de Python que se va a implementar es relativa al archivo de configuración en el que se declara esta tarea. Consulte Dependencias de la biblioteca de conjuntos de recursos de Databricks.
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
Para establecer asignaciones adicionales para esta tarea, consulte tasks > python_wheel_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte también Crear un archivo wheel de Python mediante conjuntos de recursos de Databricks y la tarea de paquete wheel de Python para trabajos.
Tarea JAR
Esta tarea se usa para ejecutar un archivo JAR. Puede hacer referencia a bibliotecas JAR locales o a las de un área de trabajo, un volumen de catálogo de Unity o una ubicación de almacenamiento en la nube externa. Consulte Dependencias de la biblioteca de conjuntos de recursos de Databricks.
Para obtener más información sobre cómo compilar e implementar archivos JAR de Scala en un clúster habilitado para catálogos de Unity en modo de acceso estándar, consulte Deploy Scala JAR on Unity Catalog clusters (Implementación de JRS de Scala en clústeres de catálogo de Unity).
En el ejemplo siguiente se agrega una tarea JAR a un trabajo. La ruta de acceso del archivo JAR se dirige a la ubicación del volumen especificada.
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
Para establecer asignaciones adicionales para esta tarea, consulte tasks > spark_jar_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte Tarea JAR para trabajos.
Tarea de archivo SQL
Esta tarea se usa para ejecutar un archivo SQL ubicado en un área de trabajo o en un repositorio de Git remoto.
En el ejemplo siguiente se agrega una tarea de archivo SQL a un trabajo. Esta tarea de archivo SQL usa el almacenamiento SQL especificado para ejecutar el archivo SQL especificado.
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
Para obtener el identificador de un almacén de SQL, abra la página de configuración de SQL Warehouse y, a continuación, copie el identificador que se encuentra entre paréntesis después del nombre del almacén en el campo Nombre de la pestaña Información general .
Para establecer asignaciones adicionales para esta tarea, consulte tasks > sql_task > file
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte Tarea SQL para trabajos.
Tarea de canalización
Usas esta tarea para ejecutar una canalización. Consulte Pipelines declarativas de Lakeflow.
En el ejemplo siguiente se agrega una tarea de canalización a un trabajo. Esta tarea ejecuta la canalización especificada.
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
Para obtener el ID de una canalización, abra la canalización en el área de trabajo y copie el valor ID de canalización en la pestaña Detalles de canalización de la página de configuración de la canalización.
Para establecer asignaciones adicionales para esta tarea, consulte tasks > pipeline_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte Tarea de canalización para trabajos.
Tarea del panel de control
Esta tarea se usa para actualizar un panel y enviar una instantánea a los suscriptores. Para obtener más información sobre la tarea del panel de control, consulte Tarea del panel de control para trabajos.
En el ejemplo siguiente se agrega una tarea del panel de control a un trabajo. Cuando se ejecuta el trabajo, se actualiza el panel con el identificador especificado.
resources:
jobs:
my-dashboard-job:
name: my-dashboard-job
tasks:
- task_key: my-dashboard-task
dashboard_task:
dashboard_id: 11111111-1111-1111-1111-111111111111
Para obtener mapeos adicionales que puede establecer para esta tarea, consulte tasks > dashboard_task
en el payload de la solicitud de la operación de creación del trabajo, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, expresada en formato YAML.
tarea dbt
Esta tarea se usa para ejecutar uno o varios comandos dbt. Vea Conexión a dbt Cloud.
En el ejemplo siguiente se agrega una tarea dbt a un trabajo. Esta tarea dbt usa el almacenamiento SQL especificado para ejecutar los comandos dbt especificados.
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- 'dbt deps'
- 'dbt seed'
- 'dbt run'
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: 'dbt-databricks>=1.0.0,<2.0.0'
Para obtener el identificador de un almacén de SQL, abra la página de configuración de SQL Warehouse y, a continuación, copie el identificador que se encuentra entre paréntesis después del nombre del almacén en el campo Nombre de la pestaña Información general .
Para establecer asignaciones adicionales para esta tarea, consulte tasks > dbt_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML. Consulte tarea dbt para trabajos.
Databricks Asset Bundles también incluye una plantilla de proyecto dbt-sql
que define un trabajo con una tarea dbt, así como perfiles de dbt para trabajos dbt implementados. Para obtener información sobre las plantillas de conjuntos de activos de Databricks, consulte Plantillas de agrupación predeterminadas.
Tarea de condición If/else
condition_task
permite agregar una tarea con lógica condicional if/else al trabajo. La tarea evalúa una condición que se puede usar para controlar la ejecución de otras tareas. La tarea de condición no requiere que un clúster se ejecute y no admita reintentos ni notificaciones. Para obtener más información sobre la tarea if/else, vea Agregar lógica de bifurcación a un trabajo con la tarea If/else.
El ejemplo siguiente contiene una tarea de condición y una tarea de cuaderno, donde la tarea del cuaderno solo se ejecuta si el número de reparaciones de trabajos es inferior a 5.
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: '{{job.repair_count}}'
right: '5'
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: 'true'
notebook_task:
notebook_path: ../src/notebook.ipynb
Para establecer asignaciones adicionales para esta tarea, consulte tasks > condition_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.
Para cada tarea
for_each_task
permite agregar una tarea con un bucle for each al trabajo. La tarea ejecuta una tarea anidada para cada entrada proporcionada. Para obtener más información sobre for_each_task
, vea Usar una For each
tarea para ejecutar otra tarea en un bucle.
En el ejemplo siguiente se agrega un for_each_task
a un trabajo, donde hace un bucle sobre los valores de otra tarea y los procesa.
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: '{{tasks.generate_countries_list.values.countries}}'
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
Para establecer asignaciones adicionales para esta tarea, consulte tasks > for_each_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.
Ejecutar tarea de trabajo
Esta tarea se usa para ejecutar otro trabajo.
El ejemplo siguiente contiene una tarea de trabajo de ejecución en el segundo trabajo que ejecuta el primer trabajo.
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: '13.3.x-scala2.12'
node_type_id: 'i3.xlarge'
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
En este ejemplo, se utiliza una sustitución para recuperar el identificador del trabajo que se va a ejecutar. Para obtener el identificador de un trabajo de la interfaz de usuario, abra el trabajo en el área de trabajo y copie el ID del valor ID de trabajo en la pestaña Detalles del trabajo de la página de configuración del trabajo.
Para establecer asignaciones adicionales para esta tarea, consulte tasks > run_job_task
en la carga de solicitud de la operación de creación de trabajos, tal como se define en POST /api/2.1/jobs/create en la referencia de la API REST, en formato YAML.