Skip to content

Commit

Permalink
topk algorithm (#28)
Browse files Browse the repository at this point in the history
topk algorithm

Co-authored-by: chenzhihui <zhihui_chen@foxmail.com>
  • Loading branch information
lintanghui and tonybase committed Oct 10, 2022
1 parent 37a53fd commit 1f87372
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ go 1.15

require (
github.com/shirou/gopsutil/v3 v3.21.8
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shirou/gopsutil/v3 v3.21.8 h1:nKct+uP0TV8DjjNiHanKf8SAuub+GNsbrOtM9Nl9biA=
github.com/shirou/gopsutil/v3 v3.21.8/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
90 changes: 90 additions & 0 deletions pkg/minheap/minheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package minheap

import (
"container/heap"
"sort"
)

type Heap struct {
Nodes Nodes
K uint32
}

func NewHeap(k uint32) *Heap {
h := Nodes{}
heap.Init(&h)
return &Heap{Nodes: h, K: k}
}

func (h *Heap) Add(val *Node) *Node {
if h.K > uint32(len(h.Nodes)) {
heap.Push(&h.Nodes, val)
} else if val.Count > h.Nodes[0].Count {
expelled := heap.Pop(&h.Nodes)
heap.Push(&h.Nodes, val)
node := expelled.(*Node)
return node
}
return nil
}

func (h *Heap) Pop() *Node {
expelled := heap.Pop(&h.Nodes)
return expelled.(*Node)
}

func (h *Heap) Fix(idx int, count uint32) {
h.Nodes[idx].Count = count
heap.Fix(&h.Nodes, idx)
}

func (h *Heap) Min() uint32 {
if len(h.Nodes) == 0 {
return 0
}
return h.Nodes[0].Count
}

func (h *Heap) Find(key string) (int, bool) {
for i := range h.Nodes {
if h.Nodes[i].Key == key {
return i, true
}
}
return 0, false
}

func (h *Heap) Sorted() Nodes {
nodes := append([]*Node(nil), h.Nodes...)
sort.Sort(sort.Reverse(Nodes(nodes)))
return nodes
}

type Nodes []*Node

type Node struct {
Key string
Count uint32
}

func (n Nodes) Len() int {
return len(n)
}

func (n Nodes) Less(i, j int) bool {
return (n[i].Count < n[j].Count) || (n[i].Count == n[j].Count && n[i].Key > n[j].Key)
}

func (n Nodes) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}

func (n *Nodes) Push(val interface{}) {
*n = append(*n, val.(*Node))
}

func (n *Nodes) Pop() interface{} {
var val *Node
val, *n = (*n)[len((*n))-1], (*n)[:len((*n))-1]
return val
}
160 changes: 160 additions & 0 deletions topk/heavykeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package topk

// Native Go implement of topk heavykeeper algorithm, Based on paper
// HeavyKeeper: An Accurate Algorithm for Finding Top-k Elephant Flow (https://www.usenix.org/system/files/conference/atc18/atc18-gong.pdf)

import (
"math"
"math/rand"

"github.com/go-kratos/aegis/pkg/minheap"

"github.com/spaolacci/murmur3"
)

const LOOKUP_TABLE = 256

// Topk implement by heavykeeper algorithm.
type HeavyKeeper struct {
k uint32
width uint32
depth uint32
decay float64
lookupTable []float64

r *rand.Rand
buckets [][]bucket
minHeap *minheap.Heap
expelled chan Item
}

func NewHeavyKeeper(k, width, depth uint32, decay float64) Topk {
arrays := make([][]bucket, depth)
for i := range arrays {
arrays[i] = make([]bucket, width)
}

topk := &HeavyKeeper{
k: k,
width: width,
depth: depth,
decay: decay,
lookupTable: make([]float64, LOOKUP_TABLE),
buckets: arrays,
r: rand.New(rand.NewSource(0)),
minHeap: minheap.NewHeap(k),
expelled: make(chan Item, 32),
}
for i := 0; i < LOOKUP_TABLE; i++ {
topk.lookupTable[i] = math.Pow(decay, float64(i))
}
return topk
}

func (topk *HeavyKeeper) Expelled() <-chan Item {
return topk.expelled
}

func (topk *HeavyKeeper) List() []Item {
items := topk.minHeap.Sorted()
res := make([]Item, 0, len(items))
for _, item := range items {
res = append(res, Item{Key: item.Key, Count: item.Count})
}
return res
}

// Add add item into heavykeeper and return if item had beend add into minheap.
// if item had been add into minheap and some item was expelled, return the expelled item.
func (topk *HeavyKeeper) Add(key string, incr uint32) bool {
keyBytes := []byte(key)
itemFingerprint := murmur3.Sum32(keyBytes)
var maxCount uint32

// compute d hashes
for i, row := range topk.buckets {

bucketNumber := murmur3.Sum32WithSeed(keyBytes, uint32(i)) % uint32(topk.width)
fingerprint := row[bucketNumber].fingerprint
count := row[bucketNumber].count

if count == 0 {
row[bucketNumber].fingerprint = itemFingerprint
row[bucketNumber].count = incr
maxCount = max(maxCount, incr)

} else if fingerprint == itemFingerprint {
row[bucketNumber].count += incr
maxCount = max(maxCount, row[bucketNumber].count)

} else {
for localIncr := incr; localIncr > 0; localIncr-- {
var decay float64
curCount := row[bucketNumber].count
if row[bucketNumber].count < LOOKUP_TABLE {
decay = topk.lookupTable[curCount]
} else {
// decr pow caculate cost
decay = topk.lookupTable[LOOKUP_TABLE-1]
}
if topk.r.Float64() < decay {
row[bucketNumber].count--
if row[bucketNumber].count == 0 {
row[bucketNumber].fingerprint = itemFingerprint
row[bucketNumber].count = localIncr
maxCount = max(maxCount, localIncr)
break
}
}
}
}
}
minHeap := topk.minHeap.Min()
if len(topk.minHeap.Nodes) == int(topk.k) && maxCount < minHeap {
return false
}
// update minheap
itemHeapIdx, itemHeapExist := topk.minHeap.Find(key)
if itemHeapExist {
topk.minHeap.Fix(itemHeapIdx, maxCount)
return true
}
expelled := topk.minHeap.Add(&minheap.Node{Key: key, Count: maxCount})
if expelled != nil {
topk.expell(Item{Key: expelled.Key, Count: expelled.Count})
}
return true
}

func (topk *HeavyKeeper) expell(item Item) {
select {
case topk.expelled <- item:
default:
}
}

type bucket struct {
fingerprint uint32
count uint32
}

func (b *bucket) Get() (uint32, uint32) {
return b.fingerprint, b.count
}

func (b *bucket) Set(fingerprint, count uint32) {
b.fingerprint = fingerprint
b.count = count
}

func (b *bucket) Inc(val uint32) uint32 {
b.count += val
return b.count
}

func max(x, y uint32) uint32 {
if x > y {
return x
}
return y
}
46 changes: 46 additions & 0 deletions topk/heavykeeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package topk

import (
"math"
"math/rand"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTopkList(t *testing.T) {
// zipfan distribution
zipf := rand.NewZipf(rand.New(rand.NewSource(time.Now().Unix())), 3, 2, 1000)
topk := NewHeavyKeeper(10, 10000, 5, 0.925)
dataMap := make(map[string]int)
for i := 0; i < 10000; i++ {
key := strconv.FormatUint(zipf.Uint64(), 10)
dataMap[key] = dataMap[key] + 1
topk.Add(key, 1)
}
var rate float64
for _, node := range topk.List() {
rate += math.Abs(float64(node.Count)-float64(dataMap[node.Key])) / float64(dataMap[node.Key])
t.Logf("item %s, count %d, expect %d", node.Key, node.Count, dataMap[node.Key])
}
t.Logf("err rate avg:%f", rate)
for i, node := range topk.List() {
assert.Equal(t, strconv.FormatInt(int64(i), 10), node.Key)
t.Logf("%s: %d", node.Key, node.Count)
}
}

func BenchmarkAdd(b *testing.B) {
zipf := rand.NewZipf(rand.New(rand.NewSource(time.Now().Unix())), 2, 2, 1000)
var data []string = make([]string, 1000)
for i := 0; i < 1000; i++ {
data[i] = strconv.FormatUint(zipf.Uint64(), 10)
}
topk := NewHeavyKeeper(10, 1000, 5, 0.9)
b.ResetTimer()
for i := 0; i < b.N; i++ {
topk.Add(data[i%1000], 1)
}
}
17 changes: 17 additions & 0 deletions topk/topk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package topk

// Item is topk item.
type Item struct {
Key string
Count uint32
}

// Topk algorithm interface.
type Topk interface {
// Add item and return if item is in the topk.
Add(item string, incr uint32) bool
// List all topk items.
List() []Item
// Expelled watch at the expelled items.
Expelled() <-chan Item
}

0 comments on commit 1f87372

Please sign in to comment.