Skip to content

Commit

Permalink
feat(processor): Add multiple worker support (#246)
Browse files Browse the repository at this point in the history
* feat(processor): Add multiple worker support

* chore: Update alpha-releases.yaml

* refactor: Simplify batch timeout reset in worker function

* refactor: Reset timer in batchBuilder

* refactor(batch): Reset timer when there are no items in the batch

* refactor: Remove redundant code
  • Loading branch information
samcm authored Oct 24, 2023
1 parent 1878369 commit 35fa120
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 238 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/alpha-releases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ on:
- 'release/*'

jobs:
permissions:
contents: write
tag-release:
permissions:
contents: write
runs-on: ubuntu-20.04
steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions pkg/output/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Compression CompressionStrategy `yaml:"compression" default:"none"`
KeepAlive *bool `yaml:"keepAlive" default:"true"`
Workers int `yaml:"workers" default:"1"`
}

func (c *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
processor.WithExportTimeout(config.ExportTimeout),
processor.WithMaxExportBatchSize(config.MaxExportBatchSize),
processor.WithShippingMethod(shippingMethod),
processor.WithWorkers(config.Workers),
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/output/xatu/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"`
ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"`
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Workers int `yaml:"workers" default:"1"`
}

func (c *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/output/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
processor.WithExportTimeout(config.ExportTimeout),
processor.WithMaxExportBatchSize(config.MaxExportBatchSize),
processor.WithShippingMethod(shippingMethod),
processor.WithWorkers(config.Workers),
)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 35fa120

Please sign in to comment.