AdventureWorks 数据集的 AI 技能示例(预览)
本文介绍如何设置 AI 技能,并将 Lakehouse 用作数据源。 为了说明该过程,我们首先创建一个湖屋,然后向其添加数据。 之后,我们将创建 AI 技能,并将该湖屋配置为其数据源。 如果已有 Power BI 语义模型(具有必要的读/写权限)、仓库或 KQL 数据库,可以在创建 AI 技能以添加数据源后执行相同的步骤。 虽然此处显示的步骤侧重于湖屋,但过程与其他数据源类似,你只需根据特定选择进行调整。
重要说明
此功能目前处于预览状态。
先决条件
- 付费 F64 或更高版本的 Fabric 容量资源
- AI 技能租户切换已启用。
- Copilot 租户切换已启用。
- AI 的跨地理位置处理已启用。
- AI 的跨地理存储 功能已启用。
- 仓库、湖屋、Power BI 语义模型和包含数据的 KQL 数据库。
- 已为 Power BI 语义模型数据源启用通过 XMLA 终结点的 Power BI 语义模型租户切换。
使用 AdventureWorksLH 创建湖屋
首先,创建湖屋并使用必要的数据进行填充。
如果在湖仓(或仓库)中已有 AdventureWorksLH 实例,则可以跳过此步骤。 如果没有,可以利用 Fabric 笔记本中的以下说明来使用数据填充湖屋。
在要创建 AI 技能的工作区中创建新的笔记本。
在“资源管理器”窗格左侧,选择“+ 数据源”。 使用此选项,可以添加现有的湖屋或创建新湖屋。 为清楚起见,请创建新湖屋并为其分配名称。
在顶部单元格中,添加以下代码片段:
import pandas as pd from tqdm.auto import tqdm base = "https://synapseaisolutionsa.blob.core.windows.net/public/AdventureWorks" # load list of tables df_tables = pd.read_csv(f"{base}/adventureworks.csv", names=["table"]) for table in (pbar := tqdm(df_tables['table'].values)): pbar.set_description(f"Uploading {table} to lakehouse") # download df = pd.read_parquet(f"{base}/{table}.parquet") # save as lakehouse table spark.createDataFrame(df).write.mode('overwrite').saveAsTable(table)
选择“全部运行”。
几分钟后,系统将对湖屋填充必要的数据。
创建 AI 技能
若要创建新的 AI 技能,请导航到工作区,然后选择“+ 新建项”按钮,如以下屏幕截图所示:
在“所有项”选项卡中,搜索 AI 技能 以找到适当的选项。 选择后,系统会提示你提供 AI 技能的名称,如以下屏幕截图所示:
输入名称后,请继续执行以下步骤,使 AI 技能符合特定要求。
选择数据
选择在上一步中创建的湖屋,然后选择“添加”。 将 Lakehouse 添加为数据源后,AI 技能页左侧 资源管理器 窗格会显示 lakehouse 名称。 选择该湖屋以查看所有可用的表。 使用复选框选择要提供给 AI 的表。 对于此方案,请选择以下表:
dimcustomer
dimdate
dimgeography
dimproduct
dimproductcategory
dimpromotion
dimreseller
dimsalesterritory
factinternetsales
cactresellersales
提供说明
若要添加 AI 说明,请选择 AI 说明 按钮,打开右侧的“AI 说明”窗格。 可以添加以下说明。
AdventureWorksLH
数据源包含来自三个表的信息:
dimcustomer
,用于获取详细的客户人口统计数据和联系信息dimdate
,用于与日期相关的数据 - 例如日历和会计信息dimgeography
,用于地理详细信息,包括城市名称和国家/地区代码。
使用此数据源进行涉及客户详细信息、基于时间的事件和地理位置的查询和分析。
提供示例
若要添加示例查询,请选择 示例查询 按钮打开右侧的示例查询窗格。 此窗格提供用于为所有受支持的数据源添加或编辑示例查询的选项。 对于每个数据源,可以选择 添加或编辑示例查询 输入相关示例,如以下屏幕截图所示:
在这里,应为创建的 Lakehouse 数据源添加示例查询。
Question: Calculate the average percentage increase in sales amount for repeat purchases for every zipcode. Repeat purchase is a purchase subsequent to the first purchase (the average should always be computed relative to the first purchase)
SELECT AVG((s.SalesAmount - first_purchase.SalesAmount) / first_purchase.SalesAmount * 100) AS AvgPercentageIncrease
FROM factinternetsales s
INNER JOIN dimcustomer c ON s.CustomerKey = c.CustomerKey
INNER JOIN dimgeography g ON c.GeographyKey = g.GeographyKey
INNER JOIN (
SELECT *
FROM (
SELECT
CustomerKey,
SalesAmount,
OrderDate,
ROW_NUMBER() OVER (PARTITION BY CustomerKey ORDER BY OrderDate) AS RowNumber
FROM factinternetsales
) AS t
WHERE RowNumber = 1
) first_purchase ON s.CustomerKey = first_purchase.CustomerKey
WHERE s.OrderDate > first_purchase.OrderDate
GROUP BY g.PostalCode;
Question: Show the monthly total and year-to-date total sales. Order by year and month.
SELECT
Year,
Month,
MonthlySales,
SUM(MonthlySales) OVER (PARTITION BY Year ORDER BY Year, Month ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS CumulativeTotal
FROM (
SELECT
YEAR(OrderDate) AS Year,
MONTH(OrderDate) AS Month,
SUM(SalesAmount) AS MonthlySales
FROM factinternetsales
GROUP BY YEAR(OrderDate), MONTH(OrderDate)
) AS t
注释
Power BI 语义模型数据源目前不支持添加示例查询/问题对。
测试和修改 AI 技能
现在,你已配置 AI 技能、添加了 AI 说明,并为 Lakehouse 提供了示例查询,接下来可以通过提问和接收答案来与之交互。 继续测试时,可以添加更多示例并优化说明,以进一步提高 AI 技能的性能。 与同事协作,收集反馈,并根据他们的输入,确保提供的示例查询和说明符合他们想要提问的问题类型。
以编程方式使用 AI 技能
可以在 Fabric 笔记本中以编程方式使用 AI 技能。 要确定 AI 技能是否具有已发布的 URL 值,请选择“设置”,如以下屏幕截图所示:
在发布 AI 技能之前,它没有已发布的 URL 值,如以下屏幕截图所示:
验证 AI 技能的性能后,可以决定发布它,以便与想要对数据执行 Q&A 的同事共享。 在这种情况下,请选择“发布”,如以下屏幕截图所示:
此时会显示 AI 技能的已发布 URL,如以下屏幕截图所示:
然后,可以复制已发布 URL 并将其用于 Fabric 笔记本。 通过此方式,可以通过在 Fabric 笔记本中调用 AI 技能 API 来查询 AI 技能。 将复制的 URL 粘贴到此代码片段中。 然后,将问题替换为与 AI 技能相关的任何查询。 此示例使用 \<generic published URL value\>
作为 URL。
%pip install "openai==1.14.1"
%pip install httpx==0.27.2
import requests
import json
import pprint
import typing as t
import time
import uuid
from openai import OpenAI
from openai._exceptions import APIStatusError
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given
from synapse.ml.mlflow import get_mlflow_env_config
from sempy.fabric._token_provider import SynapseTokenProvider
base_url = "https://<generic published base URL value>"
question = "What datasources do you have access to?"
configs = get_mlflow_env_config()
# Create OpenAI Client
class FabricOpenAI(OpenAI):
def __init__(
self,
api_version: str ="2024-05-01-preview",
**kwargs: t.Any,
) -> None:
self.api_version = api_version
default_query = kwargs.pop("default_query", {})
default_query["api-version"] = self.api_version
super().__init__(
api_key="",
base_url=base_url,
default_query=default_query,
**kwargs,
)
def _prepare_options(self, options: FinalRequestOptions) -> None:
headers: dict[str, str | Omit] = (
{**options.headers} if is_given(options.headers) else {}
)
options.headers = headers
headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
if "Accept" not in headers:
headers["Accept"] = "application/json"
if "ActivityId" not in headers:
correlation_id = str(uuid.uuid4())
headers["ActivityId"] = correlation_id
return super()._prepare_options(options)
# Pretty printing helper
def pretty_print(messages):
print("---Conversation---")
for m in messages:
print(f"{m.role}: {m.content[0].text.value}")
print()
fabric_client = FabricOpenAI()
# Create assistant
assistant = fabric_client.beta.assistants.create(model="not used")
# Create thread
thread = fabric_client.beta.threads.create()
# Create message on thread
message = fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
# Create run
run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)
# Wait for run to complete
while run.status == "queued" or run.status == "in_progress":
run = fabric_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
print(run.status)
time.sleep(2)
# Print messages
response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
pretty_print(response)
# Delete thread
fabric_client.beta.threads.delete(thread_id=thread.id)