次の方法で共有


ビッグ データ用の Azure OpenAI

Azure OpenAI サービスを使用すると、入力候補 API を要求することで、多数の自然言語タスクを解決できます。 プロンプト ワークフローをいくつかの例から大規模なデータセットの例に簡単にスケーリングできるように、Azure OpenAI サービスと分散機械学習ライブラリ SynapseML を統合しました。 この統合により、Apache Spark 分散コンピューティング フレームワークを使用して、OpenAI サービスで何百万ものプロンプトを処理することが容易になります。 このチュートリアルでは、Azure Open AI と Azure Synapse Analytics を使用して、大規模な言語モデルを分散スケールで適用する方法について説明します。

前提条件

このクイックスタートの重要な前提条件として、機能する Azure OpenAI リソースと、SynapseML がインストールされた Apache Spark クラスターがあります。

このガイドをノートブックとしてインポートする

次の手順では、このコードを Spark クラスターに追加します。 Spark プラットフォームでノートブックを作成し、このノートブックにコードをコピーしてデモを実行できます。 または、ノートブックをダウンロードして Synapse Analytics にインポートすることもできます

  1. このデモをノートブックとしてダウンロードします ([未加工] を選択してからファイルを保存します)
  2. ノートブックを Synapse ワークスペースにインポートするか、Fabric を使用している場合は、Fabric ワークスペースにインポートします
  3. クラスターに SynapseML をインストールします。 SynapseML Web サイトの下部にある Synapse のインストール手順を参照してください。 Fabric を使用している場合は、「インストール ガイド」をチェックしてください。 このためには、インポートしたノートブックの上部に別のセルを貼り付ける必要があります。
  4. ノートブックをクラスターに接続して手順を進め、セルを編集して実行します。

サービス情報を入力する

次に、ノートブック内のセルを編集して、サービスをポイントします。 具体的には、service_namedeployment_namelocationkey 変数を OpenAI Service のそれらと対応するように設定します。

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

プロンプトのデータセットを作成する

次に、一連の行で構成され、行ごとに 1 つのプロンプトがあるデータフレームを作成します。

ADLS またはその他のデータベースから直接データを読み込むこともできます。 Spark データフレームの読み込みと準備の詳細については、Apache Spark データ読み込みガイドを参照してください。

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

OpenAICompletion Apache Spark クライアントを作成する

作成したデータフレームに OpenAI Completion サービスを適用するには、分散クライアントとして機能する OpenAICompletion オブジェクトを作成します。 サービスのパラメーターは、単一の値で設定することも、OpenAICompletion オブジェクト上の適切なセッターを含むデータフレームの列で設定することもできます。 ここでは、maxTokens を200 に設定します。 トークンは 4 文字ぐらいで、この制限はプロンプトと結果の合計に適用されます。 また、データフレームのプロンプト列の名前を使用して promptCol パラメーターを設定します。

from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

OpenAICompletion クライアントを使用してデータフレームを変換する

データフレームと入力候補クライアントが完了したら、入力データセットを変換し、サービスが追加するすべての情報と共に completions という列を追加できます。 わかりやすくするために、テキストのみを選択します。

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

出力は次のようになります。 完了テキストはサンプルとは異なります。

prompt error text
Hello my name is (こんにちは、私の名前は) null マカヴェリです 私は 18 歳で、大きくなったらラッパーになりたいです 文章を書いたり音楽を作ったりするのが大好きです カリフォルニア州ロサンゼルス出身です
最適なコードとは、 null 理解可能なものです これは主観的な説明であり、明確な答えはありません。
SynapseML is (SynapseM は、) null A machine learning algorithm that is able to learn how to predict the future outcome of events.(イベントの将来の結果を予測する方法を学習できる機械学習アルゴリズムです)

使用例の詳細

テキスト埋め込みの生成

テキストの補完の他に、ダウンストリーム アルゴリズムやベクター取得アーキテクチャで使用するテキストを埋め込むこともできます。 埋め込みを作成すると、大規模なコレクションからドキュメントを検索および取得できるようになります。また、プロンプト エンジニアリングがタスクに不十分な場合に利用できます。 OpenAIEmbedding の使用の詳細については、「埋め込みガイド」を参照してください。

from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

チャットの完了

ChatGPT や GPT-4 などのモデルは、単一のプロンプトではなくチャットを理解できます。 OpenAIChatCompletion トランスフォーマーが、この機能を大規模に公開します。

from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

要求のバッチ処理を使用してスループットを向上させる

この例では、プロンプトごとに 1 つずつ、サービスに対して複数の要求を行います。 1 つの要求で複数のプロンプトを完了すために、バッチ モードを使用します。 まず、OpenAICompletion オブジェクトで Prompt 列を "Prompt" に設定する代わりに、BatchPrompt 列に "batchPrompt" を指定します。 そのためには、行ごとにプロンプトの一覧を含むデータフレームを作成します。

執筆時点で、1 つの要求に 20 件のプロンプトという制限があり、2048 個の "トークン" (約 1500 ワード) というハード制限があります。

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

次に、OpenAICompletion オブジェクトを作成します。 prompt 列を設定するのではなく、列の型が Array[String] の場合は、batchPrompt 列を設定します。

batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

変換の呼び出しでは、行ごとに要求が行われます。 1 つの行に複数のプロンプトがあるため、各要求がその行のすべてのプロンプトと共に送信されます。 結果には、要求の各行の行が含まれます。

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

自動ミニバッチ機能の使用

データが列形式の場合は、SynapseML の FixedMiniBatcherTransformer を使用して行形式に入れ替えることができます。

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

翻訳のための迅速なエンジニアリング

Azure OpenAI サービスは、プロンプト エンジニアリングを通じて、さまざまな自然言語タスクを解決できます。 言語翻訳を求めるプロンプトの例を次に示します。

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

質問の回答を求めるプロンプト

ここでは、GPT-3 に一般的な知識に関する質問への回答を求めます。

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))