diff --git a/data/data.go b/data/data.go index 23ddd4e..4a885c0 100644 --- a/data/data.go +++ b/data/data.go @@ -85,7 +85,7 @@ func (dt *Data) InitData() error { dt.Lock() defer dt.Unlock() if dt.Initialized == false { - dt.Sources = cache.New(5*time.Minute, 1*time.Minute) + dt.Sources = cache.New(10*time.Minute, 1*time.Minute) dt.QueryCache = cache.New(5*time.Minute, 1*time.Minute) dt.Alive = true go dt.Run() @@ -158,13 +158,13 @@ func (dt *Data) DataSourceDiffMap() (map[string]uint64, uint64) { info := source.GetDataInfo() if info != nil { diff := minUint64(((localN-info.N)/2)+1, 1000) // diff may be negative - freq := float64(diff) / float64(localN+1) + fraction := float64(diff) / float64(localN+1) if info.N > localN { diff = 1 } if VectorDistance(localInfo.Avg, info.Avg)+VectorDistance(localInfo.Hist, info.Hist) <= 0.01*localInfo.GetMaxDistance() { // This is arbitary - diff = 1 // close enough - if freq < 0.01 { + diff = 1 // close enough + if fraction < 0.01 { // small or negative diff = 0 } } @@ -183,185 +183,6 @@ func CheckIfUnkownError(err error) bool { return true } -// // Process runs through keys and calculates statistics -// func (dt *Data) ProcessOLD(force bool) error { -// // log.Printf("Try Running Process (forced: %v) current: %v timestamp: %v diff: %v\n", force, getCurrentTime(), dt.Timestamp, getCurrentTime()-dt.Timestamp) -// if getCurrentTime()-dt.Timestamp >= 60 || force { -// localInfo := dt.GetDataInfo() -// localN := localInfo.N -// config := dt.GetConfig() -// diffMap, limit := dt.DataSourceDiffMap() -// datumStream := make(chan *pb.InsertDatumWithConfig, limit) -// defer close(datumStream) -// insertionCounter := uint64(0) -// fraction := float64(0) -// if localN > 0 { -// fraction = float64(limit) / float64(localN) -// // log.Printf("Diff larger than 0: %v\n", diff) -// countMap := make(map[string]uint64, len(diffMap)) -// go func() { -// deleted := uint64(0) -// counter := 0 -// for datum := range datumStream { -// for id, count := range diffMap { -// if countMap[id] < count { -// if sourceItem, ok := dt.Sources.Get(id); ok { -// if source, ok2 := sourceItem.(DataSource); ok2 { -// err := source.Insert(datum.Datum, datum.Config) -// if err != nil && CheckIfUnkownError(err) { -// log.Printf("Sending Insert error %v\n", err.Error()) -// } -// if err == nil { -// counter++ -// } -// if err == nil && (!dt.Alive || isEvictionOn(localInfo, config, deleted)) { -// countMap[id]++ -// dt.Delete(datum.Datum) -// deleted++ -// // log.Printf("Datum deleted count: %v\n", deleted) -// } -// } -// } -// } -// } -// // No need to sleep time.Sleep(200 * time.Millisecond) -// } -// }() -// // dt.InsertStreamSample(datumStream, float64(diff)/float64(localN)) -// // log.Printf("Close stream\n") -// } -// // log.Printf("Running Process (forced: %v)\n", force) -// n := uint64(0) -// distance := 0.0 -// maxDistance := 0.0 -// avg := make([]float32, 0) -// hist := make([]float32, 64) -// nFloat := float32(dt.N) -// if nFloat == 0 { -// // log.Printf("Data size was 0\n") -// nFloat = 1 -// } -// histUnit := 1 / nFloat -// newDataIndex := make([]*pb.Datum, max(1000, int(dt.N))) -// var newAnnoyIndex annoyindex.AnnoyIndexAngular -// var newTempFileName string -// err := dt.DB.View(func(txn *badger.Txn) error { -// opts := badger.DefaultIteratorOptions -// opts.PrefetchValues = true -// it := txn.NewIterator(opts) -// defer it.Close() -// for it.Rewind(); it.Valid(); it.Next() { -// item := it.Item() -// k := item.Key() -// datumKey, err := ToDatumKey(k) -// if err == nil { -// n++ -// avg = CalculateAverage(avg, datumKey.Feature, nFloat) -// distance = VectorDistance(dt.Avg, datumKey.Feature) - -// if distance > maxDistance { -// maxDistance = distance -// } -// if dt.MaxDistance != 0 { -// index := int((distance / dt.MaxDistance) * 64) -// if index >= 64 { -// index = 63 -// } -// if index <= 0 { -// index = 0 // this is probably related to a bug -// } -// hist[index] += histUnit -// } -// } -// err = item.Value(func(v []byte) error { -// datum, errV := ToDatum(k, v) -// if errV == nil { -// // features32 := make([]float32, len(datum.Key.Feature)) -// // for i, f := range datum.Key.Feature { -// // features32[i] = float32(f) -// // } -// i := int(n - 1) -// if dt.Alive && i < len(newDataIndex) { -// if newAnnoyIndex == nil { -// // newAnnoyIndex = annoyindex.NewAnnoyIndexEuclidean(len(datum.Key.Feature)) -// newAnnoyIndex = annoyindex.NewAnnoyIndexAngular(len(datum.Key.Feature)) -// tmpfile, err := ioutil.TempFile("", "annoy") -// if err == nil { -// newTempFileName = tmpfile.Name() -// newAnnoyIndex.OnDiskBuild(newTempFileName) -// } - -// } -// newAnnoyIndex.AddItem(i, datum.Key.Feature) -// newDataIndex[i] = datum -// } -// if !dt.Alive || (insertionCounter < limit && rand.Float64() < fraction) { -// config := InsertConfigFromExpireAt(item.ExpiresAt()) -// if config.TTL > 10 { -// datumStream <- &pb.InsertDatumWithConfig{ -// Datum: datum, -// Config: config, -// } -// insertionCounter++ -// } -// } -// // newDataIndex = append(newDataIndex, datum) - -// } -// return nil -// }) -// if err != nil { -// log.Printf("err: %v\n", err) -// } -// } -// return nil -// }) -// if err != nil { -// return err -// } -// dt.Avg = avg -// dt.Hist = hist -// dt.MaxDistance = maxDistance -// dt.N = n -// dt.Timestamp = getCurrentTime() -// if newAnnoyIndex != nil { -// start := time.Now() -// newAnnoyIndex.Build(-1) // Previosly 10, -1 creates index dynamically -// elapsed := time.Since(start) -// log.Printf("Building annoy index took %s", elapsed) -// // log.Printf("Updating index. len: %v\n", len(newDataIndex)) -// dt.Annoyer.Lock() -// if dt.Annoyer.DataIndex != nil { -// dt.Annoyer.AnnoyIndex.Unload() // Not sure if this is needed -// annoyindex.DeleteAnnoyIndexAngular(dt.Annoyer.AnnoyIndex) -// } -// oldFile := dt.Annoyer.BuildFileName -// dt.Annoyer.BuildFileName = newTempFileName -// dt.Annoyer.AnnoyIndex = newAnnoyIndex -// dt.Annoyer.DataIndex = &newDataIndex -// dt.Annoyer.Unlock() -// if len(oldFile) > 0 { -// os.Remove(oldFile) -// } -// // log.Printf("Updated index\n") -// } -// // if dt.ActiveIndex == 0 { -// // dt.AnnoyIndexB = newAnnoyIndex -// // dt.IndexB = &newDataIndex -// // dt.ActiveIndex = 1 -// // } else { -// // dt.AnnoyIndexA = newAnnoyIndex -// // dt.IndexA = &newDataIndex -// // dt.ActiveIndex = 0 -// // } - -// // dt.SyncAll() -// } -// // dt.Timestamp = getCurrentTime() // update always -// dt.Dirty = false -// return nil -// } - // GetDataInfo out of data func (dt *Data) GetDataInfo() *pb.DataInfo { // log.Printf("Data: %v\n", dt) @@ -399,7 +220,6 @@ func (dt *Data) GetID() string { func (dt *Data) RunOnRandomSources(sourceLimit int, sourceFunction func(dataSource DataSource) error) error { sourceList := dt.Sources.Items() - // sourceLimit := 5 // This should be configurable for _, sourceItem := range sourceList { // Assumption is that random map runs are random enough if sourceLimit < 0 { break diff --git a/data/search.go b/data/search.go index 7582e80..0fe5bc3 100644 --- a/data/search.go +++ b/data/search.go @@ -193,7 +193,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<- // external dt.RunOnRandomSources(5, func(source DataSource) error { queryWaitGroup.Add(1) - log.Printf("Query Datasource: %v\n", source.GetID()) + // log.Printf("Query Datasource: %v\n", source.GetID()) go source.StreamSearch(datum, scoredDatumStream, &queryWaitGroup, config) return nil }) @@ -209,7 +209,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<- case scoredDatum := <-scoredDatumStream: temp.Insert(scoredDatum) case <-waitChannel: - log.Printf("AggregatedSearch: all data finished") + // log.Printf("AggregatedSearch: all data finished") close(scoredDatumStream) for scoredDatum := range scoredDatumStream { temp.Insert(scoredDatum) diff --git a/util/connpool.go b/util/connpool.go index d7c8bd5..33c3bc2 100644 --- a/util/connpool.go +++ b/util/connpool.go @@ -124,8 +124,8 @@ func (cp *ConnectionPool) Get() *Connection { c := cp.GetWithRetry(0) if c != nil { atomic.AddInt64(&c.Counter, 1) - if atomic.LoadInt64(&c.Counter) < 10 { - cp.Put(c) // if there are less than 10 concurrent users put it back + if atomic.LoadInt64(&c.Counter) <= 20 { + cp.Put(c) // if there are less than 20 concurrent users put it back } } return c