Skip to content

Commit

Permalink
chore: add debug histograms for s3 object size and events per process…
Browse files Browse the repository at this point in the history
…ed s3 object (#40775)

* add debug histograms for s3 object size and events per object

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* changelog entry

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* move changelog to developer next

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

---------

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
(cherry picked from commit 8f2c3f9)
  • Loading branch information
Kavindu-Dodan authored and mergify[bot] committed Sep 13, 2024
1 parent 9136549 commit e299dfa
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308]
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]
- Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775]

==== Deprecated

Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type inputMetrics struct {
s3EventsCreatedTotal *monitoring.Uint // Number of events created from processing S3 data.
s3ObjectsInflight *monitoring.Uint // Number of S3 objects inflight (gauge).
s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing).
s3ObjectSizeInBytes metrics.Sample // Histogram of processed S3 object size in bytes
s3EventsPerObject metrics.Sample // Histogram of events in an individual S3 object
}

// Close cancels the context and removes the metrics from the registry.
Expand Down Expand Up @@ -174,16 +176,23 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
s3EventsCreatedTotal: monitoring.NewUint(reg, "s3_events_created_total"),
s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"),
s3ObjectProcessingTime: metrics.NewUniformSample(1024),
s3ObjectSizeInBytes: metrics.NewUniformSample(1024),
s3EventsPerObject: metrics.NewUniformSample(1024),
}

// Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1).
out.sqsMessagesWaiting.Set(int64(-1))

adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_size_in_bytes", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectSizeInBytes)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_events_per_object", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3EventsPerObject)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.

if maxWorkers > 0 {
// Periodically update the sqs worker utilization metric.
Expand Down
53 changes: 38 additions & 15 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger,
}
}

// s3DownloadedObject encapsulate downloaded s3 object for internal processing
type s3DownloadedObject struct {
body io.ReadCloser
length int64
contentType string
metadata map[string]interface{}
}

type s3ObjectProcessor struct {
*s3ObjectProcessorFactory

Expand All @@ -112,6 +120,7 @@ type s3ObjectProcessor struct {
s3Obj s3EventV2 // S3 object information.
s3ObjHash string
s3RequestURL string
eventCount int64

s3Metadata map[string]interface{} // S3 object metadata.
}
Expand All @@ -138,23 +147,25 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
}()

// Request object (download).
contentType, meta, body, err := p.download()
s3Obj, err := p.download()
if err != nil {
// Wrap downloadError in the result so the caller knows it's not a
// permanent failure.
return fmt.Errorf("%w: %w", errS3DownloadFailed, err)
}
defer body.Close()
p.s3Metadata = meta
defer s3Obj.body.Close()

p.s3Metadata = s3Obj.metadata
p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length)

reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(body, p.metrics.s3BytesProcessedTotal))
reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal))
if err != nil {
return fmt.Errorf("failed checking for gzip content: %w", err)
}

// Overwrite with user configured Content-Type.
if p.readerConfig.ContentType != "" {
contentType = p.readerConfig.ContentType
s3Obj.contentType = p.readerConfig.ContentType
}

// try to create a decoder from the using the codec config
Expand Down Expand Up @@ -183,7 +194,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
// This is the legacy path. It will be removed in future and clubbed together with the decoder.
// Process object content stream.
switch {
case strings.HasPrefix(contentType, contentTypeJSON) || strings.HasPrefix(contentType, contentTypeNDJSON):
case strings.HasPrefix(s3Obj.contentType, contentTypeJSON) || strings.HasPrefix(s3Obj.contentType, contentTypeNDJSON):
err = p.readJSON(reader)
default:
err = p.readFile(reader)
Expand All @@ -194,31 +205,41 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
time.Since(start).Nanoseconds(), err)
}

p.metrics.s3EventsPerObject.Update(p.eventCount)
return nil
}

// download requests the S3 object from AWS and returns the object's
// Content-Type and reader to get the object's contents. The caller must
// close the returned reader.
func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) {
// Content-Type and reader to get the object's contents.
// The caller must close the reader embedded in s3DownloadedObject.
func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) {
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return "", nil, nil, err
return nil, err
}

if getObjectOutput == nil {
return "", nil, nil, fmt.Errorf("empty response from s3 get object: %w", err)
return nil, fmt.Errorf("empty response from s3 get object: %w", err)
}
s3RequestURL := getObjectOutput.ResultMetadata.Get(s3RequestURLMetadataKey)
if s3RequestURLAsString, ok := s3RequestURL.(string); ok {
p.s3RequestURL = s3RequestURLAsString
}

meta := s3Metadata(getObjectOutput, p.readerConfig.IncludeS3Metadata...)
if getObjectOutput.ContentType == nil {
return "", meta, getObjectOutput.Body, nil

ctType := ""
if getObjectOutput.ContentType != nil {
ctType = *getObjectOutput.ContentType
}

s := &s3DownloadedObject{
body: getObjectOutput.Body,
length: *getObjectOutput.ContentLength,
contentType: ctType,
metadata: meta,
}
return *getObjectOutput.ContentType, meta, getObjectOutput.Body, nil

return s, nil
}

func (p *s3ObjectProcessor) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) {
Expand Down Expand Up @@ -391,9 +412,11 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error {
return nil
}

// publish the generated event and perform necessary tracking
func (p *s3ObjectProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) {
ack.Add()
event.Private = ack
p.eventCount += 1
p.metrics.s3EventsCreatedTotal.Inc()
p.publisher.Publish(*event)
}
Expand Down

0 comments on commit e299dfa

Please sign in to comment.