教程:构建自定义搜索引擎和问答系统
本教程介绍如何为从 Spark 群集加载的大型数据编制索引和查询数据。 将设置一个执行以下操作的 Jupyter Notebook:
- 在 Apache Spark 会话中将各种表单(发票)加载到数据帧
- 分析它们以确定其功能
- 将生成的输出组合到表格数据结构中
- 将输出写入 Azure 认知搜索中托管的搜索索引
- 浏览和查询所创建的内容
1 - 设置依赖项
首先导入包并连接到此工作流中使用的 Azure 资源。
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
2 - 将数据加载到 Spark 中
此代码从用于演示目的的 Azure 存储帐户加载一些外部文件。 这些文件是各种发票,可读入数据帧。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
3 - 应用表单识别
此代码加载 AnalyzeInvoices 转换器,并传递对包含发票的数据帧的引用。 它调用 Azure Forms Analyzer 的预生成的发票模型。
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
4 - 简化表单识别输出
此代码使用 FormOntologyLearner,这是一个分析表单识别器转换器(适用于 Azure AI 文档智能)的输出并推断表格数据结构的转换器。 AnalyzeInvoices 的输出是动态的,具体因内容中检测到的特征而异。
FormOntologyLearner 通过查找可用于创建表格数据结构的模式扩展了 AnalyzeInvoices 转换器的实用性。 将输出组织为多列和多行可以简化下游分析。
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
借助美观的表格数据帧,我们可以使用 SparkSQL 平展表单中的嵌套表
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
5 - 添加翻译
此代码加载 Translate,这是一个调用 Azure AI 服务中的 Azure AI 翻译服务的转换器。 “说明”一列的英文原文被机器翻译成各种语言。 所有输出都合并到“output.translations”数组中。
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
6 - 使用 OpenAI 将产品翻译为表情符号 🤯
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
7 - 使用 OpenAI 推断供应商地址所在洲
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
8 - 为表单创建 Azure 搜索索引
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
9 - 试用搜索查询
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
10 - 构建可将 Azure 搜索用作工具的聊天机器人 🧠🔧
import json
import openai
openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
11 - 向聊天机器人提问
custom_chatbot("What did Luke Diaz buy?")
12 - 快速执行双重检查
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)