Reuse SpeechRecognizer and stream for multiple audio streams?

LeetGPT 60 Reputation points
2024-04-04T15:16:16.24+00:00

Hi team, is there any best practice on how to reuse the SpeechRecognizer for stream recognizing user audios? In our application, we know where user start talking and end talking so we can signal speech recognizer for it.

The reason I wanted to reuse SpeechRecognizer is because upon load testing on production, creating separate SpeechRecognizer consumes a lot of CPU and memory for websocket connections.

I am trying out the below code to test but it doesn't work since SpeechRecognizer seems to wait for stream to close in order to stop. However I don't want to close stream since it will need to recreate another connection later on.

import os
import random

import azure.cognitiveservices.speech as speechsdk
from gevent.event import Event

class StreamingSession:
    def __init__(self):
        self.transcription = ""
        self.closed = False
        self.recognition_done = Event()
        self.stream = speechsdk.audio.PushAudioInputStream()

        self.__register()
        self.speech_recognizer.start_continuous_recognition_async()

    def reset(self):
        self.transcription = ""
        self.closed = False
        self.recognition_done.clear()
        self.speech_recognizer.start_continuous_recognition_async()

    def add_chunk(self, chunk):
        if not self.closed:
            self.stream.write(chunk)

    def get_transcription(self):
        if not self.closed:
            self.recognition_done.wait()
            self.speech_recognizer.stop_continuous_recognition_async()
        return self.transcription

    def __register(self):
        # Initialize the speech configuration using environment variables
        selected_key = random.choice(self.speech_keys)
        self.speech_config = speechsdk.SpeechConfig(subscription=selected_key, 
                                                    region=os.environ.get('AZURE_STT_REGION'))
        self.audio_config = speechsdk.audio.AudioConfig(stream=self.stream)
        self.speech_recognizer = speechsdk.SpeechRecognizer(speech_config=self.speech_config, audio_config=self.audio_config)
        self.__connect_callbacks()

    def __connect_callbacks(self):
        self.speech_recognizer.recognized.connect(self.__recognized_callback)
        self.speech_recognizer.session_stopped.connect(self.__session_stopped_callback)

    def __recognized_callback(self, evt):
        self.transcription += evt.result.text

    def __session_stopped_callback(self, evt):
        self.recognition_done.set()
        self.closed = True
        print("Session is stopped")

    def __stream_recognize(self):
        self.recognition_done.wait()
        self.speech_recognizer.stop_continuous_recognition_async()


Ideally we want to reuse the same SpeechRecognizer for the same user throughout the session to recognize multiple audio streams. If that's not an option, we might have to create new SpeechRecognizer for every audio stream which is not performant, if we do clean up by disconnecting all callbacks, those operations are taking a long time > 10s. Please advise! Thank you!

Azure AI Speech
Azure AI Speech
An Azure service that integrates speech processing into apps and services.
1,408 questions
Azure
Azure
A cloud computing platform and infrastructure for building, deploying and managing applications and services through a worldwide network of Microsoft-managed datacenters.
954 questions
{count} votes

Accepted answer
  1. dupammi 6,815 Reputation points Microsoft Vendor
    2024-04-06T03:27:48.1833333+00:00

    Hi @LeetGPT

    Thank you for the follow-up question regarding python example.

    Unfortunately, I couldn't find direct Python versions of the codes provided in C# and Java for reusing SpeechRecognizer. However, I attempted to achieve similar functionality by managing SpeechSynthesizer instances in Python.

    Below is my attempted code conversion by referring the above C# and Java codes for this purpose. you can enhance it for meeting your requirements:

    import asyncio
    import concurrent.futures
    import time
    from azure.cognitiveservices.speech import SpeechSynthesizer, SpeechConfig, AudioDataStream
    
    class SynthesizerPool:
        def __init__(self, synthesizer_generator, initial_capacity=2, maximum_retained_capacity=100):
            self._synthesizer_generator = synthesizer_generator
            self._synthesizer_stack = []
            self._maximum_retained_capacity = maximum_retained_capacity
    
            print(f"Create initial {initial_capacity} synthesizer and warm up")
            for _ in range(initial_capacity):
                item = self._synthesizer_generator()
                # Remove the line below to avoid warming up with "1"
                # self._warm_up_synthesizer(item)
                self._put(item)
    
            print("Pool created!")
    
        # Remove this method since we are not warming up the synthesizer anymore
        # def _warm_up_synthesizer(self, synthesizer):
        #     synthesizer.speak_text_async("1").get()
    
        def _put(self, item):
            if len(self._synthesizer_stack) < self._maximum_retained_capacity:
                print("put synthesizer to pool.")
                self._synthesizer_stack.append(item)
            else:
                print("dispose a synthesizer.")
                item.dispose()
    
        def get(self):
            if self._synthesizer_stack:
                return self._synthesizer_stack.pop()
            else:
                print("create a brand new synthesizer...")
                return self._synthesizer_generator()
    
    class SynthesisServer:
        def __init__(self, subscription, region, voice_name, output_format, concurrency):
            self._pool = SynthesizerPool(lambda: SpeechSynthesizer(SpeechConfig(subscription=subscription, region=region)), concurrency)
            self._voice_name = voice_name
            self._output_format = output_format
            self._latency_list = []
            self._processing_time_list = []
    
        def _synthesizing_event(self, event_args):
            if event_args.result.reason == "SynthesizingAudioStarted":
                print(f"First byte latency: {time.time() - self._start}")
                self._latency_list.append((time.time() - self._start) * 1000)
    
        def _synthesize(self, synthesizer, text):
            self._start = time.time()
            synthesizer.synthesizing.connect(self._synthesizing_event)
    
            result = synthesizer.start_speaking_text_async(text).get()
            if result.reason == "SynthesizingAudioStarted":
                total_size = 0
                audio_data_stream = AudioDataStream.from_result(result)
                while True:
                    buffer = bytearray(4096)
                    filled_size = audio_data_stream.read_data(buffer)
                    if filled_size > 0:
                        print(f"{filled_size} bytes received. Handle the data buffer here")
                        total_size += filled_size
                    else:
                        break
    
                if total_size > 0:
                    self._processing_time_list.append((time.time() - self._start) * 1000)
    
                synthesizer.synthesizing.disconnect(self._synthesizing_event)
                self._pool._put(synthesizer)  # Corrected method call
    
        async def synthesize(self, text):
            synthesizer = self._pool.get()
            self._synthesize(synthesizer, text)
    
        def dump_stats(self):
            if self._latency_list:
                self._latency_list.sort()
                print("Average latency {:.2f} ms".format(sum(self._latency_list) / len(self._latency_list)))
                print("Max latency {:.2f} ms".format(max(self._latency_list)))
                print("Min latency {:.2f} ms".format(min(self._latency_list)))
                print("90% latency {:.2f} ms".format(self._latency_list[min(int(len(self._latency_list) * 0.9), len(self._latency_list) - 1)]))
                print("95% latency {:.2f} ms".format(self._latency_list[min(int(len(self._latency_list) * 0.95), len(self._latency_list) - 1)]))
    
                self._processing_time_list.sort()
                print("\nAverage processing time {:.2f} ms".format(sum(self._processing_time_list) / len(self._processing_time_list)))
                print("Max processing time {:.2f} ms".format(max(self._processing_time_list)))
                print("Min processing time {:.2f} ms".format(min(self._processing_time_list)))
                print("90% processing time {:.2f} ms".format(self._processing_time_list[min(int(len(self._processing_time_list) * 0.9), len(self._processing_time_list) - 1)]))
                print("95% processing time {:.2f} ms".format(self._processing_time_list[min(int(len(self._processing_time_list) * 0.95), len(self._processing_time_list) - 1)]))
            else:
                print("Something wrong! No latency data!")
    
    async def main():
        subscription_key = "YOUR_KEY"
        region = "YOUR_REGION"
        concurrency = 1  # Limiting concurrent tasks to match the capacity of the synthesizer pool
    
        server = SynthesisServer(subscription_key, region,
                                 "en-US-AvaNeural", None, concurrency)
    
        text = "today is a nice day."
        tasks = [server.synthesize(text) for _ in range(3)]
        await asyncio.gather(*tasks)
    
        server.dump_stats()
        print("Press the Enter key to exit.")
        input()
    
    if __name__ == "__main__":
        import nest_asyncio
        nest_asyncio.apply()
        asyncio.run(main())
    
    

    The proposed solution involves using an object pool to optimize resource usage.

    You can refactor the Python code to meet your specific use case.

    I hope you understand. Thank you.

    1 person found this answer helpful.
    0 comments No comments

0 additional answers

Sort by: Most helpful