Sdílet prostřednictvím


Kurz 5: Vývoj sady funkcí s vlastním zdrojem

Azure Machine Učení spravované uložiště funkcí umožňuje zjišťovat, vytvářet a zprovozňovat funkce. Funkce slouží jako spojovací tkáně v životním cyklu strojového učení, počínaje fází vytváření prototypů, kde experimentujete s různými funkcemi. Tento životní cyklus pokračuje ve fázi operacionalizace, kde nasazujete modely, a kroky odvozování vyhledávají data funkcí. Další informace o úložištích funkcí najdete v tématu Koncepty úložiště funkcí.

1. část této série kurzů ukázala, jak vytvořit specifikaci sady funkcí s vlastními transformacemi, povolit materializaci a provést backfill. Část 2 ukázala, jak experimentovat s funkcemi v tocích experimentování a trénování. Část 3 vysvětlila opakující se materializaci sady transactions funkcí a ukázala, jak spustit kanál dávkového odvozování v registrovaném modelu. Část 4 popisuje, jak spustit dávkové odvození.

V tomto kurzu budete

  • Definujte logiku pro načtení dat z vlastního zdroje dat.
  • Nakonfigurujte a zaregistrujte sadu funkcí pro využívání z tohoto vlastního zdroje dat.
  • Otestujte zaregistrovanou sadu funkcí.

Požadavky

Poznámka:

Tento kurz používá poznámkový blok Azure Machine Učení s bezserverovým výpočetním prostředím Spark.

  • Ujistěte se, že jste dokončili předchozí kurzy v této sérii. Tento kurz znovu používá úložiště funkcí a další prostředky vytvořené v předchozích kurzech.

Nastavení

V tomto kurzu se používá základní sada SDK (azureml-featurestore) úložiště funkcí Pythonu. Sada Python SDK se používá k operacím vytváření, čtení, aktualizace a odstraňování (CRUD) v úložištích funkcí, sadách funkcí a entitách úložiště funkcí.

Tyto prostředky pro účely tohoto kurzu nemusíte explicitně instalovat, protože v zde uvedených pokynech k nastavení je conda.yml soubor pokrývá.

Konfigurace poznámkového bloku Spark Učení Azure Machine

Můžete vytvořit nový poznámkový blok a krok za krokem spustit pokyny v tomto kurzu. Můžete také otevřít a spustit existující poznámkový blok featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Nechte si tento kurz otevřený a projděte si ho, kde najdete odkazy na dokumentaci a další vysvětlení.

  1. V horní nabídce v rozevíracím seznamu Compute vyberte v části Azure Machine Učení Bezserverový Spark výpočetní prostředkybez serveru.

  2. Konfigurace relace:

    1. Na horním stavovém řádku vyberte Konfigurovat relaci .
    2. Výběr karty balíčků Pythonu, s
    3. Vyberte Nahrát soubor Conda.
    4. Nahrajte soubor conda.yml, který jste nahráli v prvním kurzu.
    5. Volitelně můžete zvýšit časový limit relace (dobu nečinnosti), abyste se vyhnuli častým opakovaným spuštěním požadavků.

Nastavení kořenového adresáře pro ukázky

Tato buňka kódu nastaví kořenový adresář pro ukázky. K instalaci všech závislostí a spuštění relace Sparku potřebuje asi 10 minut.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

Inicializace klienta CRUD pracovního prostoru úložiště funkcí

Inicializuje pracovní MLClient prostor úložiště funkcí, aby zahrnoval operace vytvoření, čtení, aktualizace a odstranění (CRUD) v pracovním prostoru úložiště funkcí.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Inicializace základního klienta sady SDK úložiště funkcí

Jak už bylo zmíněno dříve, v tomto kurzu se používá základní sada SDK (azureml-featurestore) úložiště funkcí Pythonu. Tento inicializovaný klient sady SDK se zabývá operacemi vytvoření, čtení, aktualizace a odstranění (CRUD) v úložištích funkcí, sadách funkcí a entitách úložiště funkcí.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Definice vlastního zdroje

