Samouczek: uruchamianie równoległego obciążenia w usłudze Azure Batch przy użyciu interfejsu API Python

Usługa Azure Batch umożliwia wydajne uruchamianie równoległych zadań wsadowych oraz zadań wsadowych obliczeń o wysokiej wydajności na platformie Azure. W tym samouczku przedstawiono przykład uruchamiania równoległego obciążenia za pomocą usługi Azure Batch i języka Python. Poznasz prosty przepływ pracy aplikacji usługi Azure Batch i sposób pracy programowej z zasobami usług Azure Batch i Storage.

  • Uwierzytelnianie przy użyciu kont usługi Batch i usługi Storage.
  • Przekazywanie plików wejściowych do usługi Storage.
  • Utwórz pulę węzłów obliczeniowych, aby uruchomić aplikację.
  • Utwórz zadanie i zadania przetwarzania plików wejściowych.
  • Monitorowanie wykonywania zadań.
  • Pobieranie plików wyjściowych.

W tym samouczku przekonwertujesz pliki multimedialne MP4 na format MP3, równolegle przy użyciu narzędzia open source ffmpeg .

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto platformy Azure.

Wymagania wstępne

Logowanie się do platformy Azure

Zaloguj się w witrynie Azure Portal.

Uzyskiwanie poświadczeń konta

W tym przykładzie należy podać poświadczenia dla kont usług Batch i Storage. Najprościej jest uzyskać wymagane poświadczenia w witrynie Azure Portal. (Te poświadczenia możesz również uzyskać za pomocą interfejsów API platformy Azure lub narzędzi wiersza polecenia).

  1. Wybierz pozycję Wszystkie konta usługi>Batch, a następnie wybierz nazwę konta usługi Batch.

  2. Aby wyświetlić poświadczenia usługi Batch, wybierz pozycję Klucze. Skopiuj wartości z pól Konto usługi Batch, Adres URL i Podstawowy klucz dostępu do edytora tekstów.

  3. Aby wyświetlić nazwę i klucze konta magazynu, wybierz pozycję Konto magazynu. Skopiuj wartości z pól Nazwa konta usługi Storage i Klucz1 do edytora tekstów.

Pobieranie i uruchamianie przykładowej aplikacji

Pobieranie przykładowej aplikacji

Pobierz lub sklonuj przykładową aplikację z usługi GitHub. Aby sklonować repozytorium przykładowej aplikacji za pomocą klienta Git, użyj następującego polecenia:

git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git

Przejdź do katalogu zawierającego batch_python_tutorial_ffmpeg.py pliku.

W środowisku Python zainstaluj wymagane pakiety przy użyciu menedżera pip.

pip install -r requirements.txt

Użyj edytora kodu, aby otworzyć config.py pliku. Zaktualizuj ciągi poświadczeń konta usługi Batch i konta magazynu, podając wartości unikatowe dla Twoich kont. Na przykład:

_BATCH_ACCOUNT_NAME = 'yourbatchaccount'
_BATCH_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxE+yXrRvJAqT9BlXwwo1CwF+SwAYOxxxxxxxxxxxxxxxx43pXi/gdiATkvbpLRl3x14pcEQ=='
_BATCH_ACCOUNT_URL = 'https://yourbatchaccount.yourbatchregion.batch.azure.com'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'
_STORAGE_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxy4/xxxxxxxxxxxxxxxxfwpbIC5aAWA8wDu+AFXZB827Mt9lybZB1nUcQbQiUrkPtilK5BQ=='

Uruchom aplikację

Aby uruchomić skrypt:

python batch_python_tutorial_ffmpeg.py

Po uruchomieniu aplikacji przykładowej dane wyjściowe w konsoli będą wyglądać mniej więcej następująco. W czasie wykonywania nastąpi wstrzymanie operacji w momencie wyświetlenia komunikatu Monitoring all tasks for 'Completed' state, timeout in 00:30:00... podczas uruchamiania węzłów obliczeniowych puli.

Sample start: 11/28/2018 3:20:21 PM

Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....

Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742

Przejdź do konta usługi Batch w witrynie Azure Portal, aby monitorować pulę, węzły obliczeniowe, zadanie i zadania podrzędne. Aby na przykład wyświetlić mapę cieplną węzłów obliczeniowych w puli, wybierz pozycję Pule>LinuxFfmpegPool.

Podczas wykonywania zadań podrzędnych mapa cieplna może wyglądać następująco:

Screenshot of Pool heat map.

Typowy czas wykonywania wynosi mniej więcej 5 minut w przypadku uruchomienia aplikacji w konfiguracji domyślnej. Tworzenie puli zajmuje najwięcej czasu.

Pobieranie plików wyjściowych

Przy użyciu witryny Azure Portal można pobrać wyjściowe pliki MP3 wygenerowane przez zadania ffmpeg.

  1. Kliknij pozycję Wszystkie usługi>Konta usługi Storage i kliknij nazwę odpowiedniego konta usługi Storage.
  2. Kliknij pozycję Obiekty Blob>dane wyjściowe.
  3. Kliknij prawym przyciskiem myszy jeden z wyjściowych plików MP3, a następnie kliknij polecenie Pobierz. Postępuj zgodnie z monitami wyświetlanymi w przeglądarce, aby otworzyć lub zapisać plik.

