Öğretici: Python API’si kullanarak Azure Batch ile paralel iş yükü çalıştırma

Büyük ölçekli paralel ve yüksek performanslı bilgi işlem (HPC) toplu işlerini Azure’da verimli bir şekilde çalıştırmak için Azure Batch’i kullanın. Bu öğreticide, Batch kullanarak paralel iş yükü çalıştırmaya ilişkin bir Python örneği açıklanmaktadır. Genel bir Batch uygulaması iş akışı hakkında bilgi alacak ve Batch ve Depolama kaynakları ile programlı olarak etkileşimde bulunmayı öğreneceksiniz.

  • Batch ve Depolama hesaplarıyla kimlik doğrulaması.
  • Giriş dosyalarını Depolama yükleyin.
  • Bir uygulamayı çalıştırmak için işlem düğümleri havuzu oluşturun.
  • Giriş dosyalarını işlemek için bir iş ve görevler oluşturun.
  • Görev yürütmeyi izleyin.
  • Çıkış dosyalarını alın.

Bu öğreticide, ffmpeg açık kaynak aracını kullanarak MP4 medya dosyalarını paralel olarak MP3 biçimine dönüştürebilirsiniz.

Azure aboneliğiniz yoksa başlamadan önce birücretsiz Azure hesabı oluşturun.

Önkoşullar

Azure'da oturum açma

Azure Portal’ında oturum açın.

Hesap kimlik bilgilerini alma

Bu örnekte, Batch ve depolama hesaplarınız için kimlik bilgileri sağlamanız gerekir. Gerekli kimlik bilgilerini almanın kolay yolu Azure portalındadır. (Bu kimlik bilgilerini ayrıca Azure API'lerini veya komut satırı araçlarını kullanarak da alabilirsiniz.)

  1. Tüm hizmetler>Batch hesapları'nı ve ardından Batch hesabınızın adını seçin.

  2. Batch kimlik bilgilerini görmek için Anahtarlar'ı seçin. Batch hesabı, URL ve Birincil erişim anahtarı değerlerini metin düzenleyiciye kopyalayın.

  3. Depolama hesap adını ve anahtarlarını görmek için Depolama hesabı seçin. Depolama hesabı adı ve Key1 değerlerini bir metin düzenleyiciye kopyalayın.

Örnek uygulamayı indirme ve çalıştırma

Örnek uygulamayı indirme

GitHub’dan örnek uygulamayı indirin veya kopyalayın. Örnek uygulama deposunu bir Git istemcisi ile kopyalamak için aşağıdaki komutu kullanın:

git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git

batch_python_tutorial_ffmpeg.py dosyasını içeren dizine gidin.

Python ortamınızda pip kullanarak gerekli paketleri yükleyin.

pip install -r requirements.txt

Config.py dosyasını açmak için bir kod düzenleyicisi kullanın. Batch ve depolama hesabı kimlik bilgilerini, hesaplarınıza özel değerlerle güncelleştirin. Örneğin:

_BATCH_ACCOUNT_NAME = 'yourbatchaccount'
_BATCH_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxE+yXrRvJAqT9BlXwwo1CwF+SwAYOxxxxxxxxxxxxxxxx43pXi/gdiATkvbpLRl3x14pcEQ=='
_BATCH_ACCOUNT_URL = 'https://yourbatchaccount.yourbatchregion.batch.azure.com'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'
_STORAGE_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxy4/xxxxxxxxxxxxxxxxfwpbIC5aAWA8wDu+AFXZB827Mt9lybZB1nUcQbQiUrkPtilK5BQ=='

Uygulamayı çalıştırma

Betiği çalıştırmak için:

python batch_python_tutorial_ffmpeg.py

Örnek uygulamayı çalıştırdığınızda, konsol çıktısı aşağıdakine benzer. Yürütme sırasında, havuzun işlem düğümleri başlatıldığı sırada Monitoring all tasks for 'Completed' state, timeout in 00:30:00... konumunda bir duraklama yaşarsınız.

Sample start: 11/28/2018 3:20:21 PM

Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....

Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742

Havuz, işlem düğümleri, iş ve görevleri izlemek için Azure portalında Batch hesabınıza gidin. Örneğin, havuzunuzdaki işlem düğümlerinin ısı haritasını görmek için Havuzlar>LinuxFmpegPool'u seçin.

Görevler çalıştırılırken ısı haritası aşağıdakine benzer:

Screenshot of Pool heat map.

Uygulama varsayılan yapılandırmasında çalıştırıldığında tipik yürütme süresi yaklaşık 5 dakikadır. En uzun süreyi havuz oluşturma işlemi alır.

Çıkış dosyalarını alma

ffmpeg görevleri tarafından oluşturulan çıkış MP3 dosyalarını indirmek için Azure portalını kullanabilirsiniz.

  1. Tüm hizmetler>Depolama hesapları seçeneğine ve sonra da depolama hesabınızın adına tıklayın.
  2. Bloblar>çıkış seçeneğine tıklayın.
  3. Çıkış MP3 dosyalarından birine sağ tıklayın ve ardından İndir’e tıklayın. Dosyayı açmak veya kaydetmek için tarayıcınızdaki yönergeleri izleyin.

Download output file

Bu örnekte gösterilmese de dosyaları programlamayla işlem düğümlerinden veya depolama kapsayıcısından indirebilirsiniz.

Kodu gözden geçirin

Aşağıdaki bölümlerde örnek uygulama, Batch hizmetinde iş yükünü işlemeyi gerçekleştiren adımlara ayrılmıştır. Örnekteki her bir kod satırı ele alınmadığı için, bu makalenin geri kalanını okurken Python koduna bakın.

Blob ve Batch istemcilerinde kimlik doğrulaması

Bir depolama hesabıyla etkileşimde bulunmak için uygulama, azure-storage-blob paketini kullanarak bir BlockBlobService nesnesi oluşturur.

blob_client = azureblob.BlockBlobService(
    account_name=_STORAGE_ACCOUNT_NAME,
    account_key=_STORAGE_ACCOUNT_KEY)

Uygulama, Batch hizmetinde havuz, iş ve görevleri oluşturup yönetmek üzere bir BatchServiceClient nesnesi oluşturur. Örnekteki Batch istemcisi, paylaşılan anahtar kimlik doğrulaması kullanır. Batch ayrıca tek tek kullanıcıların veya katılımsız bir uygulamanın kimliğini doğrulamak için Microsoft Entra Kimliği aracılığıyla kimlik doğrulamasını da destekler.

credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                             _BATCH_ACCOUNT_KEY)

