Tutorial 5: Develop a feature set with a custom source

An Azure Machine Learning managed feature store lets you discover, create, and operationalize features. Features serve as the connective tissue in the machine learning lifecycle, starting from the prototyping phase, where you experiment with various features. That lifecycle continues to the operationalization phase, where you deploy your models, and inference steps look up the feature data. For more information about feature stores, see feature store concepts.

Part 1 of this tutorial series showed how to create a feature set specification with custom transformations, enable materialization and perform a backfill. Part 2 showed how to experiment with features in the experimentation and training flows. Part 3 explained recurrent materialization for the transactions feature set, and showed how to run a batch inference pipeline on the registered model. Part 4 described how to run batch inference.

In this tutorial, you'll

  • Define the logic to load data from a custom data source.
  • Configure and register a feature set to consume from this custom data source.
  • Test the registered feature set.

Prerequisites

Note

This tutorial uses an Azure Machine Learning notebook with Serverless Spark Compute.

  • Make sure you complete the previous tutorials in this series. This tutorial reuses feature store and other resources created in those earlier tutorials.

Set up

This tutorial uses the Python feature store core SDK (azureml-featurestore). The Python SDK is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities.

You don't need to explicitly install these resources for this tutorial, because in the set-up instructions shown here, the conda.yml file covers them.

Configure the Azure Machine Learning Spark notebook

You can create a new notebook and execute the instructions in this tutorial step by step. You can also open and run the existing notebook featurestore_sample/notebooks/sdk_only/5. Develop a feature set with custom source.ipynb. Keep this tutorial open and refer to it for documentation links and more explanation.

  1. On the top menu, in the Compute dropdown list, select Serverless Spark Compute under Azure Machine Learning Serverless Spark.

  2. Configure the session:

    1. Select Configure session in the top status bar.
    2. Select the Python packages tab, s
    3. Select Upload Conda file.
    4. Upload the conda.yml file that you uploaded in the first tutorial.
    5. Optionally, increase the session time-out (idle time) to avoid frequent prerequisite reruns.

Set up the root directory for the samples

This code cell sets up the root directory for the samples. It needs about 10 minutes to install all dependencies and start the Spark session.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

Initialize the CRUD client of the feature store workspace

Initialize the MLClient for the feature store workspace, to cover the create, read, update, and delete (CRUD) operations on the feature store workspace.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Initialize the feature store core SDK client

As mentioned earlier, this tutorial uses the Python feature store core SDK (azureml-featurestore). This initialized SDK client covers create, read, update, and delete (CRUD) operations on feature stores, feature sets, and feature store entities.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Custom source definition

You can define your own source loading logic from any data storage that has a custom source definition. Implement a source processor user-defined function (UDF) class (CustomSourceTransformer in this tutorial) to use this feature. This class should define an __init__(self, **kwargs) function, and a process(self, start_time, end_time, **kwargs) function. The kwargs dictionary is supplied as a part of the feature set specification definition. This definition is then passed to the UDF. The start_time and end_time parameters are calculated and passed to the UDF function.

This is sample code for the source processor UDF class:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Create a feature set specification with a custom source, and experiment with it locally

Now, create a feature set specification with a custom source definition, and use it in your development environment to experiment with the feature set. The tutorial notebook attached to Serverless Spark Compute serves as the development environment.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Next, define a feature window, and display the feature values in this feature window.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Export as a feature set specification

To register the feature set specification with the feature store, first save that specification in a specific format. Review the generated transactions_custom_source feature set specification. Open this file from the file tree to see the specification: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

The specification has these elements:

  • features: A list of features and their datatypes.
  • index_columns: The join keys required to access values from the feature set.

To learn more about the specification, see Understanding top-level entities in managed feature store and CLI (v2) feature set YAML schema.

Feature set specification persistence offers another benefit: the feature set specification can be source controlled.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

Register the transaction feature set with the feature store

Use this code to register a feature set asset loaded from custom source with the feature store. You can then reuse that asset, and easily share it. Registration of a feature set asset offers managed capabilities, including versioning and materialization.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Obtain the registered feature set, and print related information.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Test feature generation from registered feature set

Use the to_spark_dataframe() function of the feature set to test the feature generation from the registered feature set, and display the features. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

You should be able to successfully fetch the registered feature set as a Spark dataframe, and then display it. You can now use these features for a point-in-time join with observation data, and the subsequent steps in your machine learning pipeline.

Clean up

If you created a resource group for the tutorial, you can delete that resource group, which deletes all the resources associated with this tutorial. Otherwise, you can delete the resources individually:

  • To delete the feature store, open the resource group in the Azure portal, select the feature store, and delete it.
  • The user-assigned managed identity (UAI) assigned to the feature store workspace is not deleted when we delete the feature store. To delete the UAI, follow these instructions.
  • To delete a storage account-type offline store, open the resource group in the Azure portal, select the storage that you created, and delete it.
  • To delete an Azure Cache for Redis instance, open the resource group in the Azure portal, select the instance that you created, and delete it.

Next steps