Paralel çalıştırma adımlarını SDK v2'ye yükseltme
SDK v2'de "Paralel çalıştırma adımı" olarak parallel job
iş kavramında birleştirilir. Paralel iş, güçlü çok düğümlü işlem kümelerine yinelenen görevler dağıtarak kullanıcıların iş yürütmelerini hızlandırmalarını sağlamak için aynı hedefi korur. Paralel çalıştırma adımının yanı sıra, v2 paralel işi ek avantajlar sağlar:
- Kullanıcının paralel işiniz için birden çok özel giriş ve çıkış tanımlamasına olanak tanıyan esnek arabirim. Giriş betiğinizde içeriklerini kullanmak veya yönetmek için bunları diğer adımlarla bağlayabilirsiniz
- v2
data asset
kavramını kullanarak giriş olarak değiştirilenDataset
giriş şemasını basitleştirin. Yerel dosyalarınızı veya blob dizin URI'nizi paralel işe giriş olarak kolayca kullanabilirsiniz. - Daha güçlü özellikler yalnızca v2 paralel işlerinde geliştiriliyor. Örneğin, yinelenen çabayı kaydetmek için başarılı sonucu yeniden kullanarak başarısız veya işlenmemiş mini toplu işlemleri işlemeye devam etmek için başarısız/iptal edilmiş paralel işi sürdürebilirsiniz.
Geçerli sdk v1 paralel çalıştırma adımınızı v2'ye yükseltmek için
- ve değerini v1'de değiştirerek
ParallelRunConfig
ParallelRunStep
paralel iş oluşturmak için kullanınparallel_run_function
. - v1 işlem hattınızı v2'ye yükseltin. Ardından v2 işlem hattınızda bir adım olarak v2 paralel işinizi çağırabilirsiniz. İşlem hattı yükseltmesi hakkında ayrıntılı bilgi için bkz. İşlem hattını v1'den v2'ye yükseltme.
Not: Kullanıcı giriş betiği v1 paralel çalıştırma adımı ile v2 paralel işi arasında uyumludur. Böylece paralel çalıştırma işinizi yükseltirken aynı entry_script.py'yi kullanmaya devam edebilirsiniz.
Bu makalede SDK v1 ve SDK v2'deki senaryoların karşılaştırması yer alır. Aşağıdaki örneklerde, bir işlem hattı işinde giriş verilerini tahmin etmek için paralel bir iş oluşturacağız. Paralel iş oluşturmayı ve hem SDK v1 hem de SDK v2 için işlem hattı işinde kullanmayı öğreneceksiniz.
Önkoşullar
- SDK v2 ortamınızı hazırlama: Python için Azure Machine Learning SDK v2'yi yükleme
- SDK v2 işlem hattının temelini anlama: Python SDK v2 ile Azure Machine Learning işlem hattı oluşturma
Paralel adım oluşturma
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
İşlem hattında paralel adım kullanma
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
SDK v1 ve SDK v2'deki temel işlevleri eşleme
SDK v1'deki İşlevsellik | SDK v2'de kaba eşleme |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Çıkış |
veri kümesi as_mount | Giriş |
Paralel iş yapılandırmaları ve ayarlar eşlemesi
SDK v1 | SDK v2 | Description |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Eğitim işinin çalıştırılacağı ortam. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Birden çok düğümde paralel olarak çalıştırılacak kullanıcı betiği. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Bu paralel işte yoksayılabilir başarısız mini toplu iş sayısı. Başarısız olan mini toplu işin sayısı bu eşikten yüksekse paralel iş başarısız olarak işaretlenir. "-1", varsayılan sayıdır ve paralel iş sırasında başarısız olan tüm mini toplu işleri yoksaymak anlamına gelir. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Her mini toplu iş çalıştırmasından tüm dönüşleri toplayarak bu dosyaya aktarın. ${{outputs ifadesini kullanarak paralel işin çıkışlarından birine başvurabilir.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | İşlem hedefi tarafından kullanılan isteğe bağlı örnek veya düğüm sayısı. Varsayılan değer 1'tir. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Her işlem örneğinin sahip olduğu maksimum paralellik. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Girişi bölmek için her mini toplu işlemin boyutunu tanımlayın. input_data bir klasör veya dosya kümesiyse, bu sayı her mini toplu iş için dosya sayısını tanımlar. Örneğin, 10, 100. input_data öğesinden mltable tablo verileriyse, bu sayı her mini toplu iş için yakın fiziksel boyutu tanımlar. Varsayılan birim Bayt'tır ve değer 100 kb, 100 mb gibi bir dize kabul edebilir. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Kaynak kodu gösteren yerel veya uzak yol. |
ParallelRunConfig.description | parallel_run_function.description | Paralelin kolay açıklaması |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | 'Günlüğe kaydetme' içinde tanımlanan günlük düzeyi adı dizesi. Olası değerler 'UYARI', 'BİlGİ' ve 'HATA AYıKLAMA' değerleridir. (isteğe bağlı, varsayılan değer 'BİlGİ'dir.) Bu değer PipelineParameter aracılığıyla ayarlanabilir. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Özel run() işlevini yürütmek için saniye olarak zaman aşımı. Yürütme süresi bu eşikten yüksekse, mini toplu iş durdurulacak ve yeniden denemeyi tetikleyecek başarısız bir mini toplu iş olarak işaretlenecektir. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Mini toplu işlem başarısız olduğunda veya zaman aşımı olduğunda yeniden deneme sayısı. Tüm yeniden denemeler başarısız olursa, mini toplu işlem mini_batch_error_threshold hesaplama tarafından sayılamadı olarak işaretlenir. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Ayarıyla birlikte append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Bu paralel işte yoksayılabilir başarısız mini toplu iş sayısı. Başarısız olan mini toplu işin sayısı bu eşikten yüksekse paralel iş başarısız olarak işaretlenir. "-1", varsayılan sayıdır ve paralel iş sırasında başarısız olan tüm mini toplu işleri yoksaymak anlamına gelir. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments kümesi--allowed_failed_percent |
"allowed_failed_count" ayarına benzer, ancak bu ayar mini toplu iş hata sayısı yerine başarısız olan mini toplu işlem yüzdesini kullanır. Bu ayarın aralığı [0, 100] şeklindedir. "100", varsayılan sayıdır ve paralel iş sırasında başarısız olan tüm mini toplu işleri yoksaymak anlamına gelir. |
ParallelRunConfig.partition_keys | Geliştirme aşamasında. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Ortam değişkenlerinin adlarını ve değerlerini içeren bir sözlük. Bu ortam değişkenleri, kullanıcı betiğinin yürütüldüğü işlemde ayarlanır. |
ParallelRunStep.name | parallel_run_function.name | Oluşturulan paralel işin veya bileşenin adı. |
ParallelRunStep.inputs | parallel_run_function.inputs | Bu paralel tarafından kullanılan girişlerin diktesi. |
-- | parallel_run_function.input_data | Bölünecek ve paralel olarak işlenecek verileri bildirme |
ParallelRunStep.output | parallel_run_function.outputs | Bu paralel işin çıkışları. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | ile inputs birlikte tanımlanır. |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Paralel görevin bağımsız değişkenleri. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Paralelin aynı girişte aynı çıkışı döndüreceğini belirtin. |
Sonraki adımlar
Daha fazla bilgi için buradaki belgelere bakın:
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin