Compartir a través de


Tutorial: Ejecución de una carga de trabajo paralela con Azure Batch mediante la API de Python

Use Azure Batch para ejecutar aplicaciones de informática de alto rendimiento (HPC) en paralelo y a gran escala, de manera eficaz en Azure. En este tutorial se explica un ejemplo de Python para ejecutar una carga de trabajo paralela mediante Batch. Aprenderá lo que es un flujo de trabajo de aplicación de Batch común y a interactuar con los recursos de Batch y Storage mediante programación.

  • Autenticarse en las cuentas de Batch y Storage.
  • Cargar archivos de entrada en Storage.
  • Crear un grupo de nodos de proceso para ejecutar una aplicación.
  • Crear un proceso de trabajo y asignar tareas para procesar archivos de entrada.
  • Supervise la ejecución de las tareas.
  • Recuperación de archivos de salida.

En este tutorial, convertiremos archivos multimedia MP4 a formato MP3 en paralelo con la herramienta de código de abierto ffmpeg.

Si no tiene una cuenta de Azure, cree una cuenta gratuita antes de comenzar.

Prerrequisitos

Inicio de sesión en Azure

Inicie sesión en Azure Portal.

Obtención de credenciales de la cuenta

En este ejemplo, debe especificar las credenciales de las cuentas de Batch y Storage. Azure Portal es una manera sencilla de obtener las credenciales necesarias. (También se pueden obtener estas credenciales mediante las API de Azure o las herramientas de línea de comandos).

  1. Seleccione Todos los servicios>Cuentas de Batch y, después, seleccione el nombre de la cuenta de Batch.

  2. Para ver las credenciales de Batch, haga clic en Claves. Copie los valores de cuenta de Batch, URL y Clave de acceso principal en un editor de texto.

  3. Para ver el nombre y las claves de la cuenta de almacenamiento, seleccione Cuenta de almacenamiento. Copie los valores de Nombre de la cuenta de almacenamiento y Key1 en un editor de texto.

Descarga y ejecución de la aplicación de ejemplo

Descarga de la aplicación de ejemplo

Descargue o clone la aplicación de ejemplo desde GitHub. Para clonar el repositorio de la aplicación de ejemplo con un cliente de Git, use el siguiente comando:

git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git

Vaya al directorio que contiene el archivo batch_python_tutorial_ffmpeg.py.

En el entorno de Python, instale los paquetes necesarios mediante pip.

pip install -r requirements.txt

Use un editor de código para abrir el archivo config.py. Actualice las cadenas de credenciales de la cuenta de Storage y de Batch con los valores únicos correspondientes. Por ejemplo:

_BATCH_ACCOUNT_NAME = 'yourbatchaccount'
_BATCH_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxE+yXrRvJAqT9BlXwwo1CwF+SwAYOxxxxxxxxxxxxxxxx43pXi/gdiATkvbpLRl3x14pcEQ=='
_BATCH_ACCOUNT_URL = 'https://yourbatchaccount.yourbatchregion.batch.azure.com'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'
_STORAGE_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxy4/xxxxxxxxxxxxxxxxfwpbIC5aAWA8wDu+AFXZB827Mt9lybZB1nUcQbQiUrkPtilK5BQ=='

Ejecución de la aplicación

Para ejecutar el script:

python batch_python_tutorial_ffmpeg.py

Al ejecutar la aplicación de ejemplo, la salida de la consola es similar a la siguiente. Durante la ejecución, se experimenta una pausa en Monitoring all tasks for 'Completed' state, timeout in 00:30:00... mientras se inician los nodos de ejecución del grupo.

Sample start: 11/28/2018 3:20:21 PM

Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....

Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742

Vaya a la cuenta de Batch en Azure Portal para supervisar el grupo, los nodos de ejecución, el trabajo y las tareas. Por ejemplo, para ver un mapa térmico de los nodos de proceso del grupo, seleccione Grupos>LinuxFfmpegPool.

Con las tareas en ejecución, el mapa térmico es similar al siguiente:

Captura de pantalla del mapa térmico de la piscina.

El tiempo de ejecución típico es de aproximadamente 5 minutos cuando se ejecuta la aplicación en su configuración predeterminada. La creación del pool es lo que más tiempo lleva.

Recuperación de archivos de salida

