From e0667bdc9d9a7e3d57a8df176c35810e28bcb69f Mon Sep 17 00:00:00 2001 From: DrJJ Date: Sun, 10 Sep 2023 21:08:31 -0500 Subject: [PATCH] mores examples and test code for real-time websocket and TWIML transcription --- .../real-time/rt-1chn-simulated-twiml-pcmu.py | 300 ++++++++++++++++++ .../real-time/rt-2chn-simulated-twiml-f32.py | 32 +- .../real-time/rt-2chn-simulated-twiml-pcmu.py | 33 +- .../real-time/rt-websocket-1chn-ffmpeg.py | 18 +- .../real-time/rt-websocket-2chn-ffmpeg.py | 54 ++-- 5 files changed, 399 insertions(+), 38 deletions(-) create mode 100644 new-examples/real-time/rt-1chn-simulated-twiml-pcmu.py diff --git a/new-examples/real-time/rt-1chn-simulated-twiml-pcmu.py b/new-examples/real-time/rt-1chn-simulated-twiml-pcmu.py new file mode 100644 index 00000000..927db494 --- /dev/null +++ b/new-examples/real-time/rt-1chn-simulated-twiml-pcmu.py @@ -0,0 +1,300 @@ +#pip install websockets + +import requests, time, os, json +import threading +import asyncio +import websockets +import datetime +import base64 +import configparser + +cfg = configparser.ConfigParser() +cfg.read("config.ini") +configSection = cfg.get("DEFAULT", "CONFIG") +protocol = cfg.get(configSection, "PROTOCOL") +hostPort = cfg.get(configSection, "HOSTPORT") +JWT = cfg.get(configSection, "JWT") +urlPrefix = cfg.get(configSection, "URLPREFIX") +inputFolder = cfg.get("DEFAULT", "INPUTFOLDER") +inputFile = cfg.get("DEFAULT", "INPUTFILE") + +inbound_audio = f"{inputFolder}/{inputFile}" + +headers = {"Authorization":JWT} +# new transcription session request +# it specifies audio input via an TWIML stream +# and output is via a plain websocket +body = { + "sessions": [ + { + "asyncMode": "REAL-TIME", + "audioChannelSelector" : "mix", + "websocket": { + "adHoc": 'true', + "useSTOMP" : 'false', + "minimumDelay": 0 + }, + "content" : { + "incremental" : ['words'], + "full" : [] + } + } + ], + "audio": { + "source": { "stream": { "protocol": "TWIML", "noAudioTimeout" : 150000 } }, + "format": "PCMU", + "channel" : "mono", + "rate": 8000, + "capture": 'true' + }, + "settings": { + "asr": { + "noInputTimeout": 60000, + "completeTimeout": 0 + } + } +} + +print("making request", flush=True) +url = "{}://{}/{}/asr/transcribe/async".format(protocol, hostPort, urlPrefix) +init_response_raw = requests.post(url, json=body, headers=headers) +print("done request: {}".format(init_response_raw), flush=True) +print("response headers: {}".format(init_response_raw.headers), flush=True) + + +init_response = init_response_raw.json() +if(init_response.get("sessions") is None): + print("did not obtain session") + print(init_response_raw.status_code) + print(init_response_raw.text) + exit() + +# retrieve values from response +# sessionId and capturedAudio are printed for debugging purposes +session_id_left = init_response["sessions"][0]["sessionId"] +ws_url_left = init_response["sessions"][0]["websocket"]["url"] +audio_ws_url = init_response["audio"]["stream"]["websocketUrl"] +capturedAudio = init_response["audio"].get("capturedAudio") + +print(" SessionId L: {}".format(session_id_left)) +print(" Audio webscocket: {}".format(audio_ws_url)) + +if( not(capturedAudio is None)): + print("captured audio id: {}".format(capturedAudio)) +print("Result Websocket Url L: {}".format(ws_url_left), flush=True) + +msgCnt = 0 + +# function to process JSON with incremental transcription results sent as messages over websocket +def process_ws_msg(wsMsg, stack, prefix): + global msgCnt + msgCnt += 1 + # uncomment this to see raw messages + # print(prefix+" "+wsMsg, flush=True) + + try: + data = json.loads(wsMsg) + utter = data.get('utt') + if( utter is None ): + toDel = data.get('del') + if( toDel is None): + # unknown edit + print("EDIT->"+wsMsg, flush=True) + else: + # delete and edits + for i in range(toDel): + stack.pop() + edits = data.get('edit') + if(not (edits is None)): + for edit in edits: + utter = edit.get('utt') + stack.append(utter) + else: + # simple utterance + stack.append(utter) + print("\t"+prefix+" \t"+' '.join(stack), flush=True) + except Exception as e: + print("ERROR: "+str(e), flush=True) + +# function to read audio from file and stream to Voicegain via RTP - we are using ffmpeg to do all the work +# note that we are using the -re parameter to stream at about real-time speed +async def stream_audio(): + print("stream_audio start", flush=True) + print("Inbound audio file: {}".format(inbound_audio), flush=True) + with open(inbound_audio, mode='rb') as file1: + async with websockets.connect(audio_ws_url, + # we need to lower the buffer size - otherwise the sender will buffer for too long + write_limit=480, + # compression needs to be disabled otherwise will buffer for too long + compression=None) as websocket: + try: + print(str(datetime.datetime.now())+" sender connected", flush=True) + + + # print("sleeping 3 seconds before shutting down websocket", flush=True) + # timeLeft = 3 + # while timeLeft > 0: + # print(str(timeLeft)+" ", end =" ", flush=True) + # time.sleep(1) + # # try: + # # await websocket.ping() + # # except Exception as e: + # # print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e)) + # # break + # timeLeft -= 1 + + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return + + conn_msg = { + "event": "connected", + "protocol": "Call", + "version": "1.0.0" + } + + try: + print("send connect", flush=True) + await websocket.send(json.dumps(conn_msg)) + except Exception as e: + print(str(datetime.datetime.now())+" connected message "+str(e)) + + seq_num = 1 + + start_msg = { + "event": "start", + "sequenceNumber": seq_num, + "start": { + "streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0", + "accountSid": "AC123", + "callSid": "CA123", + "tracks": ["inbound"], + "mediaFormat": { + "encoding": "audio/x-mulaw", + "sampleRate": 8000, + "channels": 1 + } + }, + "streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0" + } + seq_num = seq_num+1 + + try: + print("send start", flush=True) + await websocket.send(json.dumps(start_msg)) + except Exception as e: + print(str(datetime.datetime.now())+" connected message "+str(e)) + + inb_ts = 0 + inb_chk = 1 + inbound_bytes = file1.read(44) + while True: + inbound_bytes = file1.read(512) + if not inbound_bytes: + break + if(inb_chk == 1): + print("4 inbound bytes: {} {} {} {}".format(inbound_bytes[0], inbound_bytes[1], inbound_bytes[2], inbound_bytes[3])) + encoded_data = base64.b64encode(inbound_bytes).decode('ascii') + + media_msg = { + "event": "media", + "sequenceNumber": seq_num, + "media": { + "track": "inbound", + "chunk": inb_chk, + "timestamp": inb_ts, + "payload": encoded_data + }, + "streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0" + } + + try: + #print("send media inbound {}".format(inb_ts), flush=True) + if(inb_chk <= 20): + print("send media inbound {}".format(json.dumps(media_msg)), flush=True) + await websocket.send(json.dumps(media_msg)) + except Exception as e: + print(str(datetime.datetime.now())+" connected message "+str(e)) + break + seq_num = seq_num+1 + inb_chk = inb_chk+1 + inb_ts = inb_ts+64 + + time.sleep(0.064) + print(".", end =" ", flush=True) + + # if(inb_ts > 15000): + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return + + stop_msg = { + "event": "stop", + "sequenceNumber": seq_num, + "stop": { + "accountSid": "AC123", + "callSid": "CA123" + }, + "streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0" + } + + try: + print("send stop", flush=True) + await websocket.send(json.dumps(stop_msg)) + except Exception as e: + print(str(datetime.datetime.now())+" connected message "+str(e)) + + time.sleep(3.0) + await websocket.close() + except Exception as e: + print("Exception when sending audio via websocket: "+str(e)) # usually because the session closed due to NOMATCH or NOINPUT + + print(str(datetime.datetime.now())+" done streaming audio", flush=True) + + + # thread that connects to the websocket and receives the results +# we do it in a separate thread because in the main thread we are streaming the audio +class wsThread (threading.Thread): + def __init__(self, ws_uri, prefix): + threading.Thread.__init__(self) + self.ws_uri = ws_uri + self.prefix = prefix + self.stack = [] + def run(self): + print (self.prefix+" Starting WS receive thread "+str(datetime.datetime.now()), flush=True) + try: + asyncio.new_event_loop().run_until_complete(websocket_receive(self.ws_uri, self.stack, self.prefix)) + except Exception as e: + print(e, flush=True) + print (self.prefix+" Exiting WS receive thread "+str(datetime.datetime.now()), flush=True) + +# function that connects to the websocket and receives the results +async def websocket_receive(uri, stack, prefix): + async with websockets.connect(uri) as websocket: + try: + print(prefix+" connected to "+uri, flush=True) + while True: + ws_msg = await websocket.recv() + process_ws_msg(ws_msg, stack, prefix) + except Exception as e: + print(e, flush=True) + + +# create and start the websocket thread +threadWsLeft = wsThread(ws_url_left, "L >>\t") +threadWsLeft.start() + +# stream audio +asyncio.get_event_loop().run_until_complete( stream_audio() ) + +# wait for websocket thread to join +print("") +print("Waiting to join Left "+str(datetime.datetime.now()), flush=True) +threadWsLeft.join() +print("Joined Left "+str(datetime.datetime.now()), flush=True) + +print("sleeping 10 seconds to ", flush=True ) +time.sleep(10.0) +print("done", flush=True) \ No newline at end of file diff --git a/new-examples/real-time/rt-2chn-simulated-twiml-f32.py b/new-examples/real-time/rt-2chn-simulated-twiml-f32.py index a8aaadb9..470c3336 100644 --- a/new-examples/real-time/rt-2chn-simulated-twiml-f32.py +++ b/new-examples/real-time/rt-2chn-simulated-twiml-f32.py @@ -22,6 +22,9 @@ inbound_audio = f"{inputFolder}/{inputFile}" outbound_audio = f"{inputFolder}/{inputFile2}" +print("inbound_audio: {}".format(inbound_audio)) +print("outbound_audio: {}".format(outbound_audio)) + headers = {"Authorization":JWT} # new transcription session request # it specifies audio input via an RTP stream @@ -60,7 +63,7 @@ # this is needed for a browser # for phone we would have PCMU "format": "F32", - "channel" : "stereo", + "channels" : "stereo", # this is needed for a browser # for phone we would have 8000 "rate": 16000, @@ -153,6 +156,25 @@ async def stream_audio(): try: print(str(datetime.datetime.now())+" sender connected", flush=True) + + # print("sleeping 3 seconds before shutting down websocket", flush=True) + # timeLeft = 3 + # while timeLeft > 0: + # print(str(timeLeft)+" ", end =" ", flush=True) + # time.sleep(1) + # # try: + # # await websocket.ping() + # # except Exception as e: + # # print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e)) + # # break + # timeLeft -= 1 + + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return + + conn_msg = { "event": "connected", "protocol": "Call", @@ -266,9 +288,15 @@ async def stream_audio(): time.sleep(0.008) print(".", end =" ", flush=True) - if(outb_ts > 8000): + if(outb_ts > 60000): break + # if(inb_ts > 15000): + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return + stop_msg = { "event": "stop", diff --git a/new-examples/real-time/rt-2chn-simulated-twiml-pcmu.py b/new-examples/real-time/rt-2chn-simulated-twiml-pcmu.py index 4a90ae0e..1b9eb83d 100644 --- a/new-examples/real-time/rt-2chn-simulated-twiml-pcmu.py +++ b/new-examples/real-time/rt-2chn-simulated-twiml-pcmu.py @@ -56,7 +56,7 @@ } ], "audio": { - "source": { "stream": { "protocol": "TWIML" } }, + "source": { "stream": { "protocol": "TWIML", "noAudioTimeout" : 150000 } }, "format": "PCMU", "channel" : "stereo", "rate": 8000, @@ -151,6 +151,24 @@ async def stream_audio(): try: print(str(datetime.datetime.now())+" sender connected", flush=True) + + # print("sleeping 3 seconds before shutting down websocket", flush=True) + # timeLeft = 3 + # while timeLeft > 0: + # print(str(timeLeft)+" ", end =" ", flush=True) + # time.sleep(1) + # # try: + # # await websocket.ping() + # # except Exception as e: + # # print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e)) + # # break + # timeLeft -= 1 + + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return + conn_msg = { "event": "connected", "protocol": "Call", @@ -260,9 +278,11 @@ async def stream_audio(): time.sleep(0.064) print(".", end =" ", flush=True) - # if(outb_ts > 8000): - # break - + # if(outb_ts > 15000): + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return stop_msg = { "event": "stop", @@ -326,10 +346,15 @@ async def websocket_receive(uri, stack, prefix): asyncio.get_event_loop().run_until_complete( stream_audio() ) # wait for websocket thread to join +print("") print("Waiting to join Left "+str(datetime.datetime.now()), flush=True) threadWsLeft.join() print("Joined Left "+str(datetime.datetime.now()), flush=True) +print("") print("Waiting to join Right "+str(datetime.datetime.now()), flush=True) threadWsRight.join() print("Joined Right "+str(datetime.datetime.now()), flush=True) +print("sleeping 10 seconds to ", flush=True ) +time.sleep(10.0) +print("done", flush=True) \ No newline at end of file diff --git a/new-examples/real-time/rt-websocket-1chn-ffmpeg.py b/new-examples/real-time/rt-websocket-1chn-ffmpeg.py index e1c84445..b43fe458 100644 --- a/new-examples/real-time/rt-websocket-1chn-ffmpeg.py +++ b/new-examples/real-time/rt-websocket-1chn-ffmpeg.py @@ -69,14 +69,14 @@ "acousticModelRealTime" : acousticModelRealTime, "noInputTimeout": 59999, "incompleteTimeout": 3599999, - "sensitivity": 0.5, - "hints": [ - "Starburst:10", - "Mars_Wrigley:10", - "contacting:8", - "Mars_Consumer_Care:10", - "mints:8" - ] + "sensitivity": 0.5 + # ,"hints": [ + # "Starburst:10", + # "Mars_Wrigley:10", + # "contacting:8", + # "Mars_Consumer_Care:10", + # "mints:8" + # ] } # ,"formatters": [ # { @@ -168,7 +168,7 @@ def process_ws_msg(wsMsg, stack, prefix): else: # simple utterance stack.append(utter) - if( len(stack) > 50 ): + if( len(stack) > 500 ): while(len(stack) > 30): stack.pop(0) diff --git a/new-examples/real-time/rt-websocket-2chn-ffmpeg.py b/new-examples/real-time/rt-websocket-2chn-ffmpeg.py index dd30dffa..00197b44 100644 --- a/new-examples/real-time/rt-websocket-2chn-ffmpeg.py +++ b/new-examples/real-time/rt-websocket-2chn-ffmpeg.py @@ -261,28 +261,28 @@ async def stream_audio(file_name, audio_ws_url): ping_timeout=None ) as websocket: try: - print(str(datetime.datetime.now())+" connected", flush=True) - - # print("sleeping 35 seconds to trigger timeout", flush=True) - # timeLeft = 61 - # while timeLeft > 0: - # print(str(timeLeft)+" ", end =" ", flush=True) - # time.sleep(1) - # try: - # await websocket.ping() - # except Exception as e: - # print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e)) - # break - # timeLeft -= 1 - - # # test websocket close before sending audio - # await websocket.close() - # print(str(datetime.datetime.now())+" closed", flush=True) - # return + print(str(datetime.datetime.now())+" audio websocket connected", flush=True) + + print("sleeping 3 seconds to trigger timeout", flush=True) + timeLeft = 3 + while timeLeft > 0: + print(str(timeLeft)+" ", end =" ", flush=True) + time.sleep(1) + try: + await websocket.ping() + except Exception as e: + print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e)) + break + timeLeft -= 1 + + # test websocket close before sending audio + await websocket.close() + print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + return global startTime startTime = time.time() - n_buf = 1 * 1024 + n_buf = 128 #1 * 1024 byte_buf = f.read(n_buf) start = time.time() epoch_start_audio_stream = start @@ -306,15 +306,20 @@ async def stream_audio(file_name, audio_ws_url): time.sleep(time_to_wait) # to simulate real time streaming byte_buf = f.read(n_buf) - # if(not slept and elapsed_time_fl > 10): - # print("sleeping 35 seconds to trigger timeout", flush=True) - # left = 35 + # if(not slept and elapsed_time_fl > 0.0001): + # print("elapsed time: "+str(elapsed_time_fl)) + # print("sleeping 3 seconds to trigger timeout", flush=True) + # left = 3 # while left > 0: # print(str(left)+" ", end =" ", flush=True) # time.sleep(1) # left -= 1 # start += 35 # slept = True + # # test websocket close before sending audio + # await websocket.close() + # print(str(datetime.datetime.now())+" audio websocket closed", flush=True) + # return # global session_id_left, session_id_right # body = {"pause": {"action" : "stop"}} @@ -404,7 +409,7 @@ def process_audio(file_name): asyncio.get_event_loop().run_until_complete( stream_audio(file_name, web_res["audio_ws_url"]) ) # wait for websocket thread to join - print("waiting for websocket threads to join", flush=True) + print("waiting for receiving websocket threads to join", flush=True) threadWsLeft.join() threadWsRight.join() print(f"END processing: {file_name}") @@ -415,3 +420,6 @@ def process_audio(file_name): process_audio(inputFilePath) +print("sleeping 60 seconds", flush=True ) +time.sleep(60) +print("done sleeping", flush=True ) \ No newline at end of file