Skip to content

Commit

Permalink
update async log (#7)
Browse files Browse the repository at this point in the history
*  switch use of channel to logHolder
*  support lz4
  • Loading branch information
secmask authored Oct 18, 2023
1 parent eed3e87 commit 2c6b57c
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 57 deletions.
21 changes: 3 additions & 18 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
- master
- staging
- develop
- ci
pull_request:
env:
SERVICE: cclog
Expand Down Expand Up @@ -56,7 +57,7 @@ jobs:
short_sha="$(git rev-parse --short HEAD)"
branch_tag="$(echo "$CURRENT_BRANCH" | sed 's/[^a-zA-Z0-9]/-/g' | sed 's/--*/-/g' | sed 's/-$//g')"
echo "::set-output name=image_tag::$branch_tag-$short_sha"
echo "::set-output name=branch_tag::$branch_tag-$short_sha"
echo "::set-output name=branch_tag::$branch_tag"
lint:
name: Lint
runs-on: ubuntu-latest
Expand Down Expand Up @@ -110,27 +111,11 @@ jobs:
VERSION_TAG: ${{ needs.prepare.outputs.version_tag }}
BRANCH_TAG: ${{ needs.prepare.outputs.current_branch }}
run: |
DOCKER_REPO="kybernetwork/$SERVICE"
COMBINE_TAG="$DOCKER_REPO:$BRANCH_TAG-$COMMIT_TAG"
echo "::set-output name=combine::$COMBINE_TAG"
IMAGE_WITH_BRANCH_TAG="$DOCKER_REPO:$BRANCH_TAG"
echo "::set-output name=branch::$IMAGE_WITH_BRANCH_TAG"
[[ -n "$VERSION_TAG" ]] || exit 0
IMAGE_WITH_VERSION_TAG="$DOCKER_REPO:$VERSION_TAG"
echo "::set-output name=version::$IMAGE_WITH_VERSION_TAG"
echo "run docker build"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@master
if: github.event_name != 'pull_request'
- name: Docker login
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}
if: github.event_name != 'pull_request'

- name: Gcloud Auth
uses: google-github-actions/auth@v0
Expand Down
40 changes: 40 additions & 0 deletions lib/agent/logholder.go
Original file line number Diff line number Diff line change
@@ -1 +1,41 @@
package agent

import (
"bytes"
"sync"
)

var (
BufferSize = 1024 * 1024
BufferPool = sync.Pool{New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, BufferSize))
}}
)

type LogHolder struct {
buffer *bytes.Buffer
lock sync.Mutex
}

func NewLogHolder() *LogHolder {
return &LogHolder{
buffer: BufferPool.Get().(*bytes.Buffer),
}
}

func (b *LogHolder) Write(d []byte) {
b.lock.Lock()
defer b.lock.Unlock()
b.buffer.Write(d)
}

func (b *LogHolder) GetAndClear() (*bytes.Buffer, bool) {
b.lock.Lock()
defer b.lock.Unlock()
if b.buffer.Len() == 0 {
return nil, false
}
resp := b.buffer
b.buffer = BufferPool.Get().(*bytes.Buffer)
return resp, true
}
16 changes: 10 additions & 6 deletions lib/app/logging.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package app

