Skip to content

Commit

Permalink
Release 0.4.0
Browse files Browse the repository at this point in the history
* Make most code async
* Make all db communication async
* Optimize db queries
* Parallelize db queries
* Change MongoDB integration to use a single-node replica set
* Integrate MongoDB transactions
* Refactor persistence code
* Add new tests and improve some existing ones
* Refactor and improve various parts of the code
* Update dependency versions
  • Loading branch information
aangelos28 committed Oct 18, 2024
1 parent 4d6890f commit 28c032b
Show file tree
Hide file tree
Showing 80 changed files with 2,879 additions and 1,992 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ EOS provides:
* Device and sample container allocation system to prevent conflicts
* Result aggregation such as automatic output file storage

Documentation is available at [https://unc-robotics.github.io/eos/](https://unc-robotics.github.io/eos/).

## Installation

### 1. Install PDM
Expand Down
8 changes: 2 additions & 6 deletions docker/.env.example
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
# EOS #####################################
COMPOSE_PROJECT_NAME=eos

# MongoDB root username
# MongoDB admin credentials
EOS_MONGO_INITDB_ROOT_USERNAME=

# MongoDB root user password
EOS_MONGO_INITDB_ROOT_PASSWORD=

# MinIO root username
# MinIO admin credentials
EOS_MINIO_ROOT_USER=

# MinIO root user password
EOS_MINIO_ROOT_PASSWORD=

# Budibase ################################
Expand Down
16 changes: 13 additions & 3 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
services:
eos-mongodb:
image: mongo:noble
build:
context: .
dockerfile: mongodb/Dockerfile
args:
- MONGO_INITDB_ROOT_USERNAME=${EOS_MONGO_INITDB_ROOT_USERNAME}
- MONGO_INITDB_ROOT_PASSWORD=${EOS_MONGO_INITDB_ROOT_PASSWORD}
image: eos-mongodb/latest
container_name: eos-mongodb
hostname: eos-mongodb
restart: unless-stopped
environment:
MONGO_INITDB_ROOT_USERNAME: ${EOS_MONGO_INITDB_ROOT_USERNAME}
Expand All @@ -12,10 +19,12 @@ services:
- eos_network
volumes:
- mongodb_data:/data/db
command: ["-f", "/etc/mongod.conf"]

eos-minio:
image: minio/minio:RELEASE.2024-10-02T17-50-41Z
container_name: eos-minio
hostname: eos-minio
restart: unless-stopped
environment:
MINIO_ROOT_USER: ${EOS_MINIO_ROOT_USER}
Expand All @@ -32,9 +41,8 @@ services:
eos-budibase:
image: budibase/budibase:2.32.12-sqs
container_name: eos-budibase
hostname: eos-budibase
restart: unless-stopped
ports:
- "8080:80"
environment:
JWT_SECRET: ${BB_JWT_SECRET}
MINIO_ACCESS_KEY: ${BB_MINIO_ACCESS_KEY}
Expand All @@ -45,6 +53,8 @@ services:
INTERNAL_API_KEY: ${BB_INTERNAL_API_KEY}
BB_ADMIN_USER_EMAIL: ${BB_ADMIN_USER_EMAIL}
BB_ADMIN_USER_PASSWORD: ${BB_ADMIN_USER_PASSWORD}
ports:
- "8080:80"
networks:
- eos_network
extra_hosts:
Expand Down
9 changes: 9 additions & 0 deletions docker/mongodb/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM mongo:noble

COPY mongodb/generate_keyfile.sh /root/generate_keyfile.sh
RUN /bin/bash /root/generate_keyfile.sh

COPY mongodb/mongod.conf /etc/mongod.conf
COPY mongodb/init_mongodb.js /docker-entrypoint-initdb.d/init_mongodb.js

CMD ["mongod", "-f", "/etc/mongod.conf"]
9 changes: 9 additions & 0 deletions docker/mongodb/generate_keyfile.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
MONGO_KEYFILE="/data/mongo-keyfile"

if [ ! -f "$MONGO_KEYFILE" ]; then
echo "Generating keyfile..."
openssl rand -base64 756 > "$MONGO_KEYFILE"
chmod 400 "$MONGO_KEYFILE"
chown mongodb:mongodb "$MONGO_KEYFILE"
fi
21 changes: 21 additions & 0 deletions docker/mongodb/init_mongodb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Replica set configuration
var config = {
"_id": "rs0",
"members": [
{ "_id": 0, "host": "localhost:27017" }
]
}

rs.initiate(config)

while (!rs.isMaster().ismaster) {
sleep(1000)
}

// Create the admin user
var adminDb = db.getSiblingDB('admin');
adminDb.createUser({
user: process.env["MONGO_INITDB_ROOT_USERNAME"],
pwd: process.env["MONGO_INITDB_ROOT_PASSWORD"],
roles: [{ role: 'root', db: 'admin' }]
});
13 changes: 13 additions & 0 deletions docker/mongodb/mongod.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
storage:
dbPath: /data/db

net:
port: 27017
bindIp: localhost,eos-mongodb

security:
authorization: enabled
keyFile: /data/mongo-keyfile

replication:
replSetName: rs0
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
project = "eos"
copyright = "2024, UNC Robotics"
author = "Angelos Angelopoulos"
release = "0.3.0"
release = "0.4.0"

extensions = [
"sphinx.ext.autosectionlabel",
Expand Down
51 changes: 26 additions & 25 deletions eos/campaigns/campaign_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
self._campaign_id = campaign_id
self._experiment_type = experiment_type
self._execution_parameters = execution_parameters

self._campaign_manager = campaign_manager
self._campaign_optimizer_manager = campaign_optimizer_manager
self._task_manager = task_manager
Expand All @@ -46,7 +47,7 @@ def __init__(

self._campaign_status: CampaignStatus | None = None

def _setup_optimizer(self) -> None:
async def _setup_optimizer(self) -> None:
if self._optimizer:
return

Expand All @@ -56,7 +57,7 @@ def _setup_optimizer(self) -> None:
self._execution_parameters.optimizer_computer_ip,
)
self._optimizer_input_names, self._optimizer_output_names = (
self._campaign_optimizer_manager.get_input_and_output_names(self._campaign_id)
await self._campaign_optimizer_manager.get_input_and_output_names(self._campaign_id)
)

def cleanup(self) -> None:
Expand All @@ -70,13 +71,13 @@ async def start_campaign(self) -> None:
"""
Start the campaign or handle an existing campaign.
"""
campaign = self._campaign_manager.get_campaign(self._campaign_id)
campaign = await self._campaign_manager.get_campaign(self._campaign_id)
if campaign:
await self._handle_existing_campaign(campaign)
else:
self._create_new_campaign()
await self._create_new_campaign()

self._campaign_manager.start_campaign(self._campaign_id)
await self._campaign_manager.start_campaign(self._campaign_id)
self._campaign_status = CampaignStatus.RUNNING
log.info(f"Started campaign '{self._campaign_id}'.")

Expand All @@ -87,7 +88,6 @@ async def _handle_existing_campaign(self, campaign: Campaign) -> None:
self._campaign_status = campaign.status

if not self._execution_parameters.resume:

def _raise_error(status: str) -> None:
raise EosCampaignExecutionError(
f"Cannot start campaign '{self._campaign_id}' as it already exists and is '{status}'. "
Expand All @@ -104,27 +104,27 @@ def _raise_error(status: str) -> None:

await self._resume_campaign()

def _create_new_campaign(self) -> None:
async def _create_new_campaign(self) -> None:
"""
Create a new campaign.
"""
self._campaign_manager.create_campaign(
await self._campaign_manager.create_campaign(
campaign_id=self._campaign_id,
experiment_type=self._experiment_type,
execution_parameters=self._execution_parameters,
)

if self._execution_parameters.do_optimization:
self._setup_optimizer()
await self._setup_optimizer()

async def _resume_campaign(self) -> None:
"""
Resume an existing campaign.
"""
self._campaign_manager.delete_current_campaign_experiments(self._campaign_id)
await self._campaign_manager.delete_current_campaign_experiments(self._campaign_id)

if self._execution_parameters.do_optimization:
self._setup_optimizer()
await self._setup_optimizer()
await self._restore_optimizer_state()

log.info(f"Campaign '{self._campaign_id}' resumed.")
Expand All @@ -133,7 +133,7 @@ async def _restore_optimizer_state(self) -> None:
"""
Restore the optimizer state for a resumed campaign.
"""
completed_experiment_ids = self._campaign_manager.get_campaign_experiment_ids(
completed_experiment_ids = await self._campaign_manager.get_campaign_experiment_ids(
self._campaign_id, status=ExperimentStatus.COMPLETED
)

Expand All @@ -150,15 +150,15 @@ async def cancel_campaign(self) -> None:
"""
Cancel the campaign and all running experiments.
"""
campaign = self._campaign_manager.get_campaign(self._campaign_id)
campaign = await self._campaign_manager.get_campaign(self._campaign_id)
if not campaign or campaign.status != CampaignStatus.RUNNING:
raise EosCampaignExecutionError(
f"Cannot cancel campaign '{self._campaign_id}' with status "
f"'{campaign.status if campaign else 'None'}'. It must be running."
)

log.warning(f"Cancelling campaign '{self._campaign_id}'...")
self._campaign_manager.cancel_campaign(self._campaign_id)
await self._campaign_manager.cancel_campaign(self._campaign_id)
self._campaign_status = CampaignStatus.CANCELLED

await self._cancel_running_experiments()
Expand Down Expand Up @@ -194,18 +194,18 @@ async def progress_campaign(self) -> bool:

await self._progress_experiments()

campaign = self._campaign_manager.get_campaign(self._campaign_id)
campaign = await self._campaign_manager.get_campaign(self._campaign_id)
if self._is_campaign_completed(campaign):
if self._execution_parameters.do_optimization:
await self._compute_pareto_solutions()
self._campaign_manager.complete_campaign(self._campaign_id)
await self._campaign_manager.complete_campaign(self._campaign_id)
return True

await self._create_experiments(campaign)

return False
except EosExperimentExecutionError as e:
self._campaign_manager.fail_campaign(self._campaign_id)
await self._campaign_manager.fail_campaign(self._campaign_id)
self._campaign_status = CampaignStatus.FAILED
raise EosCampaignExecutionError(f"Error executing campaign '{self._campaign_id}'") from e

Expand All @@ -225,16 +225,16 @@ async def _progress_experiments(self) -> None:

for experiment_id in completed_experiments:
del self._experiment_executors[experiment_id]
self._campaign_manager.delete_campaign_experiment(self._campaign_id, experiment_id)
self._campaign_manager.increment_iteration(self._campaign_id)
await self._campaign_manager.delete_campaign_experiment(self._campaign_id, experiment_id)
await self._campaign_manager.increment_iteration(self._campaign_id)

async def _process_completed_experiments(self, completed_experiments: list[str]) -> None:
"""
Process the results of completed experiments.
"""
inputs_df, outputs_df = await self._collect_experiment_results(completed_experiments)
await self._optimizer.report.remote(inputs_df, outputs_df)
self._campaign_optimizer_manager.record_campaign_samples(
await self._campaign_optimizer_manager.record_campaign_samples(
self._campaign_id, completed_experiments, inputs_df, outputs_df
)

Expand All @@ -248,11 +248,12 @@ async def _collect_experiment_results(self, experiment_ids: list[str]) -> tuple[
for experiment_id in experiment_ids:
for input_name in self._optimizer_input_names:
reference_task_id, parameter_name = input_name.split(".")
task = self._task_manager.get_task(experiment_id, reference_task_id)
task = await self._task_manager.get_task(experiment_id, reference_task_id)
inputs[input_name].append(float(task.input.parameters[parameter_name]))
for output_name in self._optimizer_output_names:
reference_task_id, parameter_name = output_name.split(".")
output_parameters = self._task_manager.get_task_output(experiment_id, reference_task_id).parameters
task_output = await self._task_manager.get_task_output(experiment_id, reference_task_id)
output_parameters = task_output.parameters
outputs[output_name].append(float(output_parameters[parameter_name]))

return pd.DataFrame(inputs), pd.DataFrame(outputs)
Expand All @@ -271,9 +272,9 @@ async def _create_experiments(self, campaign: Campaign) -> None:
experiment_executor = self._experiment_executor_factory.create(
new_experiment_id, self._experiment_type, experiment_execution_parameters
)
self._campaign_manager.add_campaign_experiment(self._campaign_id, new_experiment_id)
await self._campaign_manager.add_campaign_experiment(self._campaign_id, new_experiment_id)
self._experiment_executors[new_experiment_id] = experiment_executor
experiment_executor.start_experiment(experiment_dynamic_parameters)
await experiment_executor.start_experiment(experiment_dynamic_parameters)

async def _get_experiment_parameters(self, iteration: int) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -324,7 +325,7 @@ async def _compute_pareto_solutions(self) -> None:
try:
pareto_solutions_df = await self._optimizer.get_optimal_solutions.remote()
pareto_solutions = pareto_solutions_df.to_dict(orient="records")
self._campaign_manager.set_pareto_solutions(self._campaign_id, pareto_solutions)
await self._campaign_manager.set_pareto_solutions(self._campaign_id, pareto_solutions)
except Exception as e:
raise EosCampaignExecutionError(f"CMP '{self._campaign_id}' - Error computing Pareto solutions.") from e

Expand Down
Loading

0 comments on commit 28c032b

Please sign in to comment.