Puede usar Azure Portal para descargar los archivos MP3 de salida generados por las tareas de ffmpeg.

  1. Haga clic en Todos los servicios>Cuentas de almacenamiento y haga clic en el nombre de la cuenta de almacenamiento.
  2. Haga clic en Blobs>salida.
  3. Haga clic con el botón derecho en uno de los archivos MP3 de salida y, después, haga clic en Descargar. Para abrir o guardar el archivo, siga las indicaciones del explorador.

Descargar un archivo de salida

Aunque no se muestra en este ejemplo, los archivos también se pueden descargar mediante programación desde los nodos de proceso o desde el contenedor de almacenamiento.

Revisión del código

En las secciones siguientes se desglosan los pasos que lleva a cabo la aplicación de ejemplo para procesar una carga de trabajo en el servicio Batch. Consulte el código de Python mientras lee el resto de este artículo, ya que no se describe todas las líneas de código del ejemplo.

Autenticación de los clientes de Blob y Batch

Para interactuar con una cuenta de almacenamiento, la aplicación usa el paquete azure-storage-blob para crear un objeto BlockBlobService .

blob_client = azureblob.BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

La aplicación crea un objeto BatchServiceClient para crear y administrar grupos, trabajos y tareas en el servicio Batch. El cliente de Batch del ejemplo usa la autenticación de clave compartida. Batch también admite la autenticación a través de Microsoft Entra ID, para autenticar usuarios individuales o una aplicación desatendida.

credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                             _BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    base_url=_BATCH_ACCOUNT_URL)

Carga de archivos de entrada

La aplicación usa la blob_client referencia para crear un contenedor de almacenamiento para los archivos MP4 de entrada y un contenedor para la salida de la tarea. A continuación, llama a la upload_file_to_container función para cargar archivos MP4 en el directorio InputFiles local en el contenedor. Los archivos de almacenamiento se definen como objetos ResourceFile que Batch puede descargar después en nodos de ejecución.

blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
input_file_paths = []

for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
    for filename in files:
        if filename.endswith(".mp4"):
            input_file_paths.append(os.path.abspath(
                os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

Crear un grupo de nodos de cómputo

A continuación, el ejemplo crea un grupo de nodos de cómputo en la cuenta de Batch con una llamada a create_pool. Esta función definida usa la clase Batch PoolAddParameter para establecer el número de nodos, el tamaño de la máquina virtual y una configuración de grupo. Aquí, un objeto VirtualMachineConfiguration especifica una imageReference a una imagen de Ubuntu Server 20.04 LTS publicada en Azure Marketplace. Batch es compatible con una amplia gama de imágenes de máquina virtual de Azure Marketplace, así como con las imágenes de máquina virtual personalizadas.

El número de nodos y el tamaño de la máquina virtual se establecen mediante constantes definidas. Batch admite nodos dedicados y nodos Spot, y puedes usar cualquiera de ellos o ambos en tus grupos. Los nodos dedicados están reservados para el grupo. Los nodos de acceso puntual se ofrecen a precio reducido por la capacidad sobrante de las máquinas virtuales de Azure. Los nodos de acceso puntual no estarán disponibles si Azure no tiene suficiente capacidad. El ejemplo crea de forma predeterminada un grupo que contiene solo cinco nodos de spot de tamaño Standard_A1_v2.

Además de las propiedades del nodo físico, esta configuración del grupo incluye un objeto StartTask . StartTask se ejecuta en cada nodo a medida que ese nodo se une al grupo y cada vez que se reinicia un nodo. En este ejemplo, StartTask ejecuta comandos de shell de Bash para instalar el paquete ffmpeg y las dependencias en los nodos.

El método pool.add envía el grupo al servicio Batch.

new_pool = batch.models.PoolAddParameter(
    id=pool_id,
    virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
        image_reference=batchmodels.ImageReference(
            publisher="Canonical",
            offer="UbuntuServer",
            sku="20.04-LTS",
            version="latest"
        ),
        node_agent_sku_id="batch.node.ubuntu 20.04"),
    vm_size=_POOL_VM_SIZE,
    target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
    target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
    start_task=batchmodels.StartTask(
        command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
        wait_for_success=True,
        user_identity=batchmodels.UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin)),
    )
)
batch_service_client.pool.add(new_pool)

Crear un trabajo

