Skip to content

Commit

Permalink
feat: consumer support io worker config to reduce mem usage (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
crimson-gao authored Dec 20, 2024
1 parent bc0f39d commit ac186b4
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
4 changes: 4 additions & 0 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type LogHubConfig struct {
//:param AuthVersion: signature algorithm version, default is sls.AuthV1
//:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4
//:param DisableRuntimeMetrics: disable runtime metrics, runtime metrics prints to local log.
//::param MaxIoWorkers: max io workers, default is 50. Smaller io workers will reduce memory usage, but may reduce throughput.
Endpoint string
AccessKeyID string
AccessKeySecret string
Expand Down Expand Up @@ -83,6 +84,7 @@ type LogHubConfig struct {
AuthVersion sls.AuthVersionType
Region string
DisableRuntimeMetrics bool
MaxIoWorkers int
}

const (
Expand All @@ -98,3 +100,5 @@ const (
SHUTTING_DOWN = "SHUTTING_DOWN"
SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE"
)

const defaultMaxIoWorkers = 50
6 changes: 5 additions & 1 deletion consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type ShardConsumerWorker struct {
shutDownFlag *atomic.Bool
stopped *atomic.Bool
startOnceFlag sync.Once
ioThrottler ioThrottler
}

func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker {
func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger, ioThrottler ioThrottler) *ShardConsumerWorker {
shardConsumeWorker := &ShardConsumerWorker{
processor: processor,
consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger),
Expand All @@ -47,6 +48,7 @@ func newShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consume
stopped: atomic.NewBool(false),
lastCheckpointSaveTime: time.Now(),
monitor: newShardMonitor(shardId, time.Minute),
ioThrottler: ioThrottler,
}
return shardConsumeWorker
}
Expand Down Expand Up @@ -95,6 +97,8 @@ func (consumer *ShardConsumerWorker) getInitCursor() string {
}

func (c *ShardConsumerWorker) fetchLogs(cursor string) (shouldCallProcess bool, logGroupList *sls.LogGroupList, plm *sls.PullLogMeta) {
c.ioThrottler.Acquire()
defer c.ioThrottler.Release()

start := time.Now()
logGroupList, plm, err := c.client.pullLogs(c.shardId, cursor)
Expand Down
38 changes: 35 additions & 3 deletions consumer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ConsumerWorker struct {
processor Processor
waitGroup sync.WaitGroup
Logger log.Logger
ioThrottler ioThrottler
}

// depreciated: this old logic is to automatically save to memory, and then commit at a fixed time
Expand Down Expand Up @@ -57,6 +58,10 @@ func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *
if logger == nil {
logger = logConfig(option)
}
maxIoWorker := defaultMaxIoWorkers
if option.MaxIoWorkers > 0 {
maxIoWorker = option.MaxIoWorkers
}

consumerClient := initConsumerClient(option, logger)
consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger)
Expand All @@ -65,8 +70,9 @@ func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *
client: consumerClient,
workerShutDownFlag: atomic.NewBool(false),
//shardConsumer: make(map[int]*ShardConsumerWorker),
processor: processor,
Logger: logger,
processor: processor,
Logger: logger,
ioThrottler: newSimpleIoThrottler(maxIoWorker),
}
if err := consumerClient.createConsumerGroup(); err != nil {
level.Error(consumerWorker.Logger).Log(
Expand Down Expand Up @@ -144,7 +150,12 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum
if ok {
return consumer.(*ShardConsumerWorker)
}
consumerIns := newShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger)
consumerIns := newShardConsumerWorker(shardId,
consumerWorker.client,
consumerWorker.consumerHeatBeat,
consumerWorker.processor,
consumerWorker.Logger,
consumerWorker.ioThrottler)
consumerWorker.shardConsumer.Store(shardId, consumerIns)
return consumerIns

Expand Down Expand Up @@ -217,3 +228,24 @@ func logConfig(option LogHubConfig) log.Logger {
logger = log.With(logger, "time", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
return logger
}

type ioThrottler interface {
Acquire()
Release()
}

type simpleIoThrottler struct {
chance chan struct{}
}

func newSimpleIoThrottler(maxIoWorkers int) *simpleIoThrottler {
return &simpleIoThrottler{
chance: make(chan struct{}, maxIoWorkers),
}
}
func (t *simpleIoThrottler) Acquire() {
t.chance <- struct{}{}
}
func (t *simpleIoThrottler) Release() {
<-t.chance
}

0 comments on commit ac186b4

Please sign in to comment.