Skip to content

Commit

Permalink
Add operation settings to most operations (#887)
Browse files Browse the repository at this point in the history
  • Loading branch information
ikonst authored Jan 5, 2021
1 parent b10da8e commit a175f43
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 70 deletions.
2 changes: 2 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Other changes in this release:
* Replace the internal attribute type constants with their "short" DynamoDB version (#827)
* Typed list attributes can now support any Attribute subclass (#833)
* Add support for empty values in Binary and String attributes (#830)
* Most API operation methods now accept a ``settings`` argument to customize settings of individual operations.
This currently allow adding or overriding HTTP headers. (#887)
* Remove ``ListAttribute.remove_indexes`` (added in v4.3.2) and document usage of remove for list elements (#838)
* Add the attribute name to error messages when deserialization fails (#815)
* Add the table name to error messages for transactional operations (#835)
Expand Down
68 changes: 38 additions & 30 deletions pynamodb/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from pynamodb.expressions.operand import Path
from pynamodb.expressions.projection import create_projection_expression
from pynamodb.expressions.update import Action, Update
from pynamodb.settings import get_settings_value
from pynamodb.settings import get_settings_value, OperationSettings
from pynamodb.signals import pre_dynamodb_send, post_dynamodb_send
from pynamodb.types import HASH, RANGE

Expand All @@ -67,8 +67,8 @@ class MetaTable(object):
A pythonic wrapper around table metadata
"""

def __init__(self, data: Dict) -> None:
self.data: Dict = data or {}
def __init__(self, data: Optional[Dict]) -> None:
self.data = data or {}
self._range_keyname = None
self._hash_keyname = None

Expand Down Expand Up @@ -290,12 +290,6 @@ def __init__(self,
def __repr__(self) -> str:
return "Connection<{}>".format(self.client.meta.endpoint_url)

def _log_debug(self, operation: str, kwargs: str):
"""
Sends a debug message to the logger
"""
log.debug("Calling %s with arguments %s", operation, kwargs)

def _sign_request(self, request):
auth = self.client._request_signer.get_auth_instance(
self.client._request_signer.signing_name,
Expand All @@ -306,16 +300,18 @@ def _sign_request(self, request):
def _create_prepared_request(
self,
params: Dict,
operation_model: Optional[Any],
settings: OperationSettings,
) -> AWSPreparedRequest:
request = create_request_object(params)
self._sign_request(request)
prepared_request = self.client._endpoint.prepare_request(request)
if self._extra_headers is not None:
prepared_request.headers.update(self._extra_headers)
if settings.extra_headers is not None:
prepared_request.headers.update(settings.extra_headers)
return prepared_request

def dispatch(self, operation_name, operation_kwargs):
def dispatch(self, operation_name: str, operation_kwargs: Dict, settings: OperationSettings = OperationSettings.default) -> Dict:
"""
Dispatches `operation_name` with arguments `operation_kwargs`
Expand All @@ -324,13 +320,13 @@ def dispatch(self, operation_name, operation_kwargs):
if operation_name not in [DESCRIBE_TABLE, LIST_TABLES, UPDATE_TABLE, UPDATE_TIME_TO_LIVE, DELETE_TABLE, CREATE_TABLE]:
if RETURN_CONSUMED_CAPACITY not in operation_kwargs:
operation_kwargs.update(self.get_consumed_capacity_map(TOTAL))
self._log_debug(operation_name, operation_kwargs)
log.debug("Calling %s with arguments %s", operation_name, operation_kwargs)

table_name = operation_kwargs.get(TABLE_NAME)
req_uuid = uuid.uuid4()

self.send_pre_boto_callback(operation_name, req_uuid, table_name)
data = self._make_api_call(operation_name, operation_kwargs)
data = self._make_api_call(operation_name, operation_kwargs, settings)
self.send_post_boto_callback(operation_name, req_uuid, table_name)

if data and CONSUMED_CAPACITY in data:
Expand All @@ -352,7 +348,7 @@ def send_pre_boto_callback(self, operation_name, req_uuid, table_name):
except Exception as e:
log.exception("pre_boto callback threw an exception.")

def _make_api_call(self, operation_name, operation_kwargs):
def _make_api_call(self, operation_name: str, operation_kwargs: Dict, settings: OperationSettings = OperationSettings.default) -> Dict:
"""
This private method is here for two reasons:
1. It's faster to avoid using botocore's response parsing
Expand All @@ -379,7 +375,7 @@ def _make_api_call(self, operation_name, operation_kwargs):
prepared_request.reset_stream()

# Create a new request for each retry (including a new signature).
prepared_request = self._create_prepared_request(request_dict, operation_model)
prepared_request = self._create_prepared_request(request_dict, settings)

# Implement the before-send event from botocore
event_name = 'before-send.dynamodb.{}'.format(operation_model.name)
Expand Down Expand Up @@ -470,6 +466,8 @@ def _make_api_call(self, operation_name, operation_kwargs):

return self._handle_binary_attributes(data)

assert False # unreachable code

@staticmethod
def _handle_binary_attributes(data):
""" Simulate botocore's binary attribute handling """
Expand Down Expand Up @@ -943,6 +941,7 @@ def delete_item(
return_values: Optional[str] = None,
return_consumed_capacity: Optional[str] = None,
return_item_collection_metrics: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the DeleteItem operation and returns the result
Expand All @@ -957,7 +956,7 @@ def delete_item(
return_item_collection_metrics=return_item_collection_metrics
)
try:
return self.dispatch(DELETE_ITEM, operation_kwargs)
return self.dispatch(DELETE_ITEM, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise DeleteError("Failed to delete item: {}".format(e), e)

Expand All @@ -971,6 +970,7 @@ def update_item(
return_consumed_capacity: Optional[str] = None,
return_item_collection_metrics: Optional[str] = None,
return_values: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the UpdateItem operation
Expand All @@ -986,10 +986,10 @@ def update_item(
condition=condition,
return_values=return_values,
return_consumed_capacity=return_consumed_capacity,
return_item_collection_metrics=return_item_collection_metrics
return_item_collection_metrics=return_item_collection_metrics,
)
try:
return self.dispatch(UPDATE_ITEM, operation_kwargs)
return self.dispatch(UPDATE_ITEM, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise UpdateError("Failed to update item: {}".format(e), e)

Expand All @@ -1003,6 +1003,7 @@ def put_item(
return_values: Optional[str] = None,
return_consumed_capacity: Optional[str] = None,
return_item_collection_metrics: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the PutItem operation and returns the result
Expand All @@ -1019,7 +1020,7 @@ def put_item(
return_item_collection_metrics=return_item_collection_metrics
)
try:
return self.dispatch(PUT_ITEM, operation_kwargs)
return self.dispatch(PUT_ITEM, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise PutError("Failed to put item: {}".format(e), e)

Expand Down Expand Up @@ -1047,7 +1048,8 @@ def transact_write_items(
update_items: Sequence[Dict],
client_request_token: Optional[str] = None,
return_consumed_capacity: Optional[str] = None,
return_item_collection_metrics: Optional[str] = None
return_item_collection_metrics: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the TransactWrite operation and returns the result
Expand All @@ -1074,14 +1076,15 @@ def transact_write_items(
operation_kwargs[TRANSACT_ITEMS] = transact_items

try:
return self.dispatch(TRANSACT_WRITE_ITEMS, operation_kwargs)
return self.dispatch(TRANSACT_WRITE_ITEMS, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise TransactWriteError("Failed to write transaction items", e)

def transact_get_items(
self,
get_items: Sequence[Dict],
return_consumed_capacity: Optional[str] = None
return_consumed_capacity: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the TransactGet operation and returns the result
Expand All @@ -1092,7 +1095,7 @@ def transact_get_items(
]

try:
return self.dispatch(TRANSACT_GET_ITEMS, operation_kwargs)
return self.dispatch(TRANSACT_GET_ITEMS, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise TransactGetError("Failed to get transaction items", e)

Expand All @@ -1102,7 +1105,8 @@ def batch_write_item(
put_items: Optional[Any] = None,
delete_items: Optional[Any] = None,
return_consumed_capacity: Optional[str] = None,
return_item_collection_metrics: Optional[str] = None
return_item_collection_metrics: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the batch_write_item operation
Expand Down Expand Up @@ -1132,7 +1136,7 @@ def batch_write_item(
})
operation_kwargs[REQUEST_ITEMS][table_name] = delete_items_list + put_items_list
try:
return self.dispatch(BATCH_WRITE_ITEM, operation_kwargs)
return self.dispatch(BATCH_WRITE_ITEM, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise PutError("Failed to batch write items: {}".format(e), e)

Expand All @@ -1142,7 +1146,8 @@ def batch_get_item(
keys: Sequence[str],
consistent_read: Optional[bool] = None,
return_consumed_capacity: Optional[str] = None,
attributes_to_get: Optional[Any] = None
attributes_to_get: Optional[Any] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the batch get item operation
Expand Down Expand Up @@ -1173,7 +1178,7 @@ def batch_get_item(
)
operation_kwargs[REQUEST_ITEMS][table_name].update(keys_map)
try:
return self.dispatch(BATCH_GET_ITEM, operation_kwargs)
return self.dispatch(BATCH_GET_ITEM, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise GetError("Failed to batch get items: {}".format(e), e)

Expand All @@ -1184,6 +1189,7 @@ def get_item(
range_key: Optional[str] = None,
consistent_read: bool = False,
attributes_to_get: Optional[Any] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the GetItem operation and returns the result
Expand All @@ -1196,7 +1202,7 @@ def get_item(
attributes_to_get=attributes_to_get
)
try:
return self.dispatch(GET_ITEM, operation_kwargs)
return self.dispatch(GET_ITEM, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise GetError("Failed to get item: {}".format(e), e)

Expand All @@ -1212,6 +1218,7 @@ def scan(
total_segments: Optional[int] = None,
consistent_read: Optional[bool] = None,
index_name: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the scan operation
Expand Down Expand Up @@ -1248,7 +1255,7 @@ def scan(
operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values

try:
return self.dispatch(SCAN, operation_kwargs)
return self.dispatch(SCAN, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise ScanError("Failed to scan table: {}".format(e), e)

Expand All @@ -1266,6 +1273,7 @@ def query(
return_consumed_capacity: Optional[str] = None,
scan_index_forward: Optional[bool] = None,
select: Optional[str] = None,
settings: OperationSettings = OperationSettings.default,
) -> Dict:
"""
Performs the Query operation and returns the result
Expand Down Expand Up @@ -1322,7 +1330,7 @@ def query(
operation_kwargs[EXPRESSION_ATTRIBUTE_VALUES] = expression_attribute_values

try:
return self.dispatch(QUERY, operation_kwargs)
return self.dispatch(QUERY, operation_kwargs, settings)
except BOTOCORE_EXCEPTIONS as e:
raise QueryError("Failed to query items: {}".format(e), e)

Expand Down
Loading

0 comments on commit a175f43

Please sign in to comment.