Skip to content

Commit

Permalink
feat: producer support multiple logstore pack id generate (#307)
Browse files Browse the repository at this point in the history
* feat: producer support multiple logstroe pack id generate

* chore: refine add benchmark
  • Loading branch information
crimson-gao authored Dec 6, 2024
1 parent d719e9d commit bc0f39d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 17 deletions.
7 changes: 5 additions & 2 deletions producer/log_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type LogAccumulator struct {
logger log.Logger
threadPool *IoThreadPool
producer *Producer
packIdGenrator *PackIdGenerator
}

func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool, producer *Producer) *LogAccumulator {
Expand All @@ -32,6 +33,7 @@ func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.L
logger: logger,
threadPool: threadPool,
producer: producer,
packIdGenrator: newPackIdGenerator(),
}
}

Expand Down Expand Up @@ -96,10 +98,11 @@ func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}
level.Debug(logAccumulator.logger).Log("msg", "Create a new ProducerBatch")

if mlog, ok := logType.(*sls.Log); ok {
newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)

newProducerBatch := initProducerBatch(logAccumulator.packIdGenrator, mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = newProducerBatch
} else if logList, ok := logType.([]*sls.Log); ok {
newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
newProducerBatch := initProducerBatch(logAccumulator.packIdGenrator, logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = newProducerBatch
}
}
Expand Down
57 changes: 57 additions & 0 deletions producer/pack_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package producer

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
)

type PackIdGenerator struct {
mutex sync.RWMutex
logstorePackIdGenerator map[string]*LogStorePackIdGenerator
count atomic.Int32
}

func newPackIdGenerator() *PackIdGenerator {
return &PackIdGenerator{
logstorePackIdGenerator: make(map[string]*LogStorePackIdGenerator),
}
}

func (g *PackIdGenerator) GeneratePackId(project, logstore string) string {
key := project + "|" + logstore

// fast path, logstore already has a generator
g.mutex.RLock()
if l, ok := g.logstorePackIdGenerator[key]; ok {
packNumber := l.packNumber.Add(1)
g.mutex.RUnlock()
return fmt.Sprintf("%s%X", l.prefix, packNumber-1)
}
g.mutex.RUnlock()

// slow path
g.mutex.Lock()
if _, ok := g.logstorePackIdGenerator[key]; !ok {
g.logstorePackIdGenerator[key] = newLogStorePackIdGenerator(g.count.Add(1))
}
l := g.logstorePackIdGenerator[key]
packNumber := l.packNumber.Add(1)
g.mutex.Unlock()
return fmt.Sprintf("%s%X", l.prefix, packNumber-1)
}

type LogStorePackIdGenerator struct {
packNumber atomic.Int64
prefix string // with "-"
}

func newLogStorePackIdGenerator(id int32) *LogStorePackIdGenerator {
hash := fmt.Sprintf("%d-%d", time.Now().UnixNano(), id)
return &LogStorePackIdGenerator{
packNumber: atomic.Int64{},
prefix: strings.ToUpper(generatePackId(hash)) + "-",
}
}
45 changes: 45 additions & 0 deletions producer/pack_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package producer

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPackIdGenerator(t *testing.T) {
g := newPackIdGenerator()
wg := &sync.WaitGroup{}
m := 1000
n := 10
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
project := fmt.Sprintf("test%d", i)
logstore := fmt.Sprintf("test%d", i)
results := make([]string, 0, m)
for j := 0; j < m; j++ {
result := g.GeneratePackId(project, logstore)
results = append(results, result)
}
prefix := results[0][:16]
for j := 0; j < m; j++ {
assert.Equal(t, prefix, results[j][:16])
suffix := results[j][17:]
assert.Equal(t, fmt.Sprintf("%X", j), suffix)
}

wg.Done()
}(i)
}
wg.Wait()
}

// BenchmarkPackIdGenerator-12 8366719 120.7 ns/op 64 B/op 4 allocs/op
func BenchmarkPackIdGenerator(b *testing.B) {
g := newPackIdGenerator()
for i := 0; i < b.N; i++ {
g.GeneratePackId("test", "test")
}
}
12 changes: 2 additions & 10 deletions producer/producer_batch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package producer

import (
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -36,7 +34,7 @@ func generatePackId(source string) string {
return ToMd5(srcData)[0:16]
}

func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch {
func initProducerBatch(packIdGenerator *PackIdGenerator, logData interface{}, callBackFunc CallBack, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch {
logs := []*sls.Log{}

if log, ok := logData.(*sls.Log); ok {
Expand All @@ -52,17 +50,11 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs
Source: proto.String(logSource),
}
if config.GeneratePackId {
config.packLock.Lock()
if config.packPrefix == "" {
config.packPrefix = strings.ToUpper(generatePackId(logSource)) + "-"
}
packStr := config.packPrefix + fmt.Sprintf("%X", config.packNumber)
packStr := packIdGenerator.GeneratePackId(project, logstore)
logGroup.LogTags = append(logGroup.LogTags, &sls.LogTag{
Key: proto.String("__pack_id__"),
Value: proto.String(packStr),
})
config.packNumber++
config.packLock.Unlock()
}
currentTimeMs := GetTimeMs(time.Now().UnixNano())
producerBatch := &ProducerBatch{
Expand Down
5 changes: 0 additions & 5 deletions producer/producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package producer

import (
"net/http"
"sync"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
Expand Down Expand Up @@ -40,10 +39,6 @@ type ProducerConfig struct {
CredentialsProvider sls.CredentialsProvider
UseMetricStoreURL bool

packLock sync.Mutex
packPrefix string
packNumber int64

// Deprecated: use CredentialsProvider and UpdateFuncProviderAdapter instead.
//
// Example:
Expand Down

0 comments on commit bc0f39d

Please sign in to comment.