Compartir a través de


Tutorial 3: habilitar materialización recurrente y ejecutar inferencia por lotes

Esta serie de tutoriales muestra cómo las características integran a la perfección todas las fases del ciclo de vida del aprendizaje automático: creación de prototipos, entrenamiento y operacionalización.

El primer tutorial mostraba cómo crear una especificación de conjunto de características con transformaciones personalizadas, y después usar ese conjunto de características para generar datos de entrenamiento, habilitar la materialización y realizar una reposición. El segundo tutorial mostró cómo habilitar la materialización y realizar un reposición. También se mostró cómo experimentar con características, como una manera de mejorar el rendimiento del modelo.

Este tutorial explica cómo realizar lo siguiente:

  • Habilitar la materialización recurrente para el conjunto de características transactions.
  • Ejecutar una canalización de inferencia por lotes en el modelo registrado.

Requisitos previos

Antes de continuar con este tutorial, asegúrese de completar el primero y el segundo de la serie.

Configurar

  1. Configure el cuaderno de Spark de Azure Machine Learning.

    Para ejecutar este tutorial, puede crear un cuaderno y ejecutar las instrucciones paso a paso. También puede abrir y ejecutar el cuaderno denominado 3. Habilitación de la materialización recurrente y ejecución de la inferencia por lotes. Puede encontrar ese cuaderno y todos los cuadernos de esta serie en el directorio featurestore_sample/notebooks. Puede elegir sdk_only o sdk_and_cli. Mantenga este tutorial abierto y consúltelo para obtener vínculos de documentación y explicaciones más detalladas.

    1. En el menú superior, en la lista desplegable Proceso, seleccione Proceso de Spark sin servidor en Spark sin servidor de Azure Machine Learning.

    2. Configuración de la sesión:

      1. Seleccione Configurar sesión en el panel de navegación superior.
      2. Seleccione la pestaña Paquetes de Python.
      3. Seleccione Cargar archivo de Conda.
      4. Seleccione el archivo azureml-examples/sdk/python/featurestore-sample/project/env/online.yml de la máquina local.
      5. De manera opcional, aumente el tiempo de espera de la sesión (tiempo de inactividad) para evitar repeticiones frecuentes de ejecución de requisitos previos.
  2. Inicie la sesión de Spark.

    # run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
    print("start spark session")
  3. Configure el directorio raíz para los ejemplos.

    import os
    
    # please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to).
    # You can find the name from the directory structure in the left nav
    root_dir = "./Users/<your_user_alias>/featurestore_sample"
    
    if os.path.isdir(root_dir):
        print("The folder exists.")
    else:
        print("The folder does not exist. Please create or fix the path")
  4. Configuración de la CLI.

    No aplicable.


  1. Inicialice el cliente CRUD del área de trabajo del proyecto (crear, leer, actualizar y eliminar).

    El cuaderno del tutorial se ejecuta desde esta área de trabajo actual.

    ### Initialize the MLClient of this project workspace
    import os
    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]
    
    # connect to the project workspace
    ws_client = MLClient(
        AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name
    )
  2. Inicialice las variables del almacén de características.

    Asegúrese de actualizar el valor featurestore_name para reflejar lo que ha creado en el primer tutorial.

    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    # feature store
    featurestore_name = (
        "<FEATURESTORE_NAME>"  # use the same name from part #1 of the tutorial
    )
    featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    
    # feature store ml client
    fs_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        featurestore_subscription_id,
        featurestore_resource_group_name,
        featurestore_name,
    )
  3. Inicialice el cliente del SDK del almacén de características.

    # feature store client
    from azureml.featurestore import FeatureStoreClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    featurestore = FeatureStoreClient(
        credential=AzureMLOnBehalfOfCredential(),
        subscription_id=featurestore_subscription_id,
        resource_group_name=featurestore_resource_group_name,
        name=featurestore_name,
    )

Habilitación de la materialización recurrente en el conjunto de características de transacciones

