Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HyperLogLog column cardinality estimation for in-memory tables #154

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added lib/stream-2.5.0-SNAPSHOT.jar
Binary file not shown.
2 changes: 2 additions & 0 deletions src/main/scala/shark/execution/MemoryStoreSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class MemoryStoreSinkOperator extends TerminalOperator {
}

if (builder != null) {
// Generate cardinality estimate now that all rows for current part are serialized
builder.asInstanceOf[TablePartitionBuilder].stats.estimateCardinality()
statsAcc += Tuple2(partitionIndex, builder.asInstanceOf[TablePartitionBuilder].stats)
Iterator(builder.asInstanceOf[TablePartitionBuilder].build)
} else {
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/shark/memstore2/TablePartitionStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ class TablePartitionStats(val stats: Array[ColumnStats[_]], val numRows: Long)
" column " + index + " " +
{ if (column != null) column.toString else "no column statistics" }
}.mkString("\n")

def estimateCardinality() {
stats.foreach(_.estimateCardinality())
}
}
34 changes: 30 additions & 4 deletions src/main/scala/shark/memstore2/column/ColumnStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.ObjectInput
import java.io.ObjectOutput
import java.io.Externalizable
import java.sql.Timestamp
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.hadoop.io.Text


Expand All @@ -35,18 +36,29 @@ sealed trait ColumnStats[@specialized(Boolean, Byte, Short, Int, Long, Float, Do
protected def _min: T
protected def _max: T


def min: T = _min
def max: T = _max

override def toString = "[" + min + ", " + max + "]"

def :><(l: Any, r: Any): Boolean = (this :>= l) && (this :<= r)
def :<=(v: Any): Boolean = (this := v) || (this :< v)
def :>=(v: Any): Boolean = (this := v) || (this :> v)
def :=(v: Any): Boolean
def :>(v: Any): Boolean
def :<(v: Any): Boolean


// Use Streamlib's HyperLogLog and 32-bit Murmurhash implemenatations for
// cardinality estimation, with 16 bits as the basis for this HLL instance.
@transient protected var _hyperLogLog: HyperLogLog = new HyperLogLog(16)
protected var _numDistinct: Long = 0

def numDistinct: Long = _numDistinct

def estimateCardinality() {
_numDistinct = _hyperLogLog.cardinality()
}
}


Expand All @@ -58,7 +70,9 @@ object ColumnStats {
class NoOpStats[T] extends ColumnStats[T] {
protected var _max = null.asInstanceOf[T]
protected var _min = null.asInstanceOf[T]
override def append(v: T) {}
override def append(v: T) {
_hyperLogLog.offer(v)
}
override def :=(v: Any): Boolean = true
override def :>(v: Any): Boolean = true
override def :<(v: Any): Boolean = true
Expand All @@ -68,9 +82,11 @@ object ColumnStats {
protected var _max = false
protected var _min = true
override def append(v: Boolean) {
_hyperLogLog.offer(v)
if (v) _max = v
else _min = v
}

def :=(v: Any): Boolean = {
v match {
case u:Boolean => _min <= u && _max >= u
Expand Down Expand Up @@ -98,6 +114,7 @@ object ColumnStats {
protected var _max = Byte.MinValue
protected var _min = Byte.MaxValue
override def append(v: Byte) {
_hyperLogLog.offer(v)
if (v > _max) _max = v
if (v < _min) _min = v
}
Expand Down Expand Up @@ -128,9 +145,11 @@ object ColumnStats {
protected var _max = Short.MinValue
protected var _min = Short.MaxValue
override def append(v: Short) {
_hyperLogLog.offer(v)
if (v > _max) _max = v
if (v < _min) _min = v
}

def :=(v: Any): Boolean = {
v match {
case u:Short => _min <= u && _max >= u
Expand Down Expand Up @@ -197,6 +216,7 @@ object ColumnStats {
}

override def append(v: Int) {
_hyperLogLog.offer(v)
if (v > _max) _max = v
if (v < _min) _min = v

Expand Down Expand Up @@ -229,9 +249,11 @@ object ColumnStats {
protected var _max = Long.MinValue
protected var _min = Long.MaxValue
override def append(v: Long) {
_hyperLogLog.offer(v)
if (v > _max) _max = v
if (v < _min) _min = v
}

def :=(v: Any): Boolean = {
v match {
case u:Long => _min <= u && _max >= u
Expand All @@ -258,6 +280,7 @@ object ColumnStats {
protected var _max = Float.MinValue
protected var _min = Float.MaxValue
override def append(v: Float) {
_hyperLogLog.offer(v)
if (v > _max) _max = v
if (v < _min) _min = v
}
Expand Down Expand Up @@ -287,6 +310,7 @@ object ColumnStats {
protected var _max = Double.MinValue
protected var _min = Double.MaxValue
override def append(v: Double) {
_hyperLogLog.offer(v)
if (v > _max) _max = v
if (v < _min) _min = v
}
Expand Down Expand Up @@ -316,6 +340,7 @@ object ColumnStats {
protected var _max = new Timestamp(0)
protected var _min = new Timestamp(Long.MaxValue)
override def append(v: Timestamp) {
_hyperLogLog.offer(v)
if (v.compareTo(_max) > 0) _max = v
if (v.compareTo(_min) < 0) _min = v
}
Expand Down Expand Up @@ -345,7 +370,7 @@ object ColumnStats {
// Note: this is not Java serializable because Text is not Java serializable.
protected var _max: Text = null
protected var _min: Text = null

def :=(v: Any): Boolean = {
v match {
case u: Text => _min.compareTo(u) <= 0 && _max.compareTo(u) >= 0
Expand All @@ -371,6 +396,7 @@ object ColumnStats {
}

override def append(v: Text) {
_hyperLogLog.offer(v)
// Need to make a copy of Text since Text is not immutable and we reuse
// the same Text object in serializer to mitigate frequent GC.
if (_max == null) {
Expand Down
46 changes: 45 additions & 1 deletion src/test/scala/shark/memstore2/ColumnStatsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c :>= false)
assert(!(c :<= false))
assert(c :>= true)

c.append(true)
c.estimateCardinality()
assert(c.numDistinct == 1)
}

test("ByteColumnStats") {
Expand All @@ -69,6 +73,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c :> 0.toByte)
assert(c :<= -1.toByte)
assert(!(c :<= -3.toByte))

c.append(0)
c.estimateCardinality()
assert(c.numDistinct == 5)
}

test("ShortColumnStats") {
Expand All @@ -83,6 +91,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c.min == -1 && c.max == 1024)
c.append(-1024)
assert(c.min == -1024 && c.max == 1024)

c.append(0)
c.estimateCardinality()
assert(c.numDistinct == 5)
}

test("IntColumnStats") {
Expand Down Expand Up @@ -120,6 +132,10 @@ class ColumnStatsSuite extends FunSuite {
c = new ColumnStats.IntColumnStats
Array(22, 1, 24).foreach(c.append)
assert(!c.isOrdered && !c.isAscending && !c.isDescending)

c.append(22)
c.estimateCardinality()
assert(c.numDistinct == 3)
}

test("LongColumnStats") {
Expand All @@ -137,6 +153,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c := 0.toLong)
assert(c :> -2.toLong)
assert(c :< 0.toLong)

c.append(0)
c.estimateCardinality()
assert(c.numDistinct == 5)
}

test("FloatColumnStats") {
Expand All @@ -154,6 +174,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c := 20.5F)
assert(c :< 20.6F)
assert(c :> -20.6F)

c.append(0)
c.estimateCardinality()
assert(c.numDistinct == 5)
}

test("DoubleColumnStats") {
Expand All @@ -171,6 +195,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c := 20.5)
assert(!(c :> 20.6))
assert(c :< 20.6)

c.append(0)
c.estimateCardinality()
assert(c.numDistinct == 5)
}

test("TimestampColumnStats") {
Expand All @@ -192,7 +220,10 @@ class ColumnStatsSuite extends FunSuite {
assert(c.min.equals(ts1) && c.max.equals(ts2))
c.append(ts4)
assert(c.min.equals(ts1) && c.max.equals(ts4))


c.append(ts4)
c.estimateCardinality()
assert(c.numDistinct == 4)
}

test("StringColumnStats") {
Expand All @@ -213,5 +244,18 @@ class ColumnStatsSuite extends FunSuite {
c.append("0987")
assert(c.min.equals(T("0987")) && c.max.equals(T("cccc")))

c.estimateCardinality()
assert(c.numDistinct == 4)
}

test("HighCardinality") {
val size = 10000000
var c = new ColumnStats.IntColumnStats
for (i <- 1 to size) {
c.append(i)
}
c.estimateCardinality()
val err = scala.math.abs(c.numDistinct - size) / size.asInstanceOf[Double]
assert(err < 0.1)
}
}