batch_client = batch.BatchServiceClient(
    credentials,
    base_url=_BATCH_ACCOUNT_URL)

Giriş dosyalarını karşıya yükleme

Uygulama, giriş MP4 dosyaları için bir depolama kapsayıcısı ve görev çıkışı için bir kapsayıcı oluşturmak üzere blob_client başvurusunu kullanır. Ardından yerel InputFiles dizinindeki MP4 dosyalarını kapsayıcıya yüklemek için işlevini çağırırupload_file_to_container. Depolama alanındaki dosyalar, Batch hizmetinin daha sonra işlem düğümlerine indirebileceği Batch ResourceFile nesneleri olarak tanımlanır.

blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
input_file_paths = []

for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
    for filename in files:
        if filename.endswith(".mp4"):
            input_file_paths.append(os.path.abspath(
                os.path.join(folder, filename)))

# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
    upload_file_to_container(blob_client, input_container_name, file_path)
    for file_path in input_file_paths]

İşlem düğümleri havuzu oluşturma

Ardından örnek, create_pool çağrısıyla Batch hesabında bir işlem düğümü havuzu oluşturur. Bu tanımlı işlev; düğüm sayısını, VM boyutunu ve havuz yapılandırmasını ayarlamak üzere Batch PoolAddParameter sınıfını kullanır. Burada VirtualMachineConfiguration nesnesi, Azure Market yayımlanan bir Ubuntu Server 20.04 LTS görüntüsüne ImageReference belirtir. Batch, Azure Market’te çok çeşitli VM görüntülerinin yanı sıra özel VM görüntülerini destekler.

Düğüm sayısı ve VM boyutu, tanımlı sabitler kullanılarak ayarlanır. Batch ayrılmış düğümleri ve Spot düğümleri destekler ve havuzlarınızda ya da her ikisini birden kullanabilirsiniz. Adanmış düğümler, havuzunuz için ayrılmıştır. Spot düğümler, Azure'daki fazla VM kapasitesinden daha düşük bir fiyata sunulur. Azure yeterli kapasiteye sahip değilse spot düğümler kullanılamaz duruma gelir. Örnek varsayılan olarak Standard_A1_v2 boyutu yalnızca beş Spot düğümü içeren bir havuz oluşturur.

Fiziksel düğüm özelliklerine ek olarak, bu havuz yapılandırması bir StartTask nesnesi içerir. StartTask, her düğümü havuza katıldığında ve her yeniden başlatıldığında yürütecektir. Bu örnekte StartTask, ffmpeg paketini ve bağımlılıkları düğümlere yüklemek için Bash kabuk komutları çalıştırır.

Pool.add yöntemi, havuzu Batch hizmetine gönderir.

new_pool = batch.models.PoolAddParameter(
    id=pool_id,
    virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
        image_reference=batchmodels.ImageReference(
            publisher="Canonical",
            offer="UbuntuServer",
            sku="20.04-LTS",
            version="latest"
        ),
        node_agent_sku_id="batch.node.ubuntu 20.04"),
    vm_size=_POOL_VM_SIZE,
    target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
    target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
    start_task=batchmodels.StartTask(
        command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
        wait_for_success=True,
        user_identity=batchmodels.UserIdentity(
            auto_user=batchmodels.AutoUserSpecification(
                scope=batchmodels.AutoUserScope.pool,
                elevation_level=batchmodels.ElevationLevel.admin)),
    )
)
batch_service_client.pool.add(new_pool)

İş oluşturma

