适用于大数据的 Azure OpenAI

Azure OpenAI 服务可用于通过提示完成 API 解决大量自然语言任务。 为了更轻松地将提示工作流从几个示例扩展到大型示例数据集,我们已将 Azure OpenAI 服务与分布式机器学习库 SynapseML 集成。 通过这种集成,可以轻松使用 Apache Spark 分布式计算框架,通过 OpenAI 服务处理数百万个提示。 本教程介绍如何使用 Azure OpenAI 和 Azure Synapse Analytics 在分布范围内应用大型语言模型。

先决条件

本快速入门的关键先决条件包括有效的 Azure OpenAI 资源以及安装了 SynapseML 的 Apache Spark 群集。

以笔记本的形式导入此指南

下一步是将此代码添加到 Spark 群集。 可以在 Spark 平台中创建笔记本并将代码复制到此笔记本中以运行演示。 或下载笔记本并将其导入 Synapse Analytics

  1. 将此演示下载为笔记本(选择“原始”,然后保存文件
  2. 将笔记本导入 Synapse 工作区,或导入 Fabric 工作区(使用的是 Fabric)
  3. 在群集上安装 SynapseML。 请参阅 SynapseML 网站底部的 Synapse 安装说明。 如果使用的是 Fabric,请检查安装指南。 这需要在导入的笔记本顶部粘贴另一个单元格
  4. 将笔记本连接到群集,然后继续编辑并运行单元格。

填写服务信息

接下来,编辑笔记本中的单元格以指向服务。 尤其是,设置 service_namedeployment_namelocationkey 变量,使其与 OpenAI 服务匹配:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

创建提示数据集

接下来,创建一个由一系列行组成的数据帧,每行有一个提示。

还可以直接从 ADLS 或其他数据库加载数据。 有关加载和准备 Spark 数据帧的详细信息,请参阅 Apache Spark 数据加载指南

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

创建 OpenAICompletion Apache Spark 客户端

要将 OpenAI Completion 服务应用于创建的数据帧,请创建用作分布式客户端的 OpenAICompletion 对象。 服务参数可以使用单个值设置,也可以使用 OpenAICompletion 对象上的适当资源库通过数据帧列设置。 在此,将 maxTokens 设置为 200。 令牌大约是四个字符,并且此限制适用于提示和结果的总和。 我们还使用数据帧中提示列的名称设置 promptCol 参数。

from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

使用 OpenAICompletion 客户端转换数据帧

完成数据帧和完成客户端后,可以转换输入数据集并添加一个名为 completions 的列,该列包含服务添加的所有信息。 为了简单起见,只选择文本。

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

输出应类似于以下内容。 完成文本将与示例不同。

prompt error text
Hello my name is Null Makaveli,我十八岁了,长大后我想成为一名说唱歌手,我喜欢写作和制作音乐,我来自加利福尼亚洛杉矶
最好的代码就是易于理解的代码。 Null 这是一个主观的陈述,没有明确的答案。
SynapseML 是 Null 一种机器学习算法,能够学习如何预测事件的未来结果。

更多用法示例

生成文本嵌入

除了完成文本外,还可以嵌入文本,以用于下游算法或矢量检索体系结构。 通过创建嵌入,你可以从大型集合中搜索和检索文档,并且可以在提示工程不足以完成任务时使用。 有关使用 OpenAIEmbedding 的详细信息,请参阅嵌入指南

from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

聊天补全

ChatGPT 和 GPT-4 等模型能够理解聊天,而不是单个提示。 OpenAIChatCompletion 转换器可以大规模公开此功能。

from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

通过请求批处理提高吞吐量

以上示例向服务发出多个请求,每个提示一个。 若要在单个请求中完成多个提示,请使用批处理模式。 首先,在 OpenAICompletion 对象中,不要将 Prompt 列设置为“Prompt”,而是为 BatchPrompt 列指定“batchPrompt”。 为此,请创建一个数据帧,其中包含每行的提示列表。

截至本文执笔时,单个请求中的提示限制为 20 个,“令牌”限制为 2048 个(大约 1500 字)。

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

接下来,我们创建 OpenAICompletion 对象。 如果你的列为 Array[String] 类型,请设置 batchPrompt 列,而不是 prompt 列。

batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

在要转换的调用中,请求将按行发出。 因为单行中有多个提示,所以每个请求都将与该行中的所有提示一并发送。 结果将包含请求中的每一行的行。

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

使用自动微型批处理程序

如果数据是列格式,可以使用 SynapseML 的 FixedMiniBatcherTransformer 将其转换为行格式。

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

翻译的提示工程

Azure OpenAI 服务可以通过提示工程解决许多不同的自然语言任务。 此处显示了语言翻译提示的示例:

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

问题解答提示

在这里,我们提示 GPT-3 进行常识问题解答:

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))