Skip to content

Commit

Permalink
Removed logs and increased connection access to 20 users
Browse files Browse the repository at this point in the history
  • Loading branch information
bgokden committed Aug 4, 2021
1 parent 07eb307 commit 57ad043
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 188 deletions.
188 changes: 4 additions & 184 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (dt *Data) InitData() error {
dt.Lock()
defer dt.Unlock()
if dt.Initialized == false {
dt.Sources = cache.New(5*time.Minute, 1*time.Minute)
dt.Sources = cache.New(10*time.Minute, 1*time.Minute)
dt.QueryCache = cache.New(5*time.Minute, 1*time.Minute)
dt.Alive = true
go dt.Run()
Expand Down Expand Up @@ -158,13 +158,13 @@ func (dt *Data) DataSourceDiffMap() (map[string]uint64, uint64) {
info := source.GetDataInfo()
if info != nil {
diff := minUint64(((localN-info.N)/2)+1, 1000) // diff may be negative
freq := float64(diff) / float64(localN+1)
fraction := float64(diff) / float64(localN+1)
if info.N > localN {
diff = 1
}
if VectorDistance(localInfo.Avg, info.Avg)+VectorDistance(localInfo.Hist, info.Hist) <= 0.01*localInfo.GetMaxDistance() { // This is arbitary
diff = 1 // close enough
if freq < 0.01 {
diff = 1 // close enough
if fraction < 0.01 { // small or negative
diff = 0
}
}
Expand All @@ -183,185 +183,6 @@ func CheckIfUnkownError(err error) bool {
return true
}

// // Process runs through keys and calculates statistics
// func (dt *Data) ProcessOLD(force bool) error {
// // log.Printf("Try Running Process (forced: %v) current: %v timestamp: %v diff: %v\n", force, getCurrentTime(), dt.Timestamp, getCurrentTime()-dt.Timestamp)
// if getCurrentTime()-dt.Timestamp >= 60 || force {
// localInfo := dt.GetDataInfo()
// localN := localInfo.N
// config := dt.GetConfig()
// diffMap, limit := dt.DataSourceDiffMap()
// datumStream := make(chan *pb.InsertDatumWithConfig, limit)
// defer close(datumStream)
// insertionCounter := uint64(0)
// fraction := float64(0)
// if localN > 0 {
// fraction = float64(limit) / float64(localN)
// // log.Printf("Diff larger than 0: %v\n", diff)
// countMap := make(map[string]uint64, len(diffMap))
// go func() {
// deleted := uint64(0)
// counter := 0
// for datum := range datumStream {
// for id, count := range diffMap {
// if countMap[id] < count {
// if sourceItem, ok := dt.Sources.Get(id); ok {
// if source, ok2 := sourceItem.(DataSource); ok2 {
// err := source.Insert(datum.Datum, datum.Config)
// if err != nil && CheckIfUnkownError(err) {
// log.Printf("Sending Insert error %v\n", err.Error())
// }
// if err == nil {
// counter++
// }
// if err == nil && (!dt.Alive || isEvictionOn(localInfo, config, deleted)) {
// countMap[id]++
// dt.Delete(datum.Datum)
// deleted++
// // log.Printf("Datum deleted count: %v\n", deleted)
// }
// }
// }
// }
// }
// // No need to sleep time.Sleep(200 * time.Millisecond)
// }
// }()
// // dt.InsertStreamSample(datumStream, float64(diff)/float64(localN))
// // log.Printf("Close stream\n")
// }
// // log.Printf("Running Process (forced: %v)\n", force)
// n := uint64(0)
// distance := 0.0
// maxDistance := 0.0
// avg := make([]float32, 0)
// hist := make([]float32, 64)
// nFloat := float32(dt.N)
// if nFloat == 0 {
// // log.Printf("Data size was 0\n")
// nFloat = 1
// }
// histUnit := 1 / nFloat
// newDataIndex := make([]*pb.Datum, max(1000, int(dt.N)))
// var newAnnoyIndex annoyindex.AnnoyIndexAngular
// var newTempFileName string
// err := dt.DB.View(func(txn *badger.Txn) error {
// opts := badger.DefaultIteratorOptions
// opts.PrefetchValues = true
// it := txn.NewIterator(opts)
// defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
// item := it.Item()
// k := item.Key()
// datumKey, err := ToDatumKey(k)
// if err == nil {
// n++
// avg = CalculateAverage(avg, datumKey.Feature, nFloat)
// distance = VectorDistance(dt.Avg, datumKey.Feature)

// if distance > maxDistance {
// maxDistance = distance
// }
// if dt.MaxDistance != 0 {
// index := int((distance / dt.MaxDistance) * 64)
// if index >= 64 {
// index = 63
// }
// if index <= 0 {
// index = 0 // this is probably related to a bug
// }
// hist[index] += histUnit
// }
// }
// err = item.Value(func(v []byte) error {
// datum, errV := ToDatum(k, v)
// if errV == nil {
// // features32 := make([]float32, len(datum.Key.Feature))
// // for i, f := range datum.Key.Feature {
// // features32[i] = float32(f)
// // }
// i := int(n - 1)
// if dt.Alive && i < len(newDataIndex) {
// if newAnnoyIndex == nil {
// // newAnnoyIndex = annoyindex.NewAnnoyIndexEuclidean(len(datum.Key.Feature))
// newAnnoyIndex = annoyindex.NewAnnoyIndexAngular(len(datum.Key.Feature))
// tmpfile, err := ioutil.TempFile("", "annoy")
// if err == nil {
// newTempFileName = tmpfile.Name()
// newAnnoyIndex.OnDiskBuild(newTempFileName)
// }

// }
// newAnnoyIndex.AddItem(i, datum.Key.Feature)
// newDataIndex[i] = datum
// }
// if !dt.Alive || (insertionCounter < limit && rand.Float64() < fraction) {
// config := InsertConfigFromExpireAt(item.ExpiresAt())
// if config.TTL > 10 {
// datumStream <- &pb.InsertDatumWithConfig{
// Datum: datum,
// Config: config,
// }
// insertionCounter++
// }
// }
// // newDataIndex = append(newDataIndex, datum)

// }
// return nil
// })
// if err != nil {
// log.Printf("err: %v\n", err)
// }
// }
// return nil
// })
// if err != nil {
// return err
// }
// dt.Avg = avg
// dt.Hist = hist
// dt.MaxDistance = maxDistance
// dt.N = n
// dt.Timestamp = getCurrentTime()
// if newAnnoyIndex != nil {
// start := time.Now()
// newAnnoyIndex.Build(-1) // Previosly 10, -1 creates index dynamically
// elapsed := time.Since(start)
// log.Printf("Building annoy index took %s", elapsed)
// // log.Printf("Updating index. len: %v\n", len(newDataIndex))
// dt.Annoyer.Lock()
// if dt.Annoyer.DataIndex != nil {
// dt.Annoyer.AnnoyIndex.Unload() // Not sure if this is needed
// annoyindex.DeleteAnnoyIndexAngular(dt.Annoyer.AnnoyIndex)
// }
// oldFile := dt.Annoyer.BuildFileName
// dt.Annoyer.BuildFileName = newTempFileName
// dt.Annoyer.AnnoyIndex = newAnnoyIndex
// dt.Annoyer.DataIndex = &newDataIndex
// dt.Annoyer.Unlock()
// if len(oldFile) > 0 {
// os.Remove(oldFile)
// }
// // log.Printf("Updated index\n")
// }
// // if dt.ActiveIndex == 0 {
// // dt.AnnoyIndexB = newAnnoyIndex
// // dt.IndexB = &newDataIndex
// // dt.ActiveIndex = 1
// // } else {
// // dt.AnnoyIndexA = newAnnoyIndex
// // dt.IndexA = &newDataIndex
// // dt.ActiveIndex = 0
// // }

// // dt.SyncAll()
// }
// // dt.Timestamp = getCurrentTime() // update always
// dt.Dirty = false
// return nil
// }

// GetDataInfo out of data
func (dt *Data) GetDataInfo() *pb.DataInfo {
// log.Printf("Data: %v\n", dt)
Expand Down Expand Up @@ -399,7 +220,6 @@ func (dt *Data) GetID() string {

func (dt *Data) RunOnRandomSources(sourceLimit int, sourceFunction func(dataSource DataSource) error) error {
sourceList := dt.Sources.Items()
// sourceLimit := 5 // This should be configurable
for _, sourceItem := range sourceList { // Assumption is that random map runs are random enough
if sourceLimit < 0 {
break
Expand Down
4 changes: 2 additions & 2 deletions data/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<-
// external
dt.RunOnRandomSources(5, func(source DataSource) error {
queryWaitGroup.Add(1)
log.Printf("Query Datasource: %v\n", source.GetID())
// log.Printf("Query Datasource: %v\n", source.GetID())
go source.StreamSearch(datum, scoredDatumStream, &queryWaitGroup, config)
return nil
})
Expand All @@ -209,7 +209,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<-
case scoredDatum := <-scoredDatumStream:
temp.Insert(scoredDatum)
case <-waitChannel:
log.Printf("AggregatedSearch: all data finished")
// log.Printf("AggregatedSearch: all data finished")
close(scoredDatumStream)
for scoredDatum := range scoredDatumStream {
temp.Insert(scoredDatum)
Expand Down
4 changes: 2 additions & 2 deletions util/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (cp *ConnectionPool) Get() *Connection {
c := cp.GetWithRetry(0)
if c != nil {
atomic.AddInt64(&c.Counter, 1)
if atomic.LoadInt64(&c.Counter) < 10 {
cp.Put(c) // if there are less than 10 concurrent users put it back
if atomic.LoadInt64(&c.Counter) <= 20 {
cp.Put(c) // if there are less than 20 concurrent users put it back
}
}
return c
Expand Down

0 comments on commit 57ad043

Please sign in to comment.