Een functie implementeren en er query's op uitvoeren
In dit artikel wordt beschreven hoe u een functie implementeert en er query's op uitvoert in een stapsgewijze procedure. In dit artikel wordt de Databricks SDK gebruikt. Sommige stappen kunnen ook worden voltooid met behulp van de REST API of de Databricks-gebruikersinterface en bevatten verwijzingen naar de documentatie voor deze methoden.
In dit voorbeeld hebt u een table steden met hun locaties (breedtegraad en lengtegraad) en een aanbevolen app die rekening houdt met de huidige afstand van de gebruiker van die steden. Omdat de locatie van de gebruiker voortdurend verandert, moet de afstand tussen de gebruiker en elke stad worden berekend op het moment van deductie. In deze zelfstudie ziet u hoe u deze berekeningen met lage latentie uitvoert met behulp van Databricks Online Tables en Databricks Feature Serving. Zie voor de volledige set van de voorbeeldcode het voorbeeldnotebook.
Stap 1. De bron-table maken
De bron table bevat de vooraf samengestelde eigenschap values en kan elke Delta table in Unity Catalog zijn met een primaire sleutel. In dit voorbeeld bevat de table een list van steden met hun breedte- en lengtegraad. De primaire sleutel is destination_id
. Voorbeeldgegevens worden hieronder weergegeven.
naam | destination_id (pk) | breedtegraad | lengtegraad |
---|---|---|---|
Nashville, Tennessee | 0 | 36.162663 | -86.7816 |
Honolulu, Hawaï | 1 | 21.309885 | -157.85814 |
Las Vegas, Nevada | 2 | 36.171562 | -115.1391 |
New York, New York | 3 | 40.712776 | -74.005974 |
Stap 2. Een online table maken
Een online table is een alleen-lezen-kopie van een Delta-Table die is geoptimaliseerd voor online toegang. Zie voor meer informatie gebruik online tables voor real-time functielevering.
Als u een online-tablewilt maken, kunt u de gebruikersinterface gebruiken Een online-table maken met behulp van de gebruikersinterface, de REST APIof de Databricks SDK, zoals in het volgende voorbeeld:
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import mlflow
workspace = WorkspaceClient()
# Create an online table
feature_table_name = f"main.on_demand_demo.location_features"
online_table_name=f"main.on_demand_demo.location_features_online"
spec = OnlineTableSpec(
primary_key_columns=["destination_id"],
source_table_full_name = feature_table_name,
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
perform_full_copy=True)
# ignore "already exists" error
try:
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
pprint(workspace.online_tables.get(online_table_name))
Stap 3. Een functie maken in Unity Catalog
In dit voorbeeld berekent de functie de afstand tussen de bestemming (waarvan de locatie niet verandert) en de gebruiker (waarvan de locatie regelmatig verandert en niet bekend is tot de tijd van deductie).
# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"
spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)
# Earth's radius in kilometers
radius = 6371
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c
return distance
$$""")
Stap 4. Een functiespecificatie maken in Unity Catalog
De functiespecificatie geeft de functies op die het eindpunt dient en de bijbehorende zoeksleutels. Ook worden alle vereiste functies opgegeven die moeten worden toegepast op de opgehaalde functies met hun bindingen. Zie Een FeatureSpec maken voor meer informatie.
from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient
fe = FeatureEngineeringClient()
features=[
FeatureLookup(
table_name=feature_table_name,
lookup_key="destination_id"
),
FeatureFunction(
udf_name=function_name,
output_name="distance",
input_bindings={
"latitude": "latitude",
"longitude": "longitude",
"user_latitude": "user_latitude",
"user_longitude": "user_longitude"
},
),
]
feature_spec_name = f"main.on_demand_demo.travel_spec"
# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
Stap 5. Een eindpunt voor het leveren van functies maken
Als u een eindpunt voor het leveren van functies wilt maken, kunt u de gebruikersinterface gebruiken om een eindpunt, de REST API of de Databricks SDK te maken, die hier wordt weergegeven.
Het functie-eindpunt voor de dienst neemt het feature_spec
eindpunt dat u in stap 4 hebt gemaakt als een parameter.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
# Create endpoint
endpoint_name = "fse-location"
try:
status = workspace.serving_endpoints.create_and_wait(
name=endpoint_name,
config = EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name,
scale_to_zero_enabled=True,
workload_size="Small"
)
]
)
)
print(status)
# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)
Stap 6. Een query uitvoeren op het functie-eindpunt
Wanneer u een query uitvoert op het eindpunt, geeft u de primaire sleutel en eventueel eventuele contextgegevens op die door de functie worden gebruikt. In dit voorbeeld wordt de functie gebruikt als invoer voor de huidige locatie van de gebruiker (breedtegraad en lengtegraad). Omdat de locatie van de gebruiker voortdurend verandert, moet deze worden verstrekt aan de functie tijdens deductietijd als contextfunctie.
U kunt ook een query uitvoeren op het eindpunt met behulp van de gebruikersinterface een eindpunt opvragen met behulp van de gebruikersinterface of de REST API.
Ter vereenvoudiging berekent dit voorbeeld alleen de afstand tot twee steden. Een realistischer scenario kan de afstand van de gebruiker van elke locatie in de functie table berekenen om te bepalen welke steden moeten worden aanbevolen.
import mlflow.deployments
client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
endpoint=endpoint_name,
inputs={
"dataframe_records": [
{"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
{"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
]
},
)
pprint(response)
Voorbeeldnotitieblok
Bekijk dit notitieblok voor een volledige illustratie van de stappen:
Voorbeeldnotebook voor Feature Serving met de online tables
Aanvullende informatie
Zie de referentiedocumentatie voor meer informatie over het gebruik van de Python-API voor functie-engineering.