Skip to content

Commit

Permalink
Add support for transactions (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
hallie authored and garrettheel committed Jul 8, 2019
1 parent 7ea92e6 commit 78db9e2
Show file tree
Hide file tree
Showing 18 changed files with 955 additions and 88 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ venv
pip-log.txt

# Unit test / coverage reports
build/
.coverage
cover/
.tox
Expand Down
255 changes: 176 additions & 79 deletions pynamodb/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import json
import logging
import math
import random
import sys
import time
Expand Down Expand Up @@ -39,15 +38,19 @@
CONSUMED_CAPACITY, CAPACITY_UNITS,
SHORT_ATTR_TYPES,
ITEMS, DEFAULT_ENCODING, BINARY_SHORT, BINARY_SET_SHORT, LAST_EVALUATED_KEY, RESPONSES, UNPROCESSED_KEYS,
UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED, UPDATE_EXPRESSION,
UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED,
EXPRESSION_ATTRIBUTE_NAMES, EXPRESSION_ATTRIBUTE_VALUES,
CONDITION_EXPRESSION, FILTER_EXPRESSION,
TRANSACT_WRITE_ITEMS, TRANSACT_GET_ITEMS, CLIENT_REQUEST_TOKEN, TRANSACT_ITEMS, TRANSACT_CONDITION_CHECK,
TRANSACT_GET, TRANSACT_PUT, TRANSACT_DELETE, TRANSACT_UPDATE, UPDATE_EXPRESSION,
RETURN_VALUES_ON_CONDITION_FAILURE_VALUES, RETURN_VALUES_ON_CONDITION_FAILURE,
AVAILABLE_BILLING_MODES, DEFAULT_BILLING_MODE, BILLING_MODE, PAY_PER_REQUEST_BILLING_MODE,
TIME_TO_LIVE_SPECIFICATION, ENABLED, UPDATE_TIME_TO_LIVE)
TIME_TO_LIVE_SPECIFICATION, ENABLED, UPDATE_TIME_TO_LIVE
)
from pynamodb.exceptions import (
TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist,
VerboseClientError
)
VerboseClientError,
TransactGetError, TransactWriteError)
from pynamodb.expressions.condition import Condition
from pynamodb.expressions.operand import Path
from pynamodb.expressions.projection import create_projection_expression
Expand Down Expand Up @@ -473,10 +476,15 @@ def _handle_binary_attributes(data):
for attr in six.itervalues(item):
_convert_binary(attr)
if RESPONSES in data:
for item_list in six.itervalues(data[RESPONSES]):
for item in item_list:
if isinstance(data[RESPONSES], list):
for item in data[RESPONSES]:
for attr in six.itervalues(item):
_convert_binary(attr)
else:
for item_list in six.itervalues(data[RESPONSES]):
for item in item_list:
for attr in six.itervalues(item):
_convert_binary(attr)
if LAST_EVALUATED_KEY in data:
for attr in six.itervalues(data[LAST_EVALUATED_KEY]):
_convert_binary(attr)
Expand Down Expand Up @@ -787,6 +795,19 @@ def get_return_values_map(self, return_values):
RETURN_VALUES: str(return_values).upper()
}

def get_return_values_on_condition_failure_map(self, return_values_on_condition_failure):
"""
Builds the return values map that is common to several operations
"""
if return_values_on_condition_failure.upper() not in RETURN_VALUES_VALUES:
raise ValueError("{} must be one of {}".format(
RETURN_VALUES_ON_CONDITION_FAILURE,
RETURN_VALUES_ON_CONDITION_FAILURE_VALUES
))
return {
RETURN_VALUES_ON_CONDITION_FAILURE: str(return_values_on_condition_failure).upper()
}

