Skip to content

Commit

Permalink
aws: topology cache for aws s3 and iam resources (#3019)
Browse files Browse the repository at this point in the history
  • Loading branch information
tolujimoh authored May 24, 2024
1 parent ee9d920 commit df77166
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 0 deletions.
59 changes: 59 additions & 0 deletions backend/resolver/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (

dynamodbv1api "github.com/lyft/clutch/backend/api/aws/dynamodb/v1"
ec2v1api "github.com/lyft/clutch/backend/api/aws/ec2/v1"
iamv1api "github.com/lyft/clutch/backend/api/aws/iam/v1"
kinesisv1api "github.com/lyft/clutch/backend/api/aws/kinesis/v1"
s3v1api "github.com/lyft/clutch/backend/api/aws/s3/v1"
awsv1resolver "github.com/lyft/clutch/backend/api/resolver/aws/v1"
resolverv1 "github.com/lyft/clutch/backend/api/resolver/v1"
"github.com/lyft/clutch/backend/gateway/meta"
Expand All @@ -34,6 +36,9 @@ var typeURLInstance = meta.TypeURL((*ec2v1api.Instance)(nil))
var typeURLAutoscalingGroup = meta.TypeURL((*ec2v1api.AutoscalingGroup)(nil))
var typeURLKinesisStream = meta.TypeURL((*kinesisv1api.Stream)(nil))
var typeURLDynamodbTable = meta.TypeURL((*dynamodbv1api.Table)(nil))
var typeURLS3Bucket = meta.TypeURL((*s3v1api.Bucket)(nil))
var typeURLS3AccessPoint = meta.TypeURL((*s3v1api.AccessPoint)(nil))
var typeURLIAMRole = meta.TypeURL((*iamv1api.Role)(nil))

var typeSchemas = resolver.TypeURLToSchemaMessagesMap{
typeURLInstance: {
Expand All @@ -48,6 +53,15 @@ var typeSchemas = resolver.TypeURLToSchemaMessagesMap{
typeURLDynamodbTable: {
(*awsv1resolver.DynamodbTableName)(nil),
},
typeURLS3Bucket: {
(*awsv1resolver.S3BucketName)(nil),
},
typeURLS3AccessPoint: {
(*awsv1resolver.S3AccessPointName)(nil),
},
typeURLIAMRole: {
(*awsv1resolver.IAMRoleName)(nil),
},
}

func makeRegionOptions(regions []string) []*resolverv1.Option {
Expand Down Expand Up @@ -156,6 +170,15 @@ func (r *res) Resolve(ctx context.Context, wantTypeURL string, input proto.Messa
case typeURLDynamodbTable:
return r.resolveDynamodbTableForInput(ctx, input)

case typeURLS3Bucket:
return r.resolveS3BucketForInput(ctx, input)

case typeURLS3AccessPoint:
return r.resolveS3AccessPointForInput(ctx, input)

case typeURLIAMRole:
return r.resolveIAMRoleForInput(ctx, input)

default:
return nil, status.Errorf(codes.Internal, "resolver for '%s' not implemented", wantTypeURL)
}
Expand Down Expand Up @@ -210,6 +233,42 @@ func (r *res) Search(ctx context.Context, typeURL, query string, limit uint32) (

return r.dynamodbResults(ctx, resolver.OptionAll, resolver.OptionAll, query, limit)

case typeURLS3Bucket:
patternValues, ok, err := meta.ExtractPatternValuesFromString((*s3v1api.Bucket)(nil), query)
if err != nil {
return nil, err
}

if ok {
return r.s3BucketResults(ctx, patternValues["account"], patternValues["region"], patternValues["name"], limit)
}

return r.s3BucketResults(ctx, resolver.OptionAll, resolver.OptionAll, query, limit)

case typeURLS3AccessPoint:
patternValues, ok, err := meta.ExtractPatternValuesFromString((*s3v1api.AccessPoint)(nil), query)
if err != nil {
return nil, err
}

if ok {
return r.s3AccessPointResults(ctx, patternValues["account"], patternValues["region"], patternValues["name"], limit)
}

return r.s3AccessPointResults(ctx, resolver.OptionAll, resolver.OptionAll, query, limit)

case typeURLIAMRole:
patternValues, ok, err := meta.ExtractPatternValuesFromString((*iamv1api.Role)(nil), query)
if err != nil {
return nil, err
}

if ok {
return r.iamRoleResults(ctx, patternValues["account"], patternValues["region"], patternValues["name"], limit)
}

return r.iamRoleResults(ctx, resolver.OptionAll, resolver.OptionAll, query, limit)

default:
return nil, status.Errorf(codes.Internal, "resolver search for '%s' not implemented", typeURL)
}
Expand Down
45 changes: 45 additions & 0 deletions backend/resolver/aws/iam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package aws

import (
"context"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

awsv1resolver "github.com/lyft/clutch/backend/api/resolver/aws/v1"
"github.com/lyft/clutch/backend/resolver"
)

func (r *res) resolveIAMRoleForInput(ctx context.Context, input proto.Message) (*resolver.Results, error) {
switch i := input.(type) {
case *awsv1resolver.IAMRoleName:
return r.iamRoleResults(ctx, i.Account, i.Region, i.Name, 1)
default:
return nil, status.Errorf(codes.Internal, "resolution for type '%T' not implemented", i)
}
}

// Fanout across multiple regions if needed to fetch stream.
func (r *res) iamRoleResults(ctx context.Context, account, region, name string, limit uint32) (*resolver.Results, error) {
ctx, handler := resolver.NewFanoutHandler(ctx)

allAccountRegions := r.determineAccountAndRegionsForOption(account, region)
for account := range allAccountRegions {
for _, region := range allAccountRegions[account] {
handler.Add(1)
go func(account, region string) {
defer handler.Done()
role, err := r.client.GetIAMRole(ctx, account, region, name)
select {
case handler.Channel() <- resolver.NewSingleFanoutResult(role, err):
return
case <-handler.Cancelled():
return
}
}(account, region)
}
}

return handler.Results(limit)
}
88 changes: 88 additions & 0 deletions backend/resolver/aws/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package aws

import (
"context"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

awsv1resolver "github.com/lyft/clutch/backend/api/resolver/aws/v1"
"github.com/lyft/clutch/backend/resolver"
)

func (r *res) resolveS3BucketForInput(ctx context.Context, input proto.Message) (*resolver.Results, error) {
switch i := input.(type) {
case *awsv1resolver.S3BucketName:
return r.s3BucketResults(ctx, i.Account, i.Region, i.Name, 1)
default:
return nil, status.Errorf(codes.Internal, "resolution for type '%T' not implemented", i)
}
}

func (r *res) resolveS3AccessPointForInput(ctx context.Context, input proto.Message) (*resolver.Results, error) {
switch i := input.(type) {
case *awsv1resolver.S3AccessPointName:
return r.s3AccessPointResults(ctx, i.Account, i.Region, i.Name, 1)
default:
return nil, status.Errorf(codes.Internal, "resolution for type '%T' not implemented", i)
}
}

// Fanout across multiple regions if needed to fetch stream.
func (r *res) s3BucketResults(ctx context.Context, account, region, name string, limit uint32) (*resolver.Results, error) {
ctx, handler := resolver.NewFanoutHandler(ctx)

allAccountRegions := r.determineAccountAndRegionsForOption(account, region)

for account := range allAccountRegions {
for _, region := range allAccountRegions[account] {
handler.Add(1)
go func(account, region string) {
defer handler.Done()
bucket, err := r.client.S3DescribeBucket(ctx, account, region, name)
select {
case handler.Channel() <- resolver.NewSingleFanoutResult(bucket, err):
return
case <-handler.Cancelled():
return
}
}(account, region)
}
}

return handler.Results(limit)
}

// Fanout across multiple regions if needed to fetch stream.
func (r *res) s3AccessPointResults(ctx context.Context, account, region, name string, limit uint32) (*resolver.Results, error) {
ctx, handler := resolver.NewFanoutHandler(ctx)

allAccountRegions := r.determineAccountAndRegionsForOption(account, region)
for account := range allAccountRegions {
for _, region := range allAccountRegions[account] {
handler.Add(1)
go func(account, region string) {
defer handler.Done()
callerId, err := r.client.GetCallerIdentity(ctx, account, region)
if err != nil {
select {
case handler.Channel() <- resolver.NewSingleFanoutResult(nil, err):
return
case <-handler.Cancelled():
return
}
}
accessPoint, err := r.client.S3GetAccessPoint(ctx, account, region, name, *callerId.Account)
select {
case handler.Channel() <- resolver.NewSingleFanoutResult(accessPoint, err):
return
case <-handler.Cancelled():
return
}
}(account, region)
}
}

return handler.Results(limit)
}
137 changes: 137 additions & 0 deletions backend/service/aws/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import (
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/iam"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3control"
"github.com/aws/aws-sdk-go-v2/service/sts"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"

Expand Down Expand Up @@ -50,6 +54,9 @@ func (c *client) processRegionTopologyObjects(ctx context.Context) {
go c.startTickerForCacheResource(ctx, time.Duration(time.Minute*10), account.alias, client, c.processAllAutoScalingGroups)
go c.startTickerForCacheResource(ctx, time.Duration(time.Minute*30), account.alias, client, c.processAllKinesisStreams)
go c.startTickerForCacheResource(ctx, time.Duration(time.Minute*30), account.alias, client, c.processAllDynamoDatabases)
go c.startTickerForCacheResource(ctx, time.Duration(time.Minute*30), account.alias, client, c.processAllS3Buckets)
go c.startTickerForCacheResource(ctx, time.Duration(time.Minute*30), account.alias, client, c.processAllS3AccessPoints)
go c.startTickerForCacheResource(ctx, time.Duration(time.Minute*30), account.alias, client, c.processAllIamRoles)
}
}
}
Expand Down Expand Up @@ -251,3 +258,133 @@ func (c *client) processAllDynamoDatabases(ctx context.Context, account string,
}
}
}

func (c *client) processAllS3Buckets(ctx context.Context, account string, client *regionalClient) {
c.log.Info("starting to process s3 for region", zap.String("region", client.region))

input := s3.ListBucketsInput{}

output, err := client.s3.ListBuckets(ctx, &input)
if err != nil {
c.log.Error("unable to list s3 buckets", zap.Error(err))
return
}
for _, bucket := range output.Buckets {
v1Bucket, err := c.S3DescribeBucket(ctx, account, client.region, *bucket.Name)
if err != nil {
c.log.Error("unable to describe s3 bucket", zap.Error(err), zap.String("bucket", *bucket.Name), zap.String("region", client.region))
continue
}

protoBucket, err := anypb.New(v1Bucket)
if err != nil {
c.log.Error("unable to marshal s3 bucket", zap.Error(err), zap.String("bucket", *bucket.Name), zap.String("region", client.region))
continue
}

patternId, err := meta.HydratedPatternForProto(v1Bucket)
if err != nil {
c.log.Error("unable to get proto id from pattern", zap.Error(err), zap.String("bucket", *bucket.Name), zap.String("region", client.region))
continue
}

c.topologyObjectChan <- &topologyv1.UpdateCacheRequest{
Resource: &topologyv1.Resource{
Id: patternId,
Pb: protoBucket,
},
Action: topologyv1.UpdateCacheRequest_CREATE_OR_UPDATE,
}
}
}

func (c *client) processAllS3AccessPoints(ctx context.Context, account string, client *regionalClient) {
c.log.Info("starting to process s3 access points for region", zap.String("region", client.region))

callerIdentity, err := client.sts.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{})
if err != nil {
c.log.Error("unable to get caller identity", zap.Error(err))
return
}

accountId := callerIdentity.Account

input := s3control.ListAccessPointsInput{
AccountId: accountId,
}

output, err := client.s3control.ListAccessPoints(ctx, &input)
if err != nil {
c.log.Error("unable to list s3 access points", zap.Error(err))
return
}
for _, accessPoint := range output.AccessPointList {
v1AccessPoint, err := c.S3GetAccessPoint(ctx, account, client.region, *accessPoint.Name, *accountId)
if err != nil {
c.log.Error("unable to describe s3 access point", zap.Error(err))
continue
}

protoAccessPoint, err := anypb.New(v1AccessPoint)
if err != nil {
c.log.Error("unable to marshal s3 access point", zap.Error(err))
continue
}

patternId, err := meta.HydratedPatternForProto(v1AccessPoint)
if err != nil {
c.log.Error("unable to get proto id from pattern", zap.Error(err))
continue
}

c.topologyObjectChan <- &topologyv1.UpdateCacheRequest{
Resource: &topologyv1.Resource{
Id: patternId,
Pb: protoAccessPoint,
},
Action: topologyv1.UpdateCacheRequest_CREATE_OR_UPDATE,
}
}
}

func (c *client) processAllIamRoles(ctx context.Context, account string, client *regionalClient) {
c.log.Info("starting to process iam roles for region", zap.String("region", client.region))

input := &iam.ListRolesInput{}
paginator := iam.NewListRolesPaginator(client.iam, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
c.log.Error("unable to get next iam role page", zap.Error(err))
break
}

for _, role := range output.Roles {
protoRole, err := c.GetIAMRole(ctx, account, client.region, *role.RoleName)
if err != nil {
c.log.Error("unable to get iam role", zap.Error(err))
continue
}

roleAny, err := anypb.New(protoRole)
if err != nil {
c.log.Error("unable to marshal iam role proto", zap.Error(err))
continue
}

patternId, err := meta.HydratedPatternForProto(protoRole)
if err != nil {
c.log.Error("unable to get proto id from pattern", zap.Error(err))
continue
}

c.topologyObjectChan <- &topologyv1.UpdateCacheRequest{
Resource: &topologyv1.Resource{
Id: patternId,
Pb: roleAny,
},
Action: topologyv1.UpdateCacheRequest_CREATE_OR_UPDATE,
}
}
}
}

0 comments on commit df77166

Please sign in to comment.