Skip to content

Commit

Permalink
Merge pull request #21 from bgokden/refactor
Browse files Browse the repository at this point in the history
Distributed query fixed, query id cache added
  • Loading branch information
bgokden authored Dec 27, 2020
2 parents 5667703 + 702d7ba commit e39b08f
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 159 deletions.
3 changes: 3 additions & 0 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (dt *Data) GetDataInfo() *pb.DataInfo {

// AddSource adds a source
func (dt *Data) AddSource(dataSource DataSource) {
if dt.Sources == nil {
dt.InitData()
}
dt.Sources.Set(dataSource.GetID(), dataSource, cache.DefaultExpiration)
}

Expand Down
2 changes: 1 addition & 1 deletion data/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (dts *Dataset) CreateIfNotExists(config *pb.DataConfig) error {
go dts.SaveIndex()
return preData.InitData()
}
if err.Error() == fmt.Sprintf("Item %s doesn't exist", config.Name) {
if err.Error() == fmt.Sprintf("Item %s already exists", config.Name) {
return nil
}
return err
Expand Down
1 change: 1 addition & 0 deletions data/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<-
duration := time.Duration(config.Timeout) * time.Millisecond
timeLimit := time.After(duration)
queryKey := GetSearchKey(datum, config)
config.Timeout = uint64(float64(config.Timeout) * 0.9) // Decrase timeout for downstream
if dt.QueryCache == nil {
dt.InitData()
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/dgraph-io/badger v1.6.2
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/golang/protobuf v1.4.1
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.7.2
github.com/jinzhu/copier v0.0.0-20201025035756-632e723a6687
github.com/magneticio/go-common v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
Expand Down
2 changes: 2 additions & 0 deletions node/datasourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (dcs *DataSourceClient) GetVeriServiceClient() (pb.VeriServiceClient, *grpc
}

func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream chan<- *pb.ScoredDatum, queryWaitGroup *sync.WaitGroup, config *pb.SearchConfig) error {
defer queryWaitGroup.Done()
client, _, err := dcs.GetVeriServiceClient()
if err != nil {
return err
Expand All @@ -53,6 +54,7 @@ func (dcs *DataSourceClient) StreamSearch(datum *pb.Datum, scoredDatumStream cha
log.Printf("Error: (%v)", err)
break
}
log.Printf("Received Score: (%v)", protoScoredDatum.Score)
scoredDatumStream <- protoScoredDatum
}
return err
Expand Down
10 changes: 8 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Node struct {
PeerList *cache.Cache
PeriodicTicker *time.Ticker
PeriodicDone chan bool
QueryUUIDCache *cache.Cache
}

func NewNode(config *NodeConfig) *Node {
Expand All @@ -50,6 +51,7 @@ func NewNode(config *NodeConfig) *Node {
node.Dataset = data.NewDataset(node.Folder)
node.PeerList = cache.New(5*time.Minute, 10*time.Minute)
node.ServiceList = cache.New(5*time.Minute, 10*time.Minute)
node.QueryUUIDCache = cache.New(5*time.Minute, 10*time.Minute)
for _, service := range config.ServiceList {
node.AddStaticService(service)
}
Expand Down Expand Up @@ -144,8 +146,12 @@ func (n *Node) SyncWithPeers() {
n.AddPeer(peerFromPeer)
}
for _, dataConfigFromPeer := range peer.DataList {
data, _ := n.Dataset.GetOrCreateIfNotExists(dataConfigFromPeer)
data.AddSource(GetDataSourceClient(peer, dataConfigFromPeer.Name))
data, err := n.Dataset.GetOrCreateIfNotExists(dataConfigFromPeer)
if err == nil {
data.AddSource(GetDataSourceClient(peer, dataConfigFromPeer.Name))
} else {
log.Printf("Error data creation: %v\n", err)
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions node/nodeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net"
"strings"

"github.com/bgokden/go-cache"
pb "github.com/bgokden/veri/veriservice"
"github.com/google/uuid"
"google.golang.org/grpc"
grpcPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -98,6 +100,19 @@ func (n *Node) GetDataInfo(ctx context.Context, getDataRequest *pb.GetDataReques

func (n *Node) SearchStream(searchRequest *pb.SearchRequest, stream pb.VeriService_SearchStreamServer) error {
config := searchRequest.GetConfig()
uid := config.GetUuid()
if uid == "" {
uid, err := uuid.NewRandom()
if err != nil {
return err
}
config.Uuid = uid.String()
} else {
err := n.QueryUUIDCache.Add(uid, true, cache.DefaultExpiration)
if err != nil {
return err
}
}
aData, err := n.Dataset.Get(config.GetDataName())
if err != nil {
return err
Expand Down Expand Up @@ -136,6 +151,7 @@ func (n *Node) Listen() error {
}

func (n *Node) SendJoinRequest(id string) error {
log.Printf("(Call Join 0) Send Join reques to %v", id)
peerInfo := n.GetNodeInfo()
request := &pb.JoinRequest{
Peer: peerInfo,
Expand Down
Loading

0 comments on commit e39b08f

Please sign in to comment.