Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
lautip committed Mar 11, 2022
0 parents commit 9531ccb
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/venv/
.idea
42 changes: 42 additions & 0 deletions lambda_write_SQS_deadletter_to_S3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.

# This file is licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.


import json
import boto3
import os
import time

# Recover & check environment variables
bucket = os.environ.get("DEADLETTER_BUCKET_NAME")
trace = os.environ.get("TRACE", False)

if trace in ("true", "True", "TRUE", 1, "Yes", "YES", True):
trace = True
else:
trace = False

if not bucket:
raise Exception("Environment variable BUCKET_NAME missing")

s3 = boto3.client('s3')


# noinspection PyUnusedLocal
def lambda_handler(event, context):
# save to S3 without further processing
key = str(time.time_ns())
s3.put_object(
Body=json.dumps(event),
Bucket=bucket,
Key=key
)
101 changes: 101 additions & 0 deletions lambda_write_to_S3_from_SQS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.

# This file is licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.


import json
import boto3
import os
from datetime import datetime as dt

# Recover & check environment variables
bucket = os.environ.get("BUCKET_NAME")
trace = os.environ.get("TRACE", False)

if trace in ("true", "True", "TRUE", 1, "Yes", "YES", True):
trace = True
else:
trace = False

if not bucket:
raise Exception("Environment variable BUCKET_NAME missing")

s3 = boto3.client('s3')


# noinspection PyUnusedLocal
def lambda_handler(event, context):
message_ids = []

if 'Records' in event:
print("Found {} records to store to S3.".format(len(event['Records'])))
# First build a list of all the message IDs to process. The list will be depopulated when processed.
for record in event.get('Records'):
message_ids.append(record['messageId'])
if trace:
print("Messages IDs to proceed: {}".format(message_ids))
# Process each message in the Records
for record in event.get('Records'):
body_str = record.get('body')
try:
# Make sure the records is properly structured and the payload exists
if not body_str:
raise Exception("No body found in Record")
body = json.loads(body_str)
msg = body.get('Message')
if not msg:
raise Exception("no Payload found")
else:
# Inspect the payload
payload = json.loads(msg)
if trace is True:
print("The payload is: {}".format(payload))
timestring = payload.get('timestamp')
if not timestring:
raise Exception('Malformed payload: timestamp key missing')
thing = payload.get('gateway')
if not thing:
raise Exception('Malformed payload: thing key missing')
device = payload.get('deviceName')
if not device:
raise Exception('Malformed payload: thing key missing')
epoch = payload.get('epoch_ms')
if not epoch:
raise Exception('Malformed payload: thing key missing')
value = payload.get('values')
if trace is True:
print("values in payload: {}".format(value))
if not value:
raise Exception("Empty payload found")
# Check that the timestamp is in the right format and genera the S3 object key
tstamp = dt.strptime(timestring, "%Y-%m-%dT%H:%M:%S%z")
key = "{:02d}/{:02d}/{:02d}/{}/{}/{}.json".format(tstamp.year, tstamp.month, tstamp.day,
thing, device, epoch)
# save to S3
s3.put_object(
Body=json.dumps(payload),
Bucket=bucket,
Key=key
)
if trace is True:
print("Object stored: {}".format(key))
# Finally remove the item from the list of unprocessed messages
if trace is True:
print("Message ID {} processed successfully".format(record['messageId']))
message_ids.remove(record['messageId'])

except Exception as e:
print("Error when processing a Record: {}".format(e))

r = {"batchItemFailures": [{"itemIdentifier": x} for x in message_ids]}
if trace is True:
print("Returning unprocessed messages IDs: {}".format(r))
return r
123 changes: 123 additions & 0 deletions lambda_write_to_timestream_from_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.

# This file is licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.


import json
import urllib.parse
import boto3
import os

print('Loading function')

s3 = boto3.client('s3')
ts = boto3.client('timestream-write')


def format_for_timestream(data):
"""
Formats the data into common attributes and records
:param data: JSON payload received
:return: common_att, records
"""

try:
common_attr = {
'Dimensions': [
{
'Name': 'Gateway',
'Value': str(data['thing']),
'DimensionValueType': 'VARCHAR'
},
{
'Name': 'deviceName',
'Value': str(data['device']),
'DimensionValueType': 'VARCHAR'
},
],
'Time': str(data['epoch_ms']),
'TimeUnit': 'MILLISECONDS'
}
vals = data['values']
records = []
for k,v in vals.items():
r = {
'MeasureName': k,
'MeasureValue': str(v),
'MeasureValueType': 'BIGINT' if isinstance(v, int) else 'DOUBLE'
}
records.append(r.copy())
return common_attr, records
except Exception as e:
print("Error when preparing data for timestream: {}".format(e))
print("Data received: {}".format(data))
raise e


def write_to_timestream(db, table, common_attributes, records):
"""
Write the daat to timestream
:param db: database name
:param table: table name
:param dict common_attr: common attributes
:param list records: records to be written
:return: None
"""
try:
result = ts.write_records(DatabaseName=db,
TableName=table,
Records=records,
CommonAttributes=common_attributes)

print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except ts.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
print("Other records were written successfully. ")
except Exception as e:
raise e


def lambda_handler(event, context):
# Get Environment variables
print("Starting work")
try:
db = os.environ.get("DB_NAME")
table = os.environ.get("TABLE_NAME")
except KeyError as e:
print("Error getting the environment variables: {}".format(e))
raise e

try:
# Get the object from the event and fetch its content
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
response = s3.get_object(Bucket=bucket, Key=key)
# print("CONTENT TYPE: " + response['ContentType'])
data = json.loads(response['Body'].read())
print("New IoT data received: {}".format(data))
except Exception as e:
print(
'Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(
key, bucket))
print(e)
raise e

try:
# Format the data for Timestream and write it
cattr, recs = format_for_timestream(data)
write_to_timestream(db=db, table=table, common_attributes=cattr, records=recs)
print("Finished without error")
except Exception as e:
print("Error when pushing data to Timestream")
print(e)
raise e
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3 == 1.18.55

0 comments on commit 9531ccb

Please sign in to comment.