forked from kmsmith137/ch_vdif_assembler
-
Notifications
You must be signed in to change notification settings - Fork 1
/
processing_thread.cpp
117 lines (80 loc) · 2.44 KB
/
processing_thread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "ch_vdif_assembler_internals.hpp"
using namespace std;
namespace ch_vdif_assembler {
#if 0
}; // pacify emacs c-mode!
#endif
// p->name, with no possibility of segfault
inline string get_name(const shared_ptr<vdif_processor> &p)
{
xassert(p);
return p->name;
}
static void throw_rerun_exception()
{
static const char *msg = "currently, a vdif_processor can't be run twice, you'll need to re-construct and start from scratch\n";
cout << msg << flush;
throw runtime_error(msg);
}
// -------------------------------------------------------------------------------------------------
struct processing_thread : public thread_base {
shared_ptr<assembler_nerve_center> nc;
shared_ptr<vdif_processor> processor;
processing_thread(const shared_ptr<assembler_nerve_center> &nc_, const shared_ptr<vdif_processor> &processor_)
: thread_base(get_name(processor_) + " thread"),
nc(nc_), processor(processor_)
{
xassert(nc);
xassert(processor);
if (processor->is_running())
throw_rerun_exception();
}
void thread_body()
{
processor->set_running();
processor_handle ph(processor->name, nc);
assembler_killer killer;
if (processor->is_critical)
killer.set_victim(nc, "exception thrown in critical processor");
for (;;) {
shared_ptr<assembled_chunk> chunk = ph.get_next_chunk(timer);
if (!chunk) {
processor->finalize();
break;
}
processor->process_chunk(chunk);
}
killer.let_live();
}
};
// -------------------------------------------------------------------------------------------------
vdif_processor::vdif_processor(const string &name_, bool is_critical_)
: name(name_), is_critical(is_critical_), runflag(false)
{
pthread_mutex_init(&mutex, NULL);
}
bool vdif_processor::is_running()
{
pthread_mutex_lock(&mutex);
bool ret = runflag;
pthread_mutex_unlock(&mutex);
return ret;
}
void vdif_processor::set_running()
{
pthread_mutex_lock(&mutex);
if (runflag) {
pthread_mutex_unlock(&mutex);
throw_rerun_exception();
}
runflag = true;
pthread_mutex_unlock(&mutex);
}
// -------------------------------------------------------------------------------------------------
void spawn_processing_thread(const shared_ptr<assembler_nerve_center> &nc, const shared_ptr<vdif_processor> &p)
{
xassert(nc);
nc->check_alive();
spawn_thread<processing_thread> (nc, p);
}
} // namespace ch_vdif_assembler