Skip to content

Commit

Permalink
Merge pull request #3 from KyberNetwork/develop
Browse files Browse the repository at this point in the history
tomaster
  • Loading branch information
secmask authored Sep 15, 2020
2 parents 29dd61a + 705dec2 commit 877e32d
Show file tree
Hide file tree
Showing 15 changed files with 583 additions and 33 deletions.
33 changes: 33 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
language: go

go:
- "1.14.x"

services:
- docker

env:
global:
- GO111MODULE=on
- GOLANGCI_LINT_VERSION=1.23.0
- GO111MODULE=on

install:
# - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b ${TRAVIS_HOME}/bin v${GOLANGCI_LINT_VERSION}
- echo "skip install"

script:
# - golangci-lint run --config .golangci.yml
- go test -v ./...

after_success:
- cd cmd/cclog-server && go build -ldflags '-linkmode external -w -extldflags "-static"' && cd ../..
- docker --version
- docker build -f Dockerfile --label "commit=$TRAVIS_COMMIT" -t kybernetwork/cclog:$TRAVIS_COMMIT .

deploy:
- provider: script
script: bash .travis/docker_push.sh
on:
all_branches: true
condition: $TRAVIS_BRANCH =~ ^develop|staging|master$
19 changes: 19 additions & 0 deletions .travis/docker_push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

set -euo pipefail

readonly DOCKER_PASSWORD=${DOCKER_PASSWORD:-}

if [[ -z "$DOCKER_PASSWORD" ]]; then
echo 'DOCKER_PASSWORD is not available, aborting.'
exit 1
fi

echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin

docker tag kybernetwork/cclog:"$TRAVIS_COMMIT" kybernetwork/cclog:"$TRAVIS_BRANCH"
if [[ -n "$TRAVIS_TAG" ]]; then
docker tag kybernetwork/cclog:"$TRAVIS_COMMIT" kybernetwork/cclog:"$TRAVIS_TAG"
fi

docker push kybernetwork/cclog
57 changes: 57 additions & 0 deletions cmd/ccli/ccli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"fmt"
"io"
"os"

"github.com/urfave/cli"

"github.com/KyberNetwork/cclog/lib/client"
)

const (
flagRemoteAddr = "remote-addr"
flagName = "name"
)

func main() {
app := cli.NewApp()
app.Name = "cli to send log to server"
app.Usage = "cli to send log to server"
app.Action = run

app.Flags = append(app.Flags,
cli.StringFlag{
Name: flagRemoteAddr,
Usage: "remote address",
Value: "127.0.0.1:4560",
EnvVar: "REMOTE_ADDR",
},
cli.StringFlag{
Name: flagName,
Usage: "name of log file",
Value: "test",
EnvVar: "LOG_NAME",
},
)
if err := app.Run(os.Args); err != nil {
fmt.Println("run error", err)
}
}

func run(c *cli.Context) error {
stat, _ := os.Stdin.Stat()
if (stat.Mode() & os.ModeCharDevice) != 0 {
fmt.Println("nothing to send")
return nil
}
w2 := client.NewSyncLogClient(c.String(flagName), c.String(flagRemoteAddr))
n, err := io.Copy(w2, os.Stdin)
if err != nil {
fmt.Println("write failed", err)
return nil
}
fmt.Println("done with", n, "bytes")
return nil
}
12 changes: 6 additions & 6 deletions cmd/cclog-server/cclog.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/urfave/cli"
"go.uber.org/zap"

"cclog/lib/app"
"cclog/lib/cclog"
"github.com/KyberNetwork/cclog/lib/app"
"github.com/KyberNetwork/cclog/lib/server"
)

const (
Expand All @@ -20,8 +20,8 @@ var sugar = zap.NewExample().Sugar()

func main() {
app := app.NewApp()
app.Name = "Binance User Data Stream"
app.Usage = "App to handle binance user data stream"
app.Name = "Log Server"
app.Usage = "Log Server to receive log from services"
app.Action = run

app.Flags = append(app.Flags,
Expand Down Expand Up @@ -65,8 +65,8 @@ func run(c *cli.Context) error {
if maxSize <= 0 {
sugar.Fatalw("max size should > 0")
}
wm := cclog.NewWriterMan(c.String(flagBaseDir), maxSize*1024*1024)
server := cclog.NewServer(c.String(flagBindAddr), wm)
wm := server.NewWriterMan(c.String(flagBaseDir), maxSize*1024*1024)
server := server.NewServer(c.String(flagBindAddr), wm)
sugar.Infow("server now start", "bind_addr", c.String(flagBindAddr))
return server.Start()
}
4 changes: 2 additions & 2 deletions cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"cclog/lib/cclog/client"
"github.com/KyberNetwork/cclog/lib/client"
)

func main() {
w2 := client.NewLogClient("test", "localhost:4560", func(err error) {
w2 := client.NewAsyncLogClient("test", "localhost:4560", func(err error) {
fmt.Println("err", err)
})
w := io.MultiWriter(os.Stdout, w2)
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
module cclog
module github.com/KyberNetwork/cclog

go 1.15

require (
github.com/TheZeroSlave/zapsentry v1.5.0
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef // indirect
github.com/ethereum/go-ethereum v1.9.21
github.com/getsentry/sentry-go v0.7.0
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/robfig/cron/v3 v3.0.0
github.com/stretchr/testify v1.6.1
Expand Down
357 changes: 357 additions & 0 deletions go.sum

Large diffs are not rendered by default.

21 changes: 12 additions & 9 deletions lib/cclog/client/client.go → lib/client/async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"net"
"time"

"cclog/lib/common"
"github.com/KyberNetwork/cclog/lib/common"
)

type SendFailedFn = func(error)

type LogClient struct {
type AsyncLogClient struct {
remoteAddr string
streamClient net.Conn
closeChan chan struct{}
Expand All @@ -20,11 +20,15 @@ type LogClient struct {
}

const (
bufferSize = 100
defaultBufferSize = 100
backOffSeconds = 1.0
)

func NewLogClient(name string, remoteAddr string, fn SendFailedFn) *LogClient {
c := &LogClient{
func NewAsyncLogClient(name string, remoteAddr string, fn SendFailedFn) *AsyncLogClient {
return NewAsyncLogClientWithBuffer(name, remoteAddr, fn, defaultBufferSize)
}
func NewAsyncLogClientWithBuffer(name string, remoteAddr string, fn SendFailedFn, bufferSize int) *AsyncLogClient {
c := &AsyncLogClient{
name: name,
remoteAddr: remoteAddr,
buffer: make(chan []byte, bufferSize),
Expand All @@ -35,7 +39,7 @@ func NewLogClient(name string, remoteAddr string, fn SendFailedFn) *LogClient {
return c
}

func (l *LogClient) Write(p []byte) (n int, err error) {
func (l *AsyncLogClient) Write(p []byte) (n int, err error) {
select {
case l.buffer <- p:
break
Expand All @@ -45,14 +49,13 @@ func (l *LogClient) Write(p []byte) (n int, err error) {
return len(p), nil
}

func (l *LogClient) Close() error {
func (l *AsyncLogClient) Close() error {
close(l.closeChan)
return nil
}

func (l *LogClient) loop() {
func (l *AsyncLogClient) loop() {
lastConnect := time.Now().Add(-2 * time.Second)
backOffSeconds := 1.0
write := func(data []byte) {
var err error
if l.streamClient == nil {
Expand Down
76 changes: 76 additions & 0 deletions lib/client/sync_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package client

import (
"fmt"
"net"
"sync"
"time"

"github.com/KyberNetwork/cclog/lib/common"
)

type SyncLogClient struct {
remoteAddr string
streamClient net.Conn
name string
lock sync.Mutex
lastConnect time.Time
}

func NewSyncLogClient(name string, remoteAddr string) *SyncLogClient {
return NewSyncLogClientWithBuffer(name, remoteAddr)
}
func NewSyncLogClientWithBuffer(name string, remoteAddr string) *SyncLogClient {
c := &SyncLogClient{
name: name,
remoteAddr: remoteAddr,
lastConnect: time.Now().Add(-time.Minute),
}
return c
}

func (l *SyncLogClient) Write(p []byte) (n int, err error) {
l.lock.Lock()
defer l.lock.Unlock()
if l.streamClient == nil {
secs := time.Since(l.lastConnect).Seconds()
if secs < backOffSeconds {
// skip due recent reconnect failed, we drop data as we can't hold
return
}
l.lastConnect = time.Now()
l.streamClient, err = net.Dial("tcp", l.remoteAddr)
if err != nil {
return 0, err
}
err = common.WriteConnectRequest(l.streamClient, common.ConnectRequest{Name: l.name})
if err != nil {
return 0, err
}
var resp common.ConnectResponse
resp, err = common.ReadConnectResponse(l.streamClient)
if err != nil {
return 0, err
}
if !resp.Success {
fmt.Println("server error", resp.Status)
_ = l.streamClient.Close()
l.streamClient = nil
return
}
}
n, err = l.streamClient.Write(p)
if err != nil {
fmt.Printf("write failed, %+v", err)
_ = l.streamClient.Close()
l.streamClient = nil
}
return
}

func (l *SyncLogClient) Close() error {
if l.streamClient != nil {
return l.streamClient.Close()
}
return nil
}
18 changes: 11 additions & 7 deletions lib/cclog/client_handler.go → lib/server/client_handler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package cclog
package server

import (
"net"
"regexp"

"go.uber.org/zap"

"cclog/lib/common"
"github.com/KyberNetwork/cclog/lib/common"
)

const (
readBufferSize = 1 << 14
)

var (
nameGrep = regexp.MustCompile(`^[0-9a-zA-Z-_]+$`)
)

type ClientHandler struct {
Expand All @@ -23,10 +31,6 @@ func NewClientHandler(c net.Conn, wm *WriterMan) *ClientHandler {
}
}

var (
nameGrep = regexp.MustCompile(`^[0-9a-zA-Z-_]+$`)
)

func (c *ClientHandler) Stop() {
_ = c.conn.Close()
}
Expand Down Expand Up @@ -59,7 +63,7 @@ func (c *ClientHandler) Run() {
remote := c.conn.RemoteAddr()
l := c.l.With("from", remote.String(), "name", req.Name)
wLog := c.wMan.GetOrCreate(req.Name)
buff := make([]byte, 4096)
buff := make([]byte, readBufferSize)
for {
n, err := c.conn.Read(buff)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/cclog/client_test.go → lib/server/client_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cclog
package server

import (
"testing"
Expand Down
7 changes: 5 additions & 2 deletions lib/cclog/server.go → lib/server/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cclog
package server

import (
"net"
Expand Down Expand Up @@ -40,5 +40,8 @@ func (s *Server) Start() error {
}

func (s *Server) Shutdown() error {
return s.listener.Close()
if s.listener != nil {
return s.listener.Close()
}
return nil
}
2 changes: 1 addition & 1 deletion lib/cclog/writer.go → lib/server/writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cclog
package server

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion lib/cclog/writer_test.go → lib/server/writer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cclog
package server

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion lib/cclog/writterman.go → lib/server/writterman.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cclog
package server

import (
"path/filepath"
Expand Down

0 comments on commit 877e32d

Please sign in to comment.