適用於巨量數據的 Azure OpenAI
Azure OpenAI 服務可用來透過提示完成 API 來解決大量自然語言工作。 為了更輕鬆地將提示工作流程從幾個範例調整為大型範例數據集,我們已將 Azure OpenAI 服務與分散式機器學習連結庫 SynapseML 整合。 透過這項整合,您將可輕鬆使用 Apache Spark 分散式運算架構來處理 OpenAI 服務的數百萬個提示。 本教學課程示範如何使用 Azure OpenAI 和 Azure Synapse Analytics,以分散式規模套用大型語言模型。
必要條件
本快速入門的主要必要條件包括運作中的 Azure OpenAI 資源,以及已安裝 SynapseML 的 Apache Spark 叢集。
取得 Microsoft Fabric 訂用帳戶。 或者,註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左側的體驗切換器,切換至 Synapse 資料科學 體驗。
- 移至 Microsoft Fabric 中的 資料科學 體驗。
- 建立 新的筆記本。
- Azure OpenAI 資源:在建立資源之前要求存取 Azure OpenAI 服務
匯入此指南作為筆記本
下一個步驟是將此程式碼新增至您的 Spark 叢集。 您可以在Spark平臺中建立筆記本,並將程式代碼複製到此筆記本,以執行示範。 或下載筆記本,並將其匯入 Synapse Analytics
- 將此示範下載為筆記本 (選取 [原始],然後儲存盤案)
- 將筆記本 匯入 Synapse 工作區 ,或使用網狀架構匯入至網 狀架構工作區
- 在您的叢集上安裝 SynapseML。 請參閱 SynapseML 網站底部的 Synapse 安裝指示。 如果您使用 Fabric,請參閱 安裝指南。 這需要貼上您匯入筆記本頂端的額外儲存格。
- 將您的筆記本連線到叢集,然後跟著,編輯和執行數據格。
填入服務資訊
接著,編輯筆記本中的儲存格以指向您的服務。 特別是設定 service_name
、 deployment_name
、 location
和 key
變數,使其符合您的 OpenAI 服務:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
if running_on_synapse():
from notebookutils.visualization import display
# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"
key = find_secret(
"openai-api-key"
) # please replace this line with your key as a string
assert key is not None and service_name is not None
建立提示的資料集
接下來,建立一個資料框架,內含一系列的資料列,每個資料列各有一個提示。
您也可以直接從 ADLS 或其他資料庫載入數據。 如需載入和準備Spark數據框架的詳細資訊,請參閱 Apache Spark資料載入指南。
df = spark.createDataFrame(
[
("Hello my name is",),
("The best code is code thats",),
("SynapseML is ",),
]
).toDF("prompt")
建立 OpenAICompletion Apache Spark 用戶端
若要將 OpenAI 完成服務套用至您所建立的數據框架,請建立 OpenAICompletion 物件,此物件可作為分散式用戶端。 服務的參數可用單一值來設定,或透過在 OpenAICompletion
物件上具有適當 setter 的資料框架資料行來設定。 我們在這裡將 設定 maxTokens
為 200。 權杖大約是四個字元,而此限制適用於提示和結果的總和。 我們也會使用資料框架中的提示資料行名稱來設定 promptCol
參數。
from synapse.ml.cognitive import OpenAICompletion
completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setPromptCol("prompt")
.setErrorCol("error")
.setOutputCol("completions")
)
使用 OpenAICompletion 用戶端轉換數據框架
完成數據框架和完成客戶端之後,您可以轉換輸入數據集,並使用服務新增的所有資訊來新增名為 completions
的數據行。 只要選取文字即可簡單起見。
from pyspark.sql.functions import col
completed_df = completion.transform(df).cache()
display(
completed_df.select(
col("prompt"),
col("error"),
col("completions.choices.text").getItem(0).alias("text"),
)
)
您的輸出看起來應該像這樣。 完成文字會與範例不同。
prompt | error | text |
---|---|---|
您好,我的名字是 | null | 馬卡維利我十八歲了, 我想成為說唱歌手, 當我長大後, 我喜歡寫作和製作音樂, 我來自洛杉磯, CA |
最好的程式代碼是程序代碼 | null | 可理解這是一個主觀陳述,沒有明確的答案。 |
SynapseML 是 | null | 一種機器學習演算法,能夠學習如何預測事件未來的結果。 |
更多使用範例
產生文字內嵌
除了完成文字之外,我們也可以內嵌文字以用於下游演算法或向量擷取架構。 建立內嵌可讓您從大型集合搜尋和擷取檔,而且可在提示工程不足以處理工作時使用。 如需使用 OpenAIEmbedding
的詳細資訊,請參閱我們的 內嵌指南。
from synapse.ml.cognitive import OpenAIEmbedding
embedding = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("prompt")
.setErrorCol("error")
.setOutputCol("embeddings")
)
display(embedding.transform(df))
聊天完成
ChatGPT 和 GPT-4 等模型能夠瞭解聊天,而不是單一提示。 轉換器會 OpenAIChatCompletion
大規模公開這項功能。
from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *
def make_message(role, content):
return Row(role=role, content=content, name=role)
chat_df = spark.createDataFrame(
[
(
[
make_message(
"system", "You are an AI chatbot with red as your favorite color"
),
make_message("user", "Whats your favorite color"),
],
),
(
[
make_message("system", "You are very excited"),
make_message("user", "How are you today"),
],
),
]
).toDF("messages")
chat_completion = (
OpenAIChatCompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMessagesCol("messages")
.setErrorCol("error")
.setOutputCol("chat_completions")
)
display(
chat_completion.transform(chat_df).select(
"messages", "chat_completions.choices.message.content"
)
)
透過要求批次來改善輸送量
此範例會向服務提出數個要求,每個提示各一個。 若要在單一要求中完成多個提示,請使用批次模式。 首先,在OpenAICompletion物件中,不要將 Prompt資料行設定為 “Prompt”,而是指定 BatchPrompt 資料行的 “batchPrompt”。 為此,請建立一個資料框架,內含每個資料列的提示清單。
截至本文撰寫,單一要求中目前有 20 個提示的限制,而硬性限製為 2048 個「令牌」,或大約 1500 個字。
batch_df = spark.createDataFrame(
[
(["The time has come", "Pleased to", "Today stocks", "Here's to"],),
(["The only thing", "Ask not what", "Every litter", "I am"],),
]
).toDF("batchPrompt")
接下來,我們會建立 OpenAICompletion 物件。 如果您的資料行屬於 Array[String]
類型,請不要設定提示資料行,而應設定 batchPrompt 資料行。
batch_completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setBatchPromptCol("batchPrompt")
.setErrorCol("error")
.setOutputCol("completions")
)
在轉換的呼叫中,每個數據列都會提出要求。 由於單一數據列中有多個提示,因此每個要求都會以該數據列中的所有提示傳送。 結果包含要求中每個數據列的數據列。
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)
使用自動迷你屠宰場
如果您的資料採用資料行格式,您可以使用 SynapseML 的 FixedMiniBatcherTransformer
將其轉置為資料列格式。
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI
completed_autobatch_df = (
df.coalesce(
1
) # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
.mlTransform(FixedMiniBatchTransformer(batchSize=4))
.withColumnRenamed("prompt", "batchPrompt")
.mlTransform(batch_completion)
)
display(completed_autobatch_df)
翻譯的提示工程
Azure OpenAI 服務可以透過提示工程來解決 許多不同的自然語言工作。 以下顯示語言翻譯提示的範例:
translate_df = spark.createDataFrame(
[
("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
(
"French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
),
]
).toDF("prompt")
display(completion.transform(translate_df))
問題解答的提示
在這裡,我們會提示 GPT-3 進行一般知識問題解答:
qa_df = spark.createDataFrame(
[
(
"Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
)
]
).toDF("prompt")
display(completion.transform(qa_df))