Zelfstudie: Een aangepast zoekprogramma en vraag-antwoordsysteem maken
In deze zelfstudie leert u hoe u grote gegevens kunt indexeren en er query's op kunt uitvoeren die zijn geladen vanuit een Spark-cluster. U stelt een Jupyter Notebook in waarmee de volgende acties worden uitgevoerd:
- Verschillende formulieren (facturen) laden in een gegevensframe in een Apache Spark-sessie
- Ze analyseren om hun functies te bepalen
- De resulterende uitvoer samenstellen in een gegevensstructuur in tabelvorm
- De uitvoer schrijven naar een zoekindex die wordt gehost in Azure Cognitive Search
- Verkennen en query's uitvoeren op de inhoud die u hebt gemaakt
1 - Afhankelijkheden instellen
We beginnen met het importeren van pakketten en het maken van verbinding met de Azure-resources die in deze werkstroom worden gebruikt.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
2 - Gegevens laden in Spark
Met deze code worden enkele externe bestanden geladen vanuit een Azure-opslagaccount dat wordt gebruikt voor demodoeleinden. De bestanden zijn verschillende facturen en ze worden in een gegevensframe gelezen.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
3 - Formulierherkenning toepassen
Met deze code wordt de transformer AnalyzeInvoices geladen en wordt een verwijzing naar het gegevensframe met de facturen doorgegeven. Het noemt het vooraf gebouwde factuurmodel van Azure Forms Analyzer.
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
4 - Uitvoer van formulierherkenning vereenvoudigen
Deze code maakt gebruik van de FormOntologyLearner, een transformator waarmee de uitvoer van Form Recognizer-transformatoren (voor Azure AI Document Intelligence) wordt geanalyseerd en een gegevensstructuur in tabelvorm wordt afgeleid. De uitvoer van AnalyzeInvoices is dynamisch en varieert op basis van de functies die in uw inhoud zijn gedetecteerd.
FormOntologyLearner breidt het hulpprogramma van de AnalyzeInvoices-transformator uit door te zoeken naar patronen die kunnen worden gebruikt om een tabellaire gegevensstructuur te maken. Het ordenen van de uitvoer in meerdere kolommen en rijen zorgt voor eenvoudigere downstreamanalyse.
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
Met ons mooie gegevensframe in tabelvorm kunnen we de geneste tabellen in de formulieren plat maken met sparkSQL
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
5 - Vertalingen toevoegen
Met deze code wordt Vertalen geladen, een transformator die de Azure AI Vertalen-service aanroept in Azure AI-services. De oorspronkelijke tekst, die in het Engels in de kolom Beschrijving staat, wordt automatisch vertaald in verschillende talen. Alle uitvoer wordt samengevoegd in de matrix output.translations.
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
6 - Producten vertalen naar emoji's met OpenAI π€―
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: π¦π¦,
Light Bulb: π‘,
Three Peaches: πππ,
Two kitchen stoves: β¨οΈβ¨οΈ,
A red car: π,
A person and a cat: π§π,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
7 - Het adres van de leverancier afleiden met OpenAI
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
8 - Een Azure Search-index voor de formulieren maken
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
9 - Een zoekquery uitproberen
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
10 - Een chatbot bouwen die Azure Search als hulpprogramma π§ kan gebruikenπ§
import json
import openai
openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
11 - Onze chatbot een vraag stellen
custom_chatbot("What did Luke Diaz buy?")
12 - Een snelle controle
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)