Sdílet prostřednictvím


Kurz: Vytvoření vlastního vyhledávacího webu a systému pro odpovědi na otázky

V tomto kurzu se dozvíte, jak indexovat a dotazovat velká data načtená z clusteru Spark. Nastavili jste poznámkový blok Jupyter, který provádí následující akce:

  • Načtení různých formulářů (faktur) do datového rámce v relaci Apache Sparku
  • Analyzujte je a zjistěte jejich funkce.
  • Sestavení výsledného výstupu do tabulkové datové struktury
  • Zápis výstupu do indexu vyhledávání hostovaného ve službě Azure Cognitive Search
  • Prozkoumání obsahu, který jste vytvořili, a dotazování na obsah, který jste vytvořili

1. Nastavení závislostí

Začneme importem balíčků a připojením k prostředkům Azure používaným v tomto pracovním postupu.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"

translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"

search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"

openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"

2. Načtení dat do Sparku

Tento kód načte několik externích souborů z účtu úložiště Azure, který se používá pro ukázkové účely. Soubory jsou různé faktury a čtou se do datového rámce.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (
    spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache()
)

display(df2)

3. Použití rozpoznávání formulářů

Tento kód načte transformátor AnalyzeInvoices a předá odkaz na datový rámec obsahující faktury. Volá předdefinovaný model faktury služby Azure Forms Analyzer.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (
    AnalyzeInvoices()
    .setSubscriptionKey(cognitive_key)
    .setLocation(cognitive_location)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache()
)

display(analyzed_df)

4. Zjednodušení výstupu rozpoznávání formulářů

Tento kód používá FormOntologyLearner, transformátor, který analyzuje výstup transformátorů Rozpoznávání formulářů (pro Azure AI Document Intelligence) a odvodí tabulkovou datovou strukturu. Výstup funkce AnalyzeInvoices je dynamický a liší se v závislosti na funkcích zjištěných v obsahu.

FormOntologyLearner rozšiřuje nástroj AnalyzeInvoices transformer hledáním vzorů, které lze použít k vytvoření tabulkové datové struktury. Uspořádání výstupu do více sloupců a řádků usnadňuje podřízenou analýzu.

from synapse.ml.cognitive import FormOntologyLearner

organized_df = (
    FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*")
    .cache()
)

display(organized_df)

Pomocí našeho pěkného tabulkového datového rámce můžeme zploštět vnořené tabulky nalezené ve formulářích pomocí některých SparkSQL.

from pyspark.sql.functions import explode, col

itemized_df = (
    organized_df.select("*", explode(col("Items")).alias("Item"))
    .drop("Items")
    .select("Item.*", "*")
    .drop("Item")
)

display(itemized_df)

5. Přidání překladů

Tento kód načte transformátor, který volá službu Azure AI Translator ve službách Azure AI. Původní text, který je v angličtině ve sloupci Popis, je strojově přeložen do různých jazyků. Veškerý výstup je konsolidovaný do pole output.translations.

from synapse.ml.cognitive import Translate

translated_df = (
    Translate()
    .setSubscriptionKey(translator_key)
    .setLocation(translator_location)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache()
)

display(translated_df)

6. Překlad produktů na emoji pomocí OpenAI 🤯

from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split

emoji_template = """ 
  Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
  
  Two Ducks: 🦆🦆,
  Light Bulb: 💡,
  Three Peaches: 🍑🍑🍑,
  Two kitchen stoves: ♨️♨️,
  A red car: 🚗,
  A person and a cat: 🧍🐈,
  A {Description}: """

prompter = (
    OpenAIPrompt()
    .setSubscriptionKey(openai_key)
    .setDeploymentName(openai_deployment_name)
    .setUrl(openai_url)
    .setMaxTokens(5)
    .setPromptTemplate(emoji_template)
    .setErrorCol("error")
    .setOutputCol("Emoji")
)

emoji_df = (
    prompter.transform(translated_df)
    .withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(emoji_df.select("Description", "Emoji"))

7. Odvození kontinentu adresy dodavatele s OpenAI

continent_template = """
Which continent does the following address belong to? 

Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica. 

Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.

Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""

continent_df = (
    prompter.setOutputCol("Continent")
    .setPromptTemplate(continent_template)
    .transform(emoji_df)
    .withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(continent_df.select("VendorAddress", "Continent"))

8. Vytvoření indexu služby Azure Search pro formuláře

from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit

(
    continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    )
)

9. Vyzkoušení vyhledávacího dotazu

import requests

search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
    search_service, search_index
)
requests.post(
    search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()

10. Vytvoření chatovacího robota, který může používat Azure Search jako nástroj 🧠🔧

import json
import openai

openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"

chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:

{continent_df.columns}

If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""


def search_query_prompt(question):
    return f"""
Given the search engine above, what would you search for to answer the following question?

Question: "{question}"

Please output a json in the form of {{"query": "example_query"}}
"""


def search_result_prompt(query):
    search_results = requests.post(
        search_url, json={"search": query}, headers={"api-key": search_key}
    ).json()
    return f"""

You previously ran a search for "{query}" which returned the following results:

{search_results}

You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem. 
"""


def prompt_gpt(messages):
    response = openai.ChatCompletion.create(
        engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
    )
    return response["choices"][0]["message"]["content"]


def custom_chatbot(question):
    while True:
        try:
            query = json.loads(
                prompt_gpt(
                    [
                        {"role": "system", "content": chat_context_prompt},
                        {"role": "user", "content": search_query_prompt(question)},
                    ]
                )
            )["query"]

            return prompt_gpt(
                [
                    {"role": "system", "content": chat_context_prompt},
                    {"role": "system", "content": search_result_prompt(query)},
                    {"role": "user", "content": question},
                ]
            )
        except Exception as e:
            raise e

11 - Položení našeho chatovacího robota na otázku

custom_chatbot("What did Luke Diaz buy?")

12 . Rychlá dvojitá kontrola

display(
    continent_df.where(col("CustomerName") == "Luke Diaz")
    .select("Description")
    .distinct()
)