Compartir a través de


Procedimiento para usar puntos de conexión de streaming implementados desde el flujo de avisos

En el flujo de avisos, puede implementar el flujo en un punto de conexión en línea administrado de Azure Machine Learning para realizar inferencias en tiempo real.

Cuando se consume el punto de conexión enviando una solicitud, el comportamiento por defecto es que el punto de conexión en línea se mantendrá a la espera hasta que toda la respuesta esté lista, y luego la enviará de vuelta al cliente. Esto puede provocar un gran retraso para el cliente y una mala experiencia para el usuario.

Para evitarlo, puede utilizar streaming cuando consuma los puntos de conexión. Una vez activado el streaming, no tiene que esperar a que toda la respuesta esté lista. En su lugar, el servidor devolverá la respuesta en trozos a medida que se vayan generando. El cliente puede entonces mostrar la respuesta progresivamente, con menos tiempo de espera y más interactividad.

Este artículo describirá el alcance del streaming, cómo funciona y cómo consumir puntos de conexión de streaming.

Crear un flujo habilitado para streaming

Si desea utilizar el modo streaming, debe crear un flujo que tenga un nodo que produzca un generador de cadenas como salida del flujo. Un generador de cadenas es un objeto que puede devolver una cadena cada vez que se le solicite. Puede utilizar los siguientes tipos de nodos para crear un generador de cadenas:

  • Nodo LLM: Este nodo utiliza un gran modelo lingüístico para generar respuestas en lenguaje natural basadas en la entrada.

    {# Sample prompt template for LLM node #}
    
    system:
    You are a helpful assistant.
    
    user:
    {{question}}
    
  • Nodo Python: Este nodo le permite escribir código Python personalizado que puede producir salidas de cadena. Puede usar este nodo para llamar a API externas o librerías que soporten streaming. Por ejemplo, puede utilizar este código para hacer eco de la entrada palabra por palabra:

    from promptflow import tool
    
    # Sample code echo input by yield in Python tool node
    
    @tool
    def my_python_tool(paragraph: str) -> str:
        yield "Echo: "
        for word in paragraph.split():
            yield word + " "
    

Importante

Solo la salida del último nodo del flujo puede admitir streaming.

"Último nodo" significa que la salida del nodo no es consumida por otros nodos.

En esta guía, utilizaremos como ejemplo el flujo de muestra "Chat con Wikipedia". Este flujo procesa la pregunta del usuario, busca en Wikipedia los artículos pertinentes y responde a la pregunta con la información de los artículos. Utiliza el modo streaming para mostrar el progreso de la generación de respuestas.

Para saber cómo crear un flujo de chat, consulte cómo desarrollar un flujo de chat en flujo de avisos para crear un flujo de chat.

Captura de pantalla del chat con el flujo de Wikipedia.

Implementación del flujo como punto de conexión en línea

Para utilizar el modo streaming, debe implementar su flujo como un punto de conexión en línea. Esto le permitirá enviar solicitudes y recibir respuestas de su flujo en tiempo real.

Para saber cómo implementar su flujo como un punto de conexión en línea, consulte Implementación de un flujo en un punto de conexión en línea para la inferencia en tiempo real con CLI para implementar su flujo como un punto de conexión en línea.

Nota:

Implementar con una versión del entorno de ejecución posterior a la versión 20230816.v10.

Puede comprobar la versión de su tiempo de ejecución y actualizarlo en la página de detalles del tiempo de ejecución.

Captura de pantalla del Estudio de Azure Machine Learning mostrando el entorno de ejecución.

Reconocer el proceso de streaming

Cuando se dispone de un punto de conexión en línea, el cliente y el servidor deben seguir unos principios específicos de negociación de contenidos para utilizar el modo streaming:

La negociación de contenidos es como una conversación entre el cliente y el servidor sobre el formato preferido de los datos que quieren enviar y recibir. Garantiza una comunicación eficaz y un contrato sobre el formato de los datos intercambiados.

Para entender el proceso de streaming, considere los siguientes pasos:

  • En primer lugar, el cliente construye una petición HTTP con el tipo de medio deseado incluido en el encabezado Accept. El tipo de medio indica al servidor qué tipo de formato de datos espera el cliente. Es como si el cliente dijera: "Oye, estoy buscando un formato específico para los datos que me vas a enviar. Puede ser JSON, texto u otra cosa". Por ejemplo,application/json indica una preferencia por datos JSON, text/event-stream indica un deseo de transmisión de datos, y */* significa que el cliente acepta cualquier formato de datos.

    Nota:

    Si una solicitud carece de Accept encabezado o tiene un encabezado vacío Accept, implica que el cliente aceptará cualquier tipo de medio en respuesta. El servidor lo trata como */*.

  • A continuación, el servidor responde en función del tipo de medio especificado en el encabezado Accept. Es importante tener en cuenta que el cliente podría solicitar varios tipos de medios en el encabezado Accept, y el servidor debe considerar sus capacidades y prioridades de formato para determinar la respuesta adecuada.

    • En primer lugar, el servidor comprueba si text/event-stream se especifica explícitamente en el encabezado Accept:
      • Para un flujo habilitado para streaming, el servidor devuelve una respuesta con un Content-Type de text/event-stream, indicando que los datos se están transmitiendo en streaming.
      • Para un flujo no habilitado para flujo, el servidor procede a comprobar si hay otros tipos de medios especificados en la cabecera.
    • Si no se especifica text/event-stream, el servidor comprueba si se especifican application/json o */* en el encabezado Accept:
      • En estos casos, el servidor devuelve una respuesta con un Content-Type de application/json, proporcionando los datos en formato JSON.
    • Si el encabezado Accept especifica otros tipos de soporte físico, como text/html:
      • El servidor devuelve una respuesta 424 con un código de error UserError en tiempo de ejecución de flujo de avisos y un estado HTTP 406 en tiempo de ejecución, lo que indica que el servidor no puede satisfacer la solicitud con el formato de datos solicitado. Para más información, consulte la sección Errores de manipulación.
  • Por último, el cliente compruebe el encabezado de respuesta Content-Type. Si se establece en text/event-stream, indica que los datos se están transmitiendo.

Veamos cómo funciona el proceso de streaming. Los datos de respuesta en modo streaming siguen el formato de los eventos enviados por el servidor (SSE).

El proceso global funciona del siguiente modo:

0. El cliente envía un mensaje al servidor

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Hello",
    "chat_history": []
}

Nota:

El encabezado Accept se establece en text/event-stream para solicitar una respuesta de flujo.

1. El servidor devuelve la respuesta en modo streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Hello"}

data: {"answer": "!"}

data: {"answer": " How"}

data: {"answer": " can"}

data: {"answer": " I"}

data: {"answer": " assist"}

data: {"answer": " you"}

data: {"answer": " today"}

data: {"answer": " ?"}

data: {"answer": ""}

Nota:

El Content-Type se establece en text/event-stream; charset=utf-8, indicando que la respuesta es un flujo de eventos.

El cliente debe descodificar los datos de respuesta como eventos enviados por el servidor y mostrarlos de forma incremental. El servidor cerrará la conexión HTTP una vez enviados todos los datos.

Cada evento de respuesta es el delta del evento anterior. Se recomienda que el cliente mantenga un registro de los datos fusionados en memoria y los envíe de vuelta al servidor como historial de chat en la siguiente petición.

2. El cliente envía otro mensaje de chat, junto con el historial de chat completo, al servidor

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Glad to know you!",
    "chat_history": [
        {
            "inputs": {
                "question": "Hello"
            },
            "outputs": {
                "answer": "Hello! How can I assist you today?"
            }
        }
    ]
}

3. El servidor devuelve la respuesta en modo streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Nice"}

data: {"answer": " to"}

data: {"answer": " know"}

data: {"answer": " you"}

data: {"answer": " too"}

data: {"answer": "!"}

data: {"answer": " Is"}

data: {"answer": " there"}

data: {"answer": " anything"}

data: {"answer": " I"}

data: {"answer": " can"}

data: {"answer": " help"}

data: {"answer": " you"}

data: {"answer": " with"}

data: {"answer": "?"}

data: {"answer": ""}

El chat continúa de forma similar.

errores

El cliente debe comprobar primero el código de respuesta HTTP. Consulte la tabla de códigos de estado HTTP para conocer los códigos de error más comunes que devuelven los puntos de conexión en línea.

Si el código de respuesta es "424 Error de modelo", significa que el error está causado por el código del modelo. La respuesta de error de un modelo de flujo de avisos siempre sigue este formato:

{
  "error": {
    "code": "UserError",
    "message": "Media type text/event-stream in Accept header is not acceptable. Supported media type(s) - application/json",
  }
}
  • Siempre es un diccionario JSON con una única clave "error" definida.
  • El valor de "error" es un diccionario que contiene "code","message".
  • "código" define la categoría del error. Actualmente, podría ser "UserError" para entradas erróneas del usuario y "SystemError" para errores dentro del servicio.
  • "message" es una descripción del error. Puede mostrarse al usuario final.

Cómo consumir los eventos enviados por el servidor

Consumir con Python

En este uso de ejemplo, se usa la clase SSEClient. Esta clase no es una clase de Python integrada y debe instalarse por separado. Puede instalarse a través de pip:

pip install sseclient-py

Un ejemplo de uso sería:

import requests
from sseclient import SSEClient
from requests.exceptions import HTTPError

try:
    response = requests.post(url, json=body, headers=headers, stream=stream)
    response.raise_for_status()

    content_type = response.headers.get('Content-Type')
    if "text/event-stream" in content_type:
        client = SSEClient(response)
        for event in client.events():
            # Handle event, i.e. print to stdout
    else:
        # Handle json response

except HTTPError:
    # Handle exceptions

Consumir con JavaScript

Existen varias librerías para consumir eventos enviados por el servidor en JavaScript. Aquí se muestra una de ellas como ejemplo.

Un ejemplo de aplicación de chat con Python

Aquí se muestra un ejemplo de aplicación de chat escrita en Python.

Gif de una aplicación de chat de ejemplo usando Python.

Uso avanzado: salida de flujo híbrido y no híbrido

A veces, es posible que quiera obtener resultados tanto de flujo como de no flujo de una salida de flujo. Por ejemplo, en el flujo "Chat con Wikipedia", es posible que quiera obtener no solo la respuesta de LLM, sino también la lista de URL que el flujo ha buscado. Para ello, es necesario modificar el flujo para que emita una combinación de la respuesta de LLM de flujo y la lista de URL de no flujo.

En el flujo de ejemplo "Chatear con Wikipedia", la salida está conectada al nodo LLM augmented_chat. Para agregar la lista de URL a la salida, es necesario agregar un campo de salida con el nombre url y el valor ${get_wiki_url.output}.

Captura de pantalla del chat híbrido con el flujo de Wikipedia.

La salida del flujo será un campo no flujo como base y un campo flujo como delta. He aquí un ejemplo de solicitud y respuesta.

Uso avanzado: 0. El cliente envía un mensaje al servidor

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When was ChatGPT launched?",
    "chat_history": []
}

Uso anticipado - 1. El servidor devuelve la respuesta en modo streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=ChatGPT", "https://en.wikipedia.org/w/index.php?search=GPT-4"]}

data: {"answer": ""}

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " \n\n"}

...

data: {"answer": "PT"}

data: {"answer": ""}

Uso anticipado - 2. El cliente envía otro mensaje de chat, junto con el historial de chat completo, al servidor

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When did OpenAI announce GPT-4? How long is it between these two milestones?",
    "chat_history": [
        {
            "inputs": {
                "question": "When was ChatGPT launched?"
            },
            "outputs": {
                "url": [
                    "https://en.wikipedia.org/w/index.php?search=ChatGPT",
                    "https://en.wikipedia.org/w/index.php?search=GPT-4"
                ],
                "answer": "ChatGPT was launched on November 30, 2022. \n\nSOURCES: https://en.wikipedia.org/w/index.php?search=ChatGPT"
            }
        }
    ]
}

Uso anticipado - 3. El servidor devuelve la respuesta en modo streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=Generative pre-trained transformer ", "https://en.wikipedia.org/w/index.php?search=Microsoft "]}

data: {"answer": ""}

data: {"answer": "Open"}

data: {"answer": "AI"}

data: {"answer": " released"}

data: {"answer": " G"}

data: {"answer": "PT"}

data: {"answer": "-"}

data: {"answer": "4"}

data: {"answer": " in"}

data: {"answer": " March"}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "3"}

data: {"answer": "."}

data: {"answer": " Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " The"}

data: {"answer": " time"}

data: {"answer": " between"}

data: {"answer": " these"}

data: {"answer": " two"}

data: {"answer": " milestones"}

data: {"answer": " is"}

data: {"answer": " approximately"}

data: {"answer": " "}

data: {"answer": "3"}

data: {"answer": " months"}

data: {"answer": ".\n\n"}

...

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": ""}

Pasos siguientes