Skip to content

Commit

Permalink
Create Batch [Logic]
Browse files Browse the repository at this point in the history
  • Loading branch information
Deeptiman committed Jul 28, 2021
1 parent af8503d commit 7e6b981
Showing 1 changed file with 12 additions and 25 deletions.
37 changes: 12 additions & 25 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
log "github.com/sirupsen/logrus"
)

var (
itemCounter = 1
var (
DefaultMaxItems = uint64(100) // maximum no of items packed inside a Batch
DefaultMaxWait = time.Duration(30) * time.Second //seconds
DefaultBatchNo = int32(1)
Expand Down Expand Up @@ -74,24 +73,23 @@ func (p *BatchProducer) WatchProducer() {
case item := <-p.Watcher:

item.BatchNo = int(p.getBatchNo())
p.Log.WithFields(log.Fields{"Id": item.Id, "BatchNo": item.BatchNo, "Item": item.Item}).Info("BatchProducer")

items = append(items, *item)
itemCounter++
if p.isBatchReady() {
p.Log.WithFields(log.Fields{"Item Size": len(items), "MaxItems": p.MaxItems}).Warn("BatchReady")

itemCounter = 0
p.Log.WithFields(log.Fields{"Id": item.Id, "Batch_Break": item.Id / int(p.MaxItems), "BatchNo": item.BatchNo, "Item": item.Item}).Info("BatchProducer")

items = append(items, *item)

if (item.Id / int(p.MaxItems)) == item.BatchNo {
p.Log.WithFields(log.Fields{"Item Size": len(items), "MaxItems": p.MaxItems}).Warn("BatchReady")
items = p.releaseBatch(items)
p.createBatchNo()
}

case <-time.After(p.MaxWait):
p.Log.WithFields(log.Fields{"Items": len(items)}).Warn("MaxWait")

if len(items) == 0 {
return
}
itemCounter = 0

items = p.releaseBatch(items)
case <-p.Quit:
p.Log.Warn("Quit BatchProducer")
Expand Down Expand Up @@ -128,23 +126,12 @@ func (p *BatchProducer) CheckRemainingItems(done chan bool) {
done <- true
}

// isBatchReady verfies that whether the batch ItemCounter++ increases to the MaxItems value
// to create a Batch.
func (p *BatchProducer) isBatchReady() bool {
return uint64(itemCounter) >= p.MaxItems
}

// addBatchNo will increases the current BatchNo to 1 atomically.
func (p *BatchProducer) addBatchNo() {
func (p *BatchProducer) createBatchNo() {
atomic.AddInt32(&p.BatchNo, 1)
}

// getBatchNo will get the current BatchNo from the atomic variable.
func (p *BatchProducer) getBatchNo() int32 {

if itemCounter == 0 {
p.addBatchNo()
}

return atomic.LoadInt32(&p.BatchNo)
}
}

0 comments on commit 7e6b981

Please sign in to comment.