Freigeben über


Tutorial 3: Aktivieren der wiederkehrenden Materialisierung und Ausführen von Batchrückschlüssen

In dieser Tutorialreihe erfahren Sie, wie Features nahtlos alle Phasen des Lebenszyklus für maschinelles Lernen integrieren: Prototyperstellung, Training und Operationalisierung.

Im ersten Tutorial wurde gezeigt, wie Sie zunächst eine Featuresatzspezifikation mit benutzerdefinierten Transformationen erstellen und anschließend den Featuresatz verwenden, um Trainingsdaten zu generieren, die Materialisierung zu aktivieren und einen Abgleich auszuführen. In zweiten Tutorial wurde gezeigt, wie Sie die Materialisierung aktivieren und einen Abgleich durchführen. Darüber hinaus wurde gezeigt, wie Sie mit Funktionen experimentieren, um die Modellleistung zu verbessern.

In diesem Tutorial werden folgende Punkte erläutert:

  • Aktivieren der wiederkehrenden Materialisierung für den transactions-Featuresatz
  • Ausführen einer Batchrückschlusspipeline für das registrierte Modell

Voraussetzungen

Bevor Sie mit diesem Tutorial fortfahren, müssen Sie das erste und das zweite Tutorial der Reihe abschließen.

Einrichten

  1. Konfigurieren Sie das Spark-Notebook für Azure Machine Learning.

    Sie können zum Ausführen dieses Tutorials ein neues Notebook erstellen und die Anweisungen Schritt für Schritt ausführen. Sie können auch das bestehende Notebook mit dem Namen 3. Aktivieren der wiederkehrenden Materialisierung und Ausführen von Batchrückschlüssen öffnen und ausführen. Dieses Notebook und alle Notebooks dieser Reihe finden Sie im Verzeichnis featurestore_sample/notebooks. Sie können aus sdk_only oder sdk_and_cli auswählen. Lassen Sie dieses Dokument geöffnet, und sehen Sie dort nach, um Links zur Dokumentation und weitere Erläuterungen zu erhalten.

    1. Wählen Sie im oberen Menü in der Dropdownliste Compute die Option Serverloses Spark Compute unter Serverlose Spark von Azure Machine Learning aus.

    2. Konfigurieren der Sitzung:

      1. Wählen Sie in der oberen Statusleiste Sitzung konfigurieren aus.
      2. Wählen Sie die Registerkarte Python-Pakete.
      3. Wählen Sie Conda-Datei hochladen aus.
      4. Kopieren Sie die Datei azureml-examples/sdk/python/featurestore-sample/project/env/online.yml von Ihrem lokalen Computer.
      5. Optional: Erhöhen Sie das Sitzungstimeout (die Leerlaufzeit), um häufige Wiederholungen der Überprüfung von Voraussetzungen zu vermeiden.
  2. Starten Sie die Spark-Sitzung.

    # run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
    print("start spark session")
  3. Richten Sie das Stammverzeichnis für die Beispiele ein.

    import os
    
    # please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to).
    # You can find the name from the directory structure in the left nav
    root_dir = "./Users/<your_user_alias>/featurestore_sample"
    
    if os.path.isdir(root_dir):
        print("The folder exists.")
    else:
        print("The folder does not exist. Please create or fix the path")
  4. Richten Sie die CLI ein.

    Nicht zutreffend.


  1. Initialisieren Sie den CRUD-Client (Erstellen, Lesen, Aktualisieren und Löschen) des Projektarbeitsbereichs.

    Das Tutorialnotebook wird über diesen aktuellen Arbeitsbereich ausgeführt.

    ### Initialize the MLClient of this project workspace
    import os
    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]
    
    # connect to the project workspace
    ws_client = MLClient(
        AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name
    )
  2. Initialisieren Sie die Variablen für den Feature Store.

    Stellen Sie sicher, dass Sie den Werte featurestore_name aktualisieren, um widerzuspiegeln, was Sie im ersten Tutorial erstellt haben.

    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    # feature store
    featurestore_name = (
        "<FEATURESTORE_NAME>"  # use the same name from part #1 of the tutorial
    )
    featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    
    # feature store ml client
    fs_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        featurestore_subscription_id,
        featurestore_resource_group_name,
        featurestore_name,
    )
  3. Initialisieren Sie den SDK-Client für den Feature Store.

    # feature store client
    from azureml.featurestore import FeatureStoreClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    featurestore = FeatureStoreClient(
        credential=AzureMLOnBehalfOfCredential(),
        subscription_id=featurestore_subscription_id,
        resource_group_name=featurestore_resource_group_name,
        name=featurestore_name,
    )

Aktivieren der wiederkehrenden Materialisierung für den Transaktionsfeaturesatz