Download output file

Mimo że nie pokazano tego w tym przykładzie, pliki można również pobrać programowo z węzłów obliczeniowych lub z kontenera magazynu.

Przeglądanie kodu

W poniższych sekcjach przykładowa aplikacja została podzielona na kroki wykonywane w celu przetworzenia obciążenia w usłudze Batch. Podczas czytania dalszej części tego artykułu obserwuj kod języka Python, ponieważ nie omówiono tu wszystkich wierszy kodu z próbki.

Uwierzytelnianie klientów obiektów blob i usługi Batch

Aby nawiązać interakcję z kontem magazynu, aplikacja używa pakietu azure-storage-blob do utworzenia obiektu BlockBlobService.

blob_client = azureblob.BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

Aplikacja tworzy obiekt BatchServiceClient na potrzeby tworzenia pul, zadań i zadań podrzędnych w usłudze Batch oraz zarządzania nimi. Klient usługi Batch w przykładzie korzysta z uwierzytelniania za pomocą klucza wspólnego. Usługa Batch obsługuje również uwierzytelnianie za pośrednictwem identyfikatora Entra firmy Microsoft w celu uwierzytelniania poszczególnych użytkowników lub aplikacji nienadzorowanej.

credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                             _BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    base_url=_BATCH_ACCOUNT_URL)

Przekazywanie plików wejściowych

W aplikacji odwołanie blob_client jest używane do utworzenia kontenera magazynu dla plików wejściowych w formacie MP4 oraz kontenera dla danych wyjściowych zadań podrzędnych. Następnie wywołuje upload_file_to_container funkcję w celu przekazania plików MP4 w lokalnym katalogu InputFiles do kontenera. Pliki w magazynie są definiowane jako obiekty ResourceFile usługi Batch, które następnie mogą być pobierane przez tę usługę do węzłów obliczeniowych.

blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
input_file_paths = []

for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
    for filename in files:
        if filename.endswith(".mp4"):
            input_file_paths.append(os.path.abspath(
                os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

Tworzenie puli węzłów obliczeniowych

Następnie w przykładzie tworzona jest pula węzłów obliczeniowych na koncie usługi Batch z wywołaniem funkcji create_pool. Ta zdefiniowana funkcja określa liczbę węzłów, rozmiar maszyny wirtualnej i konfigurację puli za pomocą klasy PoolAddParameter usługi Batch. W tym miejscu obiekt VirtualMachineConfiguration określa obraz ImageReference z systemem Ubuntu Server 20.04 LTS opublikowany w witrynie Azure Marketplace. Usługa Batch obsługuje szeroki zakres obrazów maszyn wirtualnych z witryny Azure Marketplace oraz niestandardowe obrazy maszyn wirtualnych.

Liczba węzłów i rozmiar maszyny wirtualnej są ustawiane przy użyciu zdefiniowanych stałych. Usługa Batch obsługuje dedykowane węzły i węzły typu spot, a w pulach można używać obu tych węzłów. Węzły dedykowane są zarezerwowane dla Twojej puli. Węzły typu spot są oferowane w obniżonej cenie od nadwyżkowej pojemności maszyny wirtualnej na platformie Azure. Węzły typu spot stają się niedostępne, jeśli platforma Azure nie ma wystarczającej pojemności. Przykład domyślnie tworzy pulę zawierającą tylko pięć węzłów typu Spot o rozmiarze Standard_A1_v2.

Oprócz fizycznych właściwości węzłów konfiguracja puli zawiera również obiekt StartTask. Funkcja StartTask jest wykonywana w każdym węźle, gdy tylko ten węzeł zostanie dołączony do puli, oraz za każdym razem, gdy węzeł będzie uruchamiany ponownie. W tym przykładzie obiekt StartTask uruchamia polecenia powłoki Bash w celu zainstalowania pakietu ffmpeg i jego zależności w węzłach.

Metoda pool.add przesyła pulę do usługi Batch.

new_pool = batch.models.PoolAddParameter(
    id=pool_id,
    virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
        image_reference=batchmodels.ImageReference(
            publisher="Canonical",
            offer="UbuntuServer",
            sku="20.04-LTS",
            version="latest"
        ),
        node_agent_sku_id="batch.node.ubuntu 20.04"),
    vm_size=_POOL_VM_SIZE,
    target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
    target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
    start_task=batchmodels.StartTask(
        command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
        wait_for_success=True,
        user_identity=batchmodels.UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin)),
    )
)
batch_service_client.pool.add(new_pool)

Tworzenie zadania

Zadanie usługi Batch określa pulę, w której będą uruchamiane zadania podrzędne, wraz z ustawieniami opcjonalnymi, takimi jak priorytet i harmonogram pracy. Przykładowa aplikacja tworzy zadanie z wywołaniem create_job. Ta zdefiniowana funkcja tworzy zadanie w puli za pomocą klasy JobAddParameter. Metoda job.add przesyła pulę do usługi Batch. Początkowo zadanie nie zawiera zadań podrzędnych.

