Hi Victoria Ovedie Chruickshank Langø
Thank you for sharing the sample code.
I am able to replicate very fast stream issue with stream command. But it is better with
"create_and_stream" command. (Depreciating command though)
Key changes
with self.client.beta.threads.runs.create_and_stream( thread_id=self.thread.id, assistant_id=self.assistant.id, tools=[{"type": "file_search"}], ) as event_handler:
Full code
import os
import json
import requests
import time
import logging
from openai import AzureOpenAI
# Initialize Azure OpenAI client
client = AzureOpenAI(
azure_endpoint="https://<resourcename>.openai.azure.com/", api_key="<api_key>",
api_version="2024-05-01-preview"
)
# Create an assistant instance
assistant = client.beta.assistants.create(
model="gpt-4o-mini", # Replace with your model deployment name
name="Assistant93",
instructions="",
tools=[{"type": "file_search"}],
tool_resources={"file_search": {"vector_store_ids": ["vs_Ep07C9sRiSomVYRvJDWWmCjT"]}},
temperature=1,
top_p=1
)
# Create a new thread
thread = client.beta.threads.create()
# Add a user question to the thread
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="hi, please summarize the pdf " # Replace this with your prompt
)
class AzureOpenAIStreaming:
def __init__(self, client, assistant, thread, promo=None):
self.client = client
self.assistant = assistant
self.thread = thread
self.promo = promo
self.logger = logging.getLogger(__name__)
def stream_response(self, stream_handler=None):
response_text = ""
local_start = time.time()
try:
self.logger.debug(f"Initiating Azure run for thread {self.thread.id}.")
with self.client.beta.threads.runs.create_and_stream( thread_id=self.thread.id,
assistant_id=self.assistant.id,
tools=[{"type": "file_search"}],
) as event_handler:
for event in event_handler:
event_name = event.__class__.__name__
if event_name == "ThreadMessageInProgress":
self.logger.debug("Azure run started streaming.")
elif event_name == "ThreadRunQueued":
self.logger.debug("Azure run is queued.")
elif event_name == "ThreadRunInProgress":
self.logger.debug("Azure run is in progress.")
elif event_name == "ThreadRunCompleted":
duration_sec = time.time() - local_start
self.logger.info(f"Azure run finished in {duration_sec:.2f} seconds.")
if self.promo:
self.promo.setGaugeValue("openai_run_duration", duration_sec)
return response_text
elif event_name == "ThreadMessageDelta":
if hasattr(event, "data") and event.data and hasattr(event.data, "delta"):
content = getattr(event.data.delta, "content", None)
if content:
for part in content:
text_chunk = part.text.value
print("text_chunk:", text_chunk)
response_text += text_chunk
if callable(stream_handler):
stream_handler(text_chunk)
elif event_name == "ThreadRunStepDelta":
if hasattr(event, "data") and event.data and hasattr(event.data, "delta"):
step_details = getattr(event.data.delta, "step_details", None)
if step_details and step_details.tool_calls:
for call in step_details.tool_calls:
if call.type == "code_interpreter" and call.code_interpreter:
self.logger.debug("Code interpreter tool was called.")
elif call.type == "file_search":
self.logger.debug("File search tool was called.")
except Exception as e:
self.logger.error(f"An error occurred during streaming: {e}")
return response_text
# Initialize the streaming instance
azure_streaming = AzureOpenAIStreaming(client, assistant, thread)
# Execute streaming response function
response = azure_streaming.stream_response()
# Print the response if needed
print("Final Response:", response)
Hope it helps.
Thank you