Skip to content

Commit

Permalink
68 celery (#71)
Browse files Browse the repository at this point in the history
* small modifications for celery tasks #68

* storage models modified to match django signatures #68

* version bump #68
  • Loading branch information
hcwinsemius authored May 30, 2024
1 parent 0930b0f commit 57d0ede
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 83 deletions.
4 changes: 2 additions & 2 deletions nodeorc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""NodeORC: Automated edge and cloud image-based discharge estimation with OpenRiverCam"""
import os
__version__ = "0.1.6"
__version__ = "0.1.7"

__home__ = os.path.join(os.path.expanduser("~"), ".nodeorc")
if not(os.path.isdir(__home__)):
Expand All @@ -15,4 +15,4 @@
from . import disk_mng
from . import db
from . import config

from . import cli_utils
142 changes: 76 additions & 66 deletions nodeorc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
import json
import os

import nodeorc
# import nodeorc
from pydantic import ValidationError
from dotenv import load_dotenv
# import tasks
# from nodeorc import settings_path, db, config

# import nodeorc specifics
from . import db, log, models, config, tasks, settings_path, __version__
from . import db, log, models, config, tasks, cli_utils, settings_path, __version__


session = db.session
Expand Down Expand Up @@ -169,71 +168,73 @@ def cli(ctx, info, license, debug): # , quiet, verbose):
ctx.obj = {}

@cli.command(short_help="Start main daemon")
@storage_opt
@listen_opt
def start(storage, listen):
# @storage_opt
# @listen_opt
def start():
# get the device id
logger.info(f"Device {str(device)} is online to run video analyses")
# remote storage parameters with local processing is not possible
if listen == "local" and storage == "remote":
raise click.UsageError('Locally defined tasks ("--listen local") cannot have remotely defined storage ('
'"--storage remote").')
if listen == "local":
# get the stored configuration
active_config = config.get_active_config(parse=True)
if not(active_config):
raise click.UsageError('You do not yet have an active configuration. Upload an activate configuration '
'through the CLI. Type "nodeorc upload-config --help" for more information')

# initialize the database for storing data
session_data = db.init_basedata.get_data_session()
# read the task form from the configuration
task_form_template = config.get_active_task_form(session, parse=True)
callback_url = active_config.callback_url
if task_form_template is None:
# go into the task form get daemon and try to acquire a task form from server every 5 minutes
logger.info("Awaiting task by requesting a new task every 5 seconds")
tasks.wait_for_task_form(
session=session,
callback_url=callback_url,
device=device,
logger=logger,
reboot_after=active_config.settings.reboot_after
)
else:
# check for a new form with one single request
logger.info("Checking if a new task form is available for me...")
new_task_form_row = tasks.request_task_form(
session=session,
callback_url=callback_url,
device=device,
logger=logger
)
if new_task_form_row:
task_form_template = new_task_form_row.task_body
# verify that task_template can be converted to a valid Task
try:
task_test = models.Task(**task_form_template)
except Exception as e:
logger.error(
f"Task body set as active configuration cannot be formed into a valid Task instance. Reason: {str(e)}"
)
# This only happens with version upgrades. Update the status to BROKEN and report back to platform
task_form_template = config.get_active_task_form(session, parse=False)
task_form_template.status = db.models.TaskFormStatus.BROKEN
device.form_status = db.models.DeviceFormStatus.BROKEN_FORM
session.commit()
try:
processor = tasks.LocalTaskProcessor(
task_form_template=task_form_template,
logger=logger,
**active_config.model_dump()
)
processor.await_task()
except Exception as e:
logger.error("Reboot service: %s" % str(e))
# if listen == "local" and storage == "remote":
# raise click.UsageError('Locally defined tasks ("--listen local") cannot have remotely defined storage ('
# '"--storage remote").')
# if listen == "local":
# get the stored configuration
active_config = config.get_active_config(parse=True)
if not(active_config):
raise click.UsageError(
'You do not yet have an active configuration. Upload an activate configuration '
'through the CLI. Type "nodeorc upload-config --help" for more information'
)

# initialize the database for storing data
session_data = db.init_basedata.get_data_session()
# read the task form from the configuration
task_form_template = config.get_active_task_form(session, parse=True)
callback_url = active_config.callback_url
if task_form_template is None:
# go into the task form get daemon and try to acquire a task form from server every 5 minutes
logger.info("Awaiting task by requesting a new task every 5 seconds")
tasks.wait_for_task_form(
session=session,
callback_url=callback_url,
device=device,
logger=logger,
reboot_after=active_config.settings.reboot_after
)
else:
raise NotImplementedError
# check for a new form with one single request
logger.info("Checking if a new task form is available for me...")
new_task_form_row = tasks.request_task_form(
session=session,
callback_url=callback_url,
device=device,
logger=logger
)
if new_task_form_row:
task_form_template = new_task_form_row.task_body
# verify that task_template can be converted to a valid Task
try:
task_test = models.Task(**task_form_template)
except Exception as e:
logger.error(
f"Task body set as active configuration cannot be formed into a valid Task instance. Reason: {str(e)}"
)
# This only happens with version upgrades. Update the status to BROKEN and report back to platform
task_form_template = config.get_active_task_form(session, parse=False)
task_form_template.status = db.models.TaskFormStatus.BROKEN
device.form_status = db.models.DeviceFormStatus.BROKEN_FORM
session.commit()
try:
processor = tasks.LocalTaskProcessor(
task_form_template=task_form_template,
logger=logger,
**active_config.model_dump()
)
processor.await_task()
except Exception as e:
logger.error("Reboot service: %s" % str(e))
# else:
# raise NotImplementedError

@cli.command(
short_help="Upload a new configuration for this device from a JSON formatted file",
Expand All @@ -255,10 +256,9 @@ def upload_config(json_file, set_as_active):
"""Upload a new configuration for this device from a JSON formatted file"""
logger.info(f"Device {str(device)} receiving new configuration from {json_file}")
config = load_config(json_file)
rec = nodeorc.config.add_config(session, config=config, set_as_active=set_as_active)
rec = config.add_config(session, config=config, set_as_active=set_as_active)
logger.info(f"Settings updated successfully to {rec}")

upload_config.__doc__ = get_docs_settings()
# def main():
# connection = pika.BlockingConnection(
# pika.URLParameters(
Expand All @@ -282,5 +282,15 @@ def upload_config(json_file, set_as_active):
# connection.close()
# traceback.print_tb(e.__traceback__)

@cli.command(short_help="Start cloud processor")
# @storage_opt
# @listen_opt
# def start_cloud(storage, listen):
def start_cloud():
cli_utils.cloud_processor()


upload_config.__doc__ = get_docs_settings()

if __name__ == "__main__":
cli()
4 changes: 3 additions & 1 deletion nodeorc/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def check_datetime_fmt(fn_fmt):
raise ValueError(f'Date format "{fmt}" is not a valid date format pattern')
return True

from .storage import Storage, S3Storage, File

from .storage import Storage, S3Storage, File, get_storage
from .callback_url import CallbackUrl
from .callback import Callback
from .subtask import Subtask
from .task import Task
from .config import LocalConfig, RemoteConfig, DiskManagement, Settings


26 changes: 22 additions & 4 deletions nodeorc/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

from .. import callbacks, utils


class Storage(BaseModel):
url: str = "./tmp"
bucket_name: str = "video"

options: Dict = {}
@property
def bucket(self):
return os.path.join(self.url, self.bucket_name)
Expand Down Expand Up @@ -60,6 +61,9 @@ def download_file(self, src, trg, keep_src=False):
-------
"""
dirname = os.path.dirname(trg)
if not os.path.isdir(dirname):
os.makedirs(dirname)
if keep_src:
shutil.copyfile(
os.path.join(self.bucket, src),
Expand All @@ -71,9 +75,10 @@ def download_file(self, src, trg, keep_src=False):
trg
)

class S3Storage(Storage):
class S3Storage(BaseModel):
url: AnyHttpUrl = "http://127.0.0.1:9000"
options: Dict[str, Any]
bucket_name: str = "video"
options: Dict[str, Any] = {}


@property
Expand All @@ -87,7 +92,7 @@ def bucket(self):
def upload_io(self, obj, dest, **kwargs):
utils.upload_io(obj, self.bucket, dest=dest, **kwargs)

def download_file(self, src, trg):
def download_file(self, src, trg, keep_src=True):
"""
Download file from bucket to specified target file (entire path inc. filename)
Expand All @@ -102,6 +107,10 @@ def download_file(self, src, trg):
-------
"""
dirname = os.path.dirname(trg)
if not os.path.isdir(dirname):
os.makedirs(dirname)

self.bucket.download_file(src, trg)


Expand Down Expand Up @@ -132,3 +141,12 @@ def move(self, src, trg):
src_fn = os.path.join(src, self.tmp_name)
trg_fn = os.path.join(trg, self.tmp_name)
os.rename(src_fn, trg_fn)


def get_storage(**data):
if data.get("url"):
if "://" in data["url"]:
return S3Storage(**data)
else:
return Storage(**data)

11 changes: 5 additions & 6 deletions nodeorc/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# nodeodm specific imports
from . import CallbackUrl, Storage, S3Storage, File, Subtask, Callback, REMOVE_FOR_TEMPLATE


class Task(BaseModel):
"""
Definition of an entire task
Expand Down Expand Up @@ -43,9 +44,7 @@ def replace_subtask_files(self) -> 'Task':
subtask.replace_files(self.input_files, self.output_files)
return self



def execute(self, tmp):
def execute(self, tmp, keep_src=False):
"""
Execute the entire task logic
Expand All @@ -64,7 +63,7 @@ def execute(self, tmp):
try:
self.logger.info(f"Performing task defined at {self.timestamp} with id {self.id}")
self.logger.info(f"Downloading all inputs to {tmp}")
self.download_input(tmp)
self.download_input(tmp, keep_src=keep_src)
# then perform all subtasks in order, upload occur within the subtasks
self.logger.info(f"Executing subtasks")
self.execute_subtasks(tmp, timestamp=self.timestamp)
Expand All @@ -85,7 +84,7 @@ def execute(self, tmp):
# self.logger.error(f"Task id {str(self.id)} failed with code {r.status_code} and message {r.json()}")
# raise Exception("Error detected, restarting node")

def download_input(self, tmp):
def download_input(self, tmp, keep_src=False):
"""
Downloads all required inputs to a required temp path
Expand All @@ -98,7 +97,7 @@ def download_input(self, tmp):
for key, file in self.input_files.items():
trg = os.path.join(tmp, file.tmp_name)
# put the input file on tmp location
self.storage.download_file(file.remote_name, trg)
self.storage.download_file(file.remote_name, trg, keep_src=keep_src)
# self.storage.bucket.download_file(file.remote_name, trg)


Expand Down
9 changes: 5 additions & 4 deletions nodeorc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import time


def check_bucket(s3, bucket_name):
try:
bucket = s3.Bucket(bucket_name)
Expand All @@ -28,14 +29,14 @@ def check_bucket(s3, bucket_name):

def get_s3(
endpoint_url,
aws_access_key_id,
aws_secret_access_key,
access_key,
secret_key,
):
return boto3.resource(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=boto3.session.Config(signature_version="s3v4"),
verify=False
)
Expand Down

0 comments on commit 57d0ede

Please sign in to comment.