-
Notifications
You must be signed in to change notification settings - Fork 1
/
cmque.py
44 lines (35 loc) · 1.03 KB
/
cmque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import collections
import threading
class Queue:
def __init__(self, deque):
self.deque = deque
self.cond = threading.Condition()
def __bool__(self):
with self.cond:
return bool(self.deque)
def put(self, item):
with self.cond:
self.deque.append(item)
self.cond.notify()
def get(self):
with self.cond:
while not self.deque:
self.cond.wait()
return self.deque.popleft()
class DataDeque(collections.deque):
def append(self, item):
if item is None:
super().append(None)
elif self and self[-1] is not None:
self[-1].extend(item)
else:
super().append(bytearray(item))
class PairDeque(collections.deque):
def append(self, item):
if item is None:
super().append(None)
elif self and self[-1] is not None:
self[-1][0] = item[0]
self[-1][1] += item[1]
else:
super().append(list(item))