diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index bb4a5c15bda..51e8c9808ed 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -102,72 +102,85 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { ctx := v2.GoContextFromCanceler(inputContext.Cancelation) if in.config.QueueURL != "" { - regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName) - if err != nil && in.config.RegionName == "" { - return fmt.Errorf("failed to get AWS region from queue_url: %w", err) - } - var warn regionMismatchError - if errors.As(err, &warn) { - // Warn of mismatch, but go ahead with configured region name. - inputContext.Logger.Warnf("%v: using %q", err, regionName) - } - in.awsConfig.Region = regionName + return in.runQueueReader(ctx, inputContext, pipeline) + } - // Create SQS receiver and S3 notification processor. - receiver, err := in.createSQSReceiver(inputContext, pipeline) - if err != nil { - return fmt.Errorf("failed to initialize sqs receiver: %w", err) - } - defer receiver.metrics.Close() + if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" { + return in.runS3Poller(ctx, inputContext, pipeline) + } - // Poll metrics periodically in the background - go pollSqsWaitingMetric(ctx, receiver) + return nil +} - if err := receiver.Receive(ctx); err != nil { - return err - } +func (in *s3Input) runQueueReader( + ctx context.Context, + inputContext v2.Context, + pipeline beat.Pipeline, +) error { + configRegion := in.config.RegionName + urlRegion, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) + if err != nil && configRegion == "" { + // Only report an error if we don't have a configured region + // to fall back on. + return fmt.Errorf("failed to get AWS region from queue_url: %w", err) + } else if configRegion != "" && configRegion != urlRegion { + inputContext.Logger.Warnf("configured region disagrees with queue_url region (%q != %q): using %q", configRegion, urlRegion, urlRegion) } - if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" { - // Create client for publishing events and receive notification of their ACKs. - client, err := pipeline.ConnectWith(beat.ClientConfig{ - EventListener: awscommon.NewEventACKHandler(), - Processing: beat.ProcessingConfig{ - // This input only produces events with basic types so normalization - // is not required. - EventNormalization: boolPtr(false), - }, - }) - if err != nil { - return fmt.Errorf("failed to create pipeline client: %w", err) - } - defer client.Close() + in.awsConfig.Region = urlRegion - // Connect to the registry and create our states lookup - persistentStore, err := in.store.Access() - if err != nil { - return fmt.Errorf("can not access persistent store: %w", err) - } - defer persistentStore.Close() + // Create SQS receiver and S3 notification processor. + receiver, err := in.createSQSReceiver(inputContext, pipeline) + if err != nil { + return fmt.Errorf("failed to initialize sqs receiver: %w", err) + } + defer receiver.metrics.Close() - states, err := newStates(inputContext, persistentStore) - if err != nil { - return fmt.Errorf("can not start persistent store: %w", err) - } + // Poll metrics periodically in the background + go pollSqsWaitingMetric(ctx, receiver) - // Create S3 receiver and S3 notification processor. - poller, err := in.createS3Lister(inputContext, ctx, client, states) - if err != nil { - return fmt.Errorf("failed to initialize s3 poller: %w", err) - } - defer poller.metrics.Close() + return receiver.Receive(ctx) +} - if err := poller.Poll(ctx); err != nil { - return err - } +func (in *s3Input) runS3Poller( + ctx context.Context, + inputContext v2.Context, + pipeline beat.Pipeline, +) error { + // Create client for publishing events and receive notification of their ACKs. + client, err := pipeline.ConnectWith(beat.ClientConfig{ + EventListener: awscommon.NewEventACKHandler(), + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: boolPtr(false), + }, + }) + if err != nil { + return fmt.Errorf("failed to create pipeline client: %w", err) } + defer client.Close() - return nil + // Connect to the registry and create our states lookup + persistentStore, err := in.store.Access() + if err != nil { + return fmt.Errorf("can not access persistent store: %w", err) + } + defer persistentStore.Close() + + states, err := newStates(inputContext, persistentStore) + if err != nil { + return fmt.Errorf("can not start persistent store: %w", err) + } + + // Create S3 receiver and S3 notification processor. + poller, err := in.createS3Poller(inputContext, ctx, client, states) + if err != nil { + return fmt.Errorf("failed to initialize s3 poller: %w", err) + } + defer poller.metrics.Close() + + return poller.Poll(ctx) } func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) { @@ -212,8 +225,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s return nil, err } in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages) + sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) return sqsReader, nil @@ -227,7 +243,7 @@ func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.Endpoint return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil } -func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, states *states) (*s3Poller, error) { +func (in *s3Input) createS3Poller(ctx v2.Context, cancelCtx context.Context, client beat.Client, states *states) (*s3Poller, error) { var bucketName string var bucketID string if in.config.NonAWSBucketName != "" { @@ -310,7 +326,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") -func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) { +func getRegionFromQueueURL(queueURL, endpoint string) (string, error) { // get region from queueURL // Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs // Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue @@ -323,11 +339,7 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg // check for sqs queue url if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { - region = queueHostSplit[1] - if defaultRegion != "" && region != defaultRegion { - return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} - } - return region, nil + return queueHostSplit[1], nil } } @@ -335,30 +347,13 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg queueHostSplitVPC := strings.SplitN(u.Host, ".", 5) if len(queueHostSplitVPC) == 5 && queueHostSplitVPC[1] == "sqs" { if queueHostSplitVPC[4] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplitVPC[4], "amazonaws.")) { - region = queueHostSplitVPC[2] - if defaultRegion != "" && region != defaultRegion { - return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} - } - return region, nil + return queueHostSplitVPC[2], nil } } - - if defaultRegion != "" { - return defaultRegion, nil - } } return "", errBadQueueURL } -type regionMismatchError struct { - queueURLRegion string - defaultRegion string -} - -func (e regionMismatchError) Error() string { - return fmt.Sprintf("configured region disagrees with queue_url region: %q != %q", e.queueURLRegion, e.defaultRegion) -} - func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) { getBucketLocationOutput, err := s3Client.GetBucketLocation(ctx, &s3.GetBucketLocationInput{ Bucket: awssdk.String(bucketName), diff --git a/x-pack/filebeat/input/awss3/input_test.go b/x-pack/filebeat/input/awss3/input_test.go index abc9f5c9a6a..0a3053f7f1b 100644 --- a/x-pack/filebeat/input/awss3/input_test.go +++ b/x-pack/filebeat/input/awss3/input_test.go @@ -54,7 +54,6 @@ func TestGetRegionFromQueueURL(t *testing.T) { name string queueURL string endpoint string - deflt string want string wantErr error }{ @@ -77,7 +76,6 @@ func TestGetRegionFromQueueURL(t *testing.T) { { name: "vpce_endpoint", queueURL: "https://vpce-test.sqs.us-east-2.vpce.amazonaws.com/12345678912/sqs-queue", - deflt: "", want: "us-east-2", }, { @@ -90,7 +88,7 @@ func TestGetRegionFromQueueURL(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := getRegionFromQueueURL(test.queueURL, test.endpoint, test.deflt) + got, err := getRegionFromQueueURL(test.queueURL, test.endpoint) if !sameError(err, test.wantErr) { t.Errorf("unexpected error: got:%v want:%v", err, test.wantErr) }