Condividi tramite


Come usare gli endpoint di streaming distribuiti da prompt flow

In prompt flow è possibile distribuire un flusso in un endpoint online gestito di Azure Machine Learning per l'inferenza in tempo reale.

Quando si utilizza l'endpoint inviando una richiesta, il comportamento predefinito è che l'endpoint online continuerà ad attendere fino a quando l'intera risposta non è pronta e quindi la invia al client. Ciò può causare un lungo ritardo per il client e un'esperienza utente insoddisfacente.

Per evitare questo problema, è possibile usare lo streaming quando si utilizzano gli endpoint. Una volta abilitato lo streaming, non è necessario attendere che l'intera risposta sia pronta. Il server invece invierà la risposta in blocchi man mano che vengono generati. Il client può quindi visualizzare la risposta progressivamente, con meno tempo di attesa e maggiore interattività.

Questo articolo descrive l'ambito di streaming, il funzionamento dello streaming e come usare gli endpoint di streaming.

Creare un flusso abilitato per lo streaming

Se si vuole usare la modalità di streaming, è necessario creare un flusso con un nodo che produce un generatore di stringhe come output del flusso. Un generatore di stringhe è un oggetto che può restituire una stringa alla volta quando richiesto. Per creare un generatore di stringhe, è possibile usare i tipi di nodi seguenti:

  • Nodo LLM: questo nodo usa un Large Language Model per generare risposte in linguaggio naturale in base all'input.

    {# Sample prompt template for LLM node #}
    
    system:
    You are a helpful assistant.
    
    user:
    {{question}}
    
  • Nodo Python: questo nodo consente di scrivere codice Python personalizzato in grado di produrre output di stringa. È possibile usare questo nodo per chiamare API o librerie esterne che supportano lo streaming. Ad esempio, è possibile usare questo codice per ripetere l'input parola per parola:

    from promptflow import tool
    
    # Sample code echo input by yield in Python tool node
    
    @tool
    def my_python_tool(paragraph: str) -> str:
        yield "Echo: "
        for word in paragraph.split():
            yield word + " "
    

Importante

Solo l'output dell'ultimo nodo del flusso può supportare lo streaming.

"Ultimo nodo" significai che l'output del nodo non viene utilizzato da altri nodi.

In questa guida si userà il flusso "Chat with Wikipedia" come esempio. Questo flusso elabora la domanda dell'utente, cerca in Wikipedia articoli pertinenti e risponde alla domanda con le informazioni degli articoli. Usa la modalità di streaming per mostrare lo stato di avanzamento della generazione di risposte.

Per informazioni su come creare un flusso di chat, vedere come sviluppare un flusso di chat in prompt flow.

Screenshot della chat con il flusso di Wikipedia.

Distribuire il flusso come endpoint online

Per usare la modalità di streaming, è necessario distribuire il flusso come endpoint online. In questo modo sarà possibile inviare richieste e ricevere risposte dal flusso in tempo reale.

Per informazioni su come distribuire il flusso come endpoint online, vedere Distribuire un flusso all'endpoint online per l'inferenza in tempo reale con l'interfaccia della riga di comando.

Nota

Eseguire la distribuzione con la versione dell'ambiente di runtime successiva alla versione 20230816.v10.

È possibile controllare la versione del runtime e aggiornarla nella relativa pagina dei dettagli.

Screenshot di Azure Machine Learning Studio che mostra l'ambiente di runtime.

Informazioni sul processo di streaming

Per gli endpoint online, il client e il server devono seguire principi specifici per la negoziazione del contenuto per usare la modalità di streaming:

La negoziazione del contenuto è simile a una conversazione tra il client e il server in merito al formato preferito dei dati che vogliono inviare e ricevere. Garantisce comunicazioni efficaci e l'accordo sul formato dei dati scambiati.

Per comprendere il processo di streaming, considerare i passaggi seguenti:

  • Prima di tutto, il client crea una richiesta HTTP con il tipo di supporto desiderato incluso nell'intestazione Accept. Il tipo di supporto indica al server il tipo di formato dati previsto dal client. È come il cliente comunicasse al server che è in cerca di un formato specifico per i dati che riceverà. Potrebbe trattarsi di JSON, testo o altro. Ad esempio, application/json indica una preferenza per i dati JSON, text/event-stream indica un desiderio di dati in streaming e */* indica che il client accetta qualsiasi formato di dati.

    Nota

    Se una richiesta non ha un'intestazione Accept o ha un'intestazione Accept vuota, significa che il client accetterà qualsiasi tipo di supporto in risposta. Il server lo considera come */*.

  • Il server risponde quindi in base al tipo di supporto specificato nell'intestazione Accept. È importante notare che il client potrebbe richiedere più tipi di supporti nell'intestazione Accept e il server deve considerare le relative funzionalità e le priorità di formato per determinare la risposta appropriata.

    • Prima di tutto, il server controlla se text/event-stream è specificato in modo esplicito nell'intestazione Accept:
      • Per un flusso abilitato per lo streaming, il server restituisce una risposta con Content-Type text/event-stream, che indica che i dati vengono trasmessi.
      • Per un flusso non abilitato per lo streaming, il server continua a verificare la presenza di altri tipi di supporto specificati nell'intestazione.
    • Se text/event-stream non è specificato, il server controlla se application/json o */* è specificato nell'intestazione Accept:
      • In questi casi, il server restituisce una risposta con Content-Type application/json, fornendo i dati in formato JSON.
    • Se l'intestazione Accept specifica altri tipi di supporti, ad esempio text/html:
      • Il server restituisce una risposta 424 con un codice di errore UserError del runtime di prompt flow e uno stato HTTP 406 del runtime, che indica che il server non può soddisfare la richiesta con il formato dati richiesto. Per altre informazioni, vedere Gestire gli errori.
  • Infine, il client controlla l'intestazione Content-Type della risposta. Se è impostata su text/event-stream, indica che i dati vengono trasmessi in streaming.

Esaminiamo in modo più approfondito il funzionamento del processo di streaming. I dati della risposta in modalità di streaming seguono il formato Server-Sent Event (SSE).

Il processo generale è il seguente:

0. Il client invia un messaggio al server

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Hello",
    "chat_history": []
}

