部署和查询特征服务终结点

本文逐步介绍如何部署和查询特征服务终结点。 本文使用 Databricks SDK。 某些步骤也可以使用 REST API 或 Databricks UI 来完成,其中提供了这些方法的参考文档。

在此示例中,有一个包含位置(纬度和经度)的城市表,以及一个考虑用户与这些城市的当前距离的推荐器应用。 由于用户的位置不断变化,因此在推理时必须计算用户与每个城市的距离。 本教程将演示如何使用 Databricks 联机表和 Databricks 特征服务以较低的延迟执行这些计算。 有关完整示例代码,请参阅示例笔记本

步骤 1. 创建源表

源表包含预先计算的特征值,可以是 Unity Catalog 中具有主键的任何 Delta 表。 在此示例中,该表包含城市及其纬度和经度的列表。 主键是 destination_id。 示例数据如下所示。

name destination_id (pk) latitude longitude
Nashville, Tennessee 0 36.162663 -86.7816
Honolulu, Hawaii 1 21.309885 -157.85814
内华达州拉斯维加斯 2 36.171562 -115.1391
纽约州,纽约市 3 40.712776 -74.005974

步骤 2. 创建联机表

联机表是 Delta 表的只读副本,已针对联机访问进行优化。 有关详细信息,请参阅为实时特征服务使用联机表

若要创建联机表,可以使用 UI(参阅使用 UI 创建联机表)、REST API 或 Databricks SDK,如以下示例所示:

from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import mlflow

workspace = WorkspaceClient()

# Create an online table
feature_table_name = f"main.on_demand_demo.location_features"
online_table_name=f"main.on_demand_demo.location_features_online"

spec = OnlineTableSpec(
 primary_key_columns=["destination_id"],
 source_table_full_name = feature_table_name,
 run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
 perform_full_copy=True)

# ignore "already exists" error
try:
 online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
 if "already exists" in str(e):
   pass
 else:
   raise e

pprint(workspace.online_tables.get(online_table_name))

步骤 3. 在 Unity Catalog 中创建函数

在此示例中,该函数将计算目标(其位置不会改变)和用户(其位置经常变化并且在推理时才知道)之间的距离。

# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"

spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)

# Earth's radius in kilometers
radius = 6371

# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c

return distance
$$""")

步骤 4. 在 Unity Catalog 中创建特征规范

特征规范指定终结点提供的特征及其查找键。 它还指定应用于检索到的特征及其绑定的任何所需函数。 有关详细信息,请参阅创建 FeatureSpec

from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient

fe = FeatureEngineeringClient()

features=[
 FeatureLookup(
   table_name=feature_table_name,
   lookup_key="destination_id"
 ),
 FeatureFunction(
   udf_name=function_name,
   output_name="distance",
   input_bindings={
     "latitude": "latitude",
     "longitude": "longitude",
     "user_latitude": "user_latitude",
     "user_longitude": "user_longitude"
   },
 ),
]

feature_spec_name = f"main.on_demand_demo.travel_spec"

# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
 fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
 if "already exists" in str(e):
   pass
 else:
   raise e

步骤 5。 创建特征服务终结点

若要创建特征服务终结点,可以使用 UI(创建终结点)、REST API 或 Databricks SDK,如下所示。

特征服务终结点采用步骤 4 中创建的 feature_spec 作为参数。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput

# Create endpoint
endpoint_name = "fse-location"

try:
 status = workspace.serving_endpoints.create_and_wait(
   name=endpoint_name,
   config = EndpointCoreConfigInput(
     served_entities=[
       ServedEntityInput(
         entity_name=feature_spec_name,
         scale_to_zero_enabled=True,
         workload_size="Small"
       )
     ]
   )
 )
 print(status)

# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)

步骤 6。 查询特征服务终结点

查询终结点时,需要提供主键以及(可选)函数使用的任何上下文数据。 在此示例中,该函数采用用户的当前位置(纬度和经度)作为输入。 由于用户的位置不断变化,因此必须在推理时将其作为上下文特征提供给函数。

也可以使用 UI(参阅使用 UI 查询终结点)或 REST API 来查询终结点。

为简单起见,本示例仅计算前往两个城市的距离。 更真实的方案可能会计算用户与特征表中每个位置的距离,以确定要推荐哪些城市。

import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
   endpoint=endpoint_name,
   inputs={
       "dataframe_records": [
           {"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
           {"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
       ]
   },
)

pprint(response)

示例笔记本

请参阅此笔记本以获取步骤的完整说明:

使用联机表的功能服务示例笔记本

获取笔记本

其他信息

有关使用特征工程 Python API 的详细信息,请参阅参考文档