diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 667f55d9769..0bcca8282b6 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -203,6 +203,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308] - Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623] - Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651] +- Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775] ==== Deprecated diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index bef57210ca6..3be07437a50 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -71,6 +71,8 @@ type inputMetrics struct { s3EventsCreatedTotal *monitoring.Uint // Number of events created from processing S3 data. s3ObjectsInflight *monitoring.Uint // Number of S3 objects inflight (gauge). s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing). + s3ObjectSizeInBytes metrics.Sample // Histogram of processed S3 object size in bytes + s3EventsPerObject metrics.Sample // Histogram of events in an individual S3 object } // Close cancels the context and removes the metrics from the registry. @@ -174,16 +176,23 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers s3EventsCreatedTotal: monitoring.NewUint(reg, "s3_events_created_total"), s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"), s3ObjectProcessingTime: metrics.NewUniformSample(1024), + s3ObjectSizeInBytes: metrics.NewUniformSample(1024), + s3EventsPerObject: metrics.NewUniformSample(1024), } // Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1). out.sqsMessagesWaiting.Set(int64(-1)) + adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "s3_object_size_in_bytes", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.s3ObjectSizeInBytes)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "s3_events_per_object", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.s3EventsPerObject)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. if maxWorkers > 0 { // Periodically update the sqs worker utilization metric. diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 59e73758fc3..943a36ef063 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -101,6 +101,14 @@ func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, } } +// s3DownloadedObject encapsulate downloaded s3 object for internal processing +type s3DownloadedObject struct { + body io.ReadCloser + length int64 + contentType string + metadata map[string]interface{} +} + type s3ObjectProcessor struct { *s3ObjectProcessorFactory @@ -112,6 +120,7 @@ type s3ObjectProcessor struct { s3Obj s3EventV2 // S3 object information. s3ObjHash string s3RequestURL string + eventCount int64 s3Metadata map[string]interface{} // S3 object metadata. } @@ -138,23 +147,25 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { }() // Request object (download). - contentType, meta, body, err := p.download() + s3Obj, err := p.download() if err != nil { // Wrap downloadError in the result so the caller knows it's not a // permanent failure. return fmt.Errorf("%w: %w", errS3DownloadFailed, err) } - defer body.Close() - p.s3Metadata = meta + defer s3Obj.body.Close() + + p.s3Metadata = s3Obj.metadata + p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length) - reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(body, p.metrics.s3BytesProcessedTotal)) + reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal)) if err != nil { return fmt.Errorf("failed checking for gzip content: %w", err) } // Overwrite with user configured Content-Type. if p.readerConfig.ContentType != "" { - contentType = p.readerConfig.ContentType + s3Obj.contentType = p.readerConfig.ContentType } // try to create a decoder from the using the codec config @@ -183,7 +194,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { // This is the legacy path. It will be removed in future and clubbed together with the decoder. // Process object content stream. switch { - case strings.HasPrefix(contentType, contentTypeJSON) || strings.HasPrefix(contentType, contentTypeNDJSON): + case strings.HasPrefix(s3Obj.contentType, contentTypeJSON) || strings.HasPrefix(s3Obj.contentType, contentTypeNDJSON): err = p.readJSON(reader) default: err = p.readFile(reader) @@ -194,20 +205,20 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { time.Since(start).Nanoseconds(), err) } + p.metrics.s3EventsPerObject.Update(p.eventCount) return nil } // download requests the S3 object from AWS and returns the object's -// Content-Type and reader to get the object's contents. The caller must -// close the returned reader. -func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) { +// Content-Type and reader to get the object's contents. +// The caller must close the reader embedded in s3DownloadedObject. +func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) { getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) if err != nil { - return "", nil, nil, err + return nil, err } - if getObjectOutput == nil { - return "", nil, nil, fmt.Errorf("empty response from s3 get object: %w", err) + return nil, fmt.Errorf("empty response from s3 get object: %w", err) } s3RequestURL := getObjectOutput.ResultMetadata.Get(s3RequestURLMetadataKey) if s3RequestURLAsString, ok := s3RequestURL.(string); ok { @@ -215,10 +226,20 @@ func (p *s3ObjectProcessor) download() (contentType string, metadata map[string] } meta := s3Metadata(getObjectOutput, p.readerConfig.IncludeS3Metadata...) - if getObjectOutput.ContentType == nil { - return "", meta, getObjectOutput.Body, nil + + ctType := "" + if getObjectOutput.ContentType != nil { + ctType = *getObjectOutput.ContentType + } + + s := &s3DownloadedObject{ + body: getObjectOutput.Body, + length: *getObjectOutput.ContentLength, + contentType: ctType, + metadata: meta, } - return *getObjectOutput.ContentType, meta, getObjectOutput.Body, nil + + return s, nil } func (p *s3ObjectProcessor) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { @@ -391,9 +412,11 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { return nil } +// publish the generated event and perform necessary tracking func (p *s3ObjectProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { ack.Add() event.Private = ack + p.eventCount += 1 p.metrics.s3EventsCreatedTotal.Inc() p.publisher.Publish(*event) }