Delen via


Zelfstudie 5: Een functieset ontwikkelen met een aangepaste bron

Met een door Azure Machine Learning beheerd functiearchief kunt u functies detecteren, maken en operationeel maken. Functies fungeren als het bindweefsel in de machine learning-levenscyclus, vanaf de prototypefase, waar u experimenteer met verschillende functies. Deze levenscyclus gaat verder met de operationele fase, waar u uw modellen implementeert en deductiestappen zoeken naar de functiegegevens. Zie de concepten van het functiearchief voor meer informatie over functiearchieven.

Deel 1 van deze reeks zelfstudies liet zien hoe u een specificatie van een functieset maakt met aangepaste transformaties, materialisatie inschakelt en een backfill uitvoert. Deel 2 liet zien hoe u kunt experimenteren met functies in de experimenten en trainingsstromen. Deel 3 legde terugkerende materialisatie uit voor de transactions functieset en liet zien hoe u een batchdeductiepijplijn kunt uitvoeren op het geregistreerde model. In deel 4 wordt beschreven hoe batchdeductie moet worden uitgevoerd.

In deze zelfstudie gaat u

  • Definieer de logica voor het laden van gegevens uit een aangepaste gegevensbron.
  • Configureer en registreer een functieset die moet worden gebruikt vanuit deze aangepaste gegevensbron.
  • Test de geregistreerde functieset.

Vereisten

Notitie

In deze zelfstudie wordt gebruikgemaakt van een Azure Machine Learning-notebook met Serverless Spark Compute.

  • Zorg ervoor dat u de vorige zelfstudies in deze reeks voltooit. In deze zelfstudie worden functieopslag en andere resources die in deze eerdere zelfstudies zijn gemaakt, opnieuw gebruikt.

Instellingen

In deze zelfstudie wordt gebruikgemaakt van de Python Feature Store Core SDK (azureml-featurestore). De Python SDK wordt gebruikt voor cruD-bewerkingen (create, read, update en delete), in functiearchieven, onderdelensets en entiteiten voor het onderdelenarchief.

U hoeft deze resources niet expliciet te installeren voor deze zelfstudie, omdat in de hier weergegeven installatie-instructies het conda.yml bestand deze behandelt.

Het Azure Machine Learning Spark-notebook configureren

U kunt een nieuw notebook maken en de instructies in deze zelfstudie stapsgewijs uitvoeren. U kunt ook het bestaande notebook openen en uitvoeren featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Houd deze zelfstudie open en raadpleeg deze voor documentatiekoppelingen en meer uitleg.

  1. Selecteer in het bovenste menu in de vervolgkeuzelijst Compute serverloze Spark Compute onder Serverloze Spark van Azure Machine Learning.

  2. Configureer de sessie:

    1. Selecteer Sessie configureren in de bovenste statusbalk.
    2. Selecteer het tabblad Python-pakketten , s
    3. Selecteer Conda-bestand uploaden.
    4. Upload het conda.yml bestand dat u in de eerste zelfstudie hebt geüpload.
    5. Verhoog eventueel de time-out van de sessie (niet-actieve tijd) om frequente vereisten opnieuw uit te voeren.

De hoofdmap voor de voorbeelden instellen

In deze codecel wordt de hoofdmap voor de voorbeelden ingesteld. Het duurt ongeveer 10 minuten om alle afhankelijkheden te installeren en de Spark-sessie te starten.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

Initialiseer de CRUD-client van de werkruimte van het functiearchief

Initialiseer de werkruimte voor het MLClient onderdelenarchief om de CRUD-bewerkingen (Create, Read, Update en Delete) in de werkruimte voor het onderdelenarchief te behandelen.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Initialiseer de core SDK-client voor de functieopslag

Zoals eerder vermeld, maakt deze zelfstudie gebruik van de Kern-SDK voor python-onderdelenopslag (azureml-featurestore). Deze geïnitialiseerde SDK-client behandelt cruD-bewerkingen (create, read, update en delete) in functiearchieven, onderdelensets en entiteiten voor het onderdelenarchief.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Aangepaste brondefinitie

U kunt uw eigen bronlaadlogica definiëren vanuit elke gegevensopslag met een aangepaste brondefinitie. Implementeer een door de gebruiker gedefinieerde functieklasse (UDF) van een bronprocessor (CustomSourceTransformer in deze zelfstudie) om deze functie te gebruiken. Deze klasse moet een __init__(self, **kwargs) functie en een process(self, start_time, end_time, **kwargs) functie definiëren. De kwargs woordenlijst wordt geleverd als onderdeel van de definitie van de functiesetspecificatie. Deze definitie wordt vervolgens doorgegeven aan de UDF. De start_time parameters end_time worden berekend en doorgegeven aan de UDF-functie.

Dit is voorbeeldcode voor de UDF-klasse van de bronprocessor:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Een specificatie voor een functieset maken met een aangepaste bron en er lokaal mee experimenteren

Maak nu een specificatie van een functieset met een aangepaste brondefinitie en gebruik deze in uw ontwikkelomgeving om te experimenteren met de functieset. Het zelfstudienotebook dat is gekoppeld aan Serverloze Spark Compute , fungeert als de ontwikkelomgeving.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Definieer vervolgens een functievenster en geef de functiewaarden weer in dit functievenster.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Exporteren als een specificatie van een functieset

Als u de specificatie van de functieset wilt registreren bij het functiearchief, slaat u die specificatie eerst op in een specifieke indeling. Controleer de specificatie van de gegenereerde transactions_custom_source functieset. Open dit bestand vanuit de bestandsstructuur om de specificatie te zien: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

De specificatie heeft deze elementen:

  • features: Een lijst met functies en hun gegevenstypen.
  • index_columns: De joinsleutels die vereist zijn voor toegang tot waarden uit de functieset.

Zie Informatie over entiteiten op het hoogste niveau in het beheerde functiearchief en het YAML-schema van de CLI-functieset (v2) voor meer informatie over de specificatie.

De persistentie van de functiesetspecificatie biedt een ander voordeel: de specificatie van de onderdelenset kan worden beheerd door de bron.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

De transactiefunctieset registreren bij het functiearchief

Gebruik deze code om een onderdelensetasset te registreren die is geladen vanuit een aangepaste bron bij het functiearchief. U kunt die asset vervolgens opnieuw gebruiken en deze eenvoudig delen. Registratie van een functiesetasset biedt beheerde mogelijkheden, waaronder versiebeheer en materialisatie.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Haal de geregistreerde functieset op en druk gerelateerde informatie af.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Het genereren van functies testen op een geregistreerde onderdelenset

Gebruik de to_spark_dataframe() functie van de onderdelenset om het genereren van functies uit de geregistreerde onderdelenset te testen en de functies weer te geven. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

U moet de geregistreerde functieset kunnen ophalen als een Spark-dataframe en deze vervolgens weergeven. U kunt deze functies nu gebruiken voor een punt-in-time-join met observatiegegevens en de volgende stappen in uw machine learning-pijplijn.

Opschonen

Als u een resourcegroep hebt gemaakt voor de zelfstudie, kunt u die resourcegroep verwijderen, waardoor alle resources die aan deze zelfstudie zijn gekoppeld, worden verwijderd. Anders kunt u de resources afzonderlijk verwijderen:

  • Als u het functiearchief wilt verwijderen, opent u de resourcegroep in Azure Portal, selecteert u het functiearchief en verwijdert u deze.
  • De door de gebruiker toegewezen beheerde identiteit (UAI) die is toegewezen aan de werkruimte van het onderdeelarchief, wordt niet verwijderd wanneer we het functiearchief verwijderen. Volg deze instructies om de UAI te verwijderen.
  • Als u een offlinearchief van het opslagaccounttype wilt verwijderen, opent u de resourcegroep in Azure Portal, selecteert u de opslag die u hebt gemaakt en verwijdert u deze.
  • Als u een Azure Cache voor Redis exemplaar wilt verwijderen, opent u de resourcegroep in Azure Portal, selecteert u het exemplaar dat u hebt gemaakt en verwijdert u deze.

Volgende stappen