Aracılığıyla paylaş


Paralel çalıştırma adımlarını SDK v2'ye yükseltme

SDK v2'de "Paralel çalıştırma adımı" olarak parallel jobiş kavramında birleştirilir. Paralel iş, güçlü çok düğümlü işlem kümelerine yinelenen görevler dağıtarak kullanıcıların iş yürütmelerini hızlandırmalarını sağlamak için aynı hedefi korur. Paralel çalıştırma adımının yanı sıra, v2 paralel işi ek avantajlar sağlar:

  • Kullanıcının paralel işiniz için birden çok özel giriş ve çıkış tanımlamasına olanak tanıyan esnek arabirim. Giriş betiğinizde içeriklerini kullanmak veya yönetmek için bunları diğer adımlarla bağlayabilirsiniz
  • v2 data asset kavramını kullanarak giriş olarak değiştirilen Dataset giriş şemasını basitleştirin. Yerel dosyalarınızı veya blob dizin URI'nizi paralel işe giriş olarak kolayca kullanabilirsiniz.
  • Daha güçlü özellikler yalnızca v2 paralel işlerinde geliştiriliyor. Örneğin, yinelenen çabayı kaydetmek için başarılı sonucu yeniden kullanarak başarısız veya işlenmemiş mini toplu işlemleri işlemeye devam etmek için başarısız/iptal edilmiş paralel işi sürdürebilirsiniz.

Geçerli sdk v1 paralel çalıştırma adımınızı v2'ye yükseltmek için

  • ve değerini v1'de değiştirerek ParallelRunConfigParallelRunStep paralel iş oluşturmak için kullanınparallel_run_function.
  • v1 işlem hattınızı v2'ye yükseltin. Ardından v2 işlem hattınızda bir adım olarak v2 paralel işinizi çağırabilirsiniz. İşlem hattı yükseltmesi hakkında ayrıntılı bilgi için bkz. İşlem hattını v1'den v2'ye yükseltme.

Not: Kullanıcı giriş betiği v1 paralel çalıştırma adımı ile v2 paralel işi arasında uyumludur. Böylece paralel çalıştırma işinizi yükseltirken aynı entry_script.py'yi kullanmaya devam edebilirsiniz.

Bu makalede SDK v1 ve SDK v2'deki senaryoların karşılaştırması yer alır. Aşağıdaki örneklerde, bir işlem hattı işinde giriş verilerini tahmin etmek için paralel bir iş oluşturacağız. Paralel iş oluşturmayı ve hem SDK v1 hem de SDK v2 için işlem hattı işinde kullanmayı öğreneceksiniz.

Önkoşullar

Paralel adım oluşturma

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

İşlem hattında paralel adım kullanma

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

SDK v1 ve SDK v2'deki temel işlevleri eşleme

SDK v1'deki İşlevsellik SDK v2'de kaba eşleme
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Çıkış
veri kümesi as_mount Giriş

Paralel iş yapılandırmaları ve ayarlar eşlemesi

SDK v1 SDK v2 Description
ParallelRunConfig.environment parallel_run_function.task.environment Eğitim işinin çalıştırılacağı ortam.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Birden çok düğümde paralel olarak çalıştırılacak kullanıcı betiği.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Bu paralel işte yoksayılabilir başarısız mini toplu iş sayısı. Başarısız olan mini toplu işin sayısı bu eşikten yüksekse paralel iş başarısız olarak işaretlenir.

"-1", varsayılan sayıdır ve paralel iş sırasında başarısız olan tüm mini toplu işleri yoksaymak anlamına gelir.
ParallelRunConfig.output_action parallel_run_function.append_row_to Her mini toplu iş çalıştırmasından tüm dönüşleri toplayarak bu dosyaya aktarın. ${{outputs ifadesini kullanarak paralel işin çıkışlarından birine başvurabilir.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count İşlem hedefi tarafından kullanılan isteğe bağlı örnek veya düğüm sayısı. Varsayılan değer 1'tir.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Her işlem örneğinin sahip olduğu maksimum paralellik.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Girişi bölmek için her mini toplu işlemin boyutunu tanımlayın.

input_data bir klasör veya dosya kümesiyse, bu sayı her mini toplu iş için dosya sayısını tanımlar. Örneğin, 10, 100.

input_data öğesinden mltabletablo verileriyse, bu sayı her mini toplu iş için yakın fiziksel boyutu tanımlar. Varsayılan birim Bayt'tır ve değer 100 kb, 100 mb gibi bir dize kabul edebilir.
ParallelRunConfig.source_directory parallel_run_function.task.code Kaynak kodu gösteren yerel veya uzak yol.
ParallelRunConfig.description parallel_run_function.description Paralelin kolay açıklaması
ParallelRunConfig.logging_level parallel_run_function.logging_level 'Günlüğe kaydetme' içinde tanımlanan günlük düzeyi adı dizesi. Olası değerler 'UYARI', 'BİlGİ' ve 'HATA AYıKLAMA' değerleridir. (isteğe bağlı, varsayılan değer 'BİlGİ'dir.) Bu değer PipelineParameter aracılığıyla ayarlanabilir.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Özel run() işlevini yürütmek için saniye olarak zaman aşımı. Yürütme süresi bu eşikten yüksekse, mini toplu iş durdurulacak ve yeniden denemeyi tetikleyecek başarısız bir mini toplu iş olarak işaretlenecektir.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Mini toplu işlem başarısız olduğunda veya zaman aşımı olduğunda yeniden deneme sayısı. Tüm yeniden denemeler başarısız olursa, mini toplu işlem mini_batch_error_threshold hesaplama tarafından sayılamadı olarak işaretlenir.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Ayarıyla birlikte append_row_to .
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Bu paralel işte yoksayılabilir başarısız mini toplu iş sayısı. Başarısız olan mini toplu işin sayısı bu eşikten yüksekse paralel iş başarısız olarak işaretlenir.

"-1", varsayılan sayıdır ve paralel iş sırasında başarısız olan tüm mini toplu işleri yoksaymak anlamına gelir.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments kümesi
--allowed_failed_percent
"allowed_failed_count" ayarına benzer, ancak bu ayar mini toplu iş hata sayısı yerine başarısız olan mini toplu işlem yüzdesini kullanır.

Bu ayarın aralığı [0, 100] şeklindedir. "100", varsayılan sayıdır ve paralel iş sırasında başarısız olan tüm mini toplu işleri yoksaymak anlamına gelir.
ParallelRunConfig.partition_keys Geliştirme aşamasında.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Ortam değişkenlerinin adlarını ve değerlerini içeren bir sözlük. Bu ortam değişkenleri, kullanıcı betiğinin yürütüldüğü işlemde ayarlanır.
ParallelRunStep.name parallel_run_function.name Oluşturulan paralel işin veya bileşenin adı.
ParallelRunStep.inputs parallel_run_function.inputs Bu paralel tarafından kullanılan girişlerin diktesi.
-- parallel_run_function.input_data Bölünecek ve paralel olarak işlenecek verileri bildirme
ParallelRunStep.output parallel_run_function.outputs Bu paralel işin çıkışları.
ParallelRunStep.side_inputs parallel_run_function.inputs ile inputsbirlikte tanımlanır.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Paralel görevin bağımsız değişkenleri.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Paralelin aynı girişte aynı çıkışı döndüreceğini belirtin.

Sonraki adımlar

Daha fazla bilgi için buradaki belgelere bakın: