Uaktualnianie kroku przebiegu równoległego do zestawu SDK w wersji 2
W zestawie SDK w wersji 2 "Krok przebiegu równoległego" jest konsolidowany w ramach koncepcji zadania jako parallel job
. Zadanie równoległe utrzymuje ten sam cel, aby umożliwić użytkownikom przyspieszenie wykonywania zadań przez dystrybucję powtarzających się zadań w zaawansowanych klastrach obliczeniowych z wieloma węzłami. W ramach kroku przebiegu równoległego zadanie równoległe w wersji 2 zapewnia dodatkowe korzyści:
- Interfejs elastyczny, który umożliwia użytkownikowi definiowanie wielu niestandardowych danych wejściowych i wyjściowych dla zadania równoległego. Możesz połączyć je z innymi krokami, aby używać ich zawartości lub zarządzać nią w skrypicie wejściowym
- Uprość schemat wejściowy, który zastępuje
Dataset
jako dane wejściowe przy użyciu koncepcji w wersji 2data asset
. Identyfikator URI lokalnego pliku lub katalogu obiektów blob można łatwo użyć jako danych wejściowych do zadania równoległego. - Bardziej zaawansowane funkcje są opracowywane tylko w ramach zadań równoległych w wersji 2. Na przykład wznowienie nieudanego/anulowanego zadania równoległego w celu kontynuowania przetwarzania zakończonych niepowodzeniem lub nieprzetworzonych minisadów przez ponowne użycie pomyślnego wyniku w celu zaoszczędzenia zduplikowanych wysiłków.
Aby uaktualnić bieżący krok uruchamiania równoległego zestawu SDK w wersji 1 do wersji 2, musisz wykonać następujące czynności:
- Użyj polecenia
parallel_run_function
, aby utworzyć zadanie równoległe, zastępującParallelRunConfig
element iParallelRunStep
w wersji 1. - Uaktualnij potok w wersji 1 do wersji 2. Następnie wywołaj zadanie równoległe w wersji 2 jako krok w potoku w wersji 2. Zobacz , jak uaktualnić potok z wersji 1 do wersji 2 , aby uzyskać szczegółowe informacje na temat uaktualniania potoku.
Uwaga: Skrypt wprowadzania użytkownika jest zgodny z krokiem równoległego uruchamiania w wersji 1 i równoległym zadaniem w wersji 2. Możesz więc nadal używać tych samych entry_script.py podczas uaktualniania równoległego zadania uruchamiania.
W tym artykule przedstawiono porównanie scenariuszy w zestawie SDK w wersji 1 i zestawu SDK w wersji 2. W poniższych przykładach utworzymy zadanie równoległe w celu przewidywania danych wejściowych w zadaniu potoków. Zobaczysz, jak utworzyć zadanie równoległe i jak używać go w zadaniu potoku dla zestawu SDK w wersji 1 i zestawu SDK w wersji 2.
Wymagania wstępne
- Przygotowywanie środowiska zestawu SDK w wersji 2: instalowanie zestawu Azure Machine Learning SDK w wersji 2 dla języka Python
- Omówienie podstawy potoku zestawu SDK w wersji 2: Jak utworzyć potok usługi Azure Machine Learning przy użyciu zestawu Python SDK w wersji 2
Tworzenie kroku równoległego
Zestaw SDK w wersji 1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
Zestaw SDK w wersji 2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Używanie kroku równoległego w potoku
Zestaw SDK w wersji 1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
Zestaw SDK w wersji 2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mapowanie kluczowych funkcji zestawu SDK w wersji 1 i zestawu SDK w wersji 2
Funkcje w zestawie SDK w wersji 1 | Przybliżone mapowanie w zestawie SDK w wersji 2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Wyjście |
as_mount zestawu danych | Dane wejściowe |
Równoległe konfiguracje zadań i mapowanie ustawień
Zestaw SDK w wersji 1 | Zestaw SDK w wersji 2 | opis |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Środowisko, w których zostanie uruchomione zadanie szkoleniowe. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Skrypt użytkownika, który będzie uruchamiany równolegle w wielu węzłach. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Liczba zakończonych niepowodzeniem minisadów, które można zignorować w tym zadaniu równoległym. Jeśli liczba nieudanych minisadów wsadowych jest większa niż ten próg, zadanie równoległe zostanie oznaczone jako niepowodzenie. "-1" jest liczbą domyślną, co oznacza ignorowanie wszystkich zakończonych niepowodzeniem minisadów podczas zadania równoległego. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Agregowanie wszystkich zwraca z każdego przebiegu mini-batch i wyprowadza je do tego pliku. Może odwoływać się do jednego z danych wyjściowych zadania równoległego przy użyciu wyrażenia ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Opcjonalna liczba wystąpień lub węzłów używanych przez docelowy obiekt obliczeniowy. Wartość domyślna to 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Maksymalna równoległość, jaką ma każde wystąpienie obliczeniowe. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Zdefiniuj rozmiar każdej mini-partii, aby podzielić dane wejściowe. Jeśli input_data jest folderem lub zestawem plików, ta liczba definiuje liczbę plików dla każdej minisadowej partii. Na przykład 10, 100. Jeśli input_data to dane tabelaryczne z mltable klasy , ta liczba definiuje rozmiar fizyczny proporcjonalny dla każdej minisadowej partii. Domyślna jednostka to Bajt, a wartość może akceptować ciąg, taki jak 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Ścieżka lokalna lub zdalna wskazująca kod źródłowy. |
ParallelRunConfig.description | parallel_run_function.description | Przyjazny opis równoległości |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Ciąg nazwy poziomu rejestrowania, który jest zdefiniowany w ciągu "rejestrowania". Możliwe wartości to "WARNING", "INFO" i "DEBUG". (opcjonalnie wartość domyślna to "INFO". Tę wartość można ustawić za pomocą parametru PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Limit czasu w sekundach na wykonywanie niestandardowej funkcji run(). Jeśli czas wykonywania jest wyższy niż ten próg, mini-partia zostanie przerwana i oznaczona jako nieudana minisada w celu wyzwolenia ponawiania próby. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_ponownych prób | Liczba ponownych prób w przypadku niepowodzenia lub przekroczenia limitu czasu dla mini-partii. Jeśli wszystkie ponawianie prób zakończy się niepowodzeniem, mini-partia zostanie oznaczona jako nie powiodła się w wyniku obliczenia mini_batch_error_threshold. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Połączone z ustawieniem append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Liczba zakończonych niepowodzeniem minisadów, które można zignorować w tym zadaniu równoległym. Jeśli liczba nieudanych minisadów wsadowych jest większa niż ten próg, zadanie równoległe zostanie oznaczone jako niepowodzenie. "-1" jest liczbą domyślną, co oznacza ignorowanie wszystkich zakończonych niepowodzeniem minisadów podczas zadania równoległego. |
ParallelRunConfig.allowed_failed_percent | zestaw parallel_run_function.task.program_arguments --allowed_failed_percent |
Podobnie jak "allowed_failed_count", ale to ustawienie używa procentu nieudanych minisadów zamiast liczby błędów mini-partii. Zakres tego ustawienia to [0, 100]. "100" to domyślna liczba, co oznacza ignorowanie wszystkich zakończonych niepowodzeniem minisadów podczas zadania równoległego. |
ParallelRunConfig.partition_keys | W ramach rozwoju. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Słownik nazw zmiennych środowiskowych i wartości. Te zmienne środowiskowe są ustawiane w procesie wykonywania skryptu użytkownika. |
ParallelRunStep.name | parallel_run_function.name | Nazwa utworzonego zadania równoległego lub składnika. |
ParallelRunStep.inputs | parallel_run_function.inputs | Dykt danych wejściowych używanych przez ten równoległy. |
-- | parallel_run_function.input_data | Deklarowanie danych do podzielenia i przetworzenia z użyciem równoległego |
ParallelRunStep.output | parallel_run_function.outputs | Dane wyjściowe tego zadania równoległego. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Zdefiniowano razem z inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argumenty zadania równoległego. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Określ, czy równoległe dane wyjściowe będą zwracać te same dane wejściowe. |
Następne kroki
Aby uzyskać więcej informacji, zobacz dokumentację tutaj: