Samouczek: tworzenie niestandardowej wyszukiwarki i systemu odpowiadania na pytania
Z tego samouczka dowiesz się, jak indeksować i wykonywać zapytania dotyczące dużych danych załadowanych z klastra Spark. Skonfiguruj notes Jupyter Notebook, który wykonuje następujące czynności:
- Ładowanie różnych formularzy (faktur) do ramki danych w sesji platformy Apache Spark
- Analizowanie ich w celu określenia ich funkcji
- Złóż wynikowe dane wyjściowe do struktury danych tabelarycznych
- Zapisywanie danych wyjściowych w indeksie wyszukiwania hostowanym w usłudze Azure Cognitive Search
- Eksplorowanie i wykonywanie zapytań o utworzoną zawartość
1 — Konfigurowanie zależności
Zaczynamy od zaimportowania pakietów i nawiązania połączenia z zasobami platformy Azure używanymi w tym przepływie pracy.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
2 — Ładowanie danych do platformy Spark
Ten kod ładuje kilka plików zewnętrznych z konta usługi Azure Storage, które jest używane do celów demonstracyjnych. Pliki są różnymi fakturami i są odczytywane w ramce danych.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
3 — Stosowanie rozpoznawania formularzy
Ten kod ładuje transformator AnalyzeInvoices i przekazuje odwołanie do ramki danych zawierającej faktury. Wywołuje ona wstępnie utworzony model faktury analizatora azure Forms Analyzer.
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
4 — Uproszczenie danych wyjściowych rozpoznawania formularzy
Ten kod używa klasy FormOntologyLearner, transformatora, który analizuje dane wyjściowe funkcji przekształcania rozpoznawania formularzy (dla analizy dokumentów sztucznej inteligencji platformy Azure) i wnioskuje strukturę danych tabelarycznych. Dane wyjściowe funkcji AnalyzeInvoices są dynamiczne i różnią się w zależności od funkcji wykrytych w zawartości.
FormOntologyLearner rozszerza narzędzie przekształcania AnalyzeInvoices, wyszukując wzorce, których można użyć do utworzenia struktury danych tabelarycznych. Organizowanie danych wyjściowych w wielu kolumnach i wierszach ułatwia analizę podrzędną.
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
Dzięki naszej ładnej ramce danych tabelarycznych możemy spłaszczać zagnieżdżone tabele znajdujące się w formularzach przy użyciu niektórych baz danych SparkSQL
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
5 — Dodawanie tłumaczeń
Ten kod ładuje funkcję Translate— transformator, który wywołuje usługę Azure AI Translator w usługach Azure AI. Oryginalny tekst, który jest w języku angielskim w kolumnie "Opis", jest tłumaczony maszynowo na różne języki. Wszystkie dane wyjściowe są konsolidowane w tablicy "output.translations".
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
6 — Tłumaczenie produktów na emoji za pomocą interfejsu OpenAI 🤯
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
7 — Wnioskowanie kontynentu adresu dostawcy za pomocą interfejsu OpenAI
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
8 — Tworzenie indeksu usługi Azure Search dla formularzy
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
9 — Wypróbuj zapytanie wyszukiwania
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
10 — Tworzenie czatbota, który może używać usługi Azure Search jako narzędzia 🧠🔧
import json
import openai
openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
11 — Zadaj naszemu czatbotowi pytanie
custom_chatbot("What did Luke Diaz buy?")
12 — Szybkie sprawdzanie podwójne
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)