Skip to content

Commit

Permalink
mod: optimize backfill flow
Browse files Browse the repository at this point in the history
  • Loading branch information
toannhu committed Jun 26, 2024
1 parent 523ed91 commit 89ee54b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 34 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (

require (
github.com/IBM/sarama v1.42.1
github.com/alitto/pond v1.9.0
github.com/aws/aws-sdk-go v1.54.3
github.com/creasty/defaults v1.7.0
github.com/getnimbus/ultrago v1.0.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey/v2 v2.3.1/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/alitto/pond v1.9.0 h1:B8BrvXyKe97NK9LHuRsQAOmpRnsp6GJ7mCg1Cgitczo=
github.com/alitto/pond v1.9.0/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
github.com/aws/aws-sdk-go v1.37.32/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.54.3 h1:Bk+EXoq6v5I1xmHR9GQGpsMWZZFXs+FD+5uPyEmfgX0=
github.com/aws/aws-sdk-go v1.54.3/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
Expand Down
63 changes: 32 additions & 31 deletions internal/app/indexer/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sort"
"time"

"github.com/alitto/pond"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -54,6 +53,12 @@ func (s *Service) insertData(

eg, childCtx := errgroup.WithContext(ctx)

if conf.Config.IsBackfill() {
eg.Go(func() error {
return s.storeS3(childCtx, b, tx, msg)
})
}

eg.Go(func() error {
//if err := func() error {
// defer app.TimeTrack(time.Now(), "AddAccountStates(%d)", len(acc))
Expand Down Expand Up @@ -87,12 +92,6 @@ func (s *Service) insertData(
return nil
})

if conf.Config.IsBackfill() {
eg.Go(func() error {
return s.storeS3(childCtx, b, tx, msg)
})
}

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "cannot insert data")
}
Expand Down Expand Up @@ -309,36 +308,35 @@ func (s *Service) storeS3(
) error {
log.Info().Msg("start goroutine store s3...")

pool := pond.New(10, 0, pond.MinWorkers(3))
defer pool.StopAndWait()
eg, childCtx := errgroup.WithContext(ctx)

// store blocks to S3
pool.Submit(func() {
eg.Go(func() error {
if len(blocks) == 0 {
return
return nil
}

errCh := make(chan error, 1)
defer close(errCh)

pw := s.S3.FileStreamWriter(ctx, conf.Config.AwsBucket, fmt.Sprintf("blocks/ton-blocks/datekey=%v/%v.json.gz", blocks[0].DateKey, blocks[0].PartitionKey()), errCh)
pw := s.S3.FileStreamWriter(childCtx, conf.Config.AwsBucket, fmt.Sprintf("blocks/ton-blocks/datekey=%v/%v.json.gz", blocks[0].DateKey, blocks[0].PartitionKey()), errCh)
zw, err := gzip.NewWriterLevel(pw, gzip.BestSpeed)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to create gzip writer: %v", err))
return
return errors.Wrap(err, "failed to create gzip writer")
}

// add data to gzip
for _, block := range blocks {
data, err := json.Marshal(block)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to marshal block: %v", err))
return
return errors.Wrap(err, "failed to marshal block")
}
_, err = zw.Write(data)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to write block to S3: %v", err))
return
return errors.Wrap(err, "failed to write block to S3")
}
zw.Write([]byte("\n"))
}
Expand All @@ -350,38 +348,39 @@ func (s *Service) storeS3(
returnErr := <-errCh
if returnErr != nil {
log.Error().Msg(fmt.Sprintf("failed to store block to S3: %v", err))
return
return errors.Wrap(err, "failed to store block to S3")
}
log.Info().Msg(fmt.Sprintf("[%v] submit blocks %v to S3 success", blocks[0].DateKey, blocks[0].PartitionKey()))
return nil
})

// store transactions to S3
pool.Submit(func() {
eg.Go(func() error {
if len(txs) == 0 {
return
return nil
}

errCh := make(chan error, 1)
defer close(errCh)

pw := s.S3.FileStreamWriter(ctx, conf.Config.AwsBucket, fmt.Sprintf("txs/ton-txs/datekey=%v/%v.json.gz", txs[0].DateKey, txs[0].BlockSeqNo), errCh)
pw := s.S3.FileStreamWriter(childCtx, conf.Config.AwsBucket, fmt.Sprintf("txs/ton-txs/datekey=%v/%v.json.gz", txs[0].DateKey, txs[0].BlockSeqNo), errCh)
zw, err := gzip.NewWriterLevel(pw, gzip.BestSpeed)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to create gzip writer: %v", err))
return
return errors.Wrap(err, "failed to create gzip writer")
}

// add data to gzip
for _, tx := range txs {
data, err := json.Marshal(tx)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to marshal tx: %v", err))
return
return errors.Wrap(err, "failed to marshal tx")
}
_, err = zw.Write(data)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to write tx to S3: %v", err))
return
return errors.Wrap(err, "failed to write tx to S3")
}
zw.Write([]byte("\n"))
}
Expand All @@ -393,38 +392,39 @@ func (s *Service) storeS3(
returnErr := <-errCh
if returnErr != nil {
log.Error().Msg(fmt.Sprintf("failed to store tx to S3: %v", err))
return
return errors.Wrap(err, "failed to store tx to S3")
}
log.Info().Msg(fmt.Sprintf("[%v] submit txs in checkpoint %v to S3 success", txs[0].DateKey, txs[0].BlockSeqNo))
return nil
})

// store messages to S3
pool.Submit(func() {
eg.Go(func() error {
if len(msgs) == 0 {
return
return nil
}

errCh := make(chan error, 1)
defer close(errCh)

pw := s.S3.FileStreamWriter(ctx, conf.Config.AwsBucket, fmt.Sprintf("messages/ton-messages/datekey=%v/%v.json.gz", msgs[0].DateKey, msgs[0].PartitionKey()), errCh)
pw := s.S3.FileStreamWriter(childCtx, conf.Config.AwsBucket, fmt.Sprintf("messages/ton-messages/datekey=%v/%v.json.gz", msgs[0].DateKey, msgs[0].PartitionKey()), errCh)
zw, err := gzip.NewWriterLevel(pw, gzip.BestSpeed)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to create gzip writer: %v", err))
return
return errors.Wrap(err, "failed to create gzip writer")
}

// add data to gzip
for _, msg := range msgs {
data, err := json.Marshal(msg)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to marshal msg: %v", err))
return
return errors.Wrap(err, "failed to marshal msg")
}
_, err = zw.Write(data)
if err != nil {
log.Error().Msg(fmt.Sprintf("failed to write msg to S3: %v", err))
return
return errors.Wrap(err, "failed to write msg to S3")
}
zw.Write([]byte("\n"))
}
Expand All @@ -436,10 +436,11 @@ func (s *Service) storeS3(
returnErr := <-errCh
if returnErr != nil {
log.Error().Msg(fmt.Sprintf("failed to store msg to S3: %v", err))
return
return errors.Wrap(err, "failed to store msg to S3")
}
log.Info().Msg(fmt.Sprintf("[%v] submit msgs in checkpoint %v to S3 success", msgs[0].DateKey, msgs[0].PartitionKey()))
return nil
})

return nil
return eg.Wait()
}

0 comments on commit 89ee54b

Please sign in to comment.