Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make S3 triggers apply correctly with multiple stages using the same bucket #2126

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {
"RequestId": "6GER0T0H4R7XAV8T",
"HostId": "6EEJhlP3r8RO7cOo21OEiNjvg998YnDsnctx/hZ3ZWYMuylW2+TPTaVEW00nzTECWfb91K9jA1c=",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amz-id-2": "6EEJhlP3r8RO7cOo21OEiNjvg998YnDsnctx/hZ3ZWYMuylW2+TPTaVEW00nzTECWfb91K9jA1c=",
"x-amz-request-id": "6GER0T0H4R7XAV8T",
"date": "Tue, 23 Jun 2020 14:33:30 GMT",
"transfer-encoding": "chunked",
"server": "AmazonS3"
},
"RetryAttempts": 0
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {
"RequestId": "A1147AFA4BEF07BA",
"HostId": "Fxv8kpOjAeISFoOayckNGqtMKqYci9oe/Z2GZ3Sqoc+cGxRpmdpNhdPr8SZWmKH39HvNqDbX1qg=",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amz-id-2": "Fxv8kpOjAeISFoOayckNGqtMKqYci9oe/Z2GZ3Sqoc+cGxRpmdpNhdPr8SZWmKH39HvNqDbX1qg=",
"x-amz-request-id": "A1147AFA4BEF07BA",
"date": "Tue, 23 Jun 2020 14:33:30 GMT",
"transfer-encoding": "chunked",
"server": "AmazonS3"
},
"RetryAttempts": 0
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {
"RequestId": "68727B2ED4A5E5DB",
"HostId": "h2Tim4RpazYTXTo+NzdC8vH5gqlYtEGYHhHxjqTKRoUyFCgetU6gGc17mg/nNOF7lVxMJgBSB5s=",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amz-id-2": "h2Tim4RpazYTXTo+NzdC8vH5gqlYtEGYHhHxjqTKRoUyFCgetU6gGc17mg/nNOF7lVxMJgBSB5s=",
"x-amz-request-id": "68727B2ED4A5E5DB",
"date": "Tue, 23 Jun 2020 14:33:30 GMT",
"transfer-encoding": "chunked",
"server": "AmazonS3"
},
"RetryAttempts": 0
},
"LambdaFunctionConfigurations": [
{
"Id": "lambda:test_settings.callback",
"LambdaFunctionArn": "arn:aws:lambda:lambda:lambda:lambda:lambda",
"Events": [
"s3:ObjectCreated:*"
]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {
"RequestId": "AB1396B752AF96FA",
"HostId": "paLR5Q18m5aW8gKTFSWY17Cw8oZWnuANlsCGFEnzTb1Vb/Ja/+vyK5GT1l+KOK0n8+ieM6MvuaM=",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amz-id-2": "paLR5Q18m5aW8gKTFSWY17Cw8oZWnuANlsCGFEnzTb1Vb/Ja/+vyK5GT1l+KOK0n8+ieM6MvuaM=",
"x-amz-request-id": "AB1396B752AF96FA",
"date": "Tue, 23 Jun 2020 14:33:58 GMT",
"transfer-encoding": "chunked",
"server": "AmazonS3"
},
"RetryAttempts": 0
},
"LambdaFunctionConfigurations": [
{
"Id": "lambda:test_settings.callback",
"LambdaFunctionArn": "arn:aws:lambda:lambda:lambda:lambda:lambda",
"Events": [
"s3:ObjectCreated:*"
]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {
"RequestId": "7R9WFV4Q3WFWFH2W",
"HostId": "yKZnYszkf/RBcrnCHwT566JbRjTYanGk/pmujo9dXLLIMdxofz+z1srIfyFxo2mMQzPzkuH8Ybg=",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amz-id-2": "yKZnYszkf/RBcrnCHwT566JbRjTYanGk/pmujo9dXLLIMdxofz+z1srIfyFxo2mMQzPzkuH8Ybg=",
"x-amz-request-id": "7R9WFV4Q3WFWFH2W",
"date": "Tue, 23 Jun 2020 14:33:30 GMT",
"content-length": "0",
"server": "AmazonS3"
},
"RetryAttempts": 0
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"status_code": 200,
"data": {
"ResponseMetadata": {
"RequestId": "61E24EFFB461E8E9",
"HostId": "sB3PWivILvgM5clr+VFuq0ymED90/wd76dk7DoRbC0u/7XsxlC9sWHT7Jvi8fF/bksgjEWiUT10=",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amz-id-2": "sB3PWivILvgM5clr+VFuq0ymED90/wd76dk7DoRbC0u/7XsxlC9sWHT7Jvi8fF/bksgjEWiUT10=",
"x-amz-request-id": "61E24EFFB461E8E9",
"date": "Tue, 23 Jun 2020 14:33:58 GMT",
"content-length": "0",
"server": "AmazonS3"
},
"RetryAttempts": 0
}
}
}
8 changes: 3 additions & 5 deletions tests/tests_placebo.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,9 @@ def test_add_event_source(self, session):
except ValueError:
pass

event_source = {'arn': 's3:s3:s3:s3', 'events': [
"s3:ObjectCreated:*"
]}
add_event_source(event_source, 'lambda:lambda:lambda:lambda', 'test_settings.callback', session, dry=True)
remove_event_source(event_source, 'lambda:lambda:lambda:lambda', 'test_settings.callback', session, dry=True)
event_source = {'arn': 's3:s3:s3:s3', 'events': ["s3:ObjectCreated:*"], 'key_filters': [{'type': 'prefix', 'value': 'value'}]}
add_event_source(event_source, 'lambda:lambda:lambda:lambda', 'test_settings.callback', session, dry=False)
remove_event_source(event_source, 'lambda:lambda:lambda:lambda', 'test_settings.callback', session, dry=False)
# get_event_source_status(event_source, 'lambda:lambda:lambda:lambda', 'test_settings.callback', session, dry=True)

@placebo_session
Expand Down
75 changes: 74 additions & 1 deletion zappa/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,83 @@ def add(self, function):
if self.filters:
self.add_filters(function)

# This class allows for correct addition, updates, and deletes of S3 trigger events when there are multiple events
# on a bucket that may be tied to different stages with different filters by limiting changes only to events tied
# to the targeted method and stage unlike the handler in Kappa.
# Related: https://github.com/Miserlou/Zappa/issues/2111
# This entire class is reimplemented rather than extending the Kappa version because only the constructor is reused
# and I didn't want any future usages or extentions to accidentally fall through to Kappa's object via one of the
# 'aliases' it sets up.
class ImprovedS3EventSource(kappa.event_source.base.EventSource):
def __init__(self, context, config):
super(ImprovedS3EventSource, self).__init__(context, config)
self._s3 = kappa.awsclient.create_client('s3', context.session)

def _get_bucket_name(self):
return self.arn.split(':')[-1]

# this method is overridden / implemented below but I wanted the placeholder here for clarity and to make IDEs
# less cranky
def _make_notification_id(self, function_name):
raise NotImplementedError

def add(self, function):
event_config = {
'Id': self._make_notification_id(function.name),
'Events': [e for e in self._config['events']],
'LambdaFunctionArn': '%s:%s' % (function.arn, function._context.environment),
}

# Add S3 key filters
if 'key_filters' in self._config:
filters_spec = {'Key': {'FilterRules': []}}
for key_filter in self._config['key_filters']:
if 'type' in key_filter and 'value' in key_filter and key_filter['type'] in ('prefix', 'suffix'):
rule = {'Name': key_filter['type'], 'Value': key_filter['value']}
filters_spec['Key']['FilterRules'].append(rule)
event_config['Filter'] = filters_spec

try:
bucket_config = self._s3.call('get_bucket_notification_configuration', Bucket=self._get_bucket_name())
del bucket_config['ResponseMetadata']
if 'LambdaFunctionConfigurations' in bucket_config:
bucket_config['LambdaFunctionConfigurations'].append(event_config)
else:
bucket_config['LambdaFunctionConfigurations'] = [event_config]
response = self._s3.call(
'put_bucket_notification_configuration',
Bucket=self._get_bucket_name(),
NotificationConfiguration=bucket_config)
LOG.debug(response)
except Exception as exc:
LOG.debug(exc.response)
LOG.exception('Unable to add S3 event source')

def remove(self, function):
LOG.debug('removing s3 notification')
bucket_config = self._s3.call('get_bucket_notification_configuration', Bucket=self._get_bucket_name())
LOG.debug(bucket_config)

new_config = []
for configuration in bucket_config.get('LambdaFunctionConfigurations', []):
if configuration['Id'] != self._make_notification_id(function.name):
new_config.append(configuration)
response = self._s3.call('put_bucket_notification_configuration', Bucket=self._get_bucket_name(),
NotificationConfiguration={'LambdaFunctionConfigurations': new_config})
LOG.debug(response)

def status(self, function):
LOG.debug('status for s3 notification for %s', function.name)
bucket_config = self._s3.call('get_bucket_notification_configuration', Bucket=self._get_bucket_name())
LOG.debug(bucket_config)
for configuration in bucket_config.get('LambdaFunctionConfigurations', []):
if configuration['Id'] == self._make_notification_id(function.name):
return {'LambdaFunctionConfiguration': configuration, 'State': 'Enabled'}

event_source_map = {
'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource,
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
's3': ImprovedS3EventSource,
'sns': ExtendedSnsEventSource,
'sqs': SqsEventSource,
'events': kappa.event_source.cloudwatch.CloudWatchEventSource
Expand Down