-
Notifications
You must be signed in to change notification settings - Fork 12
/
queueing_node.h
144 lines (114 loc) · 4.31 KB
/
queueing_node.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#pragma once
#ifndef SYDEVS_EXAMPLES_QUEUEING_NODE_H_
#define SYDEVS_EXAMPLES_QUEUEING_NODE_H_
#include <sydevs/systems/atomic_node.h>
namespace sydevs_examples {
using namespace sydevs;
using namespace sydevs::systems;
/**
* This node processes jobs one at a time. The time required to process a job
* is determined by the service duration flow input. Any job that arrives while
* another is being processed is placed in a queue and processed later. The
* total time spend in an idle state, with no jobs to be processed, is tracked
* and reported as a flow output.
*/
class queueing_node : public atomic_node
{
public:
// Constructor/Destructor:
queueing_node(const std::string& node_name, const node_context& external_context);
virtual ~queueing_node() = default;
// Attributes:
virtual scale time_precision() const { return micro; }
// Ports:
port<flow, input, duration> serv_dt_input; // service duration
port<message, input, int64> job_id_input; // job ID (input)
port<message, output, int64> job_id_output; // job ID (output)
port<flow, output, duration> idle_dt_output; // idle duration
protected:
// State Variables:
duration serv_dt; // service duration (constant)
std::vector<int64> Q; // queue of IDs of jobs waiting to be processed
duration idle_dt; // idle duration (accumulating)
duration planned_dt; // planned duration
// Event Handlers:
virtual duration initialization_event();
virtual duration unplanned_event(duration elapsed_dt);
virtual duration planned_event(duration elapsed_dt);
virtual void finalization_event(duration elapsed_dt);
};
inline queueing_node::queueing_node(const std::string& node_name, const node_context& external_context)
: atomic_node(node_name, external_context)
, serv_dt_input("serv_dt_input", external_interface())
, job_id_input("job_id_input", external_interface())
, job_id_output("job_id_output", external_interface())
, idle_dt_output("idle_dt_output", external_interface())
{
}
inline duration queueing_node::initialization_event()
{
// Initialize the service duration from the flow input port, and fix the
// time precision level at that of the node (a recommended practice).
serv_dt = serv_dt_input.value().fixed_at(time_precision());
// Initialize the empty queue and accumulated idle time.
Q = std::vector<int64>();
idle_dt = 0_s;
// Wait indefinitely for a message input (unplanned event) or the end of
// the simulation (finalization event).
planned_dt = duration::inf();
return planned_dt;
}
inline duration queueing_node::unplanned_event(duration elapsed_dt)
{
// If the queue is empty, record the elapsed duration as idle time.
if (Q.empty()) {
idle_dt += elapsed_dt;
}
// Handle the message input.
if (job_id_input.received()) {
// Get the received job ID
int64 job_id = job_id_input.value();
// Add the job ID to the queue.
Q.push_back(job_id);
// If the only queued job is the one just added, process it for the
// service duration; otherwise continue processing a preexisting job
// for the same duration as before minus the elapsed duration.
if (Q.size() == 1) {
planned_dt = serv_dt;
}
else {
planned_dt -= elapsed_dt;
}
}
// Schedule a planned event when the job at the front of the queue is
// processed.
return planned_dt;
}
inline duration queueing_node::planned_event(duration elapsed_dt)
{
// The first job in the queue has been processed, so send the ID as a
// message output and remove it from the queue.
int64 job_id = Q[0];
job_id_output.send(job_id);
Q.erase(std::begin(Q));
// If the queue is now empty, wait indefinitely; otherwise process the next
// job for the service duration.
if (Q.empty()) {
planned_dt = duration::inf();
}
else {
planned_dt = serv_dt;
}
return planned_dt;
}
inline void queueing_node::finalization_event(duration elapsed_dt)
{
// If the queue is empty, record the elapsed duration as idle time.
if (Q.empty()) {
idle_dt += elapsed_dt;
}
// Assign the accumulated idle time to the flow output port.
idle_dt_output.assign(idle_dt);
}
} // namespace
#endif