Skip to content

Commit

Permalink
Cleanup: organizing code in awss3/input.go (#38958)
Browse files Browse the repository at this point in the history
Cleanups in `x-pack/filebeat/input/awss3/input.go`.

- Split up the two main configuration cases, SQS queues versus bare S3 buckets, into two explicit helper functions (`s3Input.runQueueReader` and `s3Input.runS3Poller`) instead of handling them inline in `s3Input.Run`.
- Simplify region-detection logic in `getRegionFromQueueURL` (`regionMismatchError` is no longer needed)
- Rename `createS3Lister` to `createS3Poller` (since it creates an `s3Poller`)

This is only a cleanup / reorganization, it does not change any behavior.
  • Loading branch information
faec committed Apr 30, 2024
1 parent f6bad74 commit 2fa1123
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 83 deletions.
155 changes: 75 additions & 80 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,72 +102,85 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
ctx := v2.GoContextFromCanceler(inputContext.Cancelation)

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
}
var warn regionMismatchError
if errors.As(err, &warn) {
// Warn of mismatch, but go ahead with configured region name.
inputContext.Logger.Warnf("%v: using %q", err, regionName)
}
in.awsConfig.Region = regionName
return in.runQueueReader(ctx, inputContext, pipeline)
}

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
}
defer receiver.metrics.Close()
if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
return in.runS3Poller(ctx, inputContext, pipeline)
}

// Poll metrics periodically in the background
go pollSqsWaitingMetric(ctx, receiver)
return nil
}

if err := receiver.Receive(ctx); err != nil {
return err
}
func (in *s3Input) runQueueReader(
ctx context.Context,
inputContext v2.Context,
pipeline beat.Pipeline,
) error {
configRegion := in.config.RegionName
urlRegion, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil && configRegion == "" {
// Only report an error if we don't have a configured region
// to fall back on.
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
} else if configRegion != "" && configRegion != urlRegion {
inputContext.Logger.Warnf("configured region disagrees with queue_url region (%q != %q): using %q", configRegion, urlRegion, urlRegion)
}

if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
EventListener: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()
in.awsConfig.Region = urlRegion

// Connect to the registry and create our states lookup
persistentStore, err := in.store.Access()
if err != nil {
return fmt.Errorf("can not access persistent store: %w", err)
}
defer persistentStore.Close()
// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
}
defer receiver.metrics.Close()

states, err := newStates(inputContext, persistentStore)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}
// Poll metrics periodically in the background
go pollSqsWaitingMetric(ctx, receiver)

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, states)
if err != nil {
return fmt.Errorf("failed to initialize s3 poller: %w", err)
}
defer poller.metrics.Close()
return receiver.Receive(ctx)
}

if err := poller.Poll(ctx); err != nil {
return err
}
func (in *s3Input) runS3Poller(
ctx context.Context,
inputContext v2.Context,
pipeline beat.Pipeline,
) error {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
EventListener: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

return nil
// Connect to the registry and create our states lookup
persistentStore, err := in.store.Access()
if err != nil {
return fmt.Errorf("can not access persistent store: %w", err)
}
defer persistentStore.Close()

states, err := newStates(inputContext, persistentStore)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Poller(inputContext, ctx, client, states)
if err != nil {
return fmt.Errorf("failed to initialize s3 poller: %w", err)
}
defer poller.metrics.Close()

return poller.Poll(ctx)
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) {
Expand Down Expand Up @@ -212,8 +225,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
return nil, err
}
in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages)

sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages)

sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand All @@ -227,7 +243,7 @@ func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.Endpoint
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, states *states) (*s3Poller, error) {
func (in *s3Input) createS3Poller(ctx v2.Context, cancelCtx context.Context, client beat.Client, states *states) (*s3Poller, error) {
var bucketName string
var bucketID string
if in.config.NonAWSBucketName != "" {
Expand Down Expand Up @@ -310,7 +326,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) {
func getRegionFromQueueURL(queueURL, endpoint string) (string, error) {
// get region from queueURL
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
Expand All @@ -323,42 +339,21 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg
// check for sqs queue url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
return queueHostSplit[1], nil
}
}

// check for vpce url
queueHostSplitVPC := strings.SplitN(u.Host, ".", 5)
if len(queueHostSplitVPC) == 5 && queueHostSplitVPC[1] == "sqs" {
if queueHostSplitVPC[4] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplitVPC[4], "amazonaws.")) {
region = queueHostSplitVPC[2]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
return queueHostSplitVPC[2], nil
}
}

if defaultRegion != "" {
return defaultRegion, nil
}
}
return "", errBadQueueURL
}

type regionMismatchError struct {
queueURLRegion string
defaultRegion string
}

func (e regionMismatchError) Error() string {
return fmt.Sprintf("configured region disagrees with queue_url region: %q != %q", e.queueURLRegion, e.defaultRegion)
}

func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) {
getBucketLocationOutput, err := s3Client.GetBucketLocation(ctx, &s3.GetBucketLocationInput{
Bucket: awssdk.String(bucketName),
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestGetRegionFromQueueURL(t *testing.T) {
name string
queueURL string
endpoint string
deflt string
want string
wantErr error
}{
Expand All @@ -77,7 +76,6 @@ func TestGetRegionFromQueueURL(t *testing.T) {
{
name: "vpce_endpoint",
queueURL: "https://vpce-test.sqs.us-east-2.vpce.amazonaws.com/12345678912/sqs-queue",
deflt: "",
want: "us-east-2",
},
{
Expand All @@ -90,7 +88,7 @@ func TestGetRegionFromQueueURL(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := getRegionFromQueueURL(test.queueURL, test.endpoint, test.deflt)
got, err := getRegionFromQueueURL(test.queueURL, test.endpoint)
if !sameError(err, test.wantErr) {
t.Errorf("unexpected error: got:%v want:%v", err, test.wantErr)
}
Expand Down

0 comments on commit 2fa1123

Please sign in to comment.