Udostępnij za pośrednictwem


Uaktualnianie kroku przebiegu równoległego do zestawu SDK w wersji 2

W zestawie SDK w wersji 2 "Krok przebiegu równoległego" jest konsolidowany w ramach koncepcji zadania jako parallel job. Zadanie równoległe utrzymuje ten sam cel, aby umożliwić użytkownikom przyspieszenie wykonywania zadań przez dystrybucję powtarzających się zadań w zaawansowanych klastrach obliczeniowych z wieloma węzłami. W ramach kroku przebiegu równoległego zadanie równoległe w wersji 2 zapewnia dodatkowe korzyści:

  • Interfejs elastyczny, który umożliwia użytkownikowi definiowanie wielu niestandardowych danych wejściowych i wyjściowych dla zadania równoległego. Możesz połączyć je z innymi krokami, aby używać ich zawartości lub zarządzać nią w skrypicie wejściowym
  • Uprość schemat wejściowy, który zastępuje Dataset jako dane wejściowe przy użyciu koncepcji w wersji 2 data asset . Identyfikator URI lokalnego pliku lub katalogu obiektów blob można łatwo użyć jako danych wejściowych do zadania równoległego.
  • Bardziej zaawansowane funkcje są opracowywane tylko w ramach zadań równoległych w wersji 2. Na przykład wznowienie nieudanego/anulowanego zadania równoległego w celu kontynuowania przetwarzania zakończonych niepowodzeniem lub nieprzetworzonych minisadów przez ponowne użycie pomyślnego wyniku w celu zaoszczędzenia zduplikowanych wysiłków.

Aby uaktualnić bieżący krok uruchamiania równoległego zestawu SDK w wersji 1 do wersji 2, musisz wykonać następujące czynności:

  • Użyj polecenia parallel_run_function , aby utworzyć zadanie równoległe, zastępując ParallelRunConfig element i ParallelRunStep w wersji 1.
  • Uaktualnij potok w wersji 1 do wersji 2. Następnie wywołaj zadanie równoległe w wersji 2 jako krok w potoku w wersji 2. Zobacz , jak uaktualnić potok z wersji 1 do wersji 2 , aby uzyskać szczegółowe informacje na temat uaktualniania potoku.

Uwaga: Skrypt wprowadzania użytkownika jest zgodny z krokiem równoległego uruchamiania w wersji 1 i równoległym zadaniem w wersji 2. Możesz więc nadal używać tych samych entry_script.py podczas uaktualniania równoległego zadania uruchamiania.

W tym artykule przedstawiono porównanie scenariuszy w zestawie SDK w wersji 1 i zestawu SDK w wersji 2. W poniższych przykładach utworzymy zadanie równoległe w celu przewidywania danych wejściowych w zadaniu potoków. Zobaczysz, jak utworzyć zadanie równoległe i jak używać go w zadaniu potoku dla zestawu SDK w wersji 1 i zestawu SDK w wersji 2.

Wymagania wstępne

Tworzenie kroku równoległego

  • Zestaw SDK w wersji 1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • Zestaw SDK w wersji 2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Używanie kroku równoległego w potoku

  • Zestaw SDK w wersji 1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • Zestaw SDK w wersji 2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mapowanie kluczowych funkcji zestawu SDK w wersji 1 i zestawu SDK w wersji 2

Funkcje w zestawie SDK w wersji 1 Przybliżone mapowanie w zestawie SDK w wersji 2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Wyjście
as_mount zestawu danych Dane wejściowe

Równoległe konfiguracje zadań i mapowanie ustawień

Zestaw SDK w wersji 1 Zestaw SDK w wersji 2 opis
ParallelRunConfig.environment parallel_run_function.task.environment Środowisko, w których zostanie uruchomione zadanie szkoleniowe.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Skrypt użytkownika, który będzie uruchamiany równolegle w wielu węzłach.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Liczba zakończonych niepowodzeniem minisadów, które można zignorować w tym zadaniu równoległym. Jeśli liczba nieudanych minisadów wsadowych jest większa niż ten próg, zadanie równoległe zostanie oznaczone jako niepowodzenie.

