Problems with Azure Assistant API - Streaming.

2025-04-01T07:24:00.3966667+00:00

Hello, I am currently trying out the Azure Assistant API with the "AzureOpenAI" package from OpenAI. We previously used OpenAI API where streaming worked as it should, but I am experiencing some problems when using it in Azure.

The problem is that the Assistant does not really stream the answer. After the assistant is in the "in progress"-status, it starts answering, but instead of streaming, it sends everything in one chunk.

The setup I have:

  • Version: 2025-02-01-preview.
  • Async filter (for content filtering, supposedly it would give smoother streaming).
  • GPT-4o.

I tried with the Eventhandler that was in the documentation, but it did not work.

Right now I am using:

self.client.beta.threads.runs.stream(
          thread_id=thread_id,
          assistant_id=assistant_id,
          tools=[{"type": "file_search"}])

And when streaming the text chunks:

if event.__class__.__name__ == "ThreadMessageDelta":
                if hasattr(event, "data") and event.data and hasattr(event.data, "delta"):
                  content = getattr(event.data.delta, "content", None)
                  if content:
                    for part in content:
                        text_chunk = part.text.value
                        response_text += text_chunk
                        if callable(stream_handler):
                          stream_handler(text_chunk)

But the way it works now is that I create the run, it queues and loads, and then everything comes at once where it is beign streamed extremely fast.

Azure OpenAI Service
Azure OpenAI Service
An Azure service that provides access to OpenAI’s GPT-3 models with enterprise capabilities.
4,080 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Manas Mohanty 5,620 Reputation points Microsoft External Staff Moderator
    2025-04-03T09:09:10.4266667+00:00

    Hi Victoria Ovedie Chruickshank Langø

    Thank you for sharing the sample code.

    I am able to replicate very fast stream issue with stream command. But it is better with

    "create_and_stream" command. (Depreciating command though)

    Key changes

    with self.client.beta.threads.runs.create_and_stream(                     thread_id=self.thread.id,                     assistant_id=self.assistant.id,                     tools=[{"type": "file_search"}],             ) as event_handler:
    
    
    
    

    Full code

    import os
    import json
    import requests
    import time
    import logging
    from openai import AzureOpenAI
    
    # Initialize Azure OpenAI client
    client = AzureOpenAI(
        azure_endpoint="https://<resourcename>.openai.azure.com/",     api_key="<api_key>",    
        api_version="2024-05-01-preview"
    )
    
    # Create an assistant instance
    assistant = client.beta.assistants.create(
        model="gpt-4o-mini",  # Replace with your model deployment name
        name="Assistant93",
        instructions="",
        tools=[{"type": "file_search"}],
        tool_resources={"file_search": {"vector_store_ids": ["vs_Ep07C9sRiSomVYRvJDWWmCjT"]}},
        temperature=1,
        top_p=1
    )
    
    # Create a new thread
    thread = client.beta.threads.create()
    
    
    # Add a user question to the thread
    message = client.beta.threads.messages.create(
      thread_id=thread.id,
      role="user",
      content="hi, please summarize the pdf " # Replace this with your prompt
    )
    
    class AzureOpenAIStreaming:
        def __init__(self, client, assistant, thread, promo=None):
            self.client = client
            self.assistant = assistant
            self.thread = thread
            self.promo = promo
            self.logger = logging.getLogger(__name__)
    
        def stream_response(self, stream_handler=None):
            response_text = ""
            local_start = time.time()
    
            try:
                self.logger.debug(f"Initiating Azure run for thread {self.thread.id}.")
    
                with self.client.beta.threads.runs.create_and_stream(                     thread_id=self.thread.id,
                        assistant_id=self.assistant.id,
                        tools=[{"type": "file_search"}],
                ) as event_handler:
    
                    for event in event_handler:
                        event_name = event.__class__.__name__
    
                        if event_name == "ThreadMessageInProgress":
                            self.logger.debug("Azure run started streaming.")
    
                        elif event_name == "ThreadRunQueued":
                            self.logger.debug("Azure run is queued.")
    
                        elif event_name == "ThreadRunInProgress":
                            self.logger.debug("Azure run is in progress.")
    
                        elif event_name == "ThreadRunCompleted":
                            duration_sec = time.time() - local_start
                            self.logger.info(f"Azure run finished in {duration_sec:.2f} seconds.")
    
                            if self.promo:
                                self.promo.setGaugeValue("openai_run_duration", duration_sec)
    
                            return response_text
    
                        elif event_name == "ThreadMessageDelta":
                            if hasattr(event, "data") and event.data and hasattr(event.data, "delta"):
                                content = getattr(event.data.delta, "content", None)
                                if content:
                                    for part in content:
                                        text_chunk = part.text.value
                                        print("text_chunk:", text_chunk)
                                        response_text += text_chunk
                                        if callable(stream_handler):
                                            stream_handler(text_chunk)
    
                        elif event_name == "ThreadRunStepDelta":
                            if hasattr(event, "data") and event.data and hasattr(event.data, "delta"):
                                step_details = getattr(event.data.delta, "step_details", None)
                                if step_details and step_details.tool_calls:
                                    for call in step_details.tool_calls:
                                        if call.type == "code_interpreter" and call.code_interpreter:
                                            self.logger.debug("Code interpreter tool was called.")
                                        elif call.type == "file_search":
                                            self.logger.debug("File search tool was called.")
    
            except Exception as e:
                self.logger.error(f"An error occurred during streaming: {e}")
    
            return response_text
    
    
    # Initialize the streaming instance
    azure_streaming = AzureOpenAIStreaming(client, assistant, thread)
    
    # Execute streaming response function
    response = azure_streaming.stream_response()
    
    # Print the response if needed
    print("Final Response:", response)
    
    
    

    Hope it helps.

    Thank you

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.