Un trabajo de Batch especifica un grupo en el que ejecutar tareas y valores de configuración opcionales, como la prioridad y la programación del trabajo. En el ejemplo se crea un trabajo con una llamada a create_job. Esta función definida usa la clase JobAddParameter para crear un trabajo en el grupo. El método job.add envía el grupo al servicio Batch. Inicialmente, el trabajo no tiene tareas.

job = batch.models.JobAddParameter(
    id=job_id,
    pool_info=batch.models.PoolInformation(pool_id=pool_id))

batch_service_client.job.add(job)

Crear tareas

La aplicación crea tareas en el trabajo con una llamada a add_tasks. Esta función definida crea una lista de objetos de tarea mediante la clase TaskAddParameter . Cada tarea ejecuta ffmpeg para procesar un objeto de entrada resource_files mediante un command_line parámetro . Al crear el grupo, ffmpeg se instaló previamente en todos los nodos. En este caso, la línea de comandos ejecuta ffmpeg para convertir los archivos MP4 de entrada (vídeo) en archivos MP3 (audio).

En el ejemplo se crea un objeto OutputFile para el archivo MP3 después de ejecutar la línea de comandos. Los archivos de salida de cada tarea (uno, en este caso) se cargan en un contenedor de la cuenta de almacenamiento vinculada mediante la propiedad de la output_files tarea.

A continuación, la aplicación agrega tareas al trabajo con el método task.add_collection , que los pone en cola para ejecutarse en los nodos de proceso.

tasks = list()

for idx, input_file in enumerate(input_files):
    input_file_path = input_file.file_path
    output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
    command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
        input_file_path, output_file_path)
    tasks.append(batch.models.TaskAddParameter(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file],
        output_files=[batchmodels.OutputFile(
            file_pattern=output_file_path,
            destination=batchmodels.OutputFileDestination(
                container=batchmodels.OutputFileBlobContainerDestination(
                    container_url=output_container_sas_url)),
            upload_options=batchmodels.OutputFileUploadOptions(
                upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
    )
    )
batch_service_client.task.add_collection(job_id, tasks)

Supervisión de tareas

Cuando se agregan tareas a un trabajo, Batch las pone automáticamente en cola y las programa para su ejecución en los nodos de proceso del grupo asociado. Según la configuración que especifique, Batch controla la administración de las colas, la programación, los reintentos y otras labores de administración de tareas.

Hay muchos enfoques para supervisar la ejecución de tareas. La wait_for_tasks_to_complete función de este ejemplo usa el objeto TaskState para supervisar las tareas de un estado determinado, en este caso el estado completado, dentro de un límite de tiempo.

while datetime.datetime.now() < timeout_expiration:
    print('.', end='')
    sys.stdout.flush()
    tasks = batch_service_client.task.list(job_id)

    incomplete_tasks = [task for task in tasks if
                        task.state != batchmodels.TaskState.completed]
    if not incomplete_tasks:
        print()
        return True
    else:
        time.sleep(1)
...

Limpieza de recursos

Después de ejecutar las tareas, la aplicación elimina automáticamente el contenedor de almacenamiento de entrada que creó y ofrece la opción de eliminar el grupo y el trabajo de Batch. Las clases JobOperations y PoolOperations de BatchClient tienen métodos de eliminación, a los que se llama en caso de que confirme la eliminación. Aunque no se cobran los trabajos y las tareas, sí se cobran los nodos de ejecución. Por consiguiente, se recomienda asignar los grupos solo según sea necesario. Al eliminar el grupo, todos los resultados de las tareas en los nodos también se eliminan. Sin embargo, los archivos de entrada y salida permanecen en la cuenta de almacenamiento.

Cuando ya no los necesite, elimine el grupo de recursos, la cuenta de Batch y la de Storage. Para ello, en Azure Portal, seleccione el grupo de recursos de la cuenta de Batch y elija Eliminar grupo de recursos.

Pasos siguientes

En este tutorial, ha aprendido a:

  • Autenticarse en las cuentas de Batch y Storage.
  • Cargar archivos de entrada en Storage.
  • Crear un grupo de nodos de proceso para ejecutar una aplicación.
  • Crear un proceso de trabajo y asignar tareas para procesar archivos de entrada.
  • Supervise la ejecución de las tareas.
  • Recuperación de archivos de salida.

Para obtener más ejemplos de uso de la API de Python para programar y procesar cargas de trabajo de Batch, consulte los ejemplos de Python de Batch en GitHub.