Udostępnij za pośrednictwem


Wdrażanie i wykonywanie zapytań względem punktu końcowego obsługującego funkcję

W tym artykule przedstawiono sposób wdrażania i wykonywania zapytań dotyczących punktu końcowego obsługującego funkcję w procesie krok po kroku. W tym artykule jest używany zestaw SDK usługi Databricks. Niektóre kroki można również wykonać przy użyciu interfejsu API REST lub interfejsu użytkownika usługi Databricks i zawierać odwołania do dokumentacji dla tych metod.

W tym przykładzie masz tabelę miast z ich lokalizacjami (szerokość geograficzna i długość geograficzna) oraz aplikacją polecającą, która uwzględnia bieżącą odległość użytkownika od tych miast. Ponieważ lokalizacja użytkownika stale się zmienia, odległość między użytkownikiem a każdym miastem musi być obliczana w czasie wnioskowania. W tym samouczku pokazano, jak wykonać te obliczenia z małym opóźnieniem przy użyciu tabel online usługi Databricks i obsługi funkcji usługi Databricks. Pełny zestaw przykładowego kodu można znaleźć w przykładowym notesie.

Krok 1. Tworzenie tabeli źródłowej

Tabela źródłowa zawiera wstępnie skompilowane wartości funkcji i może być dowolną tabelą delty w wykazie aparatu Unity z kluczem podstawowym. W tym przykładzie tabela zawiera listę miast z ich szerokością geograficzną i długością geograficzną. Kluczem podstawowym jest destination_id. Poniżej przedstawiono przykładowe dane.

name destination_id (pk) latitude długość geograficzna
Nashville, Tennessee 0 36.162663 -86.7816
Honolulu, Hawaje 1 21.309885 -157.85814
Las Vegas, Nevada 2 36.171562 -115.1391
Nowy Jork (stan Nowy Jork) 3 40.712776 -74.005974

Krok 2. Tworzenie tabeli online

Tabela online to kopia tabeli delta tylko do odczytu zoptymalizowana pod kątem dostępu online. Aby uzyskać więcej informacji, zobacz Używanie tabel online na potrzeby obsługi funkcji w czasie rzeczywistym.

Aby utworzyć tabelę online, możesz użyć interfejsu użytkownika Tworzenie tabeli online przy użyciu interfejsu użytkownika, interfejsu API REST lub zestawu SDK usługi Databricks, jak w poniższym przykładzie:

from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import mlflow

workspace = WorkspaceClient()

# Create an online table
feature_table_name = f"main.on_demand_demo.location_features"
online_table_name=f"main.on_demand_demo.location_features_online"

spec = OnlineTableSpec(
 primary_key_columns=["destination_id"],
 source_table_full_name = feature_table_name,
 run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
 perform_full_copy=True)

# ignore "already exists" error
try:
 online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
 if "already exists" in str(e):
   pass
 else:
   raise e

pprint(workspace.online_tables.get(online_table_name))

Krok 3. Tworzenie funkcji w wykazie aparatu Unity

W tym przykładzie funkcja oblicza odległość między miejscem docelowym (którego lokalizacja nie zmienia się) i użytkownikiem (którego lokalizacja zmienia się często i nie jest znana do czasu wnioskowania).

# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"

spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)

# Earth's radius in kilometers
radius = 6371

# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c

return distance
$$""")

Krok 4. Tworzenie specyfikacji funkcji w wykazie aparatu Unity

Specyfikacja funkcji określa funkcje, które obsługuje punkt końcowy i ich klucze wyszukiwania. Określa również wszystkie wymagane funkcje, które mają być stosowane do pobranych funkcji z ich powiązaniami. Aby uzyskać szczegółowe informacje, zobacz Tworzenie funkcjiSpec.

from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient

fe = FeatureEngineeringClient()

features=[
 FeatureLookup(
   table_name=feature_table_name,
   lookup_key="destination_id"
 ),
 FeatureFunction(
   udf_name=function_name,
   output_name="distance",
   input_bindings={
     "latitude": "latitude",
     "longitude": "longitude",
     "user_latitude": "user_latitude",
     "user_longitude": "user_longitude"
   },
 ),
]

feature_spec_name = f"main.on_demand_demo.travel_spec"

# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
 fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
 if "already exists" in str(e):
   pass
 else:
   raise e

Krok 5. Tworzenie punktu końcowego obsługującego funkcję

Aby utworzyć punkt końcowy obsługujący funkcję, możesz użyć interfejsu użytkownika Tworzenie punktu końcowego, interfejsu API REST lub zestawu SDK usługi Databricks, jak pokazano tutaj.

Punkt końcowy obsługujący funkcję przyjmuje feature_spec parametr utworzony w kroku 4 jako parametr.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput

# Create endpoint
endpoint_name = "fse-location"

try:
 status = workspace.serving_endpoints.create_and_wait(
   name=endpoint_name,
   config = EndpointCoreConfigInput(
     served_entities=[
       ServedEntityInput(
         entity_name=feature_spec_name,
         scale_to_zero_enabled=True,
         workload_size="Small"
       )
     ]
   )
 )
 print(status)

# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)

Krok 6. Wykonywanie zapytań względem punktu końcowego obsługującego funkcję

Podczas wykonywania zapytań dotyczących punktu końcowego należy podać klucz podstawowy i opcjonalnie wszystkie dane kontekstowe używane przez funkcję. W tym przykładzie funkcja przyjmuje jako dane wejściowe bieżącej lokalizacji użytkownika (szerokość geograficzną i długość geograficzną). Ponieważ lokalizacja użytkownika stale się zmienia, należy ją udostępnić funkcji w czasie wnioskowania jako funkcję kontekstową.

Możesz również wykonać zapytanie dotyczące punktu końcowego przy użyciu interfejsu użytkownika Query an endpoint using the UI or the REST API (Wykonywanie zapytań dotyczących punktu końcowego przy użyciu interfejsu użytkownika lub interfejsu API REST).

Dla uproszczenia w tym przykładzie jest obliczana tylko odległość do dwóch miast. Bardziej realistyczny scenariusz może obliczyć odległość użytkownika od każdej lokalizacji w tabeli funkcji, aby określić, które miasta mają być zalecane.

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
   endpoint=endpoint_name,
   inputs={
       "dataframe_records": [
           {"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
           {"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
       ]
   },
)

pprint(response)

Przykładowy notes

Zobacz ten notes, aby uzyskać pełną ilustrację kroków:

Przykładowy notes obsługujący funkcję z tabelami online

Pobierz notes

Dodatkowe informacje

Aby uzyskać szczegółowe informacje na temat korzystania z interfejsu API języka Python inżynierii funkcji, zobacz dokumentację referencyjną.