Use UDP channel as input for ConversationTranscriber
I have an audio stream accessible through UDP. I want to transcribe this audio using the ConversationTranscriber service from Azure Speech in real time (or as close to real time as possible).
I have implemented a custom PullAudioInputStreamCallback to read from one of the channels (it's a 2 channel stream) and return the data to be processed. I provide the code bellow.
When testing the code using the mic as input source, the system works well. Nevertheless, with the PullAudioStream as input, the system starts running but after some seconds the script stops without throwing any error or raising any exception. Is there a more appropriate way to connect the UDP source to the ConversationTranscriber service?
import socket
import time
import numpy as np
import azure.cognitiveservices.speech as speechsdk
key_ = <MY_KEY>
region_ = <MY_REGION>
class MyAudioStream(speechsdk.audio.PullAudioInputStreamCallback):
def __init__(self, socket, channel):
super().__init__()
print("Init")
self.socket = socket
self.channel = channel # Channel 0 for left, 1 for right
self.buffer = bytearray()
def read(self, buffer_size):
# Fetch new data from socket if buffer is insufficient
while len(self.buffer) < buffer_size * 2: # x2 because stereo data
data, _ = self.socket.recvfrom(buffer_size * 2)
self.buffer.extend(data)
# Extract the requested buffer size * 2 (to handle stereo data)
data = self.buffer[:buffer_size * 2]
del self.buffer[:buffer_size * 2]
# Assuming 16-bit samples, split the stereo into mono
audio_data = np.frombuffer(data, dtype=np.int16) # Read data as 16-bit samples
mono_data = audio_data[self.channel::2] # Extract one channel
# print(audio_data)
return mono_data.tobytes()
def close(self):
print("Closed")
self.socket.close()
def conversation_transcriber_recognition_canceled_cb(evt: speechsdk.SessionEventArgs):
print('Canceled event')
def conversation_transcriber_session_stopped_cb(evt: speechsdk.SessionEventArgs):
print('SessionStopped event')
def conversation_transcriber_transcribed_cb(evt: speechsdk.SpeechRecognitionEventArgs, file_handle):
if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
s_id, text = evt.result.speaker_id, evt.result.text
file_handle.write('Speaker ID={}: {}\n'.format(s_id, text))
print('Speaker ID={}: {}\n'.format(s_id, text))
elif evt.result.reason == speechsdk.ResultReason.NoMatch:
file_handle.write('NOMATCH: Speech could not be TRANSCRIBED: {}\n'.format(evt.result.no_match_details))
def conversation_transcriber_session_started_cb(evt: speechsdk.SessionEventArgs):
print('SessionStarted event')
def create_transcriber(sock, sample_rate, channel):
speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
speech_config.speech_recognition_language="en-US"
# Set the expected audio format (monaural)
audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=sample_rate, bits_per_sample=16, channels=1)
# Create an instance of PullAudioInputStream with the custom stream callback
stream_callback = MyAudioStream(sock, channel)
audio_input = speechsdk.audio.PullAudioInputStream(stream_callback, audio_format)
audio_config = speechsdk.audio.AudioConfig(stream=audio_input)
conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)
return conversation_transcriber
def main():
# Setup the UDP socket common for both channels
UDP_IP = "localhost"
UDP_PORT = <MY_PORT>
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
transcribing_stop = False
conversation_transcriber = create_transcriber(sock, 16000, 0)
def stop_cb(evt: speechsdk.SessionEventArgs):
#"""callback that signals to stop continuous recognition upon receiving an event `evt`"""
print('CLOSING on {}'.format(evt))
nonlocal transcribing_stop
transcribing_stop = True
output_file_path = "test.txt"
with open(output_file_path, 'w') as file_handle:
conversation_transcriber.transcribed.connect(lambda evt: conversation_transcriber_transcribed_cb(evt, file_handle))
conversation_transcriber.session_started.connect(conversation_transcriber_session_started_cb)
conversation_transcriber.session_stopped.connect(conversation_transcriber_session_stopped_cb)
conversation_transcriber.canceled.connect(conversation_transcriber_recognition_canceled_cb)
conversation_transcriber.session_stopped.connect(stop_cb)
conversation_transcriber.canceled.connect(stop_cb)
conversation_transcriber.start_transcribing_async()
try:
while not transcribing_stop:
time.sleep(.5)
except Exception as e:
print(e)
conversation_transcriber.stop_transcribing_async()
conversation_transcriber.close()
sock.close()
if __name__=="__main__":
main()