次の方法で共有


例: 機能サービス エンドポイントをデプロイしてクエリを実行する

この記事では、特徴量提供エンドポイントをデプロイしてクエリを実行する方法をステップバイステップ プロセスで説明します。 この記事では、Databricks SDK を使用します。 一部の手順は、REST API または Databricks UI を使用して完了することもでき、それらのメソッドのドキュメントへの参照が含まれます。

この例では、場所 (緯度と経度) を含む都市のテーブルと、それらの都市からのユーザーの現在の距離を考慮したレコメンダー アプリがあります。 ユーザーの位置は絶えず変化するため、推論時にユーザーと各都市の間の距離を計算する必要があります。 このチュートリアルでは、Databricks Online Feature Store と Databricks Feature Serving を使用して、短い待機時間でこれらの計算を実行する方法について説明します。 完全なコード例については、「例のノートブック」を参照してください。

ステップ 1. ソース テーブルを作成する

ソース テーブルには、事前計算された特徴値が含まれており、主キーを持つ Unity Catalog 内の任意の Delta テーブルを指定できます。 この例では、テーブルに都市とその緯度および経度の一覧が含まれています。 主キーは destination_id です。 サンプル データを次に示します。

名前 destination_id (プライマリキー) 緯度 経度
テネシー州ナッシュビル 0 36.162663 -86.7816
ハワイ州ホノルル 1 21.309885 -157.85814
ネバダ州ラスベガス 2 36.171562 -115.1391
ニューヨーク (ニューヨーク州) 3 40.712776 -74.005974

ステップ 2. オンライン機能ストアを作成する

Databricks Online Feature Store の詳細については、「 Databricks Online Feature Stores」を参照してください。

from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

feature_table_name = f"{catalog_name}.{schema_name}.location_features"
function_name = f"{catalog_name}.{schema_name}.distance"


# Create the feature table
fe.create_table(
  name = feature_table_name,
  primary_keys="destination_id",
  df = destination_location_df,
  description = "Destination location features."
)


# Enable Change Data Feed to enable CONTINOUS and TRIGGERED publish modes

spark.sql(f"ALTER TABLE {feature_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = 'true')")

# Create an online store with specified capacity
online_store_name = f"{username}-online-store"

fe.create_online_store(
    name=online_store_name,
    capacity="CU_2"  # Valid options: "CU_1", "CU_2", "CU_4", "CU_8"
)

# Wait until the state is AVAILABLE
online_store = fe.get_online_store(name=online_store_name)
online_store.state

# Publish the table

published_table = fe.publish_table(
    online_store=online_store,
    source_table_name=feature_table_name,
    online_table_name=online_table_name
)

手順 3. Unity Catalog で関数を作成する

この例では、関数は、目的地 (位置が変化しない) からユーザー (位置が頻繁に変化し、推論時まで不明) までの距離を計算します。

# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"

spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)

# Earth's radius in kilometers
radius = 6371

# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c

return distance
$$""")

ステップ 4: Unity Catalog で特徴量仕様を作成する

特徴量仕様では、エンドポイントが提供する特徴量とそのルックアップ キーを指定します。 また、取得した特徴量とそのバインディングに適用するために必要な関数も指定します。 詳細については、「FeatureSpecの作成」を参照してください。

from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient

fe = FeatureEngineeringClient()

features=[
 FeatureLookup(
   table_name=feature_table_name,
   lookup_key="destination_id"
 ),
 FeatureFunction(
   udf_name=function_name,
   output_name="distance",
   input_bindings={
     "latitude": "latitude",
     "longitude": "longitude",
     "user_latitude": "user_latitude",
     "user_longitude": "user_longitude"
   },
 ),
]

feature_spec_name = f"main.on_demand_demo.travel_spec"

# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
 fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
 if "already exists" in str(e):
   pass
 else:
   raise e

ステップ 5: 特徴量提供エンドポイントを作成する

特徴量提供エンドポイントを作成するには、ここに示す UI (「[エンドポイントを作成する」)、REST API、または Databricks SDK を使用できます。

特徴量提供エンドポイントは、手順 4 で作成した feature_spec をパラメーターとして受け取ります。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput

# Create endpoint
endpoint_name = "fse-location"

try:
 status = workspace.serving_endpoints.create_and_wait(
   name=endpoint_name,
   config = EndpointCoreConfigInput(
     served_entities=[
       ServedEntityInput(
         entity_name=feature_spec_name,
         scale_to_zero_enabled=True,
         workload_size="Small"
       )
     ]
   )
 )
 print(status)

# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)

ステップ 6. 特徴量提供エンドポイントのクエリを実行する

エンドポイントのクエリを実行する場合、主キーを指定し、必要に応じて、関数で使用されるコンテキスト データも指定します。 この例では、関数はユーザーの現在の位置 (緯度と経度) を入力として受け取ります。 ユーザーの場所は絶えず変化しているため、コンテキスト機能として推論時に関数に提供する必要があります。

エンドポイントのクエリを実行する場合も、UI (「UI を使用してエンドポイントのクエリを実行する」)、または REST API を使用できます。

わかりやすくするために、この例では、2 つの都市までの距離のみを計算します。 より現実的なシナリオでは、フィーチャ テーブル内の各場所からのユーザーの距離を計算して、推奨する都市を決定できます。

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
   endpoint=endpoint_name,
   inputs={
       "dataframe_records": [
           {"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
           {"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
       ]
   },
)

pprint(response)

ノートブックの例

手順の完全な例については、次のノートブックを参照してください。

オンライン ストアでサンプル ノートブックを提供する機能

ノートブックを入手

追加情報

特徴エンジニアリング Python API の使用方法の詳細ついては、リファレンス ドキュメントを参照してください。