-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexternal_tasks.py
98 lines (77 loc) · 3.46 KB
/
external_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
from twilio.rest import Client
from bot_whatapp import dao
from bot_whatapp.app import get_connection
from custom_external_task import CustomExternalTasks
from custom_external_task_worker import CustomExternalTaskWorker
from custom_task_result import CustomTaskResult
from server import worker
logger = logging.getLogger("bot_whatsapp")
logger.setLevel(logging.DEBUG)
default_config = {
"maxTasks": 1,
"lockDuration": 60000,
"asyncResponseTimeout": 60000,
"retries": 3,
"retryTimeout": 5000,
"sleepSeconds": 10,
"isDebug": True,
}
account_sid = "ACb759d51fbda66b9c0265a32f6ceff4dc"
auth_token = "a8e49aae941aea30263b92ca4ace522a"
twilio_client = Client(account_sid, auth_token)
@worker
def update_customer(task: CustomExternalTasks) -> CustomTaskResult:
logger.info("<get_customer> Atualizando o cliente ...")
variables = task.get_variables()
customer = variables["customer"]
gender_opt = int(variables["body"])
logger.info(variables)
with get_connection() as conn:
dao.CustomerDAO(conn).update_customer(customer["customer_id"], gender_opt)
conn.commit()
customer["gender_opt"] = gender_opt
return variables
@worker
def process_audio(task: CustomExternalTasks) -> CustomTaskResult:
logger.info("<get_customer> Processando o audio ...")
variables = task.get_variables()
logger.info(variables)
return variables
@worker
def finish_flow(task: CustomExternalTasks) -> CustomTaskResult:
logger.info("<get_customer> Finalizando o flow ...")
variables = task.get_variables()
logger.info(variables)
with get_connection() as conn:
dao.FlowDAO(conn).finish_flow(instance_id=task.get_process_instance_id())
conn.commit()
return variables
@worker
def send_message(task: CustomExternalTasks) -> CustomTaskResult:
variables = task.get_variables()
properties = task.get_extension_properties()
logger.info(f"<send_message> Enviando mensagem para {variables['profile_name']}({variables['phone_number']}) ...")
message = properties["message"].replace("\\n", "\n")
# logger.info(f" ====> {message}")
results = twilio_client.messages.create(body=message, from_="whatsapp:+14155238886", to=variables["phone_number"])
logger.info(results.sid)
variables.setdefault("send_results", [])
variables["send_results"].append({"task_id": task.get_activity_id(), "sid": results.sid})
return variables
def run_worker(worker_name):
worker = CustomExternalTaskWorker(worker_id="1", config=default_config)
worker_func = getattr(sys.modules[__name__], worker_name)
worker.subscribe(worker_name, worker_func)
worker_list = ["send_message", "update_customer", "process_audio", "finish_flow"]
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=30) as executor:
executor.map(run_worker, worker_list)
# results = twilio_client.messages.create(
# body="Olá! Sou a Loro AI! 😎😊\n\nTraduzo seus áudios em inglês e português no WhatsApp! 🎉🚀\n\nCom a Loro, você pode quebrar barreiras, se comunicando com pessoas em outros países sem ser fluente em inglês. Que maravilha, né?🤓💬\n\nEnvie seu áudio em inglês ou português e receba traduzidinho! 🎉\n\nSolte a voz - ou encaminhe um áudio - e veja a mágica acontecer! 🧙♀️💫",
# from_="whatsapp:+14155238886",
# to="whatsapp:+554899983088",
# )
# print(results)