Nota

L'intestazione Accept è impostata su text/event-stream per richiedere una risposta in streaming.

1. Il server invia la risposta in modalità di streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Hello"}

data: {"answer": "!"}

data: {"answer": " How"}

data: {"answer": " can"}

data: {"answer": " I"}

data: {"answer": " assist"}

data: {"answer": " you"}

data: {"answer": " today"}

data: {"answer": " ?"}

data: {"answer": ""}

Nota

Content-Type è impostato su text/event-stream; charset=utf-8, che indica che la risposta è un flusso di eventi.

Il client deve decodificare i dati della risposta come eventi SSE e visualizzarli in modo incrementale. Il server chiuderà la connessione HTTP dopo l'invio di tutti i dati.

Ogni evento di risposta è il delta dell'evento precedente. È consigliabile che il client tenga traccia dei dati uniti in memoria e li invii al server come cronologia della chat nella richiesta successiva.

2. Il client invia un altro messaggio di chat, insieme alla cronologia completa della chat, al server

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream

{
    "question": "Glad to know you!",
    "chat_history": [
        {
            "inputs": {
                "question": "Hello"
            },
            "outputs": {
                "answer": "Hello! How can I assist you today?"
            }
        }
    ]
}

3. Il server invia la risposta in modalità di streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"answer": ""}

data: {"answer": "Nice"}

data: {"answer": " to"}

data: {"answer": " know"}

data: {"answer": " you"}

data: {"answer": " too"}

data: {"answer": "!"}

data: {"answer": " Is"}

data: {"answer": " there"}

data: {"answer": " anything"}

data: {"answer": " I"}

data: {"answer": " can"}

data: {"answer": " help"}

data: {"answer": " you"}

data: {"answer": " with"}

data: {"answer": "?"}

data: {"answer": ""}

La chat continua quindi in modo simile.

Gestione degli errori

Il client deve prima controllare il codice di risposta HTTP. Vedere la tabella di codici di stato HTTP per i codici di errore comuni restituiti dagli endpoint online.

Se il codice di risposta è "Errore del modello 424", significa che l'errore è causato dal codice del modello. La risposta di errore di un modello prompt flow segue sempre questo formato:

{
  "error": {
    "code": "UserError",
    "message": "Media type text/event-stream in Accept header is not acceptable. Supported media type(s) - application/json",
  }
}
  • È sempre un dizionario JSON solo con una chiave "error" definita.
  • Il valore di "error" è un dizionario contenente "code", "message".
  • "code" definisce la categoria di errore. Attualmente, potrebbe essere "UserError" per gli input utente non valido e "SystemError" per gli errori all'interno del servizio.
  • "message" è una descrizione dell'errore. Può essere visualizzato all'utente finale.

Come utilizzare gli eventi SSE

Utilizzo con Python

In questo esempio viene usata la classe SSEClient. Questa classe non è una classe Python predefinita e deve essere installata separatamente. È possibile installarlo tramite pip:

