forked from cisco/hash-sigs
-
Notifications
You must be signed in to change notification settings - Fork 2
/
hss_thread_pthread.c
300 lines (240 loc) · 9.78 KB
/
hss_thread_pthread.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
/*
#include "hss_thread.h"
#include <pthread.h>
#include <string.h>
* This is an implementation of our threaded abstraction using the
* POSIX pthread API
*
* C11 has a similar (but not precisely identical) API to the one that POSIX
* defines (at least for what we do; all we need is thread create/join and
* mutex's, which *any* thread library should provide). I'd code up the
* support for that API as well (using the same base logic, with typedef's and
* helper inlines to isolate the differences), however I don't have a C11
* implementation handy to test it
#define MAX_THREAD 16 Number try to create more than 16 threads, no
matter what the application tries to tell us
#define DEFAULT_THREAD 16 The number of threads to run if the
application doesn't tell us otherwise (e.g.
passes in 0)
#define MIN_DETAIL 16 So the alignment kludge we do doesn't waste space
The information we track about a thread we may have launched
struct thread_state {
pthread_t thread_id;
enum { never_was, alive, dead } state;
};
struct work_item {
struct work_item *link; They're in a linked list
void (*function)(const void *detail, Function to call
struct thread_collection *col);
These two items are used to pass the thread state to the thread
if this is the first work item for the thread to process
struct thread_collection *col; The parent thread_collection
struct thread_state *state; The pointer into the thread collection
state for the state of this thread
The detail structure that we pass to the function
We'll malloc enough space to hold the entire structure
union { union here so that the detail array is
void *align1; correctly aligned for various datatypes
long long align2;
void (*align3)(void);
unsigned char detail[MIN_DETAIL];
} x;
};
struct thread_collection {
pthread_mutex_t lock; Must be locked before this structure is
accessed if there might be a thread
pthread_mutex_t write_lock; Must be locked before common user data is
written
unsigned num_thread;
unsigned current_ptr; There two are here to avoid O(N) table
unsigned num_alive; scanning in the most common scenarios
Information about the worker threads we may have created
struct thread_state threads[MAX_THREAD];
* Queue (FIFO) of work items submitted, and which can't be processed
* immedately. We do a FIFO, rather than a stack, so that we perform
* the requests in the order they were issued (which isn't something
* the interface guarantees; however it doesn't interfere with the
* request ordering we ask applications to make)
struct work_item *top_work_queue;
struct work_item *end_work_queue;
};
* Allocate a thread control structure
struct thread_collection *hss_thread_init(int num_thread) {
if (num_thread == 0) num_thread = DEFAULT_THREAD;
if (num_thread <= 1) return 0; Not an error: an indication to run
single threaded
if (num_thread > MAX_THREAD) num_thread = MAX_THREAD;
struct thread_collection *col = malloc( sizeof *col );
if (!col) return 0; On malloc failure, run single threaded
col->num_thread = num_thread;
if (0 != pthread_mutex_init( &col->lock, 0 )) {
free(col);
return 0;
}
if (0 != pthread_mutex_init( &col->write_lock, 0 )) {
pthread_mutex_destroy( &col->lock );
free(col);
return 0;
}
col->current_ptr = 0;
col->num_alive = 0;
unsigned i;
for (i=0; i<num_thread; i++) {
col->threads[i].state = never_was;
}
col->top_work_queue = 0;
col->end_work_queue = 0;
return col;
}
* This is the base routine that a worker thread runs
static void *worker_thread( void *arg ) {
struct work_item *w = arg; The initial work item
struct thread_collection *col = w->col;
struct thread_state *state = w->state;
for (;;) {
Perform the work item in front of us
(w->function)(w->x.detail, col);
Ok, we did that
free(w);
Check if there's anything else to do
pthread_mutex_lock( &col->lock );
w = col->top_work_queue;
if (w) {
More work; pull it off the queue
col->top_work_queue = w->link;
if (w == col->end_work_queue) col->end_work_queue = 0;
And go handle it
pthread_mutex_unlock( &col->lock );
continue;
}
No more work for us to do; post our obituary
state->state = dead;
col->num_alive -= 1;
pthread_mutex_unlock( &col->lock );
And that's all folks
return 0;
}
}
* This adds function/details to the list of things that need to be done
* It either creates a thread to do it, or (if we're maxed out) add it to
* our honey-do list (or, as last resort, just does it itself)
void hss_thread_issue_work(struct thread_collection *col,
void (*function)(const void *detail,
struct thread_collection *col),
const void *detail, size_t size_detail_structure) {
If we're running in single-threaded mode
if (!col) {
function( detail, col );
return;
}
Allocate a work structure to hold this request
size_t extra_space;
if (size_detail_structure < MIN_DETAIL) extra_space = 0;
else extra_space = size_detail_structure - MIN_DETAIL;
struct work_item *w = malloc(sizeof *w + extra_space);
if (!w) {
Can't allocate the work structure; fall back to single-threaded
function( detail, col );
return;
}
w->col = col;
w->function = function;
memcpy( w->x.detail, detail, size_detail_structure );
unsigned num_thread = col->num_thread;
pthread_mutex_lock( &col->lock );
Check if we can spawn a new thread
if (col->num_alive < num_thread) {
There's supposed to be room for another
Look for the empty slot
unsigned i, j;
j = col->current_ptr; Do round-robin (so we don't bang on
slot 0 whenever we try to start a thread)
for (i=0; i<num_thread; i++, j = (j+1) % num_thread) {
struct thread_state *p = &col->threads[j];
switch (p->state) {
case alive: continue; This one's busy
case dead:
{
This one just died; grab its status (not that we care,
however that'll tell the thread library it can clean up)
pthread_t thread_id = p->thread_id;
void *status; Ignored, but we need to place thread
status somewhere
pthread_mutex_unlock( &col->lock );
pthread_join( thread_id, &status );
pthread_mutex_lock( &col->lock );
p->state = never_was;
}
FALL THROUGH
case never_was:
Now, we can spawn a new thread
w->state = p;
if (0 != pthread_create( &p->thread_id,
NULL, worker_thread, w )) {
Hmmm, couldn't spawn it; fall back
default: On error condition
pthread_mutex_unlock( &col->lock );
free(w);
function( detail, col );
return;
}
We've kicked off the thread
p->state = alive;
col->num_alive += 1;
For the next request, start scanning at the next
thread object
col->current_ptr = (j+1) % num_thread;
pthread_mutex_unlock( &col->lock );
return;
}
}
col->num_alive = num_thread; Hmmmm, everything was alive???
}
We can't create any more threads; enqueue this (and someone will get
to it)
w->link = 0;
if (col->end_work_queue) {
col->end_work_queue->link = w;
}
col->end_work_queue = w;
if (!col->top_work_queue) col->top_work_queue = w;
pthread_mutex_unlock( &col->lock );
}
* This will wait for all the work items we'e issued to complete
void hss_thread_done(struct thread_collection *col) {
if (!col) return;
unsigned i;
pthread_mutex_lock( &col->lock );
for (i=0; i<col->num_thread; i++) {
* Wait for each thread that we have spawned.
* We're the only one that will spawn them, and so we don't have to
* worry about any new ones appearing while we scan through the list
if (col->threads[i].state != never_was) {
void *status;
pthread_t thread_id = col->threads[i].thread_id;
pthread_mutex_unlock( &col->lock );
pthread_join( thread_id, &status );
pthread_mutex_lock( &col->lock );
}
}
pthread_mutex_unlock( &col->lock );
Ok, all the threads have finished; tear things down
pthread_mutex_destroy( &col->lock );
pthread_mutex_destroy( &col->write_lock );
free(col);
}
void hss_thread_before_write(struct thread_collection *col) {
if (!col) return;
pthread_mutex_lock( &col->write_lock );
}
void hss_thread_after_write(struct thread_collection *col) {
if (!col) return;
pthread_mutex_unlock( &col->write_lock );
}
unsigned hss_thread_num_tracks(int num_thread) {
if (num_thread == 0) num_thread = DEFAULT_THREAD;
if (num_thread <= 1) return 1;
if (num_thread >= MAX_THREAD) return MAX_THREAD;
return num_thread;
}
*/