"-1" jest liczbą domyślną, co oznacza ignorowanie wszystkich zakończonych niepowodzeniem minisadów podczas zadania równoległego.
ParallelRunConfig.output_action parallel_run_function.append_row_to Agregowanie wszystkich zwraca z każdego przebiegu mini-batch i wyprowadza je do tego pliku. Może odwoływać się do jednego z danych wyjściowych zadania równoległego przy użyciu wyrażenia ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Opcjonalna liczba wystąpień lub węzłów używanych przez docelowy obiekt obliczeniowy. Wartość domyślna to 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Maksymalna równoległość, jaką ma każde wystąpienie obliczeniowe.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Zdefiniuj rozmiar każdej mini-partii, aby podzielić dane wejściowe.

Jeśli input_data jest folderem lub zestawem plików, ta liczba definiuje liczbę plików dla każdej minisadowej partii. Na przykład 10, 100.

Jeśli input_data to dane tabelaryczne z mltableklasy , ta liczba definiuje rozmiar fizyczny proporcjonalny dla każdej minisadowej partii. Domyślna jednostka to Bajt, a wartość może akceptować ciąg, taki jak 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Ścieżka lokalna lub zdalna wskazująca kod źródłowy.
ParallelRunConfig.description parallel_run_function.description Przyjazny opis równoległości
ParallelRunConfig.logging_level parallel_run_function.logging_level Ciąg nazwy poziomu rejestrowania, który jest zdefiniowany w ciągu "rejestrowania". Możliwe wartości to "WARNING", "INFO" i "DEBUG". (opcjonalnie wartość domyślna to "INFO". Tę wartość można ustawić za pomocą parametru PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Limit czasu w sekundach na wykonywanie niestandardowej funkcji run(). Jeśli czas wykonywania jest wyższy niż ten próg, mini-partia zostanie przerwana i oznaczona jako nieudana minisada w celu wyzwolenia ponawiania próby.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_ponownych prób Liczba ponownych prób w przypadku niepowodzenia lub przekroczenia limitu czasu dla mini-partii. Jeśli wszystkie ponawianie prób zakończy się niepowodzeniem, mini-partia zostanie oznaczona jako nie powiodła się w wyniku obliczenia mini_batch_error_threshold.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Połączone z ustawieniem append_row_to .
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Liczba zakończonych niepowodzeniem minisadów, które można zignorować w tym zadaniu równoległym. Jeśli liczba nieudanych minisadów wsadowych jest większa niż ten próg, zadanie równoległe zostanie oznaczone jako niepowodzenie.

"-1" jest liczbą domyślną, co oznacza ignorowanie wszystkich zakończonych niepowodzeniem minisadów podczas zadania równoległego.
ParallelRunConfig.allowed_failed_percent zestaw parallel_run_function.task.program_arguments
--allowed_failed_percent
Podobnie jak "allowed_failed_count", ale to ustawienie używa procentu nieudanych minisadów zamiast liczby błędów mini-partii.

Zakres tego ustawienia to [0, 100]. "100" to domyślna liczba, co oznacza ignorowanie wszystkich zakończonych niepowodzeniem minisadów podczas zadania równoległego.
ParallelRunConfig.partition_keys W ramach rozwoju.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Słownik nazw zmiennych środowiskowych i wartości. Te zmienne środowiskowe są ustawiane w procesie wykonywania skryptu użytkownika.
ParallelRunStep.name parallel_run_function.name Nazwa utworzonego zadania równoległego lub składnika.
ParallelRunStep.inputs parallel_run_function.inputs Dykt danych wejściowych używanych przez ten równoległy.
-- parallel_run_function.input_data Deklarowanie danych do podzielenia i przetworzenia z użyciem równoległego
ParallelRunStep.output parallel_run_function.outputs Dane wyjściowe tego zadania równoległego.
ParallelRunStep.side_inputs parallel_run_function.inputs Zdefiniowano razem z inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Argumenty zadania równoległego.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Określ, czy równoległe dane wyjściowe będą zwracać te same dane wejściowe.

Następne kroki

Aby uzyskać więcej informacji, zobacz dokumentację tutaj: