Upgrade des Schritts zur parallelen Ausführung auf SDK v2

In SDK v2 wird „Schritt zur parallelen Ausführung“ als parallel job in das Auftragskonzept integriert. Bei einem Parallelauftrag wird dasselbe Ziel beibehalten, sodass Benutzer die Ausführung ihrer Aufträge beschleunigen können, indem sie wiederkehrende Aufgaben auf leistungsstarke Computecluster mit mehreren Knoten verteilen. Zusätzlich zum Schritt zur parallelen Ausführung bietet ein Parallelauftrag in v2 zusätzliche Vorteile:

  • Flexible Oberfläche, auf der Benutzer mehrere benutzerdefinierte Ein- und Ausgaben für Ihren Parallelauftrag festlegen können. Sie können sie mit anderen Schritten verbinden, um ihre Inhalte in Ihrem Einstiegsskript zu nutzen oder zu verwalten.
  • Vereinfachen Sie das Eingabeschema, das Dataset als Eingabe durch das Konzept data asset von v2 ersetzt. Sie können ganz einfach Ihre lokalen Dateien oder den URI des Blobverzeichnisses als Eingaben in einen Parallelauftrag verwenden.
  • Leistungsfähigere Features sind nur für Parallelaufträge in v2 in der Entwicklung. Setzen Sie beispielsweise den fehlgeschlagenen/abgebrochenen Parallelauftrag fort, um die fehlgeschlagenen oder nicht verarbeiteten Minibatches weiter zu verarbeiten, indem Sie das erfolgreiche Ergebnis wiederverwenden, um sich so doppelten Aufwand zu ersparen.

Gehen Sie zum Upgrade Ihres aktuellen Schritts zur parallelen Ausführung von SDK v1 auf v2 wie folgt vor:

  • Verwenden Sie parallel_run_function, um einen Parallelauftrag zu erstellen, wobei Sie ParallelRunConfig und ParallelRunStep in v1 ersetzen.
  • Upgraden Sie Ihre v1-Pipeline zu v2. Rufen Sie dann Ihren v2-Parallelauftrag als Schritt in Ihrer v2-Pipeline auf. Weitere Informationen zum Pipelineupgrade finden Sie unter Upgraden von Pipelines von v1 zu v2.

Hinweis: Das Einstiegsskript ist beim Schritt zur parallelen Ausführung von v1 und v2 kompatibel. Sie können also die gleiche Datei „entry_script.py“ verwenden, wenn Sie Ihren Auftrag zur parallelen Ausführung upgraden.

Dieser Artikel enthält einen Vergleich der Szenarien in SDK v1 und SDK v2. In den folgenden Beispielen erstellen wir einen Parallelauftrag, um Eingabedaten in einem Pipelineauftrag vorherzusagen. Sie erfahren, wie Sie einen Parallelauftrag erstellen und ihn in einem Pipelineauftrag für SDK v1 und SDK v2 verwenden.

Voraussetzungen

Erstellen des parallelen Schritts

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Verwenden des parallelen Schritts in der Pipeline

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Zuordnung der wichtigsten Funktionen in SDK v1 und SDK v2

Funktionalität im SDK v1 Grobe Zuordnung in SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Ausgabe
dataset as_mount Input (Eingabe)

Konfigurationen des Parallelauftrags und Zuordnung von Einstellungen

SDK v1 SDK v2 BESCHREIBUNG
ParallelRunConfig.environment parallel_run_function.task.environment Umgebung, in der der Trainingsauftrag ausgeführt wird.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Das Benutzerskript, das parallel auf mehreren Knoten ausgeführt wird.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Die Anzahl der fehlerhaften Minibatches, die in diesem Parallelauftrag ignoriert werden könnten. Wenn die Anzahl der fehlerhaften Minibatches über diesem Schwellenwert liegt, wird der Parallelauftrag als fehlerhaft markiert.

„-1“ ist der Standardwert, der bedeutet, dass alle fehlerhaften Minibatches bei Parallelaufträgen ignoriert werden.
ParallelRunConfig.output_action parallel_run_function.append_row_to Aggregieren Sie alle Rückgaben aus jedem ausgeführten Minibatch, und geben Sie sie in dieser Datei aus. Kann mithilfe des Ausdrucks ${{outputs.<output_name>}} auf eine der Ausgaben eines Parallelauftrags verweisen
ParallelRunConfig.node_count parallel_run_function.instance_count Optionale Anzahl von Instanzen oder Knoten, die vom Computeziel verwendet werden. Der Standardwert lautet 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Die maximale Parallelität der einzelnen Compute-Instanzen.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definieren Sie die Größe der einzelnen Minibatches, um die Eingabe aufzuteilen.

Wenn input_data ein Ordner oder eine Menge von Dateien ist, bestimmt diese Zahl die Anzahl der Dateien für jeden Minibatch. Beispiel: 10, 100.

Wenn es sich bei input_data um Tabellendaten aus mltable handelt, bestimmt diese Zahl die ungefähre physische Größe der einzelnen Minibatches. Die Standardeinheit ist Byte, und für den Wert ist eine Zeichenfolge wie 100 KB, 100 MB möglich.
ParallelRunConfig.source_directory parallel_run_function.task.code Ein lokaler oder Remotepfad, der auf den Quellcode zeigt.
ParallelRunConfig.description parallel_run_function.description Eine benutzerfreundliche Beschreibung des Parallelauftrags
ParallelRunConfig.logging_level parallel_run_function.logging_level Eine Zeichenfolge mit dem Namen des Protokolliergrads, der in „logging“ definiert ist. Mögliche Werte sind „WARNING“, „INFO“ und „DEBUG“. (Optional, Standardwert ist INFO.) Dieser Wert kann über PipelineParameter festgelegt werden.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Das Timeout in Sekunden für die Ausführung der benutzerdefinierten run()-Funktion. Wenn die Ausführungszeit diesen Schwellenwert überschreitet, wird der Minibatch abgebrochen und als fehlerhafter Minibatch markiert, um eine Wiederholung auszulösen.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Die Anzahl der Wiederholungsversuche, wenn ein Minibatch fehlerhaft ist oder ein Timeout auftritt. Wenn alle Wiederholungsversuche fehlerhaft sind, wird der Minibatch als fehlerhaft markiert und mit der Berechnung von mini_batch_error_threshold gezählt.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Kombiniert mit der Einstellung append_row_to.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Die Anzahl der fehlerhaften Minibatches, die in diesem Parallelauftrag ignoriert werden könnten. Wenn die Anzahl der fehlerhaften Minibatches über diesem Schwellenwert liegt, wird der Parallelauftrag als fehlerhaft markiert.

„-1“ ist der Standardwert, der bedeutet, dass alle fehlerhaften Minibatches bei Parallelaufträgen ignoriert werden.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
Ähnlich wie allowed_failed_count, aber für diese Einstellung wird der Prozentsatz fehlerhafter Minibatches anstelle der Anzahl verwendet.

Der Bereich dieser Einstellung ist [0, 100]. 100 ist der Standardwert, der bedeutet, dass alle fehlerhaften Minibatches bei Parallelaufträgen ignoriert werden.
ParallelRunConfig.partition_keys In der Entwicklung.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Ein Wörterbuch mit Umgebungsvariablennamen und Werten. Diese Umgebungsvariablen werden für den Prozess festgelegt, in dem das Benutzerskript ausgeführt wird.
ParallelRunStep.name parallel_run_function.name Name des erstellten Parallelauftrags oder der erstellten Komponente.
ParallelRunStep.inputs parallel_run_function.inputs Ein Wörterbuch mit Eingaben, die von diesem Parallelauftrag verwendet werden.
-- parallel_run_function.input_data Deklarieren der Daten, die aufgeteilt und mit dem Parallelauftrag verarbeitet werden sollen
ParallelRunStep.output parallel_run_function.outputs Die Ausgabe dieses Parallelauftrags.
ParallelRunStep.side_inputs parallel_run_function.inputs Wird zusammen mit inputs definiert.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Die Argumente der parallelen Aufgabe.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Geben Sie an, ob der Parallelauftrag bei gleicher Eingabe die gleiche Ausgabe liefern soll.

Nächste Schritte

Weitere Informationen finden Sie in folgender Dokumentation: