Skip to content

Commit

Permalink
mores examples and test code for real-time websocket and TWIML transc…
Browse files Browse the repository at this point in the history
…ription
  • Loading branch information
DrJJ committed Sep 11, 2023
1 parent ccedfd2 commit e0667bd
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 38 deletions.
300 changes: 300 additions & 0 deletions new-examples/real-time/rt-1chn-simulated-twiml-pcmu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
#pip install websockets

import requests, time, os, json
import threading
import asyncio
import websockets
import datetime
import base64
import configparser

cfg = configparser.ConfigParser()
cfg.read("config.ini")
configSection = cfg.get("DEFAULT", "CONFIG")
protocol = cfg.get(configSection, "PROTOCOL")
hostPort = cfg.get(configSection, "HOSTPORT")
JWT = cfg.get(configSection, "JWT")
urlPrefix = cfg.get(configSection, "URLPREFIX")
inputFolder = cfg.get("DEFAULT", "INPUTFOLDER")
inputFile = cfg.get("DEFAULT", "INPUTFILE")

inbound_audio = f"{inputFolder}/{inputFile}"

headers = {"Authorization":JWT}
# new transcription session request
# it specifies audio input via an TWIML stream
# and output is via a plain websocket
body = {
"sessions": [
{
"asyncMode": "REAL-TIME",
"audioChannelSelector" : "mix",
"websocket": {
"adHoc": 'true',
"useSTOMP" : 'false',
"minimumDelay": 0
},
"content" : {
"incremental" : ['words'],
"full" : []
}
}
],
"audio": {
"source": { "stream": { "protocol": "TWIML", "noAudioTimeout" : 150000 } },
"format": "PCMU",
"channel" : "mono",
"rate": 8000,
"capture": 'true'
},
"settings": {
"asr": {
"noInputTimeout": 60000,
"completeTimeout": 0
}
}
}

print("making request", flush=True)
url = "{}://{}/{}/asr/transcribe/async".format(protocol, hostPort, urlPrefix)
init_response_raw = requests.post(url, json=body, headers=headers)
print("done request: {}".format(init_response_raw), flush=True)
print("response headers: {}".format(init_response_raw.headers), flush=True)


init_response = init_response_raw.json()
if(init_response.get("sessions") is None):
print("did not obtain session")
print(init_response_raw.status_code)
print(init_response_raw.text)
exit()

# retrieve values from response
# sessionId and capturedAudio are printed for debugging purposes
session_id_left = init_response["sessions"][0]["sessionId"]
ws_url_left = init_response["sessions"][0]["websocket"]["url"]
audio_ws_url = init_response["audio"]["stream"]["websocketUrl"]
capturedAudio = init_response["audio"].get("capturedAudio")

print(" SessionId L: {}".format(session_id_left))
print(" Audio webscocket: {}".format(audio_ws_url))

if( not(capturedAudio is None)):
print("captured audio id: {}".format(capturedAudio))
print("Result Websocket Url L: {}".format(ws_url_left), flush=True)

msgCnt = 0

# function to process JSON with incremental transcription results sent as messages over websocket
def process_ws_msg(wsMsg, stack, prefix):
global msgCnt
msgCnt += 1
# uncomment this to see raw messages
# print(prefix+" "+wsMsg, flush=True)

try:
data = json.loads(wsMsg)
utter = data.get('utt')
if( utter is None ):
toDel = data.get('del')
if( toDel is None):
# unknown edit
print("EDIT->"+wsMsg, flush=True)
else:
# delete and edits
for i in range(toDel):
stack.pop()
edits = data.get('edit')
if(not (edits is None)):
for edit in edits:
utter = edit.get('utt')
stack.append(utter)
else:
# simple utterance
stack.append(utter)
print("\t"+prefix+" \t"+' '.join(stack), flush=True)
except Exception as e:
print("ERROR: "+str(e), flush=True)

# function to read audio from file and stream to Voicegain via RTP - we are using ffmpeg to do all the work
# note that we are using the -re parameter to stream at about real-time speed
async def stream_audio():
print("stream_audio start", flush=True)
print("Inbound audio file: {}".format(inbound_audio), flush=True)
with open(inbound_audio, mode='rb') as file1:
async with websockets.connect(audio_ws_url,
# we need to lower the buffer size - otherwise the sender will buffer for too long
write_limit=480,
# compression needs to be disabled otherwise will buffer for too long
compression=None) as websocket:
try:
print(str(datetime.datetime.now())+" sender connected", flush=True)


# print("sleeping 3 seconds before shutting down websocket", flush=True)
# timeLeft = 3
# while timeLeft > 0:
# print(str(timeLeft)+" ", end =" ", flush=True)
# time.sleep(1)
# # try:
# # await websocket.ping()
# # except Exception as e:
# # print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e))
# # break
# timeLeft -= 1

# # test websocket close before sending audio
# await websocket.close()
# print(str(datetime.datetime.now())+" audio websocket closed", flush=True)
# return

conn_msg = {
"event": "connected",
"protocol": "Call",
"version": "1.0.0"
}

try:
print("send connect", flush=True)
await websocket.send(json.dumps(conn_msg))
except Exception as e:
print(str(datetime.datetime.now())+" connected message "+str(e))

seq_num = 1

start_msg = {
"event": "start",
"sequenceNumber": seq_num,
"start": {
"streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0",
"accountSid": "AC123",
"callSid": "CA123",
"tracks": ["inbound"],
"mediaFormat": {
"encoding": "audio/x-mulaw",
"sampleRate": 8000,
"channels": 1
}
},
"streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0"
}
seq_num = seq_num+1

try:
print("send start", flush=True)
await websocket.send(json.dumps(start_msg))
except Exception as e:
print(str(datetime.datetime.now())+" connected message "+str(e))

inb_ts = 0
inb_chk = 1
inbound_bytes = file1.read(44)
while True:
inbound_bytes = file1.read(512)
if not inbound_bytes:
break
if(inb_chk == 1):
print("4 inbound bytes: {} {} {} {}".format(inbound_bytes[0], inbound_bytes[1], inbound_bytes[2], inbound_bytes[3]))
encoded_data = base64.b64encode(inbound_bytes).decode('ascii')

media_msg = {
"event": "media",
"sequenceNumber": seq_num,
"media": {
"track": "inbound",
"chunk": inb_chk,
"timestamp": inb_ts,
"payload": encoded_data
},
"streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0"
}

try:
#print("send media inbound {}".format(inb_ts), flush=True)
if(inb_chk <= 20):
print("send media inbound {}".format(json.dumps(media_msg)), flush=True)
await websocket.send(json.dumps(media_msg))
except Exception as e:
print(str(datetime.datetime.now())+" connected message "+str(e))
break
seq_num = seq_num+1
inb_chk = inb_chk+1
inb_ts = inb_ts+64

time.sleep(0.064)
print(".", end =" ", flush=True)

# if(inb_ts > 15000):
# # test websocket close before sending audio
# await websocket.close()
# print(str(datetime.datetime.now())+" audio websocket closed", flush=True)
# return

stop_msg = {
"event": "stop",
"sequenceNumber": seq_num,
"stop": {
"accountSid": "AC123",
"callSid": "CA123"
},
"streamSid": "MZ18ad3ab5a668481ce02b83e7395059f0"
}

try:
print("send stop", flush=True)
await websocket.send(json.dumps(stop_msg))
except Exception as e:
print(str(datetime.datetime.now())+" connected message "+str(e))

time.sleep(3.0)
await websocket.close()
except Exception as e:
print("Exception when sending audio via websocket: "+str(e)) # usually because the session closed due to NOMATCH or NOINPUT

print(str(datetime.datetime.now())+" done streaming audio", flush=True)


# thread that connects to the websocket and receives the results
# we do it in a separate thread because in the main thread we are streaming the audio
class wsThread (threading.Thread):
def __init__(self, ws_uri, prefix):
threading.Thread.__init__(self)
self.ws_uri = ws_uri
self.prefix = prefix
self.stack = []
def run(self):
print (self.prefix+" Starting WS receive thread "+str(datetime.datetime.now()), flush=True)
try:
asyncio.new_event_loop().run_until_complete(websocket_receive(self.ws_uri, self.stack, self.prefix))
except Exception as e:
print(e, flush=True)
print (self.prefix+" Exiting WS receive thread "+str(datetime.datetime.now()), flush=True)

# function that connects to the websocket and receives the results
async def websocket_receive(uri, stack, prefix):
async with websockets.connect(uri) as websocket:
try:
print(prefix+" connected to "+uri, flush=True)
while True:
ws_msg = await websocket.recv()
process_ws_msg(ws_msg, stack, prefix)
except Exception as e:
print(e, flush=True)


# create and start the websocket thread
threadWsLeft = wsThread(ws_url_left, "L >>\t")
threadWsLeft.start()

# stream audio
asyncio.get_event_loop().run_until_complete( stream_audio() )

# wait for websocket thread to join
print("")
print("Waiting to join Left "+str(datetime.datetime.now()), flush=True)
threadWsLeft.join()
print("Joined Left "+str(datetime.datetime.now()), flush=True)

print("sleeping 10 seconds to ", flush=True )
time.sleep(10.0)
print("done", flush=True)
32 changes: 30 additions & 2 deletions new-examples/real-time/rt-2chn-simulated-twiml-f32.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
inbound_audio = f"{inputFolder}/{inputFile}"
outbound_audio = f"{inputFolder}/{inputFile2}"

print("inbound_audio: {}".format(inbound_audio))
print("outbound_audio: {}".format(outbound_audio))

headers = {"Authorization":JWT}
# new transcription session request
# it specifies audio input via an RTP stream
Expand Down Expand Up @@ -60,7 +63,7 @@
# this is needed for a browser
# for phone we would have PCMU
"format": "F32",
"channel" : "stereo",
"channels" : "stereo",
# this is needed for a browser
# for phone we would have 8000
"rate": 16000,
Expand Down Expand Up @@ -153,6 +156,25 @@ async def stream_audio():
try:
print(str(datetime.datetime.now())+" sender connected", flush=True)


# print("sleeping 3 seconds before shutting down websocket", flush=True)
# timeLeft = 3
# while timeLeft > 0:
# print(str(timeLeft)+" ", end =" ", flush=True)
# time.sleep(1)
# # try:
# # await websocket.ping()
# # except Exception as e:
# # print(str(datetime.datetime.now())+" Exception 0 when sending ping via websocket: "+str(e))
# # break
# timeLeft -= 1

# # test websocket close before sending audio
# await websocket.close()
# print(str(datetime.datetime.now())+" audio websocket closed", flush=True)
# return


conn_msg = {
"event": "connected",
"protocol": "Call",
Expand Down Expand Up @@ -266,9 +288,15 @@ async def stream_audio():
time.sleep(0.008)
print(".", end =" ", flush=True)

if(outb_ts > 8000):
if(outb_ts > 60000):
break

# if(inb_ts > 15000):
# # test websocket close before sending audio
# await websocket.close()
# print(str(datetime.datetime.now())+" audio websocket closed", flush=True)
# return


stop_msg = {
"event": "stop",
Expand Down
Loading

0 comments on commit e0667bd

Please sign in to comment.