From 9531ccbef24f167058fcf755655deea9ec41af2b Mon Sep 17 00:00:00 2001 From: lautip Date: Fri, 11 Mar 2022 17:48:41 +0100 Subject: [PATCH] Initial commit --- .gitignore | 2 + lambda_write_SQS_deadletter_to_S3.py | 42 +++++++++ lambda_write_to_S3_from_SQS.py | 101 +++++++++++++++++++++ lambda_write_to_timestream_from_s3.py | 123 ++++++++++++++++++++++++++ requirements.txt | 1 + 5 files changed, 269 insertions(+) create mode 100644 .gitignore create mode 100644 lambda_write_SQS_deadletter_to_S3.py create mode 100644 lambda_write_to_S3_from_SQS.py create mode 100644 lambda_write_to_timestream_from_s3.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2c55d6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/venv/ +.idea diff --git a/lambda_write_SQS_deadletter_to_S3.py b/lambda_write_SQS_deadletter_to_S3.py new file mode 100644 index 0000000..1364e10 --- /dev/null +++ b/lambda_write_SQS_deadletter_to_S3.py @@ -0,0 +1,42 @@ +# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +# This file is licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + + +import json +import boto3 +import os +import time + +# Recover & check environment variables +bucket = os.environ.get("DEADLETTER_BUCKET_NAME") +trace = os.environ.get("TRACE", False) + +if trace in ("true", "True", "TRUE", 1, "Yes", "YES", True): + trace = True +else: + trace = False + +if not bucket: + raise Exception("Environment variable BUCKET_NAME missing") + +s3 = boto3.client('s3') + + +# noinspection PyUnusedLocal +def lambda_handler(event, context): + # save to S3 without further processing + key = str(time.time_ns()) + s3.put_object( + Body=json.dumps(event), + Bucket=bucket, + Key=key + ) diff --git a/lambda_write_to_S3_from_SQS.py b/lambda_write_to_S3_from_SQS.py new file mode 100644 index 0000000..2a66514 --- /dev/null +++ b/lambda_write_to_S3_from_SQS.py @@ -0,0 +1,101 @@ +# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +# This file is licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + + +import json +import boto3 +import os +from datetime import datetime as dt + +# Recover & check environment variables +bucket = os.environ.get("BUCKET_NAME") +trace = os.environ.get("TRACE", False) + +if trace in ("true", "True", "TRUE", 1, "Yes", "YES", True): + trace = True +else: + trace = False + +if not bucket: + raise Exception("Environment variable BUCKET_NAME missing") + +s3 = boto3.client('s3') + + +# noinspection PyUnusedLocal +def lambda_handler(event, context): + message_ids = [] + + if 'Records' in event: + print("Found {} records to store to S3.".format(len(event['Records']))) + # First build a list of all the message IDs to process. The list will be depopulated when processed. + for record in event.get('Records'): + message_ids.append(record['messageId']) + if trace: + print("Messages IDs to proceed: {}".format(message_ids)) + # Process each message in the Records + for record in event.get('Records'): + body_str = record.get('body') + try: + # Make sure the records is properly structured and the payload exists + if not body_str: + raise Exception("No body found in Record") + body = json.loads(body_str) + msg = body.get('Message') + if not msg: + raise Exception("no Payload found") + else: + # Inspect the payload + payload = json.loads(msg) + if trace is True: + print("The payload is: {}".format(payload)) + timestring = payload.get('timestamp') + if not timestring: + raise Exception('Malformed payload: timestamp key missing') + thing = payload.get('gateway') + if not thing: + raise Exception('Malformed payload: thing key missing') + device = payload.get('deviceName') + if not device: + raise Exception('Malformed payload: thing key missing') + epoch = payload.get('epoch_ms') + if not epoch: + raise Exception('Malformed payload: thing key missing') + value = payload.get('values') + if trace is True: + print("values in payload: {}".format(value)) + if not value: + raise Exception("Empty payload found") + # Check that the timestamp is in the right format and genera the S3 object key + tstamp = dt.strptime(timestring, "%Y-%m-%dT%H:%M:%S%z") + key = "{:02d}/{:02d}/{:02d}/{}/{}/{}.json".format(tstamp.year, tstamp.month, tstamp.day, + thing, device, epoch) + # save to S3 + s3.put_object( + Body=json.dumps(payload), + Bucket=bucket, + Key=key + ) + if trace is True: + print("Object stored: {}".format(key)) + # Finally remove the item from the list of unprocessed messages + if trace is True: + print("Message ID {} processed successfully".format(record['messageId'])) + message_ids.remove(record['messageId']) + + except Exception as e: + print("Error when processing a Record: {}".format(e)) + + r = {"batchItemFailures": [{"itemIdentifier": x} for x in message_ids]} + if trace is True: + print("Returning unprocessed messages IDs: {}".format(r)) + return r diff --git a/lambda_write_to_timestream_from_s3.py b/lambda_write_to_timestream_from_s3.py new file mode 100644 index 0000000..006445e --- /dev/null +++ b/lambda_write_to_timestream_from_s3.py @@ -0,0 +1,123 @@ +# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +# This file is licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + + +import json +import urllib.parse +import boto3 +import os + +print('Loading function') + +s3 = boto3.client('s3') +ts = boto3.client('timestream-write') + + +def format_for_timestream(data): + """ + Formats the data into common attributes and records + :param data: JSON payload received + :return: common_att, records + """ + + try: + common_attr = { + 'Dimensions': [ + { + 'Name': 'Gateway', + 'Value': str(data['thing']), + 'DimensionValueType': 'VARCHAR' + }, + { + 'Name': 'deviceName', + 'Value': str(data['device']), + 'DimensionValueType': 'VARCHAR' + }, + ], + 'Time': str(data['epoch_ms']), + 'TimeUnit': 'MILLISECONDS' + } + vals = data['values'] + records = [] + for k,v in vals.items(): + r = { + 'MeasureName': k, + 'MeasureValue': str(v), + 'MeasureValueType': 'BIGINT' if isinstance(v, int) else 'DOUBLE' + } + records.append(r.copy()) + return common_attr, records + except Exception as e: + print("Error when preparing data for timestream: {}".format(e)) + print("Data received: {}".format(data)) + raise e + + +def write_to_timestream(db, table, common_attributes, records): + """ + Write the daat to timestream + :param db: database name + :param table: table name + :param dict common_attr: common attributes + :param list records: records to be written + :return: None + """ + try: + result = ts.write_records(DatabaseName=db, + TableName=table, + Records=records, + CommonAttributes=common_attributes) + + print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) + except ts.exceptions.RejectedRecordsException as err: + print("RejectedRecords: ", err) + for rr in err.response["RejectedRecords"]: + print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) + print("Other records were written successfully. ") + except Exception as e: + raise e + + +def lambda_handler(event, context): + # Get Environment variables + print("Starting work") + try: + db = os.environ.get("DB_NAME") + table = os.environ.get("TABLE_NAME") + except KeyError as e: + print("Error getting the environment variables: {}".format(e)) + raise e + + try: + # Get the object from the event and fetch its content + bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + response = s3.get_object(Bucket=bucket, Key=key) + # print("CONTENT TYPE: " + response['ContentType']) + data = json.loads(response['Body'].read()) + print("New IoT data received: {}".format(data)) + except Exception as e: + print( + 'Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format( + key, bucket)) + print(e) + raise e + + try: + # Format the data for Timestream and write it + cattr, recs = format_for_timestream(data) + write_to_timestream(db=db, table=table, common_attributes=cattr, records=recs) + print("Finished without error") + except Exception as e: + print("Error when pushing data to Timestream") + print(e) + raise e diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fedda84 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +boto3 == 1.18.55