Azure OpenAI для больших данных
Службу Azure OpenAI можно использовать для решения большого количества задач, связанных с естественным языком, путем запроса API завершения. Чтобы упростить масштабирование рабочих процессов с помощью нескольких примеров для больших наборов данных, мы интегрируем службу Azure OpenAI с распределенной библиотекой машинного обучения SynapseML. Эта интеграция упрощает использование платформы распределенных вычислений Apache Spark для обработки миллионов запросов с помощью службы OpenAI. В этом руководстве показано, как применять большие языковые модели в распределенном масштабе с помощью Azure OpenAI и Azure Synapse Analytics.
Основные предварительные требования для этого краткого руководства включают рабочий ресурс Azure OpenAI и кластер Apache Spark с установленным SynapseML.
Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.
Войдите в Microsoft Fabric.
Используйте переключатель интерфейса в левой части домашней страницы, чтобы перейти на интерфейс Synapse Обработка и анализ данных.
- Перейдите к интерфейсу Обработка и анализ данных в Microsoft Fabric.
- Создайте записную книжку.
- Ресурс Azure OpenAI: запрос доступа к службе Azure OpenAI перед созданием ресурса
Следующим шагом станет добавление этого кода в кластер Spark. Вы можете создать записную книжку на платформе Spark и скопировать код в эту записную книжку, чтобы запустить демонстрацию. Или скачайте записную книжку и импортируйте ее в Synapse Analytics
- Скачайте эту демонстрацию в виде записной книжки (выберите "Необработанный", а затем сохраните файл)
- Импорт записной книжки в рабочую область Synapse или при импорте Структуры в рабочую область Fabric
- Установите SynapseML в кластере. См. инструкции по установке Synapse в нижней части веб-сайта SynapseML. Если вы используете Fabric, ознакомьтесь с руководством по установке. Для этого требуется вставка дополнительной ячейки в верхней части импортированной записной книжки.
- Подключите записную книжку к кластеру и следуйте инструкциям, редактированию и запуску ячеек.
Затем измените ячейку в записной книжке, чтобы она указывала на службу. В частности, задайте service_name
deployment_name
location
переменные и key
переменные для сопоставления с службой OpenAI:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
if running_on_synapse():
from notebookutils.visualization import display
# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"
key = find_secret(
"openai-api-key"
) # please replace this line with your key as a string
assert key is not None and service_name is not None
Далее создайте кадр данных, состоящий из ряда строк, с одним запросом для каждой строки.
Вы также можете загружать данные непосредственно из ADLS или других баз данных. Дополнительные сведения о загрузке и подготовке кадров данных Spark см. в руководстве по загрузке данных Apache Spark.
df = spark.createDataFrame(
[
("Hello my name is",),
("The best code is code thats",),
("SynapseML is ",),
]
).toDF("prompt")
Чтобы применить службу завершения OpenAI к созданному кадру данных, создайте объект OpenAICompletion, который служит распределенным клиентом. Параметры службы можно задать либо как отдельные значения, либо как столбец кадра данных с помощью соответствующих методов задания объекта OpenAICompletion
. Здесь мы устанавливаем maxTokens
значение 200. Токен состоит примерно из четырех символов, и это ограничение применяется к сумме длины запроса и результата. Мы также присвоим параметру promptCol
имя столбца запроса в кадре данных.
from synapse.ml.cognitive import OpenAICompletion
completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setPromptCol("prompt")
.setErrorCol("error")
.setOutputCol("completions")
)
После завершения кадра данных и клиента завершения можно преобразовать входной набор данных и добавить столбец, который вызывается completions
со всеми сведениями, которые добавляет служба. Выделите просто текст для простоты.
from pyspark.sql.functions import col
completed_df = completion.transform(df).cache()
display(
completed_df.select(
col("prompt"),
col("error"),
col("completions.choices.text").getItem(0).alias("text"),
)
)
Выходные данные должны выглядеть примерно так. Текст завершения будет отличаться от примера.
prompt | error | text |
---|---|---|
Привет, меня зовут | null | Макавали я восемнадцать лет, и я хочу быть рэпером, когда я люблю писать и делать музыку я из Лос-Анджелеса, ЦС |
Лучший код — это код, который является | null | понятно, это субъективная инструкция, и нет окончательного ответа. |
SynapseML — это | null | алгоритм машинного обучения, который позволяет обучать модели прогнозированию будущих исходов событий. |
Помимо завершения текста, мы также можем внедрить текст для использования в подчиненных алгоритмах или архитектурах извлечения векторов. Создание внедрения позволяет выполнять поиск и извлечение документов из больших коллекций и использовать их, если для задачи недостаточно разработки запросов. Дополнительные сведения об использовании OpenAIEmbedding
см. в нашем руководстве по внедрению.
from synapse.ml.cognitive import OpenAIEmbedding
embedding = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("prompt")
.setErrorCol("error")
.setOutputCol("embeddings")
)
display(embedding.transform(df))
Такие модели, как ChatGPT и GPT-4, могут понимать чаты вместо отдельных запросов. Преобразователь OpenAIChatCompletion
предоставляет эту функцию в масштабе.
from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *
def make_message(role, content):
return Row(role=role, content=content, name=role)
chat_df = spark.createDataFrame(
[
(
[
make_message(
"system", "You are an AI chatbot with red as your favorite color"
),
make_message("user", "Whats your favorite color"),
],
),
(
[
make_message("system", "You are very excited"),
make_message("user", "How are you today"),
],
),
]
).toDF("messages")
chat_completion = (
OpenAIChatCompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMessagesCol("messages")
.setErrorCol("error")
.setOutputCol("chat_completions")
)
display(
chat_completion.transform(chat_df).select(
"messages", "chat_completions.choices.message.content"
)
)
В этом примере выполняется несколько запросов к службе, по одному для каждого запроса. Чтобы выполнить сразу несколько запросов, используйте пакетный режим. Во-первых, в объекте OpenAICompletion вместо задания столбца "Запрос" укажите "batchPrompt" для столбца BatchPrompt. Для этого создайте кадр данных со списком запросов на строку.
По состоянию на этот текст в настоящее время существует ограничение в 20 запросов в одном запросе, и жесткое ограничение 2048 "токенов", или примерно 1500 слов.
batch_df = spark.createDataFrame(
[
(["The time has come", "Pleased to", "Today stocks", "Here's to"],),
(["The only thing", "Ask not what", "Every litter", "I am"],),
]
).toDF("batchPrompt")
Затем мы создадим объект OpenAICompletion. Вместо задания столбца запроса задайте столбец batchPrompt, если он имеет тип Array[String]
.
batch_completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setBatchPromptCol("batchPrompt")
.setErrorCol("error")
.setOutputCol("completions")
)
В вызове преобразования запрос будет выполнен на каждую строку. Так как в одной строке есть несколько запросов, каждый запрос отправляется со всеми запросами в этой строке. Результаты содержат строку для каждой строки в запросе.
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)
Если данные имеют формат столбца, их можно транспонировать в формат строки с помощью средства SynapseML FixedMiniBatcherTransformer
.
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI
completed_autobatch_df = (
df.coalesce(
1
) # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
.mlTransform(FixedMiniBatchTransformer(batchSize=4))
.withColumnRenamed("prompt", "batchPrompt")
.mlTransform(batch_completion)
)
display(completed_autobatch_df)
Служба Azure OpenAI может решать множество различных задач, связанных с естественным языком, посредством проектирования запросов. Вот пример запроса на перевод языка:
translate_df = spark.createDataFrame(
[
("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
(
"French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
),
]
).toDF("prompt")
display(completion.transform(translate_df))
Здесь мы запрашиваем в GPT-3 ответы на вопросы общего характера:
qa_df = spark.createDataFrame(
[
(
"Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
)
]
).toDF("prompt")
display(completion.transform(qa_df))