Im zweiten Tutorial haben Sie die Materialisierung aktiviert und den Abgleich für den transactions-Featuresatz durchgeführt. Der Abgleich ist ein bedarfsgesteuerter, einmaliger Vorgang, bei dem Featurewerte berechnet und im Materialisierungsspeicher abgelegt werden.

Um einen Rückschluss für das Modell in der Produktion durchführen zu können, sollten Sie wiederkehrende Materialisierungsaufträge einrichten, um den Materialisierungsspeicher aktuell zu halten. Diese Aufträge werden gemäß benutzerdefinierten Zeitplänen ausgeführt. Der Zeitplan für wiederkehrende Aufträge funktioniert wie folgt:

  • Intervall- und Häufigkeitswerte dienen zum Definieren eines Fensters. Die folgenden Werte definieren beispielsweise ein Zeitfenster von drei Stunden:

    • interval = 3
    • frequency = Hour
  • Das erste Fenster beginnt mit dem in RecurrenceTrigger festgelegten start_time-Wert usw.

  • Der erste wiederkehrende Auftrag wird am Anfang des nächsten Fensters nach dem Aktualisierungszeitpunkt übermittelt.

  • Spätere wiederkehrende Aufträge werden bei jedem Fenster nach dem ersten Auftrag übermittelt.

Wie bereits in vorherigen Tutorials erläutert, werden bei der Abfrage von Features standardmäßig die materialisierten Daten verwendet, sobald die Daten materialisiert wurden (Abgleich/wiederkehrende Materialisierung).

from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger

transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")

# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
    interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)

print(fs_poller.result())

(Optional) Speichern der YAML-Datei der Featuresatzressource

Sie verwenden die aktualisierten Einstellungen, um die YAML-Datei zu speichern.

## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")

Ausführen der Batchrückschlusspipeline

Der Batchrückschluss umfasst diese Schritte:

  1. Sie verwenden die gleiche integrierte Featureabrufkomponente für den Featureabruf, die in der Trainingspipeline im dritten Tutorial behandelt wird. Für das Pipelinetraining haben Sie eine Featureabrufspezifikation als Komponenteneingabe bereitgestellt. Für Batchrückschlüsse übergeben Sie das registrierte Modell als Eingabe. Die Komponente sucht im Modellartefakt nach der Featureabrufspezifikation.

    Außerdem enthielten die Beobachtungsdaten für das Training die Zielvariable. Die Daten für die Batchrückschlussbeobachtung enthalten jedoch nicht die Zielvariable. Im Featureabrufschritt werden die Beobachtungsdaten mit den Features verknüpft und die Daten für den Batchrückschluss ausgegeben.

  2. Die Pipeline verwendet die Eingabedaten für den Batchrückschluss aus dem vorherigen Schritt, wendet den Rückschluss auf das Modell an und fügt den vorhergesagten Wert als Ausgabe an.

    Hinweis

    In diesem Beispiel wird ein Auftrag für Batchrückschluss verwendet. Sie können auch Batchendpunkte in Azure Machine Learning verwenden.

    from azure.ai.ml import load_job  # will be used later
    
    # set the batch inference  pipeline path
    batch_inference_pipeline_path = (
        root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml"
    )
    batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path)
    
    # run the training pipeline
    batch_inference_pipeline_job = ws_client.jobs.create_or_update(
        batch_inference_pipeline_definition
    )
    
    # stream the run logs
    ws_client.jobs.stream(batch_inference_pipeline_job.name)

Überprüfen der Ausgabedaten des Batchrückschlusses

In der Pipelineansicht:

  1. Wählen Sie inference_step auf der Karte outputs aus.

  2. Kopieren Sie den Wert im Feld Data. Der Wert sollte etwa wie folgt aussehen: azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1.

  3. Fügen Sie den Wert des Felds Data in die folgende Zelle ein, wobei Sie den Namen und die Version getrennt angeben. Das letzte Zeichen ist die Version, vor der ein Doppelpunkt (:) steht.

  4. Beachten Sie die Spalte predict_is_fraud, die von der Pipeline für den Backrückschluss generiert wurde.

    Da Sie in den Ausgaben der Batchrückschlusspipeline (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml) keine name- oder version-Werte für outputs von inference_step angegeben haben, hat das System eine nicht verfolgte Datenressource mit einer GUID als Namen und 1 als Version erstellt. In dieser Zelle wird der Datenpfad von der Ressource abgeleitet und dann angezeigt.

    inf_data_output = ws_client.data.get(
        name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction",
        version="1",
    )
    inf_output_df = spark.read.parquet(inf_data_output.path + "data/*.parquet")
    display(inf_output_df.head(5))

Bereinigen

Das fünfte Tutorial der Reihe beschreibt, wie Sie die Ressourcen löschen.

Nächste Schritte