Skip to content

Commit

Permalink
Revert back
Browse files Browse the repository at this point in the history
  • Loading branch information
bgokden committed Aug 2, 2021
2 parents c7fb9a6 + 3a98d63 commit 684ea7f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 25 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/kr/pretty v0.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/processout/grpc-go-pool v1.2.1
github.com/spf13/cobra v1.2.1
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/processout/grpc-go-pool v1.2.1 h1:hbp1BOA02CIxEAoRLHGpUhhPFv77nwfBLBeO3Ya9P7I=
github.com/processout/grpc-go-pool v1.2.1/go.mod h1:F4hiNj96O6VQ87jv4rdz8R9tkHdelQQJ/J2B1a5VSt4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
25 changes: 10 additions & 15 deletions node/datasourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"log"
"sync"
"time"

data "github.com/bgokden/veri/data"
"github.com/bgokden/veri/util"
Expand All @@ -27,20 +28,10 @@ type DataSourceClient struct {
ConnectionCache *util.ConnectionCache
}

// func (dcs *DataSourceClient) GetVeriServiceClient() (pb.VeriServiceClient, *grpc.ClientConn, error) {
// // This can be a client pool
// address := dcs.Ids[0]
// conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(200)*time.Millisecond))
// if err != nil {
// log.Printf("fail to dial: %v\n", err)
// return nil, nil, err
// }
// client := pb.NewVeriServiceClient(conn)
// return client, conn, nil
// }

func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream chan<- *pb.ScoredDatum, queryWaitGroup *sync.WaitGroup, config *pb.SearchConfig) error {
defer queryWaitGroup.Done()
clientCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dcs.ConnectionCache.Get(dcs.IdOfPeer)
if conn == nil {
return errors.New("Connection failure")
Expand All @@ -51,7 +42,7 @@ func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream cha
Datum: []*pb.Datum{datum},
Config: config,
}
stream, err := client.SearchStream(context.Background(), searchRequest)
stream, err := client.SearchStream(clientCtx, searchRequest)
if err != nil {
return err
}
Expand All @@ -68,6 +59,8 @@ func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream cha
}

func (dcs *DataSourceClient) Insert(datum *pb.Datum, config *pb.InsertConfig) error {
clientCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dcs.ConnectionCache.Get(dcs.IdOfPeer)
if conn == nil {
return errors.New("Connection failure")
Expand All @@ -79,11 +72,13 @@ func (dcs *DataSourceClient) Insert(datum *pb.Datum, config *pb.InsertConfig) er
Datum: datum,
DataName: dcs.Name,
}
_, err := client.Insert(context.Background(), request)
_, err := client.Insert(clientCtx, request)
return err
}

func (dcs *DataSourceClient) GetDataInfo() *pb.DataInfo {
clientCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dcs.ConnectionCache.Get(dcs.IdOfPeer)
if conn == nil {
log.Printf("Connection failure\n")
Expand All @@ -94,7 +89,7 @@ func (dcs *DataSourceClient) GetDataInfo() *pb.DataInfo {
request := &pb.GetDataRequest{
Name: dcs.Name,
}
dataInfo, err := client.GetDataInfo(context.Background(), request)
dataInfo, err := client.GetDataInfo(clientCtx, request)
if err != nil {
log.Printf("GetDataInfo Error: %v\n", err.Error())
return nil
Expand Down
20 changes: 10 additions & 10 deletions node/nodeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (n *Node) SendJoinRequest(id string) error {
if conn == nil {
return errors.New("Connection failure")
}
defer n.ConnectionCache.Close(conn)
defer n.ConnectionCache.Put(conn)
// this connection should be closed time to time
// It is observed that it can cause a split brain due to two nodes
// sync to each other and never break connection
Expand Down Expand Up @@ -220,15 +220,15 @@ func (n *Node) CreateDataIfNotExists(ctx context.Context, in *pb.DataConfig) (*p
return aData.GetDataInfo(), nil
}

func (n *Node) getClient(address string) (pb.VeriServiceClient, *grpc.ClientConn, error) {
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(200)*time.Millisecond))
if err != nil {
// log.Printf("fail to dial: %v\n", err)
return nil, nil, err
}
client := pb.NewVeriServiceClient(conn)
return client, conn, nil
}
// func (n *Node) getClient(address string) (pb.VeriServiceClient, *grpc.ClientConn, error) {
// conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(200)*time.Millisecond))
// if err != nil {
// // log.Printf("fail to dial: %v\n", err)
// return nil, nil, err
// }
// client := pb.NewVeriServiceClient(conn)
// return client, conn, nil
// }

func (n *Node) AddPeer(ctx context.Context, in *pb.AddPeerRequest) (*pb.AddPeerResponse, error) {
return &pb.AddPeerResponse{}, n.AddPeerElement(in.GetPeer())
Expand Down

0 comments on commit 684ea7f

Please sign in to comment.