diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 0ff2c51..8f354f7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -5,6 +5,7 @@ on: - master - staging - develop + - ci pull_request: env: SERVICE: cclog @@ -56,7 +57,7 @@ jobs: short_sha="$(git rev-parse --short HEAD)" branch_tag="$(echo "$CURRENT_BRANCH" | sed 's/[^a-zA-Z0-9]/-/g' | sed 's/--*/-/g' | sed 's/-$//g')" echo "::set-output name=image_tag::$branch_tag-$short_sha" - echo "::set-output name=branch_tag::$branch_tag-$short_sha" + echo "::set-output name=branch_tag::$branch_tag" lint: name: Lint runs-on: ubuntu-latest @@ -110,27 +111,11 @@ jobs: VERSION_TAG: ${{ needs.prepare.outputs.version_tag }} BRANCH_TAG: ${{ needs.prepare.outputs.current_branch }} run: | - DOCKER_REPO="kybernetwork/$SERVICE" - - COMBINE_TAG="$DOCKER_REPO:$BRANCH_TAG-$COMMIT_TAG" - echo "::set-output name=combine::$COMBINE_TAG" - - IMAGE_WITH_BRANCH_TAG="$DOCKER_REPO:$BRANCH_TAG" - echo "::set-output name=branch::$IMAGE_WITH_BRANCH_TAG" - - [[ -n "$VERSION_TAG" ]] || exit 0 - IMAGE_WITH_VERSION_TAG="$DOCKER_REPO:$VERSION_TAG" - echo "::set-output name=version::$IMAGE_WITH_VERSION_TAG" + echo "run docker build" - name: Set up Docker Buildx uses: docker/setup-buildx-action@master if: github.event_name != 'pull_request' - - name: Docker login - uses: docker/login-action@v1 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_TOKEN }} - if: github.event_name != 'pull_request' - name: Gcloud Auth uses: google-github-actions/auth@v0 diff --git a/lib/agent/logholder.go b/lib/agent/logholder.go index 4883155..0f7015f 100644 --- a/lib/agent/logholder.go +++ b/lib/agent/logholder.go @@ -1 +1,41 @@ package agent + +import ( + "bytes" + "sync" +) + +var ( + BufferSize = 1024 * 1024 + BufferPool = sync.Pool{New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, BufferSize)) + }} +) + +type LogHolder struct { + buffer *bytes.Buffer + lock sync.Mutex +} + +func NewLogHolder() *LogHolder { + return &LogHolder{ + buffer: BufferPool.Get().(*bytes.Buffer), + } +} + +func (b *LogHolder) Write(d []byte) { + b.lock.Lock() + defer b.lock.Unlock() + b.buffer.Write(d) +} + +func (b *LogHolder) GetAndClear() (*bytes.Buffer, bool) { + b.lock.Lock() + defer b.lock.Unlock() + if b.buffer.Len() == 0 { + return nil, false + } + resp := b.buffer + b.buffer = BufferPool.Get().(*bytes.Buffer) + return resp, true +} diff --git a/lib/app/logging.go b/lib/app/logging.go index 52170ab..971d614 100644 --- a/lib/app/logging.go +++ b/lib/app/logging.go @@ -1,6 +1,8 @@ package app import ( + "os" + "github.com/TheZeroSlave/zapsentry" "github.com/getsentry/sentry-go" "github.com/pkg/errors" @@ -51,8 +53,13 @@ func NewFlusher(s syncer) func() { // NewLogger creates a new logger instance. // The type of logger instance will be different with different application running modes. -func NewLogger(c *cli.Context) (*zap.Logger, error) { - return zap.NewProduction() +func newLogger() (*zap.Logger, zap.AtomicLevel) { + atom := zap.NewAtomicLevelAt(zap.DebugLevel) + pConf := zap.NewProductionEncoderConfig() + pConf.EncodeTime = zapcore.ISO8601TimeEncoder + encoder := zapcore.NewConsoleEncoder(pConf) + l := zap.New(zapcore.NewCore(encoder, zapcore.AddSync(os.Stdout), atom), zap.AddCaller()) + return l, atom } // NewSugaredLogger creates a new sugared logger and a flush function. The flush function should be @@ -60,10 +67,7 @@ func NewLogger(c *cli.Context) (*zap.Logger, error) { // This function should be use most of the time unless // the application requires extensive performance, in this case use NewLogger. func NewSugaredLogger(c *cli.Context) (*zap.SugaredLogger, func(), error) { - logger, err := NewLogger(c) - if err != nil { - return nil, nil, err - } + logger, _ := newLogger() // init sentry if flag dsn exists if len(c.String(sentryDSNFlag)) != 0 { sentryClient, err := sentry.NewClient( diff --git a/lib/client/async_client.go b/lib/client/async_client.go index 9a7c9ab..1efb034 100644 --- a/lib/client/async_client.go +++ b/lib/client/async_client.go @@ -2,50 +2,51 @@ package client import ( "fmt" + "io" + "log" "net" "time" + "github.com/KyberNetwork/cclog/lib/agent" + "github.com/pierrec/lz4/v3" + "github.com/KyberNetwork/cclog/lib/common" ) type SendFailedFn = func(error) type AsyncLogClient struct { - remoteAddr string - streamClient net.Conn - closeChan chan struct{} - buffer chan []byte - failedFn SendFailedFn - name string + remoteAddr string + closeChan chan struct{} + logHolder *agent.LogHolder + failedFn SendFailedFn + name string + compression bool } const ( - defaultBufferSize = 100 - backOffSeconds = 1.0 + backOffSeconds = 1.0 ) func NewAsyncLogClient(name string, remoteAddr string, fn SendFailedFn) *AsyncLogClient { - return NewAsyncLogClientWithBuffer(name, remoteAddr, fn, defaultBufferSize) + return NewAsyncLogClientWithBuffer(name, remoteAddr, fn, true) } -func NewAsyncLogClientWithBuffer(name string, remoteAddr string, fn SendFailedFn, bufferSize int) *AsyncLogClient { + +func NewAsyncLogClientWithBuffer(name string, remoteAddr string, fn SendFailedFn, compression bool) *AsyncLogClient { c := &AsyncLogClient{ - name: name, - remoteAddr: remoteAddr, - buffer: make(chan []byte, bufferSize), - closeChan: make(chan struct{}), - failedFn: fn, + name: name, + remoteAddr: remoteAddr, + logHolder: agent.NewLogHolder(), + closeChan: make(chan struct{}), + failedFn: fn, + compression: compression, } go c.loop() return c } func (l *AsyncLogClient) Write(p []byte) (n int, err error) { - select { - case l.buffer <- p: - break - default: - l.failedFn(fmt.Errorf("failed to append log, message size: %d", len(p))) - } + l.logHolder.Write(p) return len(p), nil } @@ -54,51 +55,78 @@ func (l *AsyncLogClient) Close() error { return nil } +func (l *AsyncLogClient) sendBuffer(w io.Writer, data []byte) error { + _, err := w.Write(data) + if err != nil { + return err + } + if lw, ok := w.(*lz4.Writer); ok { + return lw.Flush() + } + return nil +} + func (l *AsyncLogClient) loop() { lastConnect := time.Now().Add(-2 * time.Second) + var ( + streamClient net.Conn + writer io.Writer + ) write := func(data []byte) { var err error - if l.streamClient == nil { + if streamClient == nil { secs := time.Since(lastConnect).Seconds() if secs < backOffSeconds { // skip due recent reconnect failed, we drop data as we can't hold return } lastConnect = time.Now() - l.streamClient, err = net.Dial("tcp", l.remoteAddr) + streamClient, err = net.Dial("tcp", l.remoteAddr) if err != nil { l.failedFn(fmt.Errorf("failed to connect, %w", err)) return } - err = common.WriteConnectRequest(l.streamClient, common.ConnectRequest{Name: l.name}) + err = common.WriteConnectRequest(streamClient, common.ConnectRequest{Name: l.name, Compression: l.compression}) if err != nil { l.failedFn(fmt.Errorf("write connect request failed, %w", err)) return } - resp, err := common.ReadConnectResponse(l.streamClient) + resp, err := common.ReadConnectResponse(streamClient) if err != nil { l.failedFn(fmt.Errorf("read failed, %w", err)) return } if !resp.Success { l.failedFn(fmt.Errorf("server return error, %s", resp.Status)) - _ = l.streamClient.Close() - l.streamClient = nil + _ = streamClient.Close() + streamClient = nil return } + if l.compression { + writer = lz4.NewWriter(streamClient) + } else { + writer = streamClient + } } - _, err = l.streamClient.Write(data) + log.Println("send data", "size", len(data)) + err = l.sendBuffer(writer, data) if err != nil { l.failedFn(fmt.Errorf("write failed, %w", err)) - _ = l.streamClient.Close() - l.streamClient = nil + _ = streamClient.Close() + streamClient = nil + writer = nil } } - + tick := time.NewTicker(time.Millisecond * 500) + defer tick.Stop() for { select { - case b := <-l.buffer: - write(b) + case <-tick.C: + if buffer, ok := l.logHolder.GetAndClear(); ok { + write(buffer.Bytes()) + buffer.Reset() + agent.BufferPool.Put(buffer) + } case <-l.closeChan: break }