Můžete definovat vlastní logiku načítání zdroje z libovolného úložiště dat, které má vlastní definici zdroje. Implementujte třídu uživatelem definované funkce zdrojového procesoru (CustomSourceTransformer V tomto kurzu) pro použití této funkce. Tato třída by měla definovat __init__(self, **kwargs) funkci a process(self, start_time, end_time, **kwargs) funkci. Slovník kwargs se dodává jako součást definice specifikace sady funkcí. Tato definice se pak předá definovanému uživatelem. Parametry start_time se end_time počítají a předají funkci definované uživatelem.

Toto je ukázkový kód pro třídu UDF zdrojového procesoru:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Vytvoření specifikace sady funkcí s vlastním zdrojem a místní experimentování s ní

Teď vytvořte specifikaci sady funkcí s vlastní definicí zdroje a použijte ji ve vývojovém prostředí k experimentování se sadou funkcí. Poznámkový blok kurzu připojený k bezserverovému výpočetnímu prostředí Sparku slouží jako vývojové prostředí.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Dále definujte okno funkce a zobrazte hodnoty funkcí v tomto okně funkce.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Export jako specifikace sady funkcí

Pokud chcete zaregistrovat specifikaci sady funkcí v úložišti funkcí, nejprve tuto specifikaci uložte v určitém formátu. Zkontrolujte vygenerovanou transactions_custom_source specifikaci sady funkcí. Otevřete tento soubor ze stromu souborů a prohlédněte si specifikaci: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

Specifikace má tyto prvky:

  • features: Seznam funkcí a jejich datových typů.
  • index_columns: Klíče spojení potřebné pro přístup k hodnotám ze sady funkcí.

Další informace o specifikaci najdete v tématu Principy entit nejvyšší úrovně ve schématu YAML v sadě funkcí spravované uložiště funkcí a rozhraní příkazového řádku (v2).

Trvalost specifikace sady funkcí nabízí další výhodu: specifikace sady funkcí může být řízena zdrojem.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

Registrace sady funkcí transakce v úložišti funkcí

Tento kód slouží k registraci prostředku sady funkcí načteného z vlastního zdroje v úložišti funkcí. Tento prostředek pak můžete znovu použít a snadno ho sdílet. Registrace prostředku sady funkcí nabízí spravované funkce, včetně správy verzí a materializace.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Získejte zaregistrovanou sadu funkcí a vytiskněte související informace.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Testování generování funkcí z registrované sady funkcí

to_spark_dataframe() Pomocí funkce sady funkcí otestujte generování funkcí z registrované sady funkcí a zobrazte funkce. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

Měli byste být schopni úspěšně načíst zaregistrovanou sadu funkcí jako datový rámec Sparku a pak ji zobrazit. Tyto funkce teď můžete použít pro spojení k určitému bodu v čase s daty pozorování a následné kroky v kanálu strojového učení.

Vyčištění

Pokud jste pro tento kurz vytvořili skupinu prostředků, můžete odstranit tuto skupinu prostředků, která odstraní všechny prostředky přidružené k tomuto kurzu. Jinak můžete prostředky odstranit jednotlivě:

  • Pokud chcete odstranit úložiště funkcí, otevřete skupinu prostředků na webu Azure Portal, vyberte úložiště funkcí a odstraňte ho.
  • Spravovaná identita přiřazená uživatelem přiřazená k pracovnímu prostoru úložiště funkcí se při odstranění úložiště funkcí neodstraní. Pokud chcete rozhraní UAI odstranit, postupujte podle těchto pokynů.
  • Pokud chcete odstranit offline úložiště typu účet úložiště, otevřete skupinu prostředků na webu Azure Portal, vyberte úložiště, které jste vytvořili, a odstraňte ho.
  • Pokud chcete odstranit instanci Azure Cache for Redis, otevřete skupinu prostředků na webu Azure Portal, vyberte instanci, kterou jste vytvořili, a odstraňte ji.

Další kroky