Use UDP channel as input for ConversationTranscriber

NB 0 Reputation points
2024-05-14T12:24:56.74+00:00

I have an audio stream accessible through UDP. I want to transcribe this audio using the ConversationTranscriber service from Azure Speech in real time (or as close to real time as possible).

I have implemented a custom PullAudioInputStreamCallback to read from one of the channels (it's a 2 channel stream) and return the data to be processed. I provide the code bellow.

When testing the code using the mic as input source, the system works well. Nevertheless, with the PullAudioStream as input, the system starts running but after some seconds the script stops without throwing any error or raising any exception. Is there a more appropriate way to connect the UDP source to the ConversationTranscriber service?

import socket
import time
import numpy as np
import azure.cognitiveservices.speech as speechsdk

key_ = <MY_KEY>
region_ = <MY_REGION>

class MyAudioStream(speechsdk.audio.PullAudioInputStreamCallback):
	def __init__(self, socket, channel):
		super().__init__()
		print("Init")
		self.socket = socket
		self.channel = channel  # Channel 0 for left, 1 for right
		self.buffer = bytearray()

	def read(self, buffer_size):
		# Fetch new data from socket if buffer is insufficient
		while len(self.buffer) < buffer_size * 2:  # x2 because stereo data
			data, _ = self.socket.recvfrom(buffer_size * 2)
			self.buffer.extend(data)

		# Extract the requested buffer size * 2 (to handle stereo data)
		data = self.buffer[:buffer_size * 2]
		del self.buffer[:buffer_size * 2]

		# Assuming 16-bit samples, split the stereo into mono
		audio_data = np.frombuffer(data, dtype=np.int16)  # Read data as 16-bit samples
		mono_data = audio_data[self.channel::2]  # Extract one channel
		# print(audio_data)
		return mono_data.tobytes()

	def close(self):
		print("Closed")
		self.socket.close()
        
def conversation_transcriber_recognition_canceled_cb(evt: speechsdk.SessionEventArgs):
    print('Canceled event')

def conversation_transcriber_session_stopped_cb(evt: speechsdk.SessionEventArgs):
    print('SessionStopped event')

def conversation_transcriber_transcribed_cb(evt: speechsdk.SpeechRecognitionEventArgs, file_handle):
	if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
		s_id, text = evt.result.speaker_id, evt.result.text
		file_handle.write('Speaker ID={}: {}\n'.format(s_id, text))
		print('Speaker ID={}: {}\n'.format(s_id, text))
	elif evt.result.reason == speechsdk.ResultReason.NoMatch:
		file_handle.write('NOMATCH: Speech could not be TRANSCRIBED: {}\n'.format(evt.result.no_match_details))


def conversation_transcriber_session_started_cb(evt: speechsdk.SessionEventArgs):
    print('SessionStarted event')


def create_transcriber(sock, sample_rate, channel):
    
	speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
	speech_config.speech_recognition_language="en-US"
	# Set the expected audio format (monaural)
	audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=sample_rate, bits_per_sample=16, channels=1)
	# Create an instance of PullAudioInputStream with the custom stream callback
	stream_callback = MyAudioStream(sock, channel)
	audio_input = speechsdk.audio.PullAudioInputStream(stream_callback, audio_format)
	audio_config = speechsdk.audio.AudioConfig(stream=audio_input)
	conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)

	return conversation_transcriber	

def main():
	# Setup the UDP socket common for both channels
	UDP_IP = "localhost"
	UDP_PORT = <MY_PORT>
	sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	sock.bind((UDP_IP, UDP_PORT))
      
	transcribing_stop = False
	conversation_transcriber = create_transcriber(sock, 16000, 0)

	def stop_cb(evt: speechsdk.SessionEventArgs):
		#"""callback that signals to stop continuous recognition upon receiving an event `evt`"""
		print('CLOSING on {}'.format(evt))
		nonlocal transcribing_stop
		transcribing_stop = True

	output_file_path = "test.txt"
	with open(output_file_path, 'w') as file_handle:
		conversation_transcriber.transcribed.connect(lambda evt: conversation_transcriber_transcribed_cb(evt, file_handle))
		conversation_transcriber.session_started.connect(conversation_transcriber_session_started_cb)
		conversation_transcriber.session_stopped.connect(conversation_transcriber_session_stopped_cb)
		conversation_transcriber.canceled.connect(conversation_transcriber_recognition_canceled_cb)
		conversation_transcriber.session_stopped.connect(stop_cb)
		conversation_transcriber.canceled.connect(stop_cb)

		conversation_transcriber.start_transcribing_async()

		try:
			while not transcribing_stop:
				time.sleep(.5)
		except Exception as e:
			print(e)
			conversation_transcriber.stop_transcribing_async()
			conversation_transcriber.close()
			sock.close()

if __name__=="__main__":
      main()
Azure AI Speech
Azure AI Speech
An Azure service that integrates speech processing into apps and services.
1,448 questions
{count} votes