Tutorial: Erstellen einer benutzerdefinierten Suchmaschine und eines Frage-Antwort-Systems
In diesem Tutorial erfahren Sie, wie Sie umfangreiche Daten, die aus einem Spark-Cluster geladen wurden, indizieren und abfragen. Sie richten ein Jupyter Notebook ein, das die folgenden Aktionen ausführt:
- Laden Sie verschiedene Formulare (Rechnungen) in einen Datenrahmen in einer Apache Spark-Sitzung
- Analysieren sie, um ihre Eigenschaften zu bestimmen
- Zusammenführen der resultierenden Ausgabe in eine tabellarische Datenstruktur
- Schreiben der Ausgabe in einen in Azure Cognitive Search gehosteten Suchindex
- Erkunden und Abfragen der von Ihnen erstellten Inhalte
1. Einrichten von Abhängigkeiten
Zunächst importieren wir Pakete und stellen eine Verbindung zu den Azure-Ressourcen her, die in diesem Workflow verwendet werden.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
2. Laden von Daten in Spark
Dieser Code lädt ein paar externe Dateien aus einem Azure-Speicherkonto, das zu Demozwecken verwendet wird. Die Dateien sind verschiedene Rechnungen und werden in einen Datenrahmen eingelesen.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
3. Anwenden der Formularerkennung
Dieser Code lädt den AnalyzeInvoices-Transformer und übergibt einen Verweis auf den Datenrahmen, der die Rechnungen enthält. Es ruft das vorgefertigte Rechnungsmodell von Azure Forms Analyzer auf.
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
4. Vereinfachen der Ausgabe der Formularerkennung
Dieser Code verwendet FormOntologyLearner, einen Transformator, der die Ausgabe von Transformatoren der Formularerkennung (für Azure KI Dokument Intelligenz) analysiert und daraus eine tabellarische Datenstruktur ableitet. Die Ausgabe von AnalyzeInvoices ist dynamisch und variiert je nach den in Ihren Inhalten erkannten Funktionen.
FormOntologyLearner erweitert den Nutzen des Transformers AnalyzeInvoices, indem es nach Mustern sucht, die zum Erstellen einer tabellarischen Datenstruktur verwendet werden können. Das Organisieren der Ausgabe in mehreren Spalten und Zeilen ermöglicht eine einfachere Downstreamanalyse.
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
Der übersichtliche tabellarische Dataframe bietet die Möglichkeit, die geschachtelten Tabellen in den Formularen mit SparkSQL zu vereinfachen.
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
5. Hinzufügen von Übersetzungen
Dieser Code lädt Übersetzen, einen Transformer, der den Azure AI Translator-Dienst in Azure AI Services aufruft. Der englische Originaltext in der Spalte „Beschreibung“ wird maschinell in verschiedene Sprachen übersetzt. Die gesamte Ausgabe wird im Array „output.translations“ konsolidiert.
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
6. Übersetzen von Produkten in Emojis mit OpenAI 🤯
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
7. Ableiten des Kontinents der Lieferantenadressen mit OpenAI
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
8. Erstellen eines Azure Search-Index für die Formulare
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
9. Testen einer Suchabfrage
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
10. Erstellen eines Chatbots, der Azure Search als Tool verwenden kann 🧠 🔧
import json
import openai
openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
11. Stellen einer Frage an den Chatbot
custom_chatbot("What did Luke Diaz buy?")
12. Durchführen einer schnellen Gegenprüfung
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)