Skip to content

Commit

Permalink
Merge pull request amplab#130 from harsha2010/buffer-resizing
Browse files Browse the repository at this point in the history
Various enhancements / bug fixes to columnar compression.
  • Loading branch information
rxin committed Aug 27, 2013
2 parents fc0143b + 9dda67b commit b2885b8
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 11 deletions.
10 changes: 6 additions & 4 deletions src/main/scala/shark/memstore2/column/ColumnBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,17 @@ trait ColumnBuilder[T] {
}

protected def growIfNeeded(orig: ByteBuffer, size: Int): ByteBuffer = {
val capacity = orig.capacity()
if (orig.remaining() < size) {
//grow in steps of initial size
var s = (3 * orig.capacity()) / 2 + 1
if (s < size) {
s = size
var additionalSize = capacity/8 + 1
var newSize = capacity + additionalSize
if (additionalSize < size) {
newSize = capacity + size
}
val pos = orig.position()
orig.clear()
val b = ByteBuffer.allocate(s)
val b = ByteBuffer.allocate(newSize)
b.order(ByteOrder.nativeOrder())
b.put(orig.array(), 0, pos)
} else {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/shark/memstore2/column/ColumnIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object Implicits {
implicit def intToCompressionType(i: Int): CompressionType = i match {
case -1 => DefaultCompressionType
case 0 => RLECompressionType
case 1 => DictionaryCompressionType
case _ => throw new UnsupportedOperationException("Compression Type " + i)
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/shark/memstore2/column/ColumnType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ object BINARY extends ColumnType[BytesWritable, BytesWritable](10, 16) {
}

def newWritable() = new BytesWritable

override def actualSize(v: BytesWritable) = v.getLength() + 4
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ class DictDecoder[V] (buffer:ByteBuffer, columnType: ColumnType[_, V]) extends I
private val _dictionary: Map[Int, V] = {
val size = buffer.getInt()
val d = new HashMap[Int, V]()
val count = 0
var count = 0
while (count < size) {
//read text, followed by index
val text = columnType.extract(buffer.position(), buffer)
val index = buffer.getInt()
d.put(index, text.asInstanceOf[V])
count+= 1
}
d
}
Expand Down
47 changes: 47 additions & 0 deletions src/test/scala/shark/SQLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,53 @@ class SQLSuite extends FunSuite with BeforeAndAfterAll {
expectSql("select * from selstar where val='val_487'","487 val_487")
}

//////////////////////////////////////////////////////////////////////////////
// various data types
//////////////////////////////////////////////////////////////////////////////

test("various data types") {
sc.sql("drop table if exists checkboolean")
sc.sql("""create table checkboolean TBLPROPERTIES ("shark.cache" = "true") as
select key, val, true as flag from test where key < "300" """)
sc.sql("""insert into table checkboolean
select key, val, false as flag from test where key > "300" """)
expectSql("select flag, count(*) from checkboolean group by flag order by flag asc",
Array[String]("false\t208", "true\t292"))

sc.sql("drop table if exists checkbyte")
sc.sql("drop table if exists checkbyte_cached")
sc.sql("""create table checkbyte (key string, val string, flag tinyint) """)
sc.sql("""insert into table checkbyte
select key, val, 1 from test where key < "300" """)
sc.sql("""insert into table checkbyte
select key, val, 0 from test where key > "300" """)
sc.sql("""create table checkbyte_cached as select * from checkbyte""")
expectSql("select flag, count(*) from checkbyte_cached group by flag order by flag asc",
Array[String]("0\t208", "1\t292"))

sc.sql("drop table if exists checkbinary")
sc.sql("drop table if exists checkbinary_cached")
sc.sql("""create table checkbinary (key string, flag binary) """)
sc.sql("""insert into table checkbinary
select key, cast(val as binary) as flag from test where key < "300" """)
sc.sql("""insert into table checkbinary
select key, cast(val as binary) as flag from test where key > "300" """)
sc.sql("create table checkbinary_cached as select key, flag from checkbinary")
expectSql("select cast(flag as string) as f from checkbinary_cached order by f asc limit 2",
Array[String]("val_0", "val_0"))

sc.sql("drop table if exists checkshort")
sc.sql("drop table if exists checkshort_cached")
sc.sql("""create table checkshort (key string, val string, flag smallint) """)
sc.sql("""insert into table checkshort
select key, val, 23 as flag from test where key < "300" """)
sc.sql("""insert into table checkshort
select key, val, 36 as flag from test where key > "300" """)
sc.sql("create table checkshort_cached as select key, val, flag from checkshort")
expectSql("select flag, count(*) from checkshort_cached group by flag order by flag asc",
Array[String]("23\t292", "36\t208"))
}

//////////////////////////////////////////////////////////////////////////////
// SharkContext APIs (e.g. sql2rdd, sql)
//////////////////////////////////////////////////////////////////////////////
Expand Down
31 changes: 31 additions & 0 deletions src/test/scala/shark/memstore2/ColumnIteratorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package shark.memstore2

import org.apache.hadoop.hive.serde2.`lazy`.ByteArrayRef
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
Expand Down Expand Up @@ -268,6 +269,36 @@ class ColumnIteratorSuite extends FunSuite {
assert(builder.stats.max.equals(ts3))
}

test("Binary Column") {
val b1 = new BytesWritable()
b1.set(Array[Byte](0,1,2), 0, 3)

val builder = new BinaryColumnBuilder
testColumn(
Array[BytesWritable](b1),
builder,
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
classOf[BinaryColumnIterator],
false,
compareBinary)
assert(builder.stats.isInstanceOf[ColumnStats.NoOpStats[_]])

def compareBinary(x: Object, y: Object): Boolean = {
val xdata = x.asInstanceOf[ByteArrayRef].getData
val ywritable = y.asInstanceOf[BytesWritable]
val ydata = ywritable.getBytes()
val length = ywritable.getLength()
if (length != xdata.length) {
false
} else {
val ydatapruned = new Array[Byte](length)
System.arraycopy(ydata, 0, ydatapruned, 0, length)
java.util.Arrays.equals(xdata, ydatapruned)
}
}
}


def testColumn[T, U <: ColumnIterator](
testData: Array[_ <: Object],
Expand Down
102 changes: 102 additions & 0 deletions src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package shark.memstore2.column

import org.scalatest.FunSuite
import java.nio.ByteBuffer
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.hive.serde2.io._

class ColumnTypeSuite extends FunSuite {

test("Int") {
assert(INT.defaultSize == 4)
var buffer = ByteBuffer.allocate(32)
var a: Seq[Int] = Array[Int](35, 67, 899, 4569001)
a.foreach {i => buffer.putInt(i)}
buffer.rewind()
a.foreach {i =>
val v = INT.extract(buffer.position(), buffer)
assert(v == i)
}
buffer = ByteBuffer.allocate(32)
a = Range(0, 4)
a.foreach { i =>
INT.append(i, buffer)
}
buffer.rewind()
a.foreach { i => assert(buffer.getInt() == i)}

buffer = ByteBuffer.allocate(32)
a =Range(0,4)
a.foreach { i => buffer.putInt(i)}
buffer.rewind()
val writable = new IntWritable()
a.foreach { i =>
INT.extractInto(buffer.position(), buffer, writable)
assert(writable.get == i)
}

}

test("Short") {
assert(SHORT.defaultSize == 2)
assert(SHORT.actualSize(8) == 2)
var buffer = ByteBuffer.allocate(32)
var a = Array[Short](35, 67, 87, 45)
a.foreach {i => buffer.putShort(i)}
buffer.rewind()
a.foreach {i =>
val v = SHORT.extract(buffer.position(), buffer)
assert(v == i)
}

buffer = ByteBuffer.allocate(32)
a = Array[Short](0,1,2,3)
a.foreach { i =>
SHORT.append(i, buffer)
}
buffer.rewind()
a.foreach { i => assert(buffer.getShort() == i)}

buffer = ByteBuffer.allocate(32)
a =Array[Short](0,1,2,3)
a.foreach { i => buffer.putShort(i)}
buffer.rewind()
val writable = new ShortWritable()
a.foreach { i =>
SHORT.extractInto(buffer.position(), buffer, writable)
assert(writable.get == i)
}
}

test("Long") {
assert(LONG.defaultSize == 8)
assert(LONG.actualSize(45L) == 8)
var buffer = ByteBuffer.allocate(64)
var a = Array[Long](35L, 67L, 8799000880L, 45000999090L)
a.foreach {i => buffer.putLong(i)}
buffer.rewind()
a.foreach {i =>
val v = LONG.extract(buffer.position(), buffer)
assert(v == i)
}

buffer = ByteBuffer.allocate(32)
a = Array[Long](0,1,2,3)
a.foreach { i =>
LONG.append(i, buffer)
}
buffer.rewind()
a.foreach { i => assert(buffer.getLong() == i)}

buffer = ByteBuffer.allocate(32)
a =Array[Long](0,1,2,3)
a.foreach { i => buffer.putLong(i)}
buffer.rewind()
val writable = new LongWritable()
a.foreach { i =>
LONG.extractInto(buffer.position(), buffer, writable)
assert(writable.get == i)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ import org.apache.hadoop.io.Text
import shark.memstore2.column.Implicits._

class CompressedColumnIteratorSuite extends FunSuite {

test("RLE Decompression") {

class TestIterator(val buffer: ByteBuffer, val columnType: ColumnType[_,_])
extends CompressedColumnIterator

val b = ByteBuffer.allocate(1024)
b.order(ByteOrder.nativeOrder())
b.putInt(STRING.typeID)
Expand All @@ -29,6 +25,41 @@ class CompressedColumnIteratorSuite extends FunSuite {
val compressedBuffer = rle.compress(b, STRING)
val iter = new TestIterator(compressedBuffer, compressedBuffer.getInt())
iter.next()
println(iter.current)
assert(iter.current.toString().equals("abc"))
iter.next()
assert(iter.current.toString().equals("abc"))
assert(iter.current.toString().equals("abc"))
iter.next()
assert(iter.current.toString().equals("efg"))
iter.next()
assert(iter.current.toString().equals("abc"))
}

test("Dictionary Decompression") {
val b = ByteBuffer.allocate(1024)
b.order(ByteOrder.nativeOrder())
b.putInt(STRING.typeID)
val dict = new DictionaryEncoding()

Array(new Text("abc"), new Text("abc"), new Text("efg"), new Text("abc")).foreach { text =>
STRING.append(text, b)
dict.gatherStatsForCompressibility(text, STRING)
}
b.limit(b.position())
b.rewind()
val compressedBuffer = dict.compress(b, STRING)
val iter = new TestIterator(compressedBuffer, compressedBuffer.getInt())
iter.next()
assert(iter.current.toString().equals("abc"))
iter.next()
assert(iter.current.toString().equals("abc"))
assert(iter.current.toString().equals("abc"))
iter.next()
assert(iter.current.toString().equals("efg"))
iter.next()
assert(iter.current.toString().equals("abc"))
}
}

class TestIterator(val buffer: ByteBuffer, val columnType: ColumnType[_,_])
extends CompressedColumnIterator

0 comments on commit b2885b8

Please sign in to comment.