Tutorial 3: Enable recurrent materialization and run batch inference
This tutorial series shows how features seamlessly integrate all phases of the machine learning lifecycle: prototyping, training, and operationalization.
The first tutorial showed how to create a feature set specification with custom transformations, and then use that feature set to generate training data, enable materialization, and perform a backfill. The second tutorial showed how to enable materialization, and perform a backfill. It also showed how to experiment with features, as a way to improve model performance.
This tutorial explains how to:
- Enable recurrent materialization for the
transactions
feature set. - Run a batch inference pipeline on the registered model.
Prerequisites
Before you proceed with this tutorial, be sure to complete the first and second tutorials in the series.
Set up
Configure the Azure Machine Learning Spark notebook.
To run this tutorial, you can create a new notebook and execute the instructions step by step. You can also open and run the existing notebook named 3. Enable recurrent materialization and run batch inference. You can find that notebook, and all the notebooks in this series, in the featurestore_sample/notebooks directory. You can choose sdk_only or sdk_and_cli. Keep this tutorial open and refer to it for documentation links and more explanation.
In the Compute dropdown list in the top nav, select Serverless Spark Compute under Azure Machine Learning Serverless Spark.
Configure the session:
- Select Configure session in the top status bar.
- Select the Python packages tab.
- Select Upload conda file.
- Select the
azureml-examples/sdk/python/featurestore-sample/project/env/online.yml
file from your local machine. - Optionally, increase the session time-out (idle time) to avoid frequent prerequisite reruns.
Start the Spark session.
# run this cell to start the spark session (any code block will start the session ). This can take around 10 mins. print("start spark session")
Set up the root directory for the samples.
import os # please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to). # You can find the name from the directory structure in the left nav root_dir = "./Users/<your_user_alias>/featurestore_sample" if os.path.isdir(root_dir): print("The folder exists.") else: print("The folder does not exist. Please create or fix the path")
Set up the CLI.
Not applicable.
Initialize the project workspace CRUD (create, read, update, and delete) client.
The tutorial notebook runs from this current workspace.
### Initialize the MLClient of this project workspace import os from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"] project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"] # connect to the project workspace ws_client = MLClient( AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name )
Initialize the feature store variables.
Be sure to update the
featurestore_name
value, to reflect what you created in the first tutorial.from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential # feature store featurestore_name = ( "<FEATURESTORE_NAME>" # use the same name from part #1 of the tutorial ) featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"] # feature store ml client fs_client = MLClient( AzureMLOnBehalfOfCredential(), featurestore_subscription_id, featurestore_resource_group_name, featurestore_name, )
Initialize the feature store SDK client.
# feature store client from azureml.featurestore import FeatureStoreClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential featurestore = FeatureStoreClient( credential=AzureMLOnBehalfOfCredential(), subscription_id=featurestore_subscription_id, resource_group_name=featurestore_resource_group_name, name=featurestore_name, )
Enable recurrent materialization on the transactions feature set
In the second tutorial, you enabled materialization and performed backfill on the transactions
feature set. Backfill is an on-demand, one-time operation that computes and places feature values in the materialization store.
To handle inference of the model in production, you might want to set up recurrent materialization jobs to keep the materialization store up to date. These jobs run on user-defined schedules. The recurrent job schedule works this way:
Interval and frequency values define a window. For example, the following values define a three-hour window:
interval
=3
frequency
=Hour
The first window starts at the
start_time
value defined inRecurrenceTrigger
, and so on.The first recurrent job is submitted at the start of the next window after the update time.
Later recurrent jobs are submitted at every window after the first job.
As explained in earlier tutorials, after data is materialized (backfill or recurrent materialization), feature retrieval uses the materialized data by default.
from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger
transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")
# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)
fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())
(Optional) Save the YAML file for the feature set asset
You use the updated settings to save the YAML file.
## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")
Run the batch inference pipeline
The batch inference has these steps:
You use the same built-in feature retrieval component for feature retrieval that you used in the training pipeline (covered in the third tutorial). For pipeline training, you provided a feature retrieval specification as a component input. For batch inference, you pass the registered model as the input. The component looks for the feature retrieval specification in the model artifact.
Additionally, for training, the observation data had the target variable. However, the batch inference observation data doesn't have the target variable. The feature retrieval step joins the observation data with the features and outputs the data for batch inference.
The pipeline uses the batch inference input data from previous step, runs inference on the model, and appends the predicted value as output.
Note
You use a job for batch inference in this example. You can also use batch endpoints in Azure Machine Learning.
from azure.ai.ml import load_job # will be used later # set the batch inference pipeline path batch_inference_pipeline_path = ( root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml" ) batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path) # run the training pipeline batch_inference_pipeline_job = ws_client.jobs.create_or_update( batch_inference_pipeline_definition ) # stream the run logs ws_client.jobs.stream(batch_inference_pipeline_job.name)
Inspect the output data for batch inference
In the pipeline view:
Select
inference_step
in theoutputs
card.Copy the
Data
field value. It looks something likeazureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1
.Paste the
Data
field value in the following cell, with separate name and version values. The last character is the version, preceded by a colon (:
).Note the
predict_is_fraud
column that the batch inference pipeline generated.In the batch inference pipeline (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml) outputs, because you didn't provide
name
orversion
values foroutputs
ofinference_step
, the system created an untracked data asset with a GUID as the name value and1
as the version value. In this cell, you derive and then display the data path from the asset.inf_data_output = ws_client.data.get( name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction", version="1", ) inf_output_df = spark.read.parquet(inf_data_output.path + "data/*.parquet") display(inf_output_df.head(5))
Clean up
The fifth tutorial in the series describes how to delete the resources.
Next steps
- Learn about feature store concepts and top-level entities in managed feature store.
- Learn about identity and access control for managed feature store.
- View the troubleshooting guide for managed feature store.
- View the YAML reference.