Skip to content

Commit

Permalink
Merge pull request #18 from helyOSFramework/status_contraints
Browse files Browse the repository at this point in the history
status constraints for agent and missions
  • Loading branch information
cviolbarbosa committed May 24, 2024
2 parents f50aefb + 201cac6 commit 90e680d
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 12 deletions.
20 changes: 19 additions & 1 deletion helyos_database/db_schema/0006_work_processes_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,25 @@ CREATE TABLE IF NOT EXISTS public.work_processes (
sched_start_at timestamp(6) without time zone,
sched_end_at timestamp(6) without time zone,
wait_free_agent boolean DEFAULT true,
process_type character varying
process_type character varying,

CONSTRAINT status_check CHECK (
status IS NULL OR
status IN (
'draft',
'dispatched',
'preparing resources',
'calculating',
'executing',
'assignments_completed',
'succeeded',
'assignment_failed',
'planning_failed',
'failed',
'canceling',
'canceled'
)
)
);


Expand Down
14 changes: 13 additions & 1 deletion helyos_database/db_schema/0009_agents_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,19 @@ CREATE TABLE IF NOT EXISTS public.agents (
sensors_data_format character varying default 'helyos-native',
geometry_data_format character varying default 'trucktrix-vehicle',
data_format character varying default 'trucktrix-vehicle',
UNIQUE(uuid)

UNIQUE(uuid),
CONSTRAINT status_check CHECK (
status IS NULL OR
status IN (
'not_automatable',
'free',
'ready',
'busy'
)
)


);


Expand Down
20 changes: 19 additions & 1 deletion helyos_database/db_schema/0011_assignments_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,25 @@ CREATE TABLE IF NOT EXISTS public.assignments (
next_assignments bigint[],
error character varying,
created_at timestamp(6) without time zone NOT NULL DEFAULT NOW(),
modified_at timestamp(6) without time zone NOT NULL DEFAULT NOW()
modified_at timestamp(6) without time zone NOT NULL DEFAULT NOW(),

CONSTRAINT status_check CHECK (
status IS NULL OR
status IN (
'to_dispatch',
'executing',
'succeeded',
'completed',
'rejected',
'failed',
'aborted',
'canceling',
'canceled',
'wait_dependencies',
'not_ready_to_dispatch',
'active'
)
)
);


Expand Down
4 changes: 2 additions & 2 deletions helyos_server/src/event_handlers/rabbitmq_event_subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ function handleBrokerMessages(channelOrQueue, message) {
if (objMsg.obj.body.status) {
updateState(objMsg.obj, uuid, 0)
.catch(e => {
const msg = e.msg? e.msg : e;
logData.addLog('agent', objMsg.obj, 'error', `agent state update=${msg}`);
const msg = e.message? e.message : e;
logData.addLog('agent', objMsg.obj, 'error', `agent state update: ${msg}`);
});
} else {
logData.addLog('agent', {}, 'error', "state update message does not contain agent status.");
Expand Down
4 changes: 2 additions & 2 deletions helyos_server/src/services/database/postg_access_layer.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class DatabaseLayer {
delete patch.time_stamp;
if (Object.keys(patch).length < 2) return Promise.resolve({});
else return this.update(index, patch[index], patch, useShortTimeClient)
.catch(r => {
return { error: r, failedIndex: patch[index] }
.catch(e => {
return { error: e, failedIndex: patch[index] }
});
});

Expand Down
13 changes: 10 additions & 3 deletions helyos_server/src/services/in_mem_database/mem_database_service.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ class InMemDB {
const promiseTrigger = () => dbService.updateMany(objArray, indexName, useShortTimeOutClient)
.then((r) => {
if (r && r.length) {
const failedIndexes = r.filter(e => e && e.failedIndex);
const failedDataTypeError = r.filter(e => e && e.error && e.error.message.includes('constraint'));
const failedIndexes = r.filter(e => e && e.error && !e.error.message.includes('constraint'));

failedDataTypeError.forEach(e => {
logData.addLog('helyos_core', {uuid: e.failedIndex}, 'error',`Update failed on ${tableName} / ${e.failedIndex} ${e.error.message}` );
console.log(e.error.message);
});

if (failedIndexes.length) {
this._catch_update_errors(failedIndexes.length);
failedIndexes.forEach(e => {
Expand Down Expand Up @@ -276,9 +283,9 @@ class InMemDB {
this.lostUpdates += n;

if (new Date() - this.timeoutCounterStartTime > 10000) {
if (this.updateTimeout === this.shortTimeout)
if (this.updateTimeout === this.shortTimeout || this.lostUpdates > 5)
logData.addLog('helyos_core', null, 'warn', `Too many updates pushed to the database. The timeout was reduced to ${this.shortTimeout} milliseconds to avoid the system blockage.`);
logData.addLog('helyos_core', null, 'error',
logData.addLog('helyos_core', null, 'error',
`${this.lostUpdates} database updates canceled. Pending promises:${this.pendingPromises}. Timeout: ${this.updateTimeout/1000} secs. Try to increase the buffer time, DB_BUFFER_TIME.`);
this.lostUpdates = 0;
this.timeoutCounterStartTime = new Date();
Expand Down
28 changes: 26 additions & 2 deletions tests/helyos_agent_assistant.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class RabbitMQClient {
this.RBMQ_HOST = host
this.EXCHANGE_NAME = 'xchange_helyos.agents.ul';
this.UUID = UUID;;
this.ROUTING_KEY = `agent.${this.UUID}.mission_req`;
this.MISSION_ROUTING_KEY = `agent.${this.UUID}.mission_req`;
this.AGENT_STATE_KEY = `agent.${this.UUID}.state`;
this.connection = null;
}

Expand Down Expand Up @@ -47,7 +48,7 @@ class RabbitMQClient {
};

const jsonMessage = JSON.stringify(message);
channel.publish(this.EXCHANGE_NAME, this.ROUTING_KEY, Buffer.from(jsonMessage), { userId: this.UUID });
channel.publish(this.EXCHANGE_NAME, this.MISSION_ROUTING_KEY, Buffer.from(jsonMessage), { userId: this.UUID });
await channel.close();
console.log('Message sent to RabbitMQ');
} catch (error) {
Expand All @@ -56,6 +57,29 @@ class RabbitMQClient {
}
}


async sendInvalidStatusValue() {
try {
const channel = await this.connection.createChannel();
const states = {
status: 'invalid_status',
};
const message = {
type: 'agent_state',
body: workProcesss,
uuid: this.UUID
};

const jsonMessage = JSON.stringify(message);
channel.publish(this.EXCHANGE_NAME, this.AGENT_STATE_KEY, Buffer.from(jsonMessage), { userId: this.UUID });
await channel.close();
console.log('Message with invalid agent status sent to RabbitMQ');
} catch (error) {
console.error('Failed to send mission request:', error);
throw error;
}
}

async close() {
try {
await this.connection.close();
Expand Down

0 comments on commit 90e680d

Please sign in to comment.