适用于大数据的 Azure OpenAI
Azure OpenAI 服务可用于通过提示完成 API 解决大量自然语言任务。 为了更轻松地将提示工作流从几个示例扩展到大型示例数据集,我们已将 Azure OpenAI 服务与分布式机器学习库 SynapseML 集成。 通过这种集成,可以轻松使用 Apache Spark 分布式计算框架,通过 OpenAI 服务处理数百万个提示。 本教程介绍如何使用 Azure OpenAI 和 Azure Synapse Analytics 在分布范围内应用大型语言模型。
先决条件
本快速入门的关键先决条件包括有效的 Azure OpenAI 资源以及安装了 SynapseML 的 Apache Spark 群集。
获取 Microsoft Fabric 订阅。 或者注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左侧的体验切换器切换到 Synapse 数据科学体验。
- 转到 Microsoft Fabric 中的数据科学体验。
- 创建新的笔记本。
- Azure OpenAI 资源:在创建资源之前请求访问 Azure OpenAI 服务
以笔记本的形式导入此指南
下一步是将此代码添加到 Spark 群集。 可以在 Spark 平台中创建笔记本并将代码复制到此笔记本中以运行演示。 或下载笔记本并将其导入 Synapse Analytics
- 将此演示下载为笔记本(选择“原始”,然后保存文件)
- 将笔记本导入 Synapse 工作区,或导入 Fabric 工作区(使用的是 Fabric)
- 在群集上安装 SynapseML。 请参阅 SynapseML 网站底部的 Synapse 安装说明。 如果使用的是 Fabric,请检查安装指南。 这需要在导入的笔记本顶部粘贴另一个单元格
- 将笔记本连接到群集,然后继续编辑并运行单元格。
填写服务信息
接下来,编辑笔记本中的单元格以指向服务。 尤其是,设置 service_name
、deployment_name
、location
和 key
变量,使其与 OpenAI 服务匹配:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
if running_on_synapse():
from notebookutils.visualization import display
# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"
key = find_secret(
"openai-api-key"
) # please replace this line with your key as a string
assert key is not None and service_name is not None
创建提示数据集
接下来,创建一个由一系列行组成的数据帧,每行有一个提示。
还可以直接从 ADLS 或其他数据库加载数据。 有关加载和准备 Spark 数据帧的详细信息,请参阅 Apache Spark 数据加载指南。
df = spark.createDataFrame(
[
("Hello my name is",),
("The best code is code thats",),
("SynapseML is ",),
]
).toDF("prompt")
创建 OpenAICompletion Apache Spark 客户端
要将 OpenAI Completion 服务应用于创建的数据帧,请创建用作分布式客户端的 OpenAICompletion 对象。 服务参数可以使用单个值设置,也可以使用 OpenAICompletion
对象上的适当资源库通过数据帧列设置。 在此,将 maxTokens
设置为 200。 令牌大约是四个字符,并且此限制适用于提示和结果的总和。 我们还使用数据帧中提示列的名称设置 promptCol
参数。
from synapse.ml.cognitive import OpenAICompletion
completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setPromptCol("prompt")
.setErrorCol("error")
.setOutputCol("completions")
)
使用 OpenAICompletion 客户端转换数据帧
完成数据帧和完成客户端后,可以转换输入数据集并添加一个名为 completions
的列,该列包含服务添加的所有信息。 为了简单起见,只选择文本。
from pyspark.sql.functions import col
completed_df = completion.transform(df).cache()
display(
completed_df.select(
col("prompt"),
col("error"),
col("completions.choices.text").getItem(0).alias("text"),
)
)
输出应类似于以下内容。 完成文本将与示例不同。
prompt | error | text |
---|---|---|
Hello my name is | Null | Makaveli,我十八岁了,长大后我想成为一名说唱歌手,我喜欢写作和制作音乐,我来自加利福尼亚洛杉矶 |
最好的代码就是易于理解的代码。 | Null | 这是一个主观的陈述,没有明确的答案。 |
SynapseML 是 | Null | 一种机器学习算法,能够学习如何预测事件的未来结果。 |
更多用法示例
生成文本嵌入
除了完成文本外,还可以嵌入文本,以用于下游算法或矢量检索体系结构。 通过创建嵌入,你可以从大型集合中搜索和检索文档,并且可以在提示工程不足以完成任务时使用。 有关使用 OpenAIEmbedding
的详细信息,请参阅嵌入指南。
from synapse.ml.cognitive import OpenAIEmbedding
embedding = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("prompt")
.setErrorCol("error")
.setOutputCol("embeddings")
)
display(embedding.transform(df))
聊天补全
ChatGPT 和 GPT-4 等模型能够理解聊天,而不是单个提示。 OpenAIChatCompletion
转换器可以大规模公开此功能。
from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *
def make_message(role, content):
return Row(role=role, content=content, name=role)
chat_df = spark.createDataFrame(
[
(
[
make_message(
"system", "You are an AI chatbot with red as your favorite color"
),
make_message("user", "Whats your favorite color"),
],
),
(
[
make_message("system", "You are very excited"),
make_message("user", "How are you today"),
],
),
]
).toDF("messages")
chat_completion = (
OpenAIChatCompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMessagesCol("messages")
.setErrorCol("error")
.setOutputCol("chat_completions")
)
display(
chat_completion.transform(chat_df).select(
"messages", "chat_completions.choices.message.content"
)
)
通过请求批处理提高吞吐量
以上示例向服务发出多个请求,每个提示一个。 若要在单个请求中完成多个提示,请使用批处理模式。 首先,在 OpenAICompletion 对象中,不要将 Prompt 列设置为“Prompt”,而是为 BatchPrompt 列指定“batchPrompt”。 为此,请创建一个数据帧,其中包含每行的提示列表。
截至本文执笔时,单个请求中的提示限制为 20 个,“令牌”限制为 2048 个(大约 1500 字)。
batch_df = spark.createDataFrame(
[
(["The time has come", "Pleased to", "Today stocks", "Here's to"],),
(["The only thing", "Ask not what", "Every litter", "I am"],),
]
).toDF("batchPrompt")
接下来,我们创建 OpenAICompletion 对象。 如果你的列为 Array[String]
类型,请设置 batchPrompt 列,而不是 prompt 列。
batch_completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setBatchPromptCol("batchPrompt")
.setErrorCol("error")
.setOutputCol("completions")
)
在要转换的调用中,请求将按行发出。 因为单行中有多个提示,所以每个请求都将与该行中的所有提示一并发送。 结果将包含请求中的每一行的行。
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)
使用自动微型批处理程序
如果数据是列格式,可以使用 SynapseML 的 FixedMiniBatcherTransformer
将其转换为行格式。
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI
completed_autobatch_df = (
df.coalesce(
1
) # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
.mlTransform(FixedMiniBatchTransformer(batchSize=4))
.withColumnRenamed("prompt", "batchPrompt")
.mlTransform(batch_completion)
)
display(completed_autobatch_df)
翻译的提示工程
Azure OpenAI 服务可以通过提示工程解决许多不同的自然语言任务。 此处显示了语言翻译提示的示例:
translate_df = spark.createDataFrame(
[
("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
(
"French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
),
]
).toDF("prompt")
display(completion.transform(translate_df))
问题解答提示
在这里,我们提示 GPT-3 进行常识问题解答:
qa_df = spark.createDataFrame(
[
(
"Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
)
]
).toDF("prompt")
display(completion.transform(qa_df))