Partager via


Azure OpenAI pour le Big Data

Le service Azure OpenAI peut être utilisé pour résoudre un grand nombre de tâches en langage naturel via l’invite de l’API d’achèvement. Pour faciliter la mise à l’échelle de vos flux de travail d’invite à partir de quelques exemples vers des jeux de données volumineux d’exemples, nous avons intégré le service Azure OpenAI à la bibliothèque SynapseML d’apprentissage automatique distribué. Cette intégration facilite l’utilisation de l’infrastructure de calcul distribuée Apache Spark pour traiter des millions d’invites avec le service OpenAI. Ce tutoriel montre comment appliquer des modèles de langage volumineux à une échelle distribuée à l’aide d’Azure OpenAI et d’Azure Synapse Analytics.

Prérequis

Les principaux prérequis pour ce démarrage rapide incluent une ressource Azure OpenAI fonctionnelle et un cluster Apache Spark avec SynapseML installé.

Importer ce guide en tant que notebook

L’étape suivante consiste à ajouter ce code à votre cluster Spark. Vous pouvez soit créer un bloc-notes dans votre plateforme Spark et copier le code dans ce bloc-notes pour exécuter la démo. Ou téléchargez le bloc-notes et importez-le dans Synapse Analytics

  1. Télécharger cette démo sous forme de notebook (sélectionner Raw, puis enregistrer le fichier)
  2. Importez le notebook dans l’espace de travail Synapse ou, si vous utilisez Fabric, importez-le dans l’espace de travail Fabric.
  3. Installez SynapseML sur votre cluster. Consultez les instructions d’installation de Synapse en bas du site web SynapseML. Si vous utilisez Fabric, consultez le Guide d’installation. Cela nécessite de coller une cellule supplémentaire en haut du bloc-notes que vous avez importé.
  4. Connectez votre ordinateur portable à un cluster et suivez, modifiez et exécutez les cellules.

Remplissez les informations de service.

Ensuite, modifiez la cellule du notebook pour pointer vers votre service. Définissez en particulier les variables service_name, deployment_name, location et key pour qu’elles correspondent à celles de votre service OpenAI :

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

Créer un jeu de données d’invites

Ensuite, créez un dataframe composé d’une série de lignes, avec une invite par ligne.

Vous pouvez également charger des données directement à partir d'ADLS ou d'autres bases de données. Pour plus d'informations sur le chargement et la préparation des dataframes Spark, consultez le guide de chargement des données Apache Spark.

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

Créer le client Apache Spark OpenAICompletion

Pour appliquer le service OpenAI Completion à votre dataframe que vous avez créé, créez un objet OpenAICompletion, qui sert de client distribué. Les paramètres du service peuvent être définis avec une valeur unique ou par une colonne du dataframe avec les setters appropriés sur l’objet OpenAICompletion. Ici, on passe maxTokens à 200. Un jeton est autour de quatre caractères, et cette limite s’applique à la somme de l’invite et au résultat. Nous définissons également le paramètre promptCol avec le nom de la colonne d’invite dans le dataframe.

from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Transformer le dataframe avec le client OpenAICompletion

Une fois que vous disposez du dataframe et du client d’achèvement, vous pouvez transformer votre jeu de données d’entrée et ajouter une colonne appelée completions avec toutes les informations ajoutées par le service. Pour plus de facilité, sélectionnez uniquement le texte.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

Votre sortie doit ressembler à ceci. Le texte d’achèvement sera différent de l’exemple.

prompt error text
Bonjour, je m’appelle null Makaveli J'ai dix-huit ans et je veux être rappeur quand je serai grand J'aime écrire et faire de la musique Je viens de Los Angeles, CA
Le meilleur code est le code qui est null compréhensible Ceci est une déclaration subjective, et il n'y a pas de réponse définitive.
SynapseML est null Un algorithme de Machine Learning capable d’apprendre à prédire le résultat futur des événements.

Autres exemples d’utilisation

Génération d'incorporations de texte

En plus de compléter le texte, nous pouvons également intégrer du texte à utiliser dans des algorithmes en aval ou des architectures de récupération de vecteurs. La création d'incorporations vous permet de rechercher et de récupérer des documents dans de grandes collections et peut être utilisée lorsque l'ingénierie rapide n'est pas suffisante pour la tâche. Pour plus d'informations sur l'utilisation de OpenAIEmbedding, consultez notre guide d'intégration.

from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

Fin de la conversation

Des modèles tels que ChatGPT et GPT-4 sont capables de compréhension les conversations instantanées au lieu d'invites uniques. Le transformateur OpenAIChatCompletion expose cette fonctionnalité à grande échelle.

from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Améliorer le débit avec le traitement par lots de requêtes

L'exemple fait plusieurs requêtes au service, une pour chaque invite. Pour effectuer plusieurs invites dans une seule requête, utilisez le mode par lots. Tout d'abord, dans l'objet OpenAI Completion, au lieu de définir la colonne Prompt sur "Prompt", spécifiez "batch Prompt" pour la colonne BatchPrompt. Pour ce faire, créez un dataframe avec une liste d’invites par ligne.

Au moment de la rédaction de cet article, il existe une limite de 20 invites dans une seule requête et une limite stricte de 2 048 « jetons », soit environ 1 500 mots.

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

Ensuite, nous créons l'objet OpenAICompletion. Au lieu de définir la colonne d’invite, définissez la colonne batchPrompt si votre colonne est de type Array[String].

batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Dans l’appel à transformer, une requête est ensuite effectuée par ligne. Puisqu’il y a plusieurs invites dans une seule ligne, chaque requête est envoyée avec toutes les invites de cette ligne. Les résultats contiennent une ligne pour chaque ligne de la requête.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Utilisation d'un minidoseur automatique

Si vos données sont en colonne, vous pouvez les transposer en ligne en utilisant FixedMiniBatcherTransformer de SynapseML.

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

Ingénierie d’invite pour la traduction

Le service Azure OpenAI peut résoudre de nombreuses tâches de langage naturel différentes par le biais de l’ingénierie d’invite. Voici un exemple d’invite de traduction de langage de programmation :

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

Demander la réponse aux questions

Ici, nous demandons à GPT-3 de répondre aux questions générales :

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))