def get_item_collection_map(self, return_item_collection_metrics):
"""
Builds the item collection map
Expand All @@ -806,38 +827,79 @@ def get_exclusive_start_key_map(self, table_name, exclusive_start_key):
raise TableError("No such table {}".format(table_name))
return tbl.get_exclusive_start_key_map(exclusive_start_key)

def delete_item(self,
table_name,
hash_key,
range_key=None,
condition=None,
return_values=None,
return_consumed_capacity=None,
return_item_collection_metrics=None):
"""
Performs the DeleteItem operation and returns the result
"""
def get_operation_kwargs(self,
table_name,
hash_key,
range_key=None,
key=KEY,
attributes=None,
attributes_to_get=None,
actions=None,
condition=None,
consistent_read=None,
return_values=None,
return_consumed_capacity=None,
return_item_collection_metrics=None,
return_values_on_condition_failure=None):
self._check_condition('condition', condition)

operation_kwargs = {TABLE_NAME: table_name}
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key))
operation_kwargs = {}
name_placeholders = {}
expression_attribute_values = {}

operation_kwargs[TABLE_NAME] = table_name
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key, key=key))
if attributes:
attrs = self.get_item_attribute_map(table_name, attributes)
operation_kwargs[ITEM].update(attrs[ITEM])
if attributes_to_get is not None:
projection_expression = create_projection_expression(attributes_to_get, name_placeholders)
operation_kwargs[PROJECTION_EXPRESSION] = projection_expression
if condition is not None:
condition_expression = condition.serialize(name_placeholders, expression_attribute_values)
operation_kwargs[CONDITION_EXPRESSION] = condition_expression
if return_values:
if consistent_read is not None:
operation_kwargs[CONSISTENT_READ] = consistent_read
if return_values is not None:
operation_kwargs.update(self.get_return_values_map(return_values))
if return_consumed_capacity:
if return_values_on_condition_failure is not None:
operation_kwargs.update(self.get_return_values_on_condition_failure_map(return_values_on_condition_failure))
if return_consumed_capacity is not None:
operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity))
if return_item_collection_metrics:
if return_item_collection_metrics is not None:
operation_kwargs.update(self.get_item_collection_map(return_item_collection_metrics))
if actions is not None:
update_expression = Update(*actions)
operation_kwargs[UPDATE_EXPRESSION] = update_expression.serialize(
name_placeholders,
expression_attribute_values
)
if name_placeholders:
operation_kwargs[EXPRESSION_ATTRIBUTE_NAMES] = self._reverse_dict(name_placeholders)
if expression_attribute_values:
operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values
return operation_kwargs

def delete_item(self,
table_name,
hash_key,
range_key=None,
condition=None,
return_values=None,
return_consumed_capacity=None,
return_item_collection_metrics=None):
"""
Performs the DeleteItem operation and returns the result
"""
operation_kwargs = self.get_operation_kwargs(
table_name,
hash_key,
range_key=range_key,
condition=condition,
return_values=return_values,
return_consumed_capacity=return_consumed_capacity,
return_item_collection_metrics=return_item_collection_metrics
)
try:
return self.dispatch(DELETE_ITEM, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
Expand All @@ -855,33 +917,19 @@ def update_item(self,
"""
Performs the UpdateItem operation
"""
self._check_condition('condition', condition)

operation_kwargs = {TABLE_NAME: table_name}
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key))
name_placeholders = {}
expression_attribute_values = {}

if condition is not None:
condition_expression = condition.serialize(name_placeholders, expression_attribute_values)
operation_kwargs[CONDITION_EXPRESSION] = condition_expression
if return_consumed_capacity:
operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity))
if return_item_collection_metrics:
operation_kwargs.update(self.get_item_collection_map(return_item_collection_metrics))
if return_values:
operation_kwargs.update(self.get_return_values_map(return_values))
if not actions:
raise ValueError("'actions' cannot be empty")

update_expression = Update(*actions)
operation_kwargs[UPDATE_EXPRESSION] = update_expression.serialize(name_placeholders, expression_attribute_values)

if name_placeholders:
operation_kwargs[EXPRESSION_ATTRIBUTE_NAMES] = self._reverse_dict(name_placeholders)
if expression_attribute_values:
operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values

operation_kwargs = self.get_operation_kwargs(
table_name=table_name,
hash_key=hash_key,
range_key=range_key,
actions=actions,
condition=condition,
return_values=return_values,
return_consumed_capacity=return_consumed_capacity,
return_item_collection_metrics=return_item_collection_metrics
)
try:
return self.dispatch(UPDATE_ITEM, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
Expand All @@ -899,34 +947,86 @@ def put_item(self,
"""
Performs the PutItem operation and returns the result
"""
self._check_condition('condition', condition)

operation_kwargs = {TABLE_NAME: table_name}
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key, key=ITEM))
name_placeholders = {}
expression_attribute_values = {}
operation_kwargs = self.get_operation_kwargs(
table_name=table_name,
hash_key=hash_key,
range_key=range_key,
key=ITEM,
attributes=attributes,
condition=condition,
return_values=return_values,
return_consumed_capacity=return_consumed_capacity,
return_item_collection_metrics=return_item_collection_metrics
)
try:
return self.dispatch(PUT_ITEM, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
raise PutError("Failed to put item: {}".format(e), e)

if attributes:
attrs = self.get_item_attribute_map(table_name, attributes)
operation_kwargs[ITEM].update(attrs[ITEM])
if condition is not None:
condition_expression = condition.serialize(name_placeholders, expression_attribute_values)
operation_kwargs[CONDITION_EXPRESSION] = condition_expression
if return_consumed_capacity:
def _get_transact_operation_kwargs(self,
client_request_token=None,
return_consumed_capacity=None,
return_item_collection_metrics=None):
operation_kwargs = {}
if client_request_token is not None:
operation_kwargs[CLIENT_REQUEST_TOKEN] = client_request_token
if return_consumed_capacity is not None:
operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity))
if return_item_collection_metrics:
if return_item_collection_metrics is not None:
operation_kwargs.update(self.get_item_collection_map(return_item_collection_metrics))
if return_values:
operation_kwargs.update(self.get_return_values_map(return_values))
if name_placeholders:
operation_kwargs[EXPRESSION_ATTRIBUTE_NAMES] = self._reverse_dict(name_placeholders)
if expression_attribute_values:
operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values

return operation_kwargs

def transact_write_items(self,
condition_check_items,
delete_items,
put_items,
update_items,
client_request_token=None,
return_consumed_capacity=None,
return_item_collection_metrics=None):
"""
Performs the TransactWrite operation and returns the result
"""
transact_items = []
transact_items.extend(
{TRANSACT_CONDITION_CHECK: item} for item in condition_check_items
)
transact_items.extend(
{TRANSACT_DELETE: item} for item in delete_items
)
transact_items.extend(
{TRANSACT_PUT: item} for item in put_items
)
transact_items.extend(
{TRANSACT_UPDATE: item} for item in update_items
)

operation_kwargs = self._get_transact_operation_kwargs(
client_request_token=client_request_token,
return_consumed_capacity=return_consumed_capacity,
return_item_collection_metrics=return_item_collection_metrics
)
operation_kwargs[TRANSACT_ITEMS] = transact_items

try:
return self.dispatch(PUT_ITEM, operation_kwargs)
return self.dispatch(TRANSACT_WRITE_ITEMS, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
raise PutError("Failed to put item: {}".format(e), e)
raise TransactWriteError("Failed to write transaction items", e)

def transact_get_items(self, get_items, return_consumed_capacity=None):
"""
Performs the TransactGet operation and returns the result
"""
operation_kwargs = self._get_transact_operation_kwargs(return_consumed_capacity=return_consumed_capacity)
operation_kwargs[TRANSACT_ITEMS] = [
{TRANSACT_GET: item} for item in get_items
]

try:
return self.dispatch(TRANSACT_GET_ITEMS, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
raise TransactGetError("Failed to get transaction items", e)

def batch_write_item(self,
table_name,
Expand Down Expand Up @@ -1014,16 +1114,13 @@ def get_item(self,
"""
Performs the GetItem operation and returns the result
"""
operation_kwargs = {}
name_placeholders = {}
if attributes_to_get is not None:
projection_expression = create_projection_expression(attributes_to_get, name_placeholders)
operation_kwargs[PROJECTION_EXPRESSION] = projection_expression
if name_placeholders:
operation_kwargs[EXPRESSION_ATTRIBUTE_NAMES] = self._reverse_dict(name_placeholders)
operation_kwargs[CONSISTENT_READ] = consistent_read
operation_kwargs[TABLE_NAME] = table_name
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key))
operation_kwargs = self.get_operation_kwargs(
table_name=table_name,
hash_key=hash_key,
range_key=range_key,
consistent_read=consistent_read,
attributes_to_get=attributes_to_get
)
try:
return self.dispatch(GET_ITEM, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
Expand Down
24 changes: 23 additions & 1 deletion pynamodb/connection/base.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Iterator, MutableMapping, Optional, Sequence, Text
from typing import Any, Dict, Iterator, MutableMapping, Optional, Sequence, Text, List

import botocore.session
from botocore.awsrequest import AWSPreparedRequest
Expand Down Expand Up @@ -56,9 +56,27 @@ class Connection:
def get_identifier_map(self, table_name: Text, hash_key, range_key: Optional[Any] = ..., key: Any = ...): ...
def get_consumed_capacity_map(self, return_consumed_capacity): ...
def get_return_values_map(self, return_values): ...
def get_return_values_on_condition_failure_map(self, return_values_on_condition_failure: str) -> Dict[str, str]: ...
def get_item_collection_map(self, return_item_collection_metrics): ...
def get_exclusive_start_key_map(self, table_name: Text, exclusive_start_key): ...

def get_operation_kwargs(
self,
table_name: Text,
hash_key: Any,
range_key: Optional[Any] = ...,
key: Text = ...,
attributes: Optional[Any] = ...,
attributes_to_get: Optional[Any] = ...,
actions: Optional[Sequence[Action]] = ...,
client_request_token: Optional[Text] = ...,
condition: Optional[Condition] = ...,
consistent_read: Optional[bool] = ...,
return_values: Optional[Any] = ...,
return_consumed_capacity: Optional[Any] = ...,
return_item_collection_metrics: Optional[Any] = ...
) -> Dict: ...

def delete_item(
self,
table_name: Text,
Expand Down Expand Up @@ -94,6 +112,10 @@ class Connection:
return_item_collection_metrics: Optional[Any] = ...
) -> Dict: ...

def _get_transact_operation_kwargs(self, client_request_token: Optional[str] = ..., return_consumed_capacity: Optional[Any] = ..., return_item_collection_metrics: Optional[Any] = ...) -> Dict: ...
def transact_get_items(self, get_items: List[Dict], return_consumed_capacity: Optional[Any] = ...) -> Dict: ...
def transact_write_items(self, condition_check_items: List[Dict], delete_items: List[Dict], put_items: List[Dict], update_items: List[Dict], client_request_token: Optional[Text] = ..., return_consumed_capacity: Optional[Any] = ..., return_item_collection_metrics: Optional[Any] = ...) -> Dict: ...

def batch_write_item(self, table_name: Text, put_items: Optional[Any] = ..., delete_items: Optional[Any] = ..., return_consumed_capacity: Optional[Any] = ..., return_item_collection_metrics: Optional[Any] = ...): ...
def batch_get_item(self, table_name: Text, keys, consistent_read: Optional[Any] = ..., return_consumed_capacity: Optional[Any] = ..., attributes_to_get: Optional[Any] = ...): ...
def get_item(self, table_name: Text, hash_key, range_key: Optional[Any] = ..., consistent_read: bool = ..., attributes_to_get: Optional[Any] = ...): ...
Expand Down
Loading

0 comments on commit 78db9e2

Please sign in to comment.