Megosztás a következőn keresztül:


5. oktatóanyag: Szolgáltatáskészlet fejlesztése egyéni forrással

Az Azure Machine Tanulás kezelt funkciótár lehetővé teszi a funkciók felderítését, létrehozását és üzembe helyezése. A funkciók a gépi tanulási életciklus kötőszöveteként szolgálnak, kezdve a prototípuskészítési fázistól, ahol különböző funkciókkal kísérletezik. Ez az életciklus továbbra is az üzemeltetési fázisig tart, ahol üzembe helyezi a modelleket, és a következtetési lépések megkeresik a funkcióadatokat. A funkciótárolókkal kapcsolatos további információkért tekintse meg a funkciótár fogalmait.

Az oktatóanyag-sorozat 1. része bemutatta, hogyan hozhat létre egy funkciókészlet-specifikációt egyéni átalakításokkal, hogyan engedélyezheti a materializálást, és hogyan végezhet visszatöltést. A 2. rész bemutatta, hogyan kísérletezhet a kísérletezési és betanítási folyamatok funkcióival. A 3. rész elmagyarázta a transactions funkciókészlet ismétlődő materializálását, és bemutatta, hogyan futtathat kötegelt következtetési folyamatot a regisztrált modellen. A 4. rész a kötegelt következtetés futtatását ismertette.

Ebben az oktatóanyagban

  • Definiálja az egyéni adatforrásból származó adatok betöltéséhez használt logikát.
  • Konfiguráljon és regisztráljon egy szolgáltatáskészletet, amely ebből az egyéni adatforrásból származik.
  • Tesztelje a regisztrált funkciókészletet.

Előfeltételek

Feljegyzés

Ez az oktatóanyag egy Azure Machine Tanulás-jegyzetfüzetet használ kiszolgáló nélküli Spark Compute használatával.

  • Győződjön meg arról, hogy elvégezte a sorozat korábbi oktatóanyagait. Ez az oktatóanyag újra felhasználja a funkciótárat és a korábbi oktatóanyagokban létrehozott egyéb erőforrásokat.

Beállítás

Ez az oktatóanyag a Python szolgáltatástároló alapvető SDK-ját (azureml-featurestore) használja. A Python SDK a CRUD-műveletek létrehozására, olvasására, frissítésére és törlésére, funkciótárolókon, szolgáltatáskészleteken és szolgáltatástár-entitásokon való létrehozására, olvasására, frissítésére és törlésére szolgál.

Ehhez az oktatóanyaghoz nem kell explicit módon telepítenie ezeket az erőforrásokat, mert az itt látható beállítási utasításokban a conda.yml fájl ismerteti őket.

Az Azure Machine Tanulás Spark-jegyzetfüzet konfigurálása

Létrehozhat egy új jegyzetfüzetet, és lépésről lépésre végrehajthatja az oktatóanyag utasításait. Megnyithatja és futtathatja a meglévő jegyzetfüzetet featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Tartsa nyitva ezt az oktatóanyagot, és tekintse meg a dokumentáció hivatkozásait és további magyarázatait.

  1. A felső menü Compute legördülő listájában válassza a Kiszolgáló nélküli Spark Compute lehetőséget az Azure Machine Tanulás Kiszolgáló nélküli Spark alatt.

  2. Konfigurálja a munkamenetet:

    1. Válassza a Munkamenet konfigurálása lehetőséget a felső állapotsoron.
    2. Válassza ki a Python-csomagok lapot,
    3. Válassza a Conda-fájl feltöltése lehetőséget.
    4. Töltse fel az első oktatóanyagban feltöltött conda.yml fájlt.
    5. Ha szeretné, növelje a munkamenet időtúllépési idejét (üresjárati idő), hogy elkerülje a gyakori előfeltétel-újrafuttatásokat.

A minták gyökérkönyvtárának beállítása

Ez a kódcella beállítja a minták gyökérkönyvtárát. Az összes függőség telepítéséhez és a Spark-munkamenet elindításához körülbelül 10 perc szükséges.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

A szolgáltatástár-munkaterület CRUD-ügyfélének inicializálása

Inicializálja a MLClient szolgáltatástár-munkaterületet a szolgáltatástár-munkaterület létrehozási, olvasási, frissítési és törlési (CRUD) műveleteinek lefedéséhez.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

A szolgáltatástároló alapvető SDK-ügyfélének inicializálása

Ahogy korábban említettük, ez az oktatóanyag a Python szolgáltatástároló alapvető SDK-ját (azureml-featurestore) használja. Ez az inicializált SDK-ügyfél a szolgáltatástárolókon, szolgáltatáskészleteken és szolgáltatástár-entitásokon végzett létrehozási, olvasási, frissítési és törlési (CRUD) műveleteket ismerteti.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Egyéni forrásdefiníció