En el segundo tutorial, ha habilitado la materialización y ha realizado la reposición en el conjunto de características transactions. La reposición es una operación a petición y de una sola vez que calcula y coloca los valores de características en el almacén de materialización.

Para manipular la inferencia del modelo en producción, es posible que desee configurar trabajos de materialización recurrente para mantener actualizado el almacén de materialización. Estos trabajos se ejecutan en programaciones definidas por el usuario. La programación de trabajos recurrente funciona de esta manera:

  • Los valores de intervalo y frecuencia definen una ventana. Por ejemplo, los siguientes valores definen una ventana de tres horas:

    • interval = 3
    • frequency = Hour
  • La primera ventana comienza en el valor start_time definido en RecurrenceTrigger y así sucesivamente.

  • El primer trabajo recurrente se envía al principio de la siguiente ventana después de la hora de actualización.

  • Los trabajos recurrentes posteriores se envían en cada ventana después del primer trabajo.

Como se explicó en tutoriales anteriores, después de que se materializan los datos (reposición o materialización recurrente), la recuperación de características usa los datos materializados de forma predeterminada.

from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger

transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")

# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
    interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)

print(fs_poller.result())

(Opcional) Guarde el archivo YAML para el recurso de conjunto de características

Use la configuración actualizada para guardar el archivo YAML.

## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")

Ejecución de la canalización de inferencia por lotes

La inferencia por lotes tiene estos pasos:

  1. Use el mismo componente de recuperación de características integrado para la recuperación de características que usó en la canalización de entrenamiento (que se describe en el tercer tutorial). Para la canalización de entrenamiento, se proporciona una especificación de recuperación de características como entrada de componente. Para la inferencia por lotes, se pasa el modelo registrado como entrada. El componente busca la especificación de recuperación de características en el artefacto del modelo.

    Además, para el entrenamiento los datos de observación tenían la variable de destino. Sin embargo, los datos de observación de inferencia por lotes no tienen la variable de destino. El paso de recuperación de características combina los datos de observación con las características y genera los datos para la inferencia por lotes.

  2. La canalización usa los datos de entrada de la inferencia por lotes del paso anterior, ejecuta la inferencia en el modelo y anexa el valor previsto como salida.

    Nota:

    En este ejemplo, usa un trabajo para la inferencia por lotes. También puede usar puntos de conexión por lotes en Azure Machine Learning.

    from azure.ai.ml import load_job  # will be used later
    
    # set the batch inference  pipeline path
    batch_inference_pipeline_path = (
        root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml"
    )
    batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path)
    
    # run the training pipeline
    batch_inference_pipeline_job = ws_client.jobs.create_or_update(
        batch_inference_pipeline_definition
    )
    
    # stream the run logs
    ws_client.jobs.stream(batch_inference_pipeline_job.name)

Inspección de los datos de salida para la inferencia por lotes

En la vista de canalización:

  1. Seleccione inference_step en la tarjeta outputs.

  2. Copie el valor del campo Data. Tiene un aspecto similar a azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1.

  3. Pegue el valor del campo Data en la siguiente celda, con valores independientes de nombre y versión. El último carácter es la versión, precedida de dos puntos (:).

  4. Observe la columna predict_is_fraud generada por la canalización de inferencia por lotes.

    Debido a que no ha proporcionado los valores name o version para outputs de inference_step en las salidas de la canalización de inferencia por lotes (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml), el sistema ha creado un recurso de datos sin seguimiento con un GUID como el valor "name" y 1 como el valor "version". En esta celda, usted deriva y, después, se muestra la ruta de acceso de datos desde el recurso.

    inf_data_output = ws_client.data.get(
        name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction",
        version="1",
    )
    inf_output_df = spark.read.parquet(inf_data_output.path + "data/*.parquet")
    display(inf_output_df.head(5))

Limpieza

En el quinto tutorial de la serie se describe cómo eliminar los recursos.

Pasos siguientes