Skip to content

Commit

Permalink
adding new rest improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tginsbu1 committed May 30, 2024
1 parent f86e3af commit 0045eb6
Showing 1 changed file with 60 additions and 198 deletions.
258 changes: 60 additions & 198 deletions src/ot2_rest_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,27 @@
import glob
import json
import os
import time
import traceback
from argparse import ArgumentParser
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import List
from typing_extensions import Annotated
from urllib.error import HTTPError, URLError

from wei.modules.rest_module import RESTModule
import requests
import yaml
from fastapi import FastAPI, UploadFile
from fastapi.responses import JSONResponse
from urllib3.exceptions import ConnectTimeoutError
from wei.core.data_classes import (
ModuleAbout,
ModuleAction,
ModuleActionArg,
ModuleActionFile,
ModuleStatus,
from wei.types.step_types import (
StepFileResponse,
StepResponse,
StepStatus,
ActionRequest
)
from wei.types.module_types import (
ModuleStatus
)
from fastapi.datastructures import State
from wei.helpers import extract_version

from ot2_driver.ot2_driver_http import OT2_Config, OT2_Driver
Expand All @@ -48,21 +44,21 @@
ip = ""


def check_protocols_folder():
def check_protocols_folder(protocols_folder_path):
"""
Description: Checks if the protocols folder path exists. Creates the resource folder path if it doesn't already exist
"""
global protocols_folder_path

isPathExist = os.path.exists(protocols_folder_path)
if not isPathExist:
os.makedirs(protocols_folder_path)


def check_resources_folder():
def check_resources_folder(resources_folder_path):
"""
Description: Checks if the resources folder path exists. Creates the resource folder path if it doesn't already exist
"""
global resources_folder_path

isPathExist = os.path.exists(resources_folder_path)
if not isPathExist:
os.makedirs(resources_folder_path)
Expand All @@ -71,15 +67,14 @@ def check_resources_folder():
print("Creating: " + protocols_folder_path)


def connect_robot():
global ot2, state, node_name, ip
def connect_robot(state: State):
try:
print(ip)
ot2 = OT2_Driver(OT2_Config(ip=ip))
state = ModuleStatus.IDLE
print(state.ip)
state.ot2 = OT2_Driver(OT2_Config(ip=state.ip))
state.status = ModuleStatus.IDLE

except ConnectTimeoutError as connection_err:
state = ModuleStatus.ERROR
state.status = ModuleStatus.ERROR
print("Connection error code: " + connection_err)

except HTTPError as http_error:
Expand All @@ -92,11 +87,11 @@ def connect_robot():
print("Connection error code: " + str(conn_err))

except Exception as error_msg:
state = ModuleStatus.ERROR
state.status = ModuleStatus.ERROR
print("-------" + str(error_msg) + " -------")

else:
print(str(node_name) + " online")
print(str(state.node_name) + " online")


def save_config_files(protocol: str, resource_config=None):
Expand Down Expand Up @@ -232,17 +227,18 @@ def poll_OT2_until_run_completion():
elif run_status["data"]["status"] and run_status["data"]["status"] == "running":
state = ModuleStatus.BUSY

rest_module = RESTModule(
name="ot2_node",
version=extract_version(Path(__file__).parent.parent / "pyproject.toml"),
description="A node to control the OT2 liquid handling robot",
model="ot2",
)

rest_module.arg_parser.add_argument( type=str, help="ot2 ip value")
rest_module.arg_parser.add_argument( "--ot2_port", type=int, help="ot2 port value")

@asynccontextmanager
async def lifespan(app: FastAPI):
global \
ot2, \
state, \
node_name, \
resources_folder_path, \
protocols_folder_path, \
logs_folder_path, \
ip
@rest_module.startup()
def ot2_startup(state: State):
"""Initial run function for the app, parses the workcell argument
Parameters
----------
Expand All @@ -252,146 +248,37 @@ async def lifespan(app: FastAPI):
Returns
-------
None"""
parser = ArgumentParser()
parser.add_argument("--alias", type=str, help="Name of the Node")
parser.add_argument("--host", type=str, help="Host for rest")
parser.add_argument("--ot2_ip", type=str, help="ip value")
parser.add_argument("--port", type=int, help="port value")
args = parser.parse_args()
node_name = args.alias
ip = args.ot2_ip
state = "UNKNOWN"

args = rest_module.arg_parser.parse_args()
state.node_name = args.name
state.ip = args.ot2_ip
state.status = "UNKNOWN"
temp_dir = Path.home() / ".wei" / ".ot2_temp"
temp_dir.mkdir(exist_ok=True)
resources_folder_path = str(temp_dir / node_name / "resources/")
protocols_folder_path = str(temp_dir / node_name / "protocols/")
logs_folder_path = str(temp_dir / node_name / "logs/")
check_resources_folder()
check_protocols_folder()
connect_robot()
state.resources_folder_path = str(temp_dir / state.node_name / "resources/")
state.protocols_folder_path = str(temp_dir / state.node_name / "protocols/")
state.logs_folder_path = str(temp_dir / state.node_name / "logs/")
check_resources_folder(state.resources_folder_path)
check_protocols_folder(state.protocols_folder_path)
connect_robot(state)
yield
pass


app = FastAPI(
lifespan=lifespan,
)


@app.get("/state")
def get_state():
global state
return JSONResponse(content={"State": state})


@app.get("/about")
async def about() -> ModuleAbout:
global node_name
return ModuleAbout(
name=node_name,
model="Opentrons OT2",
description="Opentrons OT2 Liquidhandling robot",
interface="wei_rest_node",
version=extract_version(Path(__file__).parent.parent / "pyproject.toml"),
actions=[
ModuleAction(
name="run_protocol",
description="Runs an Opentrons protocol (either python or YAML) on the connected OT2.",
args=[
ModuleActionArg(
name="resource_path",
description="Not currently implemented.",
type="[str, Path]",
required=False,
default=None,
),
ModuleActionArg(
name="use_existing_resources",
description="Whether or not to use the existing resources file (essentially, whether we've restocked or not).",
type="bool",
required=False,
default=False,
),
],
files=[
ModuleActionFile(
name="protocol",
required="True",
description="A protocol file to be run (either python or YAML) on the connected OT2.",
),
],
),
],
resource_pools=[],
)


@app.get("/resources")
async def resources():
global resource_file_path
resource_info = ""
if not (resource_file_path == ""):
with open(resource_file_path) as f:
resource_info = f.read()
return JSONResponse(content={"State": resource_info})

@rest_module.action(name="run_protocol", description="Run a provided protocol file")
def run_protocol(state: State, action: ActionRequest,
use_existing_resources: Annotated[bool, "Whether to use the existing resource file or restart"] = False
):
"""
Run a given protocol
"""



@app.post("/action")
def do_action(action_handle: str, action_vars: str, files: List[UploadFile] = []):
"""
Runs an action on the module
Parameters
----------
action_handle : str
The name of the action to be performed
action_vars : str
Any arguments necessary to run that action.
This should be a JSON object encoded as a string.
files: List[UploadFile] = []
Any files necessary to run the action defined by action_handle.
Returns
-------
response: StepResponse
A response object containing the result of the action
"""
global ot2, state
response = StepResponse()
if state == ModuleStatus.ERROR:
# Try to reconnect
check_resources_folder()
check_protocols_folder()
connect_robot()
if state == ModuleStatus.ERROR:
msg = "Can not accept the job! OT2 CONNECTION ERROR"
response.action_response = StepStatus.FAILED
response.action_msg = msg
return response

while state != ModuleStatus.IDLE:
# get_logger().warn("Waiting for OT2 to switch IDLE state...")
time.sleep(0.5)

state = ModuleStatus.BUSY
action_command = action_handle
action_vars = json.loads(action_vars)
print(f"{action_vars=}")

print(f"In action callback, command: {action_command}")

if "run_protocol" == action_command:
resource_config = action_vars.get(
"resource_path", None
) # TODO: This will be enabled in the future
resource_file_flag = action_vars.get(
"use_existing_resources", "False"
) # Returns True to use a resource file or False to not use a resource file.

if resource_file_flag:
if use_existing_resources:
try:
list_of_files = glob.glob(
resources_folder_path + "*.json"
state.resources_folder_path + "*.json"
) # Get list of files
if len(list_of_files) > 0:
resource_config = max(
Expand All @@ -404,7 +291,7 @@ def do_action(action_handle: str, action_vars: str, files: List[UploadFile] = []

# * Get the protocol file
try:
protocol = next(file for file in files if file.filename == "protocol")
protocol = next(file for file in action.files if file.filename == "protocol")
protocol = protocol.file.read().decode("utf-8")
except StopIteration:
protocol = None
Expand All @@ -415,7 +302,7 @@ def do_action(action_handle: str, action_vars: str, files: List[UploadFile] = []
config_file_path, resource_config_path = save_config_files(
protocol, resource_config
)
payload = deepcopy(action_vars)
payload = deepcopy(action.args)

print(f"ot2 {payload=}")
print(f"config_file_path: {config_file_path}")
Expand All @@ -429,7 +316,6 @@ def do_action(action_handle: str, action_vars: str, files: List[UploadFile] = []
Path(logs_folder_path).mkdir(parents=True, exist_ok=True)
with open(Path(logs_folder_path) / f"{run_id}.json", "w") as f:
json.dump(ot2.get_run_log(run_id), f, indent=2)
print("Finished Action: " + action_handle)
return StepFileResponse(
action_response=StepStatus.SUCCEEDED,
action_log=response_msg,
Expand All @@ -439,13 +325,14 @@ def do_action(action_handle: str, action_vars: str, files: List[UploadFile] = []
# response.resources = str(resource_config_path)

elif not response_flag:
state = ModuleStatus.ERROR
state.status = ModuleStatus.ERROR
response = StepResponse
response.action_response = StepStatus.FAILED
response.action_msg = response_msg
# if resource_config_path:
# response.resources = str(resource_config_path)

print("Finished Action: " + action_handle)
print("Finished Action: " + action.handle)
return response

else:
Expand All @@ -455,31 +342,6 @@ def do_action(action_handle: str, action_vars: str, files: List[UploadFile] = []
state = ModuleStatus.ERROR

return response
else:
msg = "UNKNOWN ACTION REQUEST! Available actions: run_protocol"
response.action_response = StepStatus.FAILED
response.action_msg = msg
print("Error: " + msg)
state = ModuleStatus.IDLE

return response


if __name__ == "__main__":
import uvicorn

parser = ArgumentParser()
parser.add_argument("--alias", type=str, help="Name of the Node", default="ot2")
parser.add_argument("--host", type=str, help="Host for rest", default="0.0.0.0")
parser.add_argument("--ot2_ip", type=str, help="ip value")
parser.add_argument("--port", type=int, help="port value", default=2005)
args = parser.parse_args()
node_name = args.alias
ip = args.ot2_ip
uvicorn.run(
"ot2_rest_node:app",
host=args.host,
port=args.port,
reload=False,
ws_max_size=100000000000000000000000000000000000000,
)



0 comments on commit 0045eb6

Please sign in to comment.