A saját forrásbetöltési logikát bármely olyan adattárból meghatározhatja, amely egyéni forrásdefinícióval rendelkezik. A funkció használatához implementáljon egy forrásfeldolgozó által definiált UDF-osztályt (CustomSourceTransformer ebben az oktatóanyagban). Ennek az osztálynak meg kell határoznia egy függvényt __init__(self, **kwargs) és egy függvényt process(self, start_time, end_time, **kwargs) . A kwargs szótár a funkciókészlet specifikációs definíciójának részeként van megadva. Ezt a definíciót ezután átadjuk a UDF-nek. A start_time rendszer kiszámítja és end_time átadja a paramétereket az UDF függvénynek.

Ez a forrásprocesszor UDF-osztályának mintakódja:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Szolgáltatáskészlet-specifikáció létrehozása egyéni forrással, és kísérletezés helyileg

Most hozzon létre egy szolgáltatáskészlet-specifikációt egy egyéni forrásdefinícióval, és használja a fejlesztői környezetben a funkciókészlettel való kísérletezéshez. A kiszolgáló nélküli Spark Compute-hez csatolt oktatójegyzetfüzet a fejlesztési környezetként szolgál.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Ezután definiáljon egy funkcióablakot, és jelenítse meg a funkcióértékeket ebben a funkcióablakban.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Exportálás funkciókészlet-specifikációként

Ha regisztrálni szeretné a funkciókészlet specifikációját a szolgáltatástárolóban, először mentse a specifikációt egy adott formátumban. Tekintse át a létrehozott transactions_custom_source funkciókészlet specifikációját. Nyissa meg ezt a fájlt a fájlfáról a specifikáció megtekintéséhez: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

A specifikáció a következő elemeket tartalmazza:

  • features: A funkciók és adattípusaik listája.
  • index_columns: A funkciókészlet értékeinek eléréséhez szükséges illesztőkulcsok.

A specifikációval kapcsolatos további információkért tekintse meg a kezelt funkciótár és a CLI (v2) szolgáltatáskészlet YAML-sémájának legfelső szintű entitásait ismertető témakört.

A funkciókészlet specifikációinak megőrzése egy másik előnyt is kínál: a funkciókészlet specifikációja szabályozható forrásként.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

A tranzakciós funkciókészlet regisztrálása a funkciótárolóban

Ezzel a kóddal regisztrálhat egy egyéni forrásból betöltött funkciókészlet-objektumot a funkciótárolóban. Ezután újra felhasználhatja az objektumot, és egyszerűen megoszthatja azt. A szolgáltatáskészlet-objektumok regisztrálása felügyelt képességeket kínál, beleértve a verziószámozást és a materializálást.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Szerezze be a regisztrált szolgáltatáskészletet, és nyomtassa ki a kapcsolódó információkat.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Szolgáltatáslétrehozás tesztelése regisztrált szolgáltatáskészletből

to_spark_dataframe() A funkciókészlet függvényével tesztelheti a regisztrált szolgáltatáskészletből származó szolgáltatásgenerációt, és megjelenítheti a funkciókat. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

Képesnek kell lennie a regisztrált szolgáltatáskészlet Spark-adatkeretként való beolvasására, majd megjelenítésére. Ezeket a funkciókat mostantól használhatja a megfigyelési adatokkal és a gépi tanulási folyamat későbbi lépéseivel való időponthoz kötött csatlakozáshoz.

A fölöslegessé vált elemek eltávolítása

Ha létrehozott egy erőforráscsoportot az oktatóanyaghoz, törölheti azt az erőforráscsoportot, amely törli az oktatóanyaghoz társított összes erőforrást. Ellenkező esetben egyenként törölheti az erőforrásokat:

  • A szolgáltatástár törléséhez nyissa meg az erőforráscsoportot az Azure Portalon, jelölje ki a funkciótárolót, és törölje azt.
  • A szolgáltatástár-munkaterülethez hozzárendelt felhasználó által hozzárendelt felügyelt identitás (UAI) nem törlődik a funkciótároló törlésekor. Az UAI törléséhez kövesse az alábbi utasításokat.
  • A tárfiók típusú offline tároló törléséhez nyissa meg az erőforráscsoportot az Azure Portalon, válassza ki a létrehozott tárat, és törölje azt.
  • Az Azure Cache for Redis-példány törléséhez nyissa meg az erőforráscsoportot az Azure Portalon, jelölje ki a létrehozott példányt, és törölje azt.

Következő lépések