diff --git a/go.mod b/go.mod index bef4f5f..d49b33e 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/kr/pretty v0.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 + github.com/processout/grpc-go-pool v1.2.1 github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.8.1 github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index 2b0786e..5baaa46 100644 --- a/go.sum +++ b/go.sum @@ -220,6 +220,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/processout/grpc-go-pool v1.2.1 h1:hbp1BOA02CIxEAoRLHGpUhhPFv77nwfBLBeO3Ya9P7I= +github.com/processout/grpc-go-pool v1.2.1/go.mod h1:F4hiNj96O6VQ87jv4rdz8R9tkHdelQQJ/J2B1a5VSt4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/node/datasourceclient.go b/node/datasourceclient.go index 1372a1d..a3cfc9e 100644 --- a/node/datasourceclient.go +++ b/node/datasourceclient.go @@ -5,6 +5,7 @@ import ( "errors" "log" "sync" + "time" data "github.com/bgokden/veri/data" "github.com/bgokden/veri/util" @@ -27,20 +28,10 @@ type DataSourceClient struct { ConnectionCache *util.ConnectionCache } -// func (dcs *DataSourceClient) GetVeriServiceClient() (pb.VeriServiceClient, *grpc.ClientConn, error) { -// // This can be a client pool -// address := dcs.Ids[0] -// conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(200)*time.Millisecond)) -// if err != nil { -// log.Printf("fail to dial: %v\n", err) -// return nil, nil, err -// } -// client := pb.NewVeriServiceClient(conn) -// return client, conn, nil -// } - func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream chan<- *pb.ScoredDatum, queryWaitGroup *sync.WaitGroup, config *pb.SearchConfig) error { defer queryWaitGroup.Done() + clientCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() conn := dcs.ConnectionCache.Get(dcs.IdOfPeer) if conn == nil { return errors.New("Connection failure") @@ -51,7 +42,7 @@ func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream cha Datum: []*pb.Datum{datum}, Config: config, } - stream, err := client.SearchStream(context.Background(), searchRequest) + stream, err := client.SearchStream(clientCtx, searchRequest) if err != nil { return err } @@ -68,6 +59,8 @@ func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream cha } func (dcs *DataSourceClient) Insert(datum *pb.Datum, config *pb.InsertConfig) error { + clientCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() conn := dcs.ConnectionCache.Get(dcs.IdOfPeer) if conn == nil { return errors.New("Connection failure") @@ -79,11 +72,13 @@ func (dcs *DataSourceClient) Insert(datum *pb.Datum, config *pb.InsertConfig) er Datum: datum, DataName: dcs.Name, } - _, err := client.Insert(context.Background(), request) + _, err := client.Insert(clientCtx, request) return err } func (dcs *DataSourceClient) GetDataInfo() *pb.DataInfo { + clientCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() conn := dcs.ConnectionCache.Get(dcs.IdOfPeer) if conn == nil { log.Printf("Connection failure\n") @@ -94,7 +89,7 @@ func (dcs *DataSourceClient) GetDataInfo() *pb.DataInfo { request := &pb.GetDataRequest{ Name: dcs.Name, } - dataInfo, err := client.GetDataInfo(context.Background(), request) + dataInfo, err := client.GetDataInfo(clientCtx, request) if err != nil { log.Printf("GetDataInfo Error: %v\n", err.Error()) return nil diff --git a/node/nodeservice.go b/node/nodeservice.go index e483f63..47f8571 100644 --- a/node/nodeservice.go +++ b/node/nodeservice.go @@ -181,7 +181,7 @@ func (n *Node) SendJoinRequest(id string) error { if conn == nil { return errors.New("Connection failure") } - defer n.ConnectionCache.Close(conn) + defer n.ConnectionCache.Put(conn) // this connection should be closed time to time // It is observed that it can cause a split brain due to two nodes // sync to each other and never break connection @@ -220,15 +220,15 @@ func (n *Node) CreateDataIfNotExists(ctx context.Context, in *pb.DataConfig) (*p return aData.GetDataInfo(), nil } -func (n *Node) getClient(address string) (pb.VeriServiceClient, *grpc.ClientConn, error) { - conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(200)*time.Millisecond)) - if err != nil { - // log.Printf("fail to dial: %v\n", err) - return nil, nil, err - } - client := pb.NewVeriServiceClient(conn) - return client, conn, nil -} +// func (n *Node) getClient(address string) (pb.VeriServiceClient, *grpc.ClientConn, error) { +// conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(200)*time.Millisecond)) +// if err != nil { +// // log.Printf("fail to dial: %v\n", err) +// return nil, nil, err +// } +// client := pb.NewVeriServiceClient(conn) +// return client, conn, nil +// } func (n *Node) AddPeer(ctx context.Context, in *pb.AddPeerRequest) (*pb.AddPeerResponse, error) { return &pb.AddPeerResponse{}, n.AddPeerElement(in.GetPeer())