diff --git a/go.mod b/go.mod index 9c1034d..57f4ec5 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( require ( github.com/IBM/sarama v1.42.1 - github.com/alitto/pond v1.9.0 github.com/aws/aws-sdk-go v1.54.3 github.com/creasty/defaults v1.7.0 github.com/getnimbus/ultrago v1.0.0 diff --git a/go.sum b/go.sum index 63f8ba3..a3ccf03 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,6 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/agiledragon/gomonkey/v2 v2.3.1/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= -github.com/alitto/pond v1.9.0 h1:B8BrvXyKe97NK9LHuRsQAOmpRnsp6GJ7mCg1Cgitczo= -github.com/alitto/pond v1.9.0/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= github.com/aws/aws-sdk-go v1.37.32/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.54.3 h1:Bk+EXoq6v5I1xmHR9GQGpsMWZZFXs+FD+5uPyEmfgX0= github.com/aws/aws-sdk-go v1.54.3/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= diff --git a/internal/app/indexer/save.go b/internal/app/indexer/save.go index ec2a150..1dd0bb5 100644 --- a/internal/app/indexer/save.go +++ b/internal/app/indexer/save.go @@ -9,7 +9,6 @@ import ( "sort" "time" - "github.com/alitto/pond" "github.com/pkg/errors" "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" @@ -54,6 +53,12 @@ func (s *Service) insertData( eg, childCtx := errgroup.WithContext(ctx) + if conf.Config.IsBackfill() { + eg.Go(func() error { + return s.storeS3(childCtx, b, tx, msg) + }) + } + eg.Go(func() error { //if err := func() error { // defer app.TimeTrack(time.Now(), "AddAccountStates(%d)", len(acc)) @@ -87,12 +92,6 @@ func (s *Service) insertData( return nil }) - if conf.Config.IsBackfill() { - eg.Go(func() error { - return s.storeS3(childCtx, b, tx, msg) - }) - } - if err := eg.Wait(); err != nil { return errors.Wrap(err, "cannot insert data") } @@ -309,23 +308,22 @@ func (s *Service) storeS3( ) error { log.Info().Msg("start goroutine store s3...") - pool := pond.New(10, 0, pond.MinWorkers(3)) - defer pool.StopAndWait() + eg, childCtx := errgroup.WithContext(ctx) // store blocks to S3 - pool.Submit(func() { + eg.Go(func() error { if len(blocks) == 0 { - return + return nil } errCh := make(chan error, 1) defer close(errCh) - pw := s.S3.FileStreamWriter(ctx, conf.Config.AwsBucket, fmt.Sprintf("blocks/ton-blocks/datekey=%v/%v.json.gz", blocks[0].DateKey, blocks[0].PartitionKey()), errCh) + pw := s.S3.FileStreamWriter(childCtx, conf.Config.AwsBucket, fmt.Sprintf("blocks/ton-blocks/datekey=%v/%v.json.gz", blocks[0].DateKey, blocks[0].PartitionKey()), errCh) zw, err := gzip.NewWriterLevel(pw, gzip.BestSpeed) if err != nil { log.Error().Msg(fmt.Sprintf("failed to create gzip writer: %v", err)) - return + return errors.Wrap(err, "failed to create gzip writer") } // add data to gzip @@ -333,12 +331,12 @@ func (s *Service) storeS3( data, err := json.Marshal(block) if err != nil { log.Error().Msg(fmt.Sprintf("failed to marshal block: %v", err)) - return + return errors.Wrap(err, "failed to marshal block") } _, err = zw.Write(data) if err != nil { log.Error().Msg(fmt.Sprintf("failed to write block to S3: %v", err)) - return + return errors.Wrap(err, "failed to write block to S3") } zw.Write([]byte("\n")) } @@ -350,25 +348,26 @@ func (s *Service) storeS3( returnErr := <-errCh if returnErr != nil { log.Error().Msg(fmt.Sprintf("failed to store block to S3: %v", err)) - return + return errors.Wrap(err, "failed to store block to S3") } log.Info().Msg(fmt.Sprintf("[%v] submit blocks %v to S3 success", blocks[0].DateKey, blocks[0].PartitionKey())) + return nil }) // store transactions to S3 - pool.Submit(func() { + eg.Go(func() error { if len(txs) == 0 { - return + return nil } errCh := make(chan error, 1) defer close(errCh) - pw := s.S3.FileStreamWriter(ctx, conf.Config.AwsBucket, fmt.Sprintf("txs/ton-txs/datekey=%v/%v.json.gz", txs[0].DateKey, txs[0].BlockSeqNo), errCh) + pw := s.S3.FileStreamWriter(childCtx, conf.Config.AwsBucket, fmt.Sprintf("txs/ton-txs/datekey=%v/%v.json.gz", txs[0].DateKey, txs[0].BlockSeqNo), errCh) zw, err := gzip.NewWriterLevel(pw, gzip.BestSpeed) if err != nil { log.Error().Msg(fmt.Sprintf("failed to create gzip writer: %v", err)) - return + return errors.Wrap(err, "failed to create gzip writer") } // add data to gzip @@ -376,12 +375,12 @@ func (s *Service) storeS3( data, err := json.Marshal(tx) if err != nil { log.Error().Msg(fmt.Sprintf("failed to marshal tx: %v", err)) - return + return errors.Wrap(err, "failed to marshal tx") } _, err = zw.Write(data) if err != nil { log.Error().Msg(fmt.Sprintf("failed to write tx to S3: %v", err)) - return + return errors.Wrap(err, "failed to write tx to S3") } zw.Write([]byte("\n")) } @@ -393,25 +392,26 @@ func (s *Service) storeS3( returnErr := <-errCh if returnErr != nil { log.Error().Msg(fmt.Sprintf("failed to store tx to S3: %v", err)) - return + return errors.Wrap(err, "failed to store tx to S3") } log.Info().Msg(fmt.Sprintf("[%v] submit txs in checkpoint %v to S3 success", txs[0].DateKey, txs[0].BlockSeqNo)) + return nil }) // store messages to S3 - pool.Submit(func() { + eg.Go(func() error { if len(msgs) == 0 { - return + return nil } errCh := make(chan error, 1) defer close(errCh) - pw := s.S3.FileStreamWriter(ctx, conf.Config.AwsBucket, fmt.Sprintf("messages/ton-messages/datekey=%v/%v.json.gz", msgs[0].DateKey, msgs[0].PartitionKey()), errCh) + pw := s.S3.FileStreamWriter(childCtx, conf.Config.AwsBucket, fmt.Sprintf("messages/ton-messages/datekey=%v/%v.json.gz", msgs[0].DateKey, msgs[0].PartitionKey()), errCh) zw, err := gzip.NewWriterLevel(pw, gzip.BestSpeed) if err != nil { log.Error().Msg(fmt.Sprintf("failed to create gzip writer: %v", err)) - return + return errors.Wrap(err, "failed to create gzip writer") } // add data to gzip @@ -419,12 +419,12 @@ func (s *Service) storeS3( data, err := json.Marshal(msg) if err != nil { log.Error().Msg(fmt.Sprintf("failed to marshal msg: %v", err)) - return + return errors.Wrap(err, "failed to marshal msg") } _, err = zw.Write(data) if err != nil { log.Error().Msg(fmt.Sprintf("failed to write msg to S3: %v", err)) - return + return errors.Wrap(err, "failed to write msg to S3") } zw.Write([]byte("\n")) } @@ -436,10 +436,11 @@ func (s *Service) storeS3( returnErr := <-errCh if returnErr != nil { log.Error().Msg(fmt.Sprintf("failed to store msg to S3: %v", err)) - return + return errors.Wrap(err, "failed to store msg to S3") } log.Info().Msg(fmt.Sprintf("[%v] submit msgs in checkpoint %v to S3 success", msgs[0].DateKey, msgs[0].PartitionKey())) + return nil }) - return nil + return eg.Wait() }