pip install sseclient-py

Un esempio di utilizzo è:

import requests
from sseclient import SSEClient
from requests.exceptions import HTTPError

try:
    response = requests.post(url, json=body, headers=headers, stream=stream)
    response.raise_for_status()

    content_type = response.headers.get('Content-Type')
    if "text/event-stream" in content_type:
        client = SSEClient(response)
        for event in client.events():
            # Handle event, i.e. print to stdout
    else:
        # Handle json response

except HTTPError:
    # Handle exceptions

Utilizzo con JavaScript

Sono disponibili diverse librerie per l'utilizzo di eventi SSE in JavaScript. Di seguito è riportato uno di essi come esempio.

App di chat di esempio con Python

Ecco un'app di chat di esempio scritta in Python.

Gif di un'app di chat di esempio con Python.

Utilizzo avanzato - Flusso ibrido e output di flusso non in streaming

In alcuni casi, potrebbe essere necessario ottenere i risultati in streaming e non in streaming dall'output di un flusso. Ad esempio, nel flusso "Chat with Wikipedia" si potrebbe voler ottenere non solo la risposta di LLM, ma anche l'elenco di URL cercati dal flusso. A tale scopo, è necessario modificare il flusso in modo che restituisca una combinazione di risposta LLM in streaming e un elenco di URL non in streaming.

Nel flusso di esempio "Chat with Wikipedia", l'output è connesso al nodo LLM augmented_chat. Per aggiungere l'elenco di URL all'output, è necessario aggiungere un campo di output con il nome url e il valore ${get_wiki_url.output}.

Screenshot della chat ibrida con il flusso di Wikipedia.

L'output del flusso sarà un campo non di streaming come base e un campo di streaming come delta. Ecco un esempio di richiesta e risposta.

Utilizzo avanzato - 0. Il client invia un messaggio al server

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When was ChatGPT launched?",
    "chat_history": []
}

Utilizzo avanzato -1. Il server invia la risposta in modalità di streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=ChatGPT", "https://en.wikipedia.org/w/index.php?search=GPT-4"]}

data: {"answer": ""}

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " \n\n"}

...

data: {"answer": "PT"}

data: {"answer": ""}

Utilizzo avanzato - 2. Il client invia un altro messaggio di chat, insieme alla cronologia completa della chat, al server

POST https://<your-endpoint>.inference.ml.azure.com/score
Content-Type: application/json
Authorization: Bearer <key or token of your endpoint>
Accept: text/event-stream
{
    "question": "When did OpenAI announce GPT-4? How long is it between these two milestones?",
    "chat_history": [
        {
            "inputs": {
                "question": "When was ChatGPT launched?"
            },
            "outputs": {
                "url": [
                    "https://en.wikipedia.org/w/index.php?search=ChatGPT",
                    "https://en.wikipedia.org/w/index.php?search=GPT-4"
                ],
                "answer": "ChatGPT was launched on November 30, 2022. \n\nSOURCES: https://en.wikipedia.org/w/index.php?search=ChatGPT"
            }
        }
    ]
}

Utilizzo avanzato -3. Il server invia la risposta in modalità di streaming

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Transfer-Encoding: chunked

data: {"url": ["https://en.wikipedia.org/w/index.php?search=Generative pre-trained transformer ", "https://en.wikipedia.org/w/index.php?search=Microsoft "]}

data: {"answer": ""}

data: {"answer": "Open"}

data: {"answer": "AI"}

data: {"answer": " released"}

data: {"answer": " G"}

data: {"answer": "PT"}

data: {"answer": "-"}

data: {"answer": "4"}

data: {"answer": " in"}

data: {"answer": " March"}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "3"}

data: {"answer": "."}

data: {"answer": " Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": " was"}

data: {"answer": " launched"}

data: {"answer": " on"}

data: {"answer": " November"}

data: {"answer": " "}

data: {"answer": "30"}

data: {"answer": ","}

data: {"answer": " "}

data: {"answer": "202"}

data: {"answer": "2"}

data: {"answer": "."}

data: {"answer": " The"}

data: {"answer": " time"}

data: {"answer": " between"}

data: {"answer": " these"}

data: {"answer": " two"}

data: {"answer": " milestones"}

data: {"answer": " is"}

data: {"answer": " approximately"}

data: {"answer": " "}

data: {"answer": "3"}

data: {"answer": " months"}

data: {"answer": ".\n\n"}

...

data: {"answer": "Chat"}

data: {"answer": "G"}

data: {"answer": "PT"}

data: {"answer": ""}

Passaggi successivi