-
Notifications
You must be signed in to change notification settings - Fork 0
/
dynamodb_backend.py
179 lines (135 loc) · 5.82 KB
/
dynamodb_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from time import sleep
import benchmark_utils
from boto.dynamodb.layer1 import Layer1
from boto.dynamodb.exceptions import DynamoDBKeyNotFoundError
import aws_credentials
TABLE_NAME = "data"
# We use single-character attribute names to save space in requests.
KEY = "k"
VALUE = "v"
REF_COUNT = "r"
# Thresholds mandated by Amazon.
MAX_BATCH_SIZE = 25 # Maximum number of items in a BatchWriteItemsRequest.
# Get a feel for server-side latencies.
sampler = benchmark_utils.BenchmarkTimer()
# Backend driven by DynamoDB. We use one table called "data".
class DynamoDBBackend:
def __init__(self):
self.client = Layer1(aws_credentials.accessKey,
aws_credentials.secretKey)
self.capacityUsed = 0.0
# Makes an API Key object from a key string.
def _apiKey(self, key):
if (key is None) or (len(key) == 0):
# Empty keys are not permitted by Amazon.
raise KeyError
return {'HashKeyElement':{'S':key}}
# Increments the counter of capacity used.
def useCapacity(self, result):
self.capacityUsed += result['ConsumedCapacityUnits']
def put(self, key, value):
sampler.begin()
try:
# Issue an UpdateItem request to increment the refCount, putting the value
# if necessary.
key = self._apiKey(key)
updates = {REF_COUNT:{'Value': {'N':'1'},
'Action': 'ADD'},
VALUE:{'Value':{'S':value},
'Action': 'PUT'}
}
result = self.client.update_item(TABLE_NAME, key, updates)
self.useCapacity(result)
finally:
sampler.end()
def get(self, key):
sampler.begin()
try:
# Issue an eventually-consistent GetItem request.
key = self._apiKey(key)
result = self.client.get_item(TABLE_NAME, key, [VALUE], consistent_read=False)
self.useCapacity(result)
item = result['Item']
if not(item is None):
return item[VALUE]['S']
else:
# If the eventually consistent request failed, try a consistent GetItem request.
result = self.client.get_item(TABLE_NAME, key, [VALUE], consistent_read=True)
self.useCapacity(result)
item = result['Item']
if not(item is None):
return item[VALUE]['S']
else: # The key must not exist in the database.
raise KeyError
except DynamoDBKeyNotFoundError:
raise KeyError
finally:
sampler.end()
# Issues a request to add a specific delta value (integer) to the refCount for a key.
def _addToRefCount(self, key, delta):
# Issue an UpdateItem request to increment the refCount.
updates = {REF_COUNT:{'Value': {'N':str(delta)},
'Action': 'ADD'},
}
result = self.client.update_item(TABLE_NAME, key, updates)
self.useCapacity(result)
def incRefCount(self, key):
sampler.begin()
try:
key = self._apiKey(key)
# Issue an UpdateItem request to increment the refCount.
self._addToRefCount(key, 1)
except DynamoDBKeyNotFoundError:
raise KeyError
finally:
sampler.end()
def decRefCount(self, key):
sampler.begin()
try:
key = self._apiKey(key)
# Issue an UpdateItem request to decrement the refCount.
self._addToRefCount(key, -1)
# Atomically delete the item, if its reference count is zero.
expectation = {REF_COUNT:{'Value':{'N':'0'}}}
try:
result = self.client.delete_item(TABLE_NAME, key, expectation)
self.useCapacity(result)
except Exception as e:
if e.error_message == 'The conditional request failed':
pass # The conditional check for a zero refCount must have failed; this is OK.
else: # This is some other error, so propagate it.
raise e
except DynamoDBKeyNotFoundError:
raise KeyError
finally:
sampler.end()
def nuke(self):
# Delete and re-create the table.
try:
self.client.delete_table(TABLE_NAME)
# Wait for the table to be deleted.
while True:
sleep(0.01)
try:
self.client.describe_table(TABLE_NAME)
except:
break # The table must not exist anymore.
except:
pass # The table must have been deleted already.
# Re-create the table.
self.client.create_table(TABLE_NAME,
{'HashKeyElement':{'AttributeName':KEY,
'AttributeType':'S'}},
{'ReadCapacityUnits':10,
'WriteCapacityUnits':5})
# Wait for the table to be created.
while True:
sleep(0.01)
try:
result = self.client.describe_table(TABLE_NAME)
if result['Table']['TableStatus'] == 'ACTIVE':
break
except:
pass # The table must not exist quite yet.
def flush(self):
pass # No-op.