-
Notifications
You must be signed in to change notification settings - Fork 1
/
LFQueue.cpp
139 lines (103 loc) · 2.77 KB
/
LFQueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include<vector>
#include <thread>
#include <functional>
#include <algorithm>
#include<cmath>
#include<iostream>
// convert gcc to c0xx
#define thread_local __thread;
typedef std::thread Thread;
typedef std::vector<std::thread> ThreadGroup;
typedef std::mutex Mutex;
typedef std::unique_lock<std::mutex> Guard;
typedef std::condition_variable Condition;
template<typename T>
class Queue {
public:
explicit Queue(size_t maxsize) : head(maxsize-1), tail(head), last(maxsize-1),
container(maxsize), drained(false) {}
void waitFull() const{
// spinlock
do{} while(full());
}
bool waitEmpty() const{
// spinlock
do{} while(empty()&&(!drained));
return empty()&&drained;
}
// only one thread can push
void push(T const & t) {
while (true) {
waitFull();
volatile size_t cur=head;
container[cur] = t; // shall be done first to avoid popping wrong value
// does not work: if fails state of head is unknown....
if (__sync_bool_compare_and_swap(&head,cur,cur==0 ? last : cur-1 )) {
// container[cur] = t; // too late pop already occured!
break;
}
}
}
// N threads can pop
bool pop(T&t) {
while (true) {
if(waitEmpty()) return false; // include a signal to drain and terminate
volatile size_t cur=tail;
if (cur==head) continue;
t = container[cur];
if (__sync_bool_compare_and_swap(&tail,cur,cur==0 ?last : cur-1)) break;
}
return true;
}
bool full() const { return (head==0 && tail==last)
|| (tail==head-1);
}
bool empty() const { return head==tail;}
void drain() { drained=true;}
// circular buffer
volatile size_t head;
volatile size_t tail;
size_t last;
std::vector<T> container;
bool drained;
};
struct Worker {
Worker(Queue<int>& iq) : q(iq), hist(100){}
void operator()() {
int i;
while(q.pop(i)) {
++hist[i];
}
}
Queue<int>& q;
std::vector<int> hist;
};
int main() {
const int NUMTHREADS=10;
Queue<int> q(30);
ThreadGroup threads;
threads.reserve(NUMTHREADS);
std::vector<Worker> workers(NUMTHREADS, Worker(q));
for (int i=0; i<NUMTHREADS; ++i) {
threads.push_back(Thread(std::ref(workers[i])));
}
for (int i=0; i<10000; i++)
q.push(i%100);
q.drain();
std::for_each(threads.begin(),threads.end(),
std::bind(&Thread::join,std::placeholders::_1));
std::vector<int> hist(100);
for (int i=0; i!=NUMTHREADS;++i) {
std::cout << "thread "<< i << " : ";
for (int j=0; j!=100;++j) {
hist[j]+= workers[i].hist[j];
std::cout << workers[i].hist[j] << " ,";
}
std::cout << std::endl;
}
std::cout << "\nTotal " << std::endl;
for (int j=0; j!=100;++j)
std::cout << hist[j] << " ,";
std::cout << std::endl;
return 0;
}