From c54b63d48d0a6014fbe49f37a7b21d3a97f191ac Mon Sep 17 00:00:00 2001 From: Ryan Lewis Date: Mon, 11 Sep 2023 16:14:00 -0500 Subject: [PATCH 1/3] Refactor: port ot2_rest_client.py from ot2_module --- scripts/ot2_rest_client.py | 390 +++++++++++++++++++++++++++++++++++++ 1 file changed, 390 insertions(+) create mode 100644 scripts/ot2_rest_client.py diff --git a/scripts/ot2_rest_client.py b/scripts/ot2_rest_client.py new file mode 100644 index 0000000..d24d1b2 --- /dev/null +++ b/scripts/ot2_rest_client.py @@ -0,0 +1,390 @@ +#! /usr/bin/env python3 +"""OT2 Node""" +import os +import glob +import json +import traceback +import yaml +from typing import List, Tuple +from pathlib import Path +from datetime import datetime +from copy import deepcopy +import time + + +from wei_services.srv import WeiActions, WeiDescription +from std_msgs.msg import String + +from ot2_driver.ot2_driver_http import OT2_Config, OT2_Driver +import opentrons.simulate +from opentrons.simulate import format_runlog +from urllib.error import HTTPError, URLError +from urllib3.exceptions import ConnectionError, ConnectTimeoutError +from urllib3.connection import HTTPException, HTTPConnection +import requests + +#! /usr/bin/env python3 + + +from time import sleep +import threading +import asyncio + +from time import sleep +import json + +from threading import Thread + + +"""The server that takes incoming WEI flow requests from the experiment application""" +import json +from argparse import ArgumentParser +from contextlib import asynccontextmanager +import time +from fastapi import FastAPI, File, Form, UploadFile +from fastapi.responses import JSONResponse + +workcell = None +global sealer, state +serial_port = "/dev/ttyUSB0" +local_ip = "parker.alcf.anl.gov" +local_port = "8000" + +global ot2 +resources_folder_path = "" +protocols_folder_path = "" +node_name = "" +resource_file_path = "" +ip = "" + + +def check_protocols_folder(): + """ + Description: Checks if the protocols folder path exists. Creates the resource folder path if it doesn't already exists + """ + global protocols_folder_path + isPathExist = os.path.exists(protocols_folder_path) + if not isPathExist: + os.makedirs(protocols_folder_path) + + +def check_resources_folder(): + """ + Description: Checks if the resources folder path exists. Creates the resource folder path if it doesn't alIDLE exists + """ + global resources_folder_path + isPathExist = os.path.exists(resources_folder_path) + if not isPathExist: + os.makedirs(resources_folder_path) + # get_lresource_file_pathth.exists(protocols_folder_path) + if not isPathExist: + os.makedirs(protocols_folder_path) + # get_logger().warn("Protocols path doesn't exists") + print("Creating: " + protocols_folder_path) + + +def connect_robot(): + global ot2, state, node_name, ip + try: + ot2 = OT2_Driver(OT2_Config(ip=ip)) + + except ConnectTimeoutError as connection_err: + state = "ERROR" + print("Connection error code: " + connection_err) + + except HTTPError as http_error: + print("HTTP error code: " + http_error) + + except URLError as url_err: + print("Url error code: " + url_err) + + except requests.exceptions.ConnectionError as conn_err: + print("Connection error code: " + str(conn_err)) + + except Exception as error_msg: + state = "ERROR" + print("-------" + str(error_msg) + " -------") + + else: + print(str(node_name) + " online") + + +def download_config_files(protocol_config: str, resource_config=None): + """ + Saves protocol_config string to a local yaml file locaton + + Parameters: + ----------- + protocol_config: str + String contents of yaml protocol file + + Returns + ----------- + config_file_path: str + Absolute path to generated yaml file + """ + global node_name, resource_file_path + config_dir_path = Path.home().resolve() / protocols_folder_path + config_dir_path.mkdir(exist_ok=True, parents=True) + + resource_dir_path = Path.home().resolve() / resources_folder_path + resource_dir_path.mkdir(exist_ok=True, parents=True) + + time_str = datetime.now().strftime("%Y%m%d-%H%m%s") + config_file_path = config_dir_path / f"protocol-{time_str}.yaml" + + print("Writing protocol config to {} ...".format(str(config_file_path))) + + with open(config_file_path, "w", encoding="utf-8") as pc_file: + yaml.dump(protocol_config, pc_file, indent=4, sort_keys=False) + if resource_config: + resource_file_path = resource_dir_path / f"resource-{node_name}-{time_str}.json" + with open(resource_config) as resource_content: + content = json.load(resource_content) + json.dump(content, resource_file_path.open("w")) + return config_file_path, resource_file_path + else: + return config_file_path, None + + +def execute(protocol_path, payload=None, resource_config=None): + """ + Compiles the yaml at protocol_path into .py file; + Transfers and Exececutes the .py file + + Parameters: + ----------- + protocol_path: str + absolute path to the yaml protocol + + Returns + ----------- + response: bool + If the ot2 execution was successful + """ + + global run_id, node_name, protocols_folder_path, resources_folder_path + try: + (protocol_file_path, resource_file_path,) = ot2.compile_protocol( + protocol_path, + payload=payload, + resource_file=resource_config, + resource_path=resources_folder_path, + protocol_out_path=protocols_folder_path, + ) + protocol_file_path = Path(protocol_file_path) + print(f"{protocol_file_path.resolve()=}") + protocol_id, run_id = ot2.transfer(protocol_file_path) + print("OT2 " + node_name + " protocol transfer successful") + resp = ot2.execute(run_id) + print("OT2 " + node_name + " executed a protocol") + # get_logger().warn(str(resp)) + + if resp["data"]["status"] == "succeeded": + # poll_OT2_until_run_completion() + response_msg = "OT2 " + node_name + " successfully IDLE running a protocol" + return True, response_msg + + else: + response_msg = "OT2 " + node_name + " failed running a protocol" + return False, response_msg + + except Exception as err: + + if "no route to host" in str(err.args).lower(): + response_msg = "No route to host error. Ensure that this container \ + has network access to the robot and that the environment \ + variable, robot_ip, matches the ip of the connected robot \ + on the shared LAN." + print(response_msg) + + response_msg = f"Error: {traceback.format_exc()}" + print(response_msg) + return False, response_msg + + +def poll_OT2_until_run_completion(): + """Queries the OT2 run state until reported as 'succeeded'""" + global run_id + print("Polling OT2 run until completion") + while state != "IDLE": + + run_status = ot2.get_run(run_id) + + if run_status["data"]["status"] and run_status["data"]["status"] == "succeeded": + state = "IDLE" + print("Stopping Poll") + + elif run_status["data"]["status"] and run_status["data"]["status"] == "running": + state = "BUSY" + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global ot2, state, node_name, resources_folder_path, protocols_folder_path, ip + """Initial run function for the app, parses the worcell argument + Parameters + ---------- + app : FastApi + The REST API app being initialized + + Returns + ------- + None""" + parser = ArgumentParser() + parser.add_argument("--node_name", type=str, help="Name of the Node") + parser.add_argument("--ip", type=str, help="ip value") + args = parser.parse_args() + node_name = args.node_name + ip = args.ip + state = "UNKNOWN" + resources_folder_path = "/home/rpl/.ot2_temp/" + node_name + "/" + "resources/" + protocols_folder_path = "/home/rpl/.ot2_temp/" + node_name + "/" + "protocols/" + check_resources_folder() + check_protocols_folder() + connect_robot() + state = "IDLE" + description = { + "name": node_name, + "type": "", + "actions": { + "execute": "config : %s", ## takes in the yaml content as second string arg + "run_protocol": "config_path: %s", ## Temp inclusion + }, + } + yield + pass + + +app = FastAPI( + lifespan=lifespan, +) + + +@app.get("/state") +def get_state(): + global sealer + return JSONResponse(content={"State": state}) + + +@app.get("/description") +async def description(): + global state + return JSONResponse(content={"State": state}) + + +@app.get("/resources") +async def resources(): + global resource_file_path + resource_info = "" + if not (resource_file_path == ""): + with open(resource_file_path) as f: + resource_info = f.read() + return JSONResponse(content={"State": resource_info}) + + +@app.post("/action") +def do_action(action_handle: str, action_vars): + global ot2, state + response = {"action_response": "", "action_msg": "", "action_log": ""} + if state == "ERROR": + msg = "Can not accept the job! OT2 CONNECTION ERROR" + # get_logger.error(msg) + response["action_response"] = -1 + response["action_msg"] = msg + return response + + while state != "IDLE": + # get_logger().warn("Waiting for OT2 to switch IDLE state...") + time.sleep(0.5) + + state = "BUSY" + action_command = action_handle + action_vars = json.loads(action_vars) + print(f"{action_vars=}") + + print(f"In action callback, command: {action_command}") + + if "run_protocol" == action_command: + + protocol_config = action_vars.get("config_path", None) + resource_config = action_vars.get( + "resource_path", None + ) + resource_file_flag = action_vars.get( + "use_existing_resources", "False" + ) # Returns True to use a resource file or False to not use a resource file. + + if resource_file_flag: + try: + list_of_files = glob.glob( + resources_folder_path + "*.json" + ) # Get list of files + if len(list_of_files) > 0: + resource_config = max( + list_of_files, key=os.path.getctime + ) # Finding the latest added file + print("Using the resource file: " + resource_config) + + except Exception as er: + print(er) + if protocol_config: + config_file_path, resource_config_path = download_config_files( + protocol_config, resource_config + ) + payload = deepcopy(action_vars) + payload.pop("config_path") + + print(f"ot2 {payload=}") + print(f"config_file_path: {config_file_path}") + + response_flag, response_msg = execute( + config_file_path, payload, resource_config_path + ) + + if response_flag == True: + state = "IDLE" + response["action_response"] = 0 + response["action_msg"] = response_msg + # if resource_config_path: + # response.resources = str(resource_config_path) + + elif response_flag == False: + state = "ERROR" + response["action_response"] = -1 + response["action_msg"] = response_msg + # if resource_config_path: + # response.resources = str(resource_config_path) + + print("Finished Action: " + action_handle) + return response + + else: + response[ + "action_msg" + ] = "Required 'config' was not specified in action_vars" + response["action_response"] = -1 + print(response["action_msg"]) + state = "ERROR" + + return response + else: + msg = "UNKOWN ACTION REQUEST! Available actions: run_protocol" + response["action_response"] = -1 + response["action_msg"] = msg + print("Error: " + msg) + state = "IDLE" + + return response + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run( + "ot2_rest_client:app", + host="parker.cels.anl.gov", + port=2001, + reload=False, + ws_max_size=100000000000000000000000000000000000000, + ) From 721db66c3215e5916b6c908cae8a59d2caf336d1 Mon Sep 17 00:00:00 2001 From: Ryan Lewis Date: Mon, 11 Sep 2023 16:22:12 -0500 Subject: [PATCH 2/3] Address linter errors --- scripts/ot2_rest_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/ot2_rest_client.py b/scripts/ot2_rest_client.py index d24d1b2..90bf5dd 100644 --- a/scripts/ot2_rest_client.py +++ b/scripts/ot2_rest_client.py @@ -205,7 +205,7 @@ def execute(protocol_path, payload=None, resource_config=None): def poll_OT2_until_run_completion(): """Queries the OT2 run state until reported as 'succeeded'""" - global run_id + global run_id, state print("Polling OT2 run until completion") while state != "IDLE": From bd40610110d0b0796dd84c6b251975792917c28c Mon Sep 17 00:00:00 2001 From: Ryan Lewis Date: Mon, 11 Sep 2023 16:29:13 -0500 Subject: [PATCH 3/3] Fix erroneous global import --- scripts/ot2_rest_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/ot2_rest_client.py b/scripts/ot2_rest_client.py index 90bf5dd..1470c53 100644 --- a/scripts/ot2_rest_client.py +++ b/scripts/ot2_rest_client.py @@ -263,7 +263,7 @@ async def lifespan(app: FastAPI): @app.get("/state") def get_state(): - global sealer + global state return JSONResponse(content={"State": state})