Azure OpenAI para Big Data

Você pode usar o serviço Azure OpenAI para resolver muitas tarefas de linguagem natural solicitando a API de conclusão. Para facilitar a escala dos fluxos de trabalho de elaboração de prompts de alguns exemplos para conjuntos de dados amplos, o serviço Azure OpenAI é integrado à biblioteca de machine learning distribuída SynapseML. Usando essa integração, você pode usar a estrutura de computação distribuída do Apache Spark para processar milhões de prompts com o serviço OpenAI. Este tutorial mostra como aplicar modelos de linguagem grandes em uma escala distribuída usando Azure OpenAI e Microsoft Fabric.

Pré-requisitos

Os principais pré-requisitos para este início rápido incluem um recurso openai Azure funcionando e um cluster Apache Spark com SynapseML instalado.

  • Obtenha uma assinatura Microsoft Fabric. Ou inscreva-se para uma avaliação gratuita Microsoft Fabric.

  • Faça login no Microsoft Fabric.

  • Alterne para o Fabric usando o alternador de experiências no canto inferior esquerdo da página inicial.

    Captura de tela que mostra a seleção de Fabric no menu do alternador de experiência.

Importar este guia como um notebook

A próxima etapa é adicionar esse código ao cluster do Spark. Você pode criar um notebook em sua plataforma Spark e copiar o código nesse notebook para executar a demonstração. Ou baixe o notebook e importe-o para o Synapse Analytics.

  1. Baixar esta demo como um notebook (selecione Raw e salve o arquivo)
  2. Importe o notebook para o workspace do Synapse ou, se estiver usando o Fabric, imprte para o workspace do Fabric
  3. Instale o SynapseML no seu cluster. Consulte as instruções de instalação do Synapse na parte inferior do site do SynapseML. Se você estiver usando o Fabric, verifique o Guia de Instalação. Esta etapa requer colar uma célula extra na parte superior do notebook que você importou.
  4. Conecte seu notebook a um cluster e siga o passo a passo, editando e executando as células.

Preencha as informações do serviço

Agora, edite a célula no notebook para referenciar seu serviço. Configure as variáveis service_name, deployment_name, location e key para corresponder ao serviço OpenAI.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-4.1-mini"
deployment_name_embeddings = "text-embedding-3-small"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

Criar uma coleção de dados de prompts

Em seguida, crie um dataframe que consiste em uma série de linhas, com um prompt por linha.

Você também pode carregar dados diretamente do ADLS ou de outros bancos de dados. Para obter mais informações sobre como carregar e preparar dataframes do Spark, consulte o Guia de carregamento de dados do Apache Spark.

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code that's",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

Criar o cliente OpenAIPrompt Apache Spark

Para aplicar o serviço Azure OpenAI ao seu dataframe, crie um objeto OpenAIPrompt, que atua como um cliente distribuído. Defina os parâmetros de serviço com um único valor ou com uma coluna de dataframe, usando os setters apropriados no objeto OpenAIPrompt. Neste exemplo, defina maxTokens como 200. Um token tem cerca de quatro caracteres e esse limite se aplica à soma do prompt e do resultado. Configure o parâmetro promptCol com o nome da coluna de prompt no dataframe.

from synapse.ml.services.openai import OpenAIPrompt

completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Transformar o dataframe usando o cliente OpenAIPrompt

Depois de criar o dataframe e o cliente de prompt, transforme o conjunto de dados de entrada e adicione uma coluna nomeada completions com todas as informações que o serviço adiciona. Selecione apenas o texto para simplificar.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

A saída deve ter uma aparência semelhante a essa. O texto de conclusão é diferente do exemplo.

prompt error text
Olá, meu nome é nulo Makaveli tenho 18 anos e quero ser rapper quando crescer adoro escrever e fazer música que sou de Los Angeles, CA
O melhor código é o código que é nulo compreensível Esta é uma declaração subjetiva, e não há uma resposta definitiva.
SynapseML é nulo Um algoritmo de aprendizado de máquina que é capaz de aprender a prever o resultado futuro dos eventos.

Mais exemplos de uso

Gerando inserções de texto

Além de concluir o texto, você também pode inserir texto para uso em algoritmos downstream ou arquiteturas de recuperação de vetor. Ao criar inserções, você pode pesquisar e recuperar documentos de grandes coleções. Use esta abordagem quando a engenharia de prompts não for suficiente para a tarefa. Para obter mais informações sobre como usar OpenAIEmbedding, consulte o guia de inserção.

from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

Conclusão do chat

Modelos como GPT-4o e GPT-4.1 entendem chats em vez de prompts únicos. O transformador OpenAIChatCompletion expõe essa funcionalidade em escala.

from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "What's your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Melhore a eficiência de processamento agrupando requisições

O exemplo acima faz várias solicitações para o serviço, uma para cada prompt. Para concluir vários prompts em uma única solicitação, use o modo de lote. Primeiro, no objeto OpenAIPrompt, em vez de definir a coluna Prompt como "Prompt", especifique "batchPrompt" para a coluna BatchPrompt. Para fazer isso, crie um dataframe com uma lista de prompts por linha.

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

Em seguida, crie o OpenAIPrompt objeto. Em vez de configurar a coluna de prompt, configure a coluna batchPrompt se a sua coluna for do tipo Array[String].

batch_completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Na chamada para transformação, uma solicitação é feita por linha. Como cada linha contém vários prompts, cada solicitação envia todos os prompts nessa linha. Os resultados conterão uma linha para cada linha na solicitação.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Usando um minibatcher automático

Se os dados estiverem no formato de coluna, você poderá transpor para o formato de linha usando o SynapseML.FixedMiniBatcherTransformer

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

Engenharia de prompt para tradução

O serviço Azure OpenAI pode resolver muitas tarefas de linguagem natural diferentes por meio de prompt engineering. Este exemplo mostra a solicitação de tradução de idioma:

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

Prompt para resposta de perguntas

Este exemplo solicita ao modelo que responda a perguntas de conhecimento geral.

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))