Bir Batch işi, üzerinde görevlerin çalıştırılacağı bir havuz ve iş için öncelik ile zamanlama gibi isteğe bağlı ayarları belirtir. Örnek, create_job çağrısıyla bir iş oluşturur. Bu tanımlı işlev, havuzunuzda bir iş oluşturmak üzere JobAddParameter sınıfını kullanır. Job.add yöntemi, havuzu Batch hizmetine gönderir. Başlangıçta iş hiçbir görev içermez.

job = batch.models.JobAddParameter(
    id=job_id,
    pool_info=batch.models.PoolInformation(pool_id=pool_id))

batch_service_client.job.add(job)

Görev oluşturma

Uygulama, add_tasks çağrısıyla iş içinde görevler oluşturur. Bu tanımlı işlev, TaskAddParameter sınıfını kullanarak görev nesnelerinin bir listesini oluşturur. Her görev, bir command_line parametresi kullanarak giriş resource_files nesnesini işlemek üzere ffmpeg çalıştırır. ffmpeg, daha önce havuz oluşturulduğunda her bir düğüme yüklenmiştir. Burada komut satırı, her bir giriş MP4 (video) dosyasını bir MP3 (ses) dosyasına dönüştürmek için ffmpeg çalıştırır.

Örnek, komut satırını çalıştırdıktan sonra MP3 dosyası için bir OutputFile nesnesi oluşturur. Her bir görevin çıkış dosyaları (bu örnekte bir tane), görevin output_files özelliği kullanılarak bağlı depolama hesabındaki bir kapsayıcıya yüklenir.

Sonra uygulama, task.add_collection yöntemi ile görevleri işe ekler ve işlem düğümleri üzerinde çalışmak üzere kuyruğa alır.

tasks = list()

for idx, input_file in enumerate(input_files):
    input_file_path = input_file.file_path
    output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
    command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
        input_file_path, output_file_path)
    tasks.append(batch.models.TaskAddParameter(
        id='Task{}'.format(idx),
        command_line=command,
        resource_files=[input_file],
        output_files=[batchmodels.OutputFile(
            file_pattern=output_file_path,
            destination=batchmodels.OutputFileDestination(
                container=batchmodels.OutputFileBlobContainerDestination(
                    container_url=output_container_sas_url)),
            upload_options=batchmodels.OutputFileUploadOptions(
                upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
    )
    )
batch_service_client.task.add_collection(job_id, tasks)

Görevleri izleme

Bir işe görevler eklendiğinde, Batch bu görevleri ilişkili havuzdaki işlem düğümleri üzerinde yürütülmek üzere otomatik olarak kuyruğa alır ve zamanlar. Belirttiğiniz ayarlara göre, Batch tüm kuyruğa alma, zamanlama, yeniden deneme görevlerini ve diğer görev yönetimi sorumluluklarını yerine getirir.

Görevin yürütülüşünün izlenmesi için birçok yaklaşım vardır. Bu örnekteki wait_for_tasks_to_complete işlevi, belirli bir durum (bu örnekte tamamlanmış durum) için bir süre boyunca görevleri izlemek üzere TaskState nesnesini kullanır.

while datetime.datetime.now() < timeout_expiration:
    print('.', end='')
    sys.stdout.flush()
    tasks = batch_service_client.task.list(job_id)

    incomplete_tasks = [task for task in tasks if
                        task.state != batchmodels.TaskState.completed]
    if not incomplete_tasks:
        print()
        return True
    else:
        time.sleep(1)
...

Kaynakları temizleme

Görevleri çalıştırdıktan sonra, uygulama kendi oluşturduğu giriş depolama kapsayıcısını otomatik olarak siler ve Batch havuzu ve işini silme seçeneğini sunar. BatchClient'ın JobOperations ve PoolOperations sınıflarının her ikisi de, silmeyi onaylamanız durumunda çağrılan silme yöntemleri içerir. İşlerin ve görevlerin kendileri için sizden ücret alınmasa da işlem düğümleri için ücret alınır. Bu nedenle, havuzları yalnızca gerektiğinde ayırmanız önerilir. Havuzu sildiğinizde düğümler üzerindeki tüm görev çıkışları silinir. Ancak, giriş ve çıkış dosyaları depolama hesabında kalır.

Kaynak grubunu, Batch hesabını ve depolama hesabını artık gerekli değilse silin. Azure portalında bunu yapmak için Batch hesabının kaynak grubunu seçin ve Kaynak grubunu sil'i seçin.

Sonraki adımlar

Bu öğreticide, şunların nasıl yapıldığını öğrendiniz:

  • Batch ve Depolama hesaplarıyla kimlik doğrulaması.
  • Giriş dosyalarını Depolama yükleyin.
  • Bir uygulamayı çalıştırmak için işlem düğümleri havuzu oluşturun.
  • Giriş dosyalarını işlemek için bir iş ve görevler oluşturun.
  • Görev yürütmeyi izleyin.
  • Çıkış dosyalarını alın.

Batch iş yüklerini zamanlamak ve işlemek için Python API'sini kullanma hakkında daha fazla örnek için GitHub'da Batch Python örneklerine bakın.