job = batch.models.JobAddParameter(
    id=job_id,
    pool_info=batch.models.PoolInformation(pool_id=pool_id))

batch_service_client.job.add(job)

Tworzenie zadań

Aplikacja tworzy zadania podrzędne w ramach zadania przy użyciu wywołania funkcji add_tasks. Ta zdefiniowana funkcja tworzy listę obiektów zadań podrzędnych przy użyciu klasy TaskAddParameter. Każde zadanie podrzędne uruchamia narzędzie ffmpeg w celu przetworzenia wejściowego obiektu resource_files za pomocą parametru command_line. Narzędzie ffmpeg zostało już zainstalowane na wszystkich węzłach podczas tworzenia puli. Tutaj wiersz polecenia jest używany do uruchomienia narzędzia ffmpeg w celu przekonwertowania każdego z plików wejściowych w formacie MP4 (wideo) na format MP3 (audio).

Przykładowa aplikacja tworzy obiekt OutputFile dla pliku MP3 po uruchomieniu wiersza polecenia. Pliki wyjściowe z każdego zadania podrzędnego (w tym przypadku jeden plik) są przekazywane do kontenera na połączonym koncie magazynu przy użyciu właściwości output_files w tym zadaniu podrzędnym.

Następnie aplikacja dodaje zadania podrzędne do zadania za pomocą metody task.add_collection, która tworzy kolejkę zadań podrzędnych do uruchomienia w węzłach obliczeniowych.

tasks = list()

for idx, input_file in enumerate(input_files):
    input_file_path = input_file.file_path
    output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
    command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
        input_file_path, output_file_path)
    tasks.append(batch.models.TaskAddParameter(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file],
        output_files=[batchmodels.OutputFile(
            file_pattern=output_file_path,
            destination=batchmodels.OutputFileDestination(
                container=batchmodels.OutputFileBlobContainerDestination(
                    container_url=output_container_sas_url)),
            upload_options=batchmodels.OutputFileUploadOptions(
                upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
    )
    )
batch_service_client.task.add_collection(job_id, tasks)

Monitorowanie podzadań

Po dodaniu zadań podrzędnych do zadania usługa Batch automatycznie dodaje je do kolejki i planuje ich wykonanie w węzłach obliczeniowych powiązanej puli. Na podstawie określonych przez użytkownika ustawień usługa Batch obsługuje dodawanie zadań podrzędnych do kolejki, ich planowanie, ponawianie prób ich wykonania oraz inne czynności administracyjne.

Istnieje wiele podejść do wykonywania monitorowania podzadań. Funkcja wait_for_tasks_to_complete w tym przykładzie zawiera obiekt TaskState używany do monitorowania zadań podrzędnych pod kątem określonego stanu, w tym przypadku ukończenia, w wybranym limicie czasu.

while datetime.datetime.now() < timeout_expiration:
    print('.', end='')
    sys.stdout.flush()
    tasks = batch_service_client.task.list(job_id)

    incomplete_tasks = [task for task in tasks if
                        task.state != batchmodels.TaskState.completed]
    if not incomplete_tasks:
        print()
        return True
    else:
        time.sleep(1)
...

Czyszczenie zasobów

Po wykonaniu zadań podrzędnych aplikacja automatycznie usuwa utworzony wejściowy kontener magazynu, a opcjonalnie także pulę i zadanie usługi Batch. Dla obu klas JobOperations i PoolOperations klienta BatchClient istnieją metody usuwania, które są wywoływane, jeśli potwierdzisz usunięcie. Mimo że nie są naliczane opłaty za same zadania i zadania podrzędne, są naliczane opłaty za węzły obliczeniowe. W związku z tym zaleca się przydzielanie pul stosownie do potrzeb. W przypadku usunięcia puli usuwane są również wszystkie dane wyjściowe zadań podrzędnych w węzłach. Pliki wejściowe i wyjściowe pozostają jednak na koncie magazynu.

Gdy grupa zasobów, konto usługi Batch i konto magazynu nie będą już potrzebne, usuń je. Aby to zrobić w witrynie Azure Portal, wybierz grupę zasobów dla konta usługi Batch i wybierz pozycję Usuń grupę zasobów.

Następne kroki

W tym samouczku zawarto informacje na temat wykonywania następujących czynności:

  • Uwierzytelnianie przy użyciu kont usługi Batch i usługi Storage.
  • Przekazywanie plików wejściowych do usługi Storage.
  • Utwórz pulę węzłów obliczeniowych, aby uruchomić aplikację.
  • Utwórz zadanie i zadania przetwarzania plików wejściowych.
  • Monitorowanie wykonywania zadań.
  • Pobieranie plików wyjściowych.

Aby uzyskać więcej przykładów użycia interfejsu API języka Python do planowania i przetwarzania obciążeń usługi Batch, zobacz Przykłady języka Python usługi Batch w witrynie GitHub.