From 675ab2edaf1baf88d5b2290bbd6b21d791604ce0 Mon Sep 17 00:00:00 2001 From: Peter Emil Date: Sun, 28 Feb 2021 01:53:13 +0200 Subject: [PATCH] initial release --- .gitignore | 5 + README.md | 116 +++++++++++++++++++++ poetry.lock | 114 ++++++++++++++++++++ pyproject.toml | 16 +++ storagebox/__init__.py | 3 + storagebox/api.py | 44 ++++++++ storagebox/repository/__init__.py | 2 + storagebox/repository/deduplication.py | 40 ++++++++ storagebox/repository/dynamodb.py | 18 ++++ storagebox/repository/item_bank.py | 137 +++++++++++++++++++++++++ storagebox/settings.py | 13 +++ 11 files changed, 508 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 poetry.lock create mode 100644 pyproject.toml create mode 100644 storagebox/__init__.py create mode 100644 storagebox/api.py create mode 100644 storagebox/repository/__init__.py create mode 100644 storagebox/repository/deduplication.py create mode 100644 storagebox/repository/dynamodb.py create mode 100644 storagebox/repository/item_bank.py create mode 100644 storagebox/settings.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a6caa69 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea/ +dist/ +storagebox.egg-info/ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..8d6c117 --- /dev/null +++ b/README.md @@ -0,0 +1,116 @@ +# StorageBox + +StorageBox is a python module that you can use to de-duplicate data +among distributed components. + +For example, let's assume you run a movie store. You have +voucher codes you'd like to distribute to the first 30 users who press +a button. You are concerned that some users might try to get more +than 1 voucher code by exploiting race conditions (maybe clicking the +button from multiple machines at the same time). + + + +Here is what StorageBox allows you to do +``` +# Setup Code +import storagebox + + +item_repo = storagebox.ItemBankDynamoDbRepository(table_name="voucher_codes") + +deduplication_repo = storagebox.DeduplicationDynamoDbRepository(table_name="storage_box_deduplication_table") + + +# You can add items to the item repo (for example add list of voucher codes) +item_repo.batch_add_items(voucher_codes) + + +# You can then assign voucher codes to User IDs +deduplicator = storagebox.Deduplicator(item_repo=item_repo, deduplication_repo=deduplication_repo) + +voucher_code = deduplicator.fetch_item_for_deduplication_id( + deduplication_id=user_id +) +``` +And that's it! + +As long as you use a suitable `deduplication_id`, all race conditions +and data hazards will be taken care of for you. Examples of suitable +candidates for `deduplication_id` can be User ID, IP Address, +Email Address or anything that works best with your application. + + +## Prerequisites +To use StorageBox, you need the following already set up. + +- An ItemBank DynamoDB Table, The current implementation requires the table to have 1 column +called `item`. This is where you will store items (in the case of the example: +voucher codes). +- A Deduplication DynamoDB Table, This will be used by `StorageBox` to achieve idempotency, +that is, to make sure that if you call `fetch_item_for_deduplication_id` multiple times with +the same `deduplication_id`, you will always get the same result. + +If you prefer to use something else other than DynamoDB, you can implement your own `ItemBankRepository` +and/or `DeduplicationRepository` for any other backend. This implementation will have to implement +the already established Abstract class. If you do that, contributions are welcome! + + +## Installation +``` +pip install storagebox +``` + + +## Other Example Use Cases +Hosting a big event and only have 10,300 seats that would be booked in the first few minutes? +``` +# Before the event, add 10,300 numbers to the bank +item_repo.batch_add_items([str(i) for i in range(10300)]) + +# From your webserver +assignment_number = deduplicator.fetch_item_for_deduplication_id( + deduplication_id=email +) +``` + +Are you an influencer and only have 5000 people to give special referral links to? (First 5000 +people who click the link in the description get a free something!) +``` +# Before you post your content +item_repo.batch_add_items(referral_links_list) + +# From your webserver +referral_link = deduplicator.fetch_item_for_deduplication_id( + deduplication_id=ip_address +) +``` + +Are you organizing online classes for your 150 students, you're willing to host 3 classes (50 students) +each but you'd like to be sure that no student attends more than 1 class? +``` +# Before you host your classes +class_1_codes = storagebox.ItemBankDynamoDbRepository(table_name="class_1_codes") +class_2_codes = storagebox.ItemBankDynamoDbRepository(table_name="class_2_codes") +class_3_codes = storagebox.ItemBankDynamoDbRepository(table_name="class_3_codes") +deduplication_repo = storagebox.DeduplicationDynamoDbRepository(table_name="myonline_classes_deduplication_table") + +class_1_codes.([str(i) for i in range(0, 50)]) +class_2_codes.([str(i) for i in range(50, 100)]) +class_3_codes.([str(i) for i in range(100, 150)]) + +# From your webserver +deduplicators = { + 'class_1': storagebox.Deduplicator(item_repo=class_1_codes, deduplication_repo=deduplication_repo), + 'class_2': storagebox.Deduplicator(item_repo=class_2_codes, deduplication_repo=deduplication_repo), + 'class_3': storagebox.Deduplicator(item_repo=class_3_codes, deduplication_repo=deduplication_repo), +} + +deduplicator[requested_class].fetch_item_for_deduplication_id( + deduplication_id=student_id +) + +``` + +# How It Works +A blogpost explaining how `storagebox` works is available [here](https://blog.peteremil.com/2021/02/realtime-distributed-deduplication-how.html) \ No newline at end of file diff --git a/poetry.lock b/poetry.lock new file mode 100644 index 0000000..48e39ec --- /dev/null +++ b/poetry.lock @@ -0,0 +1,114 @@ +[[package]] +category = "main" +description = "The AWS SDK for Python" +name = "boto3" +optional = false +python-versions = "*" +version = "1.16.63" + +[package.dependencies] +botocore = ">=1.19.63,<1.20.0" +jmespath = ">=0.7.1,<1.0.0" +s3transfer = ">=0.3.0,<0.4.0" + +[[package]] +category = "main" +description = "Low-level, data-driven core of boto 3." +name = "botocore" +optional = false +python-versions = "*" +version = "1.19.63" + +[package.dependencies] +jmespath = ">=0.7.1,<1.0.0" +python-dateutil = ">=2.1,<3.0.0" + +[package.dependencies.urllib3] +python = "<3.4.0 || >=3.5.0" +version = ">=1.25.4,<1.27" + +[[package]] +category = "main" +description = "JSON Matching Expressions" +name = "jmespath" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +version = "0.10.0" + +[[package]] +category = "main" +description = "Extensions to the standard Python datetime module" +name = "python-dateutil" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +version = "2.8.1" + +[package.dependencies] +six = ">=1.5" + +[[package]] +category = "main" +description = "An Amazon S3 Transfer Manager" +name = "s3transfer" +optional = false +python-versions = "*" +version = "0.3.4" + +[package.dependencies] +botocore = ">=1.12.36,<2.0a.0" + +[[package]] +category = "main" +description = "Python 2 and 3 compatibility utilities" +name = "six" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" +version = "1.15.0" + +[[package]] +category = "main" +description = "HTTP library with thread-safe connection pooling, file post, and more." +marker = "python_version != \"3.4\"" +name = "urllib3" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" +version = "1.26.3" + +[package.extras] +brotli = ["brotlipy (>=0.6.0)"] +secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"] +socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"] + +[metadata] +content-hash = "ca3a71c0444d91fb8fef15431f27baf27d96b10bcbece1da24ba9b23899e6cdd" +python-versions = "^3.8" + +[metadata.files] +boto3 = [ + {file = "boto3-1.16.63-py2.py3-none-any.whl", hash = "sha256:1c0003609e63e8cff51dee7a49e904bcdb20e140b5f7a10a03006289fd8c8dc1"}, + {file = "boto3-1.16.63.tar.gz", hash = "sha256:c919dac9773115025e1e2a7e462f60ca082e322bb6f4354247523e4226133b0b"}, +] +botocore = [ + {file = "botocore-1.19.63-py2.py3-none-any.whl", hash = "sha256:ad4adfcc195b5401d84b0c65d3a89e507c1d54c201879c8761ff10ef5c361e21"}, + {file = "botocore-1.19.63.tar.gz", hash = "sha256:d3694f6ef918def8082513e5ef309cd6cd83b612e9984e3a66e8adc98c650a92"}, +] +jmespath = [ + {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"}, + {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"}, +] +python-dateutil = [ + {file = "python-dateutil-2.8.1.tar.gz", hash = "sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c"}, + {file = "python_dateutil-2.8.1-py2.py3-none-any.whl", hash = "sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"}, +] +s3transfer = [ + {file = "s3transfer-0.3.4-py2.py3-none-any.whl", hash = "sha256:1e28620e5b444652ed752cf87c7e0cb15b0e578972568c6609f0f18212f259ed"}, + {file = "s3transfer-0.3.4.tar.gz", hash = "sha256:7fdddb4f22275cf1d32129e21f056337fd2a80b6ccef1664528145b72c49e6d2"}, +] +six = [ + {file = "six-1.15.0-py2.py3-none-any.whl", hash = "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"}, + {file = "six-1.15.0.tar.gz", hash = "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259"}, +] +urllib3 = [ + {file = "urllib3-1.26.3-py2.py3-none-any.whl", hash = "sha256:1b465e494e3e0d8939b50680403e3aedaa2bc434b7d5af64dfd3c958d7f5ae80"}, + {file = "urllib3-1.26.3.tar.gz", hash = "sha256:de3eedaad74a2683334e282005cd8d7f22f4d55fa690a2a1020a416cb0a47e73"}, +] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..43565d9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "storagebox" +version = "1.0.5" +description = "A reusable, idempotent, and exactly once deduplication API" +authors = ["Peter Emil Halim "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.8" +boto3 = "^1.16.63" + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry>=0.12"] +build-backend = "poetry.masonry.api" diff --git a/storagebox/__init__.py b/storagebox/__init__.py new file mode 100644 index 0000000..3754945 --- /dev/null +++ b/storagebox/__init__.py @@ -0,0 +1,3 @@ +from storagebox.repository.deduplication import DeduplicationDynamoDbRepository +from storagebox.repository.item_bank import ItemBankDynamoDbRepository +from storagebox.api import Deduplicator diff --git a/storagebox/api.py b/storagebox/api.py new file mode 100644 index 0000000..0075f1b --- /dev/null +++ b/storagebox/api.py @@ -0,0 +1,44 @@ +import logging +import typing +from botocore.exceptions import ClientError +from storagebox import settings +from storagebox import repository + + +log = logging.getLogger('storageBox') +log.setLevel(settings.DEFAULT_LOGGING_LEVEL) + + +class Deduplicator: + def __init__(self, item_repo, deduplication_repo): + self.item_repo = item_repo + self.deduplication_repo = deduplication_repo + + def fetch_item_for_deduplication_id(self, deduplication_id): + item_string = self.item_repo.get_item_from_bank() + if item_string is None: + return item_string + try: + self.deduplication_repo.put_deduplication_id( + deduplication_id=deduplication_id, + item_string=item_string + ) + return item_string + except ClientError: + log.debug("deduplication_id is already assigned, will check if I" + " should return item_string %s to the bank", item_string) + existing_item_string = self.deduplication_repo.get_value_for_deduplication_id( + deduplication_id=deduplication_id + ) + if existing_item_string != item_string: + self.item_repo.add_item_to_bank( + item_string=item_string + ) + log.debug("Item %s was returned", item_string) + return existing_item_string + return item_string + + def add_items_to_bank(self, items: typing.List[str]): + self.item_repo.batch_add_items( + items=items + ) diff --git a/storagebox/repository/__init__.py b/storagebox/repository/__init__.py new file mode 100644 index 0000000..013a88e --- /dev/null +++ b/storagebox/repository/__init__.py @@ -0,0 +1,2 @@ +from storagebox.repository.deduplication import DeduplicationDynamoDbRepository +from storagebox.repository.item_bank import ItemBankDynamoDbRepository diff --git a/storagebox/repository/deduplication.py b/storagebox/repository/deduplication.py new file mode 100644 index 0000000..ffff272 --- /dev/null +++ b/storagebox/repository/deduplication.py @@ -0,0 +1,40 @@ +import abc +import logging +from storagebox.repository.dynamodb import DynamoDBBasedRepository + + +log = logging.getLogger('storageBox') + + +class DeduplicationRepository(abc.ABC): + @abc.abstractmethod + def get_value_for_deduplication_id(self, deduplication_id: str): + raise NotImplementedError + + @abc.abstractmethod + def put_deduplication_id(self, deduplication_id: str, item_string: str): + raise NotImplementedError + + +class DeduplicationDynamoDbRepository(DeduplicationRepository, DynamoDBBasedRepository): + def get_value_for_deduplication_id(self, deduplication_id:str): + response = self.table.get_item( + Key={ + 'deduplication_id': str(deduplication_id) + } + ) + return response.get('Item', {}).get('item_string') # Returns None if not found + + def put_deduplication_id(self, deduplication_id: str, item_string: str): + obj = { + 'deduplication_id': deduplication_id, + 'item_string': item_string + } + self.table.put_item( # should only be put if there is no existing entry + Item=obj, + Expected={ + 'deduplication_id': { + 'Exists': False + } + } + ) diff --git a/storagebox/repository/dynamodb.py b/storagebox/repository/dynamodb.py new file mode 100644 index 0000000..d4048e5 --- /dev/null +++ b/storagebox/repository/dynamodb.py @@ -0,0 +1,18 @@ +import boto3 + + +class DynamoDBBasedRepository: + def __init__(self, table_name): + self.table_name = table_name + self.client = boto3.client('dynamodb') + if not self.table_alreaedy_exists(table_name=self.table_name): + raise RuntimeError(f"DynamoDB table {self.table_name} does not exist") + dynamodb = boto3.resource('dynamodb') + self.table = dynamodb.Table(self.table_name) + + def table_alreaedy_exists(self, table_name) -> bool: + try: + self.client.describe_table(TableName=table_name) + return True + except self.client.exceptions.ResourceNotFoundException: + return False diff --git a/storagebox/repository/item_bank.py b/storagebox/repository/item_bank.py new file mode 100644 index 0000000..536f910 --- /dev/null +++ b/storagebox/repository/item_bank.py @@ -0,0 +1,137 @@ +import abc +import random +import typing +import time +import logging +import collections +from botocore.exceptions import ClientError + +from storagebox import settings +from storagebox.repository.dynamodb import DynamoDBBasedRepository + + +log = logging.getLogger('storageBox') + + +class ItemBankRepository(abc.ABC): + @abc.abstractmethod + def batch_add_items(self, items: typing.List[str]): + pass + + @abc.abstractmethod + def add_item_to_bank(self, item_string: str): + pass + + @abc.abstractmethod + def get_item_from_bank(self) -> typing.Optional[str]: + pass + + +class ItemBankDynamoDbRepository(ItemBankRepository, DynamoDBBasedRepository): + @staticmethod + def __convert_items_to_dynamodb_json(items: typing.List[str]) -> typing.List[typing.Dict]: + return [ + { + 'item': { + 'S': item + } + } + for item in items + ] + + def __group_items(self, items, size) -> collections.deque: + if len(items) <= size: + return collections.deque([items]) + batches = collections.deque() + batches.append( + items[:size] + ) + batches.extend( + self.__group_items( + items=items[size:], + size=size + ) + ) + return batches + + def batch_add_items(self, items: typing.List[str]): + for item in items: + assert len(item) < settings.MAX_ALLOWED_ITEM_SIZE, f"Item size cannot exceed {settings.MAX_ALLOWED_ITEM_SIZE} KB" + dynamodb_json_items = self.__convert_items_to_dynamodb_json( + items=items + ) + item_batches = self.__group_items( + items=dynamodb_json_items, + size=settings.MAX_ALLOWED_BATCH_SIZE + ) + consecutive_failures = 0 + while item_batches: + batch = item_batches.popleft() + response = self.client.batch_write_item( + RequestItems={ + self.table_name: [ + { + 'PutRequest': { + 'Item': item + } + } + for item in batch + ] + } + ) + if response.get('UnprocessedItems', {}).get('string'): + unprocessed_batch = [ + put_request['PutRequest'] + for put_request in response.get('UnprocessedItems', {}).get('string') + ] + item_batches.appendleft(unprocessed_batch) + consecutive_failures += 1 + backoff_time = settings.BATCH_ADDITION_BACKOFF_TIME * (2**consecutive_failures) + log.warning("A Batch Was Partially Unprocessed, will wait %s seconds", backoff_time) + time.sleep(backoff_time) + else: + consecutive_failures = 0 + + def add_item_to_bank(self, item_string: str): + self.table.put_item( + Item={ + 'item': item_string + } + ) + + def get_item_from_bank(self) -> typing.Optional[str]: + last_evaluated_key = None + while True: + if last_evaluated_key: + response = self.table.scan( + Limit=settings.CONCURRENT_FETCH_LIMIT, + ExclusiveStartKey=last_evaluated_key + ) + else: + response = self.table.scan( + Limit=settings.CONCURRENT_FETCH_LIMIT, + ) + items = response['Items'] + if not len(items): + return None + random.shuffle(items) + for item in items: + item_string = item['item'] + try: + self.table.delete_item( + Key={ + 'item': item_string + }, + Expected={ + 'item': { + 'Value': item_string, + 'Exists': True + } + } + ) + return item_string + except ClientError: + log.debug("Item was already deleted, this implies that " + "it was fetched by someone else, trying next code") + else: + last_evaluated_key = response['LastEvaluatedKey'] diff --git a/storagebox/settings.py b/storagebox/settings.py new file mode 100644 index 0000000..8a36f3f --- /dev/null +++ b/storagebox/settings.py @@ -0,0 +1,13 @@ +import os +import logging + +CONCURRENT_FETCH_LIMIT = int(os.environ.get('STORAGE_BOX_CONCURRENT_FETCH_LIMIT', '30')) +__DEFAULT_LOGGING_LEVEL_STR = os.environ.get('STORAGE_BOX_LOGGING_LEVEL', 'WARNING') +assert __DEFAULT_LOGGING_LEVEL_STR in [ + 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG' +] +DEFAULT_LOGGING_LEVEL = getattr(logging, __DEFAULT_LOGGING_LEVEL_STR) +MAX_ALLOWED_ITEM_SIZE = 400 +MAX_ALLOWED_BATCH_SIZE = 25 +BATCH_ADDITION_BACKOFF_TIME = 1 +