Självstudie 5: Utveckla en funktionsuppsättning med en anpassad källa
Med ett Azure Mašinsko učenje hanterat funktionsarkiv kan du identifiera, skapa och operationalisera funktioner. Funktioner fungerar som bindväv i maskininlärningslivscykeln, från prototypfasen, där du experimenterar med olika funktioner. Den livscykeln fortsätter till driftsättningsfasen, där du distribuerar dina modeller och härledningsstegen letar upp funktionsdata. Mer information om funktionslager finns i begrepp för funktionslager.
Del 1 i den här självstudieserien visade hur du skapar en funktionsuppsättningsspecifikation med anpassade transformeringar, aktiverar materialisering och utför en återfyllnad. Del 2 visade hur du experimenterar med funktioner i experimenterings- och träningsflödena. I del 3 förklaras återkommande materialisering för funktionsuppsättningen transactions
och hur du kör en pipeline för batchinferens på den registrerade modellen. I del 4 beskrivs hur du kör batchinferens.
I den här självstudien kommer du att
- Definiera logiken för att läsa in data från en anpassad datakälla.
- Konfigurera och registrera en funktionsuppsättning som ska användas från den här anpassade datakällan.
- Testa den registrerade funktionsuppsättningen.
Förutsättningar
Kommentar
I den här självstudien används en Azure Mašinsko učenje notebook-fil med serverlös Spark Compute.
- Se till att du slutför de tidigare självstudierna i den här serien. Den här självstudien återanvänder funktionsarkivet och andra resurser som skapats i de tidigare självstudierna.
Konfigurera
I den här självstudien används SDK:et för Python-funktionsarkivets kärnor (azureml-featurestore
). Python SDK används för att skapa, läsa, uppdatera och ta bort (CRUD), på funktionslager, funktionsuppsättningar och funktionsarkiventiteter.
Du behöver inte uttryckligen installera dessa resurser för den här självstudien, eftersom filen omfattar dem i de konfigurationsinstruktioner som visas här conda.yml
.
Konfigurera Azure Mašinsko učenje Spark-notebook-filen
Du kan skapa en ny notebook-fil och köra anvisningarna i den här självstudien steg för steg. Du kan också öppna och köra den befintliga notebook-filen featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Håll den här självstudien öppen och se den för dokumentationslänkar och mer förklaring.
På den översta menyn går du till listrutan Beräkning och väljer Serverlös Spark Compute under Azure Mašinsko učenje Serverless Spark.
Konfigurera sessionen:
- Välj Konfigurera session i det översta statusfältet.
- Välj fliken Python-paket , s
- Välj Ladda upp Conda-fil.
- Ladda upp den conda.yml fil som du laddade upp i den första självstudien.
- Du kan också öka tidsgränsen för sessionen (inaktivitetstid) för att undvika frekventa nödvändiga omkörningar.
Konfigurera rotkatalogen för exemplen
Den här kodcellen konfigurerar rotkatalogen för exemplen. Det tar cirka 10 minuter att installera alla beroenden och starta Spark-sessionen.
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
Initiera CRUD-klienten för funktionslagringsarbetsytan
MLClient
Initiera för funktionsbutikens arbetsyta för att täcka åtgärderna för att skapa, läsa, uppdatera och ta bort (CRUD) på arbetsytan för funktionsarkivet.
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
Initiera SDK-klienten för funktionsarkivets kärnor
Som tidigare nämnts använder den här självstudien Python-funktionsarkivets SDK (azureml-featurestore
). Den här initierade SDK-klienten omfattar åtgärder för att skapa, läsa, uppdatera och ta bort (CRUD) i funktionslager, funktionsuppsättningar och funktionslagerentiteter.
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
Anpassad källdefinition
Du kan definiera din egen källinläsningslogik från alla datalagringar som har en anpassad källdefinition. Implementera en UDF-klass (source processor user-defined function) (CustomSourceTransformer
i den här självstudien) för att använda den här funktionen. Den här klassen ska definiera en __init__(self, **kwargs)
funktion och en process(self, start_time, end_time, **kwargs)
funktion. Ordlistan kwargs
anges som en del av funktionsuppsättningens specifikationsdefinition. Den här definitionen skickas sedan till UDF. Parametrarna start_time
och end_time
beräknas och skickas till UDF-funktionen.
Det här är exempelkod för källprocessorns UDF-klass:
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
Skapa en funktionsuppsättningsspecifikation med en anpassad källa och experimentera med den lokalt
Skapa nu en funktionsuppsättningsspecifikation med en anpassad källdefinition och använd den i utvecklingsmiljön för att experimentera med funktionsuppsättningen. Den självstudiekursanteckningsbok som är kopplad till Serverless Spark Compute fungerar som utvecklingsmiljö.
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
Definiera sedan ett funktionsfönster och visa funktionsvärdena i det här funktionsfönstret.
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
Exportera som en funktionsuppsättningsspecifikation
Om du vill registrera funktionsuppsättningsspecifikationen med funktionsarkivet sparar du först specifikationen i ett visst format. Granska specifikationen för den genererade funktionsuppsättningen transactions_custom_source
. Öppna den här filen från filträdet för att se specifikationen: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
.
Specifikationen har följande element:
features
: En lista över funktioner och deras datatyper.index_columns
: Anslutningsnycklarna som krävs för att komma åt värden från funktionsuppsättningen.
Mer information om specifikationen finns i Förstå entiteter på toppnivå i det hanterade funktionsarkivet och CLI-funktionsuppsättningens YAML-schema.
Funktionsuppsättningens specifikationspersistence ger en annan fördel: funktionsuppsättningsspecifikationen kan källkontrolleras.
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
Registrera transaktionsfunktionen med funktionsarkivet
Använd den här koden för att registrera en funktionsuppsättningstillgång som lästs in från en anpassad källa med funktionsarkivet. Du kan sedan återanvända den tillgången och enkelt dela den. Registrering av en funktionsuppsättningstillgång erbjuder hanterade funktioner, inklusive versionshantering och materialisering.
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
Hämta den registrerade funktionsuppsättningen och skriv ut relaterad information.
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
Testa funktionsgenerering från registrerad funktionsuppsättning
to_spark_dataframe()
Använd funktionen för funktionsuppsättningen för att testa funktionsgenereringen från den registrerade funktionsuppsättningen och visa funktionerna.
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
Du bör kunna hämta den registrerade funktionsuppsättningen som en Spark-dataram och sedan visa den. Nu kan du använda de här funktionerna för en punkt-i-tid-koppling med observationsdata och de efterföljande stegen i din maskininlärningspipeline.
Rensa
Om du har skapat en resursgrupp för självstudien kan du ta bort resursgruppen, som tar bort alla resurser som är associerade med den här självstudien. Annars kan du ta bort resurserna individuellt:
- Om du vill ta bort funktionsarkivet öppnar du resursgruppen i Azure-portalen, väljer funktionsarkivet och tar bort det.
- Den användartilldelade hanterade identiteten (UAI) som tilldelats arbetsytan för funktionsarkivet tas inte bort när vi tar bort funktionsarkivet. Om du vill ta bort UAI följer du de här anvisningarna.
- Om du vill ta bort ett offlinearkiv av lagringskontotyp öppnar du resursgruppen i Azure-portalen, väljer den lagring som du skapade och tar bort den.
- Om du vill ta bort en Azure Cache for Redis-instans öppnar du resursgruppen i Azure-portalen, väljer den instans som du skapade och tar bort den.