Hi @LeetGPT
Thank you for the follow-up question regarding python example.
Unfortunately, I couldn't find direct Python versions of the codes provided in C# and Java for reusing SpeechRecognizer. However, I attempted to achieve similar functionality by managing SpeechSynthesizer instances in Python.
Below is my attempted code conversion by referring the above C# and Java codes for this purpose. you can enhance it for meeting your requirements:
import asyncio
import concurrent.futures
import time
from azure.cognitiveservices.speech import SpeechSynthesizer, SpeechConfig, AudioDataStream
class SynthesizerPool:
def __init__(self, synthesizer_generator, initial_capacity=2, maximum_retained_capacity=100):
self._synthesizer_generator = synthesizer_generator
self._synthesizer_stack = []
self._maximum_retained_capacity = maximum_retained_capacity
print(f"Create initial {initial_capacity} synthesizer and warm up")
for _ in range(initial_capacity):
item = self._synthesizer_generator()
# Remove the line below to avoid warming up with "1"
# self._warm_up_synthesizer(item)
self._put(item)
print("Pool created!")
# Remove this method since we are not warming up the synthesizer anymore
# def _warm_up_synthesizer(self, synthesizer):
# synthesizer.speak_text_async("1").get()
def _put(self, item):
if len(self._synthesizer_stack) < self._maximum_retained_capacity:
print("put synthesizer to pool.")
self._synthesizer_stack.append(item)
else:
print("dispose a synthesizer.")
item.dispose()
def get(self):
if self._synthesizer_stack:
return self._synthesizer_stack.pop()
else:
print("create a brand new synthesizer...")
return self._synthesizer_generator()
class SynthesisServer:
def __init__(self, subscription, region, voice_name, output_format, concurrency):
self._pool = SynthesizerPool(lambda: SpeechSynthesizer(SpeechConfig(subscription=subscription, region=region)), concurrency)
self._voice_name = voice_name
self._output_format = output_format
self._latency_list = []
self._processing_time_list = []
def _synthesizing_event(self, event_args):
if event_args.result.reason == "SynthesizingAudioStarted":
print(f"First byte latency: {time.time() - self._start}")
self._latency_list.append((time.time() - self._start) * 1000)
def _synthesize(self, synthesizer, text):
self._start = time.time()
synthesizer.synthesizing.connect(self._synthesizing_event)
result = synthesizer.start_speaking_text_async(text).get()
if result.reason == "SynthesizingAudioStarted":
total_size = 0
audio_data_stream = AudioDataStream.from_result(result)
while True:
buffer = bytearray(4096)
filled_size = audio_data_stream.read_data(buffer)
if filled_size > 0:
print(f"{filled_size} bytes received. Handle the data buffer here")
total_size += filled_size
else:
break
if total_size > 0:
self._processing_time_list.append((time.time() - self._start) * 1000)
synthesizer.synthesizing.disconnect(self._synthesizing_event)
self._pool._put(synthesizer) # Corrected method call
async def synthesize(self, text):
synthesizer = self._pool.get()
self._synthesize(synthesizer, text)
def dump_stats(self):
if self._latency_list:
self._latency_list.sort()
print("Average latency {:.2f} ms".format(sum(self._latency_list) / len(self._latency_list)))
print("Max latency {:.2f} ms".format(max(self._latency_list)))
print("Min latency {:.2f} ms".format(min(self._latency_list)))
print("90% latency {:.2f} ms".format(self._latency_list[min(int(len(self._latency_list) * 0.9), len(self._latency_list) - 1)]))
print("95% latency {:.2f} ms".format(self._latency_list[min(int(len(self._latency_list) * 0.95), len(self._latency_list) - 1)]))
self._processing_time_list.sort()
print("\nAverage processing time {:.2f} ms".format(sum(self._processing_time_list) / len(self._processing_time_list)))
print("Max processing time {:.2f} ms".format(max(self._processing_time_list)))
print("Min processing time {:.2f} ms".format(min(self._processing_time_list)))
print("90% processing time {:.2f} ms".format(self._processing_time_list[min(int(len(self._processing_time_list) * 0.9), len(self._processing_time_list) - 1)]))
print("95% processing time {:.2f} ms".format(self._processing_time_list[min(int(len(self._processing_time_list) * 0.95), len(self._processing_time_list) - 1)]))
else:
print("Something wrong! No latency data!")
async def main():
subscription_key = "YOUR_KEY"
region = "YOUR_REGION"
concurrency = 1 # Limiting concurrent tasks to match the capacity of the synthesizer pool
server = SynthesisServer(subscription_key, region,
"en-US-AvaNeural", None, concurrency)
text = "today is a nice day."
tasks = [server.synthesize(text) for _ in range(3)]
await asyncio.gather(*tasks)
server.dump_stats()
print("Press the Enter key to exit.")
input()
if __name__ == "__main__":
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main())
The proposed solution involves using an object pool to optimize resource usage.
You can refactor the Python code to meet your specific use case.
I hope you understand. Thank you.