import (
"os"

"github.com/TheZeroSlave/zapsentry"
"github.com/getsentry/sentry-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -51,19 +53,21 @@ func NewFlusher(s syncer) func() {

// NewLogger creates a new logger instance.
// The type of logger instance will be different with different application running modes.
func NewLogger(c *cli.Context) (*zap.Logger, error) {
return zap.NewProduction()
func newLogger() (*zap.Logger, zap.AtomicLevel) {
atom := zap.NewAtomicLevelAt(zap.DebugLevel)
pConf := zap.NewProductionEncoderConfig()
pConf.EncodeTime = zapcore.ISO8601TimeEncoder
encoder := zapcore.NewConsoleEncoder(pConf)
l := zap.New(zapcore.NewCore(encoder, zapcore.AddSync(os.Stdout), atom), zap.AddCaller())
return l, atom
}

// NewSugaredLogger creates a new sugared logger and a flush function. The flush function should be
// called by consumer before quitting application.
// This function should be use most of the time unless
// the application requires extensive performance, in this case use NewLogger.
func NewSugaredLogger(c *cli.Context) (*zap.SugaredLogger, func(), error) {
logger, err := NewLogger(c)
if err != nil {
return nil, nil, err
}
logger, _ := newLogger()
// init sentry if flag dsn exists
if len(c.String(sentryDSNFlag)) != 0 {
sentryClient, err := sentry.NewClient(
Expand Down
94 changes: 61 additions & 33 deletions lib/client/async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,51 @@ package client

import (
"fmt"
"io"
"log"
"net"
"time"

"github.com/KyberNetwork/cclog/lib/agent"
"github.com/pierrec/lz4/v3"

"github.com/KyberNetwork/cclog/lib/common"
)

type SendFailedFn = func(error)

type AsyncLogClient struct {
remoteAddr string
streamClient net.Conn
closeChan chan struct{}
buffer chan []byte
failedFn SendFailedFn
name string
remoteAddr string
closeChan chan struct{}
logHolder *agent.LogHolder
failedFn SendFailedFn
name string
compression bool
}

const (
defaultBufferSize = 100
backOffSeconds = 1.0
backOffSeconds = 1.0
)

func NewAsyncLogClient(name string, remoteAddr string, fn SendFailedFn) *AsyncLogClient {
return NewAsyncLogClientWithBuffer(name, remoteAddr, fn, defaultBufferSize)
return NewAsyncLogClientWithBuffer(name, remoteAddr, fn, true)
}
func NewAsyncLogClientWithBuffer(name string, remoteAddr string, fn SendFailedFn, bufferSize int) *AsyncLogClient {

func NewAsyncLogClientWithBuffer(name string, remoteAddr string, fn SendFailedFn, compression bool) *AsyncLogClient {
c := &AsyncLogClient{
name: name,
remoteAddr: remoteAddr,
buffer: make(chan []byte, bufferSize),
closeChan: make(chan struct{}),
failedFn: fn,
name: name,
remoteAddr: remoteAddr,
logHolder: agent.NewLogHolder(),
closeChan: make(chan struct{}),
failedFn: fn,
compression: compression,
}
go c.loop()
return c
}

func (l *AsyncLogClient) Write(p []byte) (n int, err error) {
select {
case l.buffer <- p:
break
default:
l.failedFn(fmt.Errorf("failed to append log, message size: %d", len(p)))
}
l.logHolder.Write(p)
return len(p), nil
}

Expand All @@ -54,51 +55,78 @@ func (l *AsyncLogClient) Close() error {
return nil
}

func (l *AsyncLogClient) sendBuffer(w io.Writer, data []byte) error {
_, err := w.Write(data)
if err != nil {
return err
}
if lw, ok := w.(*lz4.Writer); ok {
return lw.Flush()
}
return nil
}

func (l *AsyncLogClient) loop() {
lastConnect := time.Now().Add(-2 * time.Second)
var (
streamClient net.Conn
writer io.Writer
)
write := func(data []byte) {
var err error
if l.streamClient == nil {
if streamClient == nil {
secs := time.Since(lastConnect).Seconds()
if secs < backOffSeconds {
// skip due recent reconnect failed, we drop data as we can't hold
return
}
lastConnect = time.Now()
l.streamClient, err = net.Dial("tcp", l.remoteAddr)
streamClient, err = net.Dial("tcp", l.remoteAddr)
if err != nil {
l.failedFn(fmt.Errorf("failed to connect, %w", err))
return
}
err = common.WriteConnectRequest(l.streamClient, common.ConnectRequest{Name: l.name})
err = common.WriteConnectRequest(streamClient, common.ConnectRequest{Name: l.name, Compression: l.compression})
if err != nil {
l.failedFn(fmt.Errorf("write connect request failed, %w", err))
return
}
resp, err := common.ReadConnectResponse(l.streamClient)
resp, err := common.ReadConnectResponse(streamClient)
if err != nil {
l.failedFn(fmt.Errorf("read failed, %w", err))
return
}
if !resp.Success {
l.failedFn(fmt.Errorf("server return error, %s", resp.Status))
_ = l.streamClient.Close()
l.streamClient = nil
_ = streamClient.Close()
streamClient = nil
return
}
if l.compression {
writer = lz4.NewWriter(streamClient)
} else {
writer = streamClient
}
}
_, err = l.streamClient.Write(data)
log.Println("send data", "size", len(data))
err = l.sendBuffer(writer, data)
if err != nil {
l.failedFn(fmt.Errorf("write failed, %w", err))
_ = l.streamClient.Close()
l.streamClient = nil
_ = streamClient.Close()
streamClient = nil
writer = nil
}
}

tick := time.NewTicker(time.Millisecond * 500)
defer tick.Stop()
for {
select {
case b := <-l.buffer:
write(b)
case <-tick.C:
if buffer, ok := l.logHolder.GetAndClear(); ok {
write(buffer.Bytes())
buffer.Reset()
agent.BufferPool.Put(buffer)
}
case <-l.closeChan:
break
}
Expand Down

0 comments on commit 2c6b57c

Please sign in to comment.