forked from trakt/script.trakt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.py
116 lines (101 loc) · 2.74 KB
/
queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# -*- coding: utf-8 -*-
import os
import sqlite3
import utilities as utils
try:
from simplejson import loads, dumps
except ImportError:
from json import loads, dumps
from time import sleep
try:
from thread import get_ident
except ImportError:
from dummy_thread import get_ident
import xbmc
import xbmcvfs
import xbmcaddon
__addon__ = xbmcaddon.Addon('script.trakt')
# code from http://flask.pocoo.org/snippets/88/ with some modifications
class SqliteQueue(object):
_create = (
'CREATE TABLE IF NOT EXISTS queue '
'('
' id INTEGER PRIMARY KEY AUTOINCREMENT,'
' item BLOB'
')'
)
_count = 'SELECT COUNT(*) FROM queue'
_iterate = 'SELECT id, item FROM queue'
_append = 'INSERT INTO queue (item) VALUES (?)'
_write_lock = 'BEGIN IMMEDIATE'
_get = (
'SELECT id, item FROM queue '
'ORDER BY id LIMIT 1'
)
_del = 'DELETE FROM queue WHERE id = ?'
_peek = (
'SELECT item FROM queue '
'ORDER BY id LIMIT 1'
)
_purge = 'DELETE FROM queue'
def __init__(self):
self.path = xbmc.translatePath(__addon__.getAddonInfo("profile")).decode("utf-8")
if not xbmcvfs.exists(self.path):
utils.Debug("Making path structure: " + repr(self.path))
xbmcvfs.mkdir(self.path)
self.path = os.path.join(self.path, 'queue.db')
self._connection_cache = {}
with self._get_conn() as conn:
conn.execute(self._create)
def __len__(self):
with self._get_conn() as conn:
l = conn.execute(self._count).next()[0]
return l
def __iter__(self):
with self._get_conn() as conn:
for id, obj_buffer in conn.execute(self._iterate):
yield loads(str(obj_buffer))
def _get_conn(self):
id = get_ident()
if id not in self._connection_cache:
self._connection_cache[id] = sqlite3.Connection(self.path, timeout=60)
return self._connection_cache[id]
def purge(self):
with self._get_conn() as conn:
conn.execute(self._purge)
def append(self, obj):
obj_buffer = dumps(obj)
with self._get_conn() as conn:
conn.execute(self._append, (obj_buffer,))
def get(self, sleep_wait=True):
keep_pooling = True
wait = 0.1
max_wait = 2
tries = 0
with self._get_conn() as conn:
id = None
while keep_pooling:
conn.execute(self._write_lock)
cursor = conn.execute(self._get)
try:
id, obj_buffer = cursor.next()
keep_pooling = False
except StopIteration:
conn.commit() # unlock the database
if not sleep_wait:
keep_pooling = False
continue
tries += 1
sleep(wait)
wait = min(max_wait, tries/10 + wait)
if id:
conn.execute(self._del, (id,))
return loads(str(obj_buffer))
return None
def peek(self):
with self._get_conn() as conn:
cursor = conn.execute(self._peek)
try:
return loads(str(cursor.next()[0]))
except StopIteration:
return None