Tutorial: como criar um mecanismo de pesquisa personalizado e um sistema de respostas a perguntas
Nesse tutorial, aprenda a indexar e consultar dados grandes carregados de um cluster Spark. Você vai configurar um Jupyter Notebook que executa as seguintes ações:
- Carregar vários formulários (faturas) em um dataframe em uma sessão do Apache Spark
- Analisá-los para determinar seus recursos
- Combinar a saída resultante em uma estrutura de dados tabular
- Gravar a saída em um índice de pesquisa hospedado no Azure Cognitive Search
- Explorar e pesquisar o conteúdo criado
1 - Configurar dependências
Começamos importando pacotes e conectando-se aos recursos do Azure usados neste fluxo de trabalho.
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
2 - Carregar dados no Spark
Esse código carrega poucos arquivos externos de uma conta de armazenamento do Azure usada para fins de demonstração. Os arquivos são várias faturas e são lidos em um dataframe.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
3 - Aplicar o reconhecimento de formulários
Esse código carrega o transformador AnalyzeInvoices e passa uma referência ao dataframe que contém as faturas. Ele chama o modelo de fatura pré-criado do Analisador de Formulários do Azure.
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
4 - Simplificar a saída de reconhecimento de formulário
Esse código usa FormOntologyLearner, um transformador que analisa a saída de transformadores do Reconhecimento de Formulários (para Inteligência de Documentos de IA do Azure) e infere uma estrutura de dados tabular. A saída de AnalyzeInvoices é dinâmica e varia de acordo com os recursos detectados em seu conteúdo.
FormOntologyLearner estende o utilitário do transformador AnalyzeInvoices procurando por padrões que podem ser usados para criar uma estrutura de dados tabular. Organizar a saída em várias colunas e linhas torna a análise de downstream mais simples.
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
Com nosso bom dataframe tabular, podemos mesclar as tabelas aninhadas encontradas nos formulários com alguns SparkSQL
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
5 - Adicionar traduções
Esse código carrega Translate, um transformador que chama o serviço de Tradutor nos serviços de IA do Azure. O texto original, que está em inglês na coluna "Description", é traduzido para vários idiomas. Toda a saída é consolidada na matriz "output.translations".
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
6 - Traduzir produtos para emojis com OpenAI 🤯
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
7 - Inferir o continente de endereços do fornecedor com o OpenAI
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
8 - Criar um índice do Azure Search para os Formulários
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
9 - Experimentar uma consulta de pesquisa
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
10 - Criar um chatbot que pode usar o Azure Search como uma ferramenta 🧠🔧
import json
import openai
openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
11 - Fazendo uma pergunta ao nosso chatbot
custom_chatbot("What did Luke Diaz buy?")
12 - Um checagem dupla rápida
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)