From 0a8bf4770858c99748404861a15db671a80f5059 Mon Sep 17 00:00:00 2001 From: harshars Date: Sun, 18 Aug 2013 23:56:00 -0700 Subject: [PATCH 1/2] 1. Slight rearrangement of buffer sizing calculation: In the original calculation the buffer could overflow earlier. 2. Bug fix in Dictionary Decoder which had not been tested. 3. Enable Dictionary Compression mapping via implicits and add tests for dictionary decoding --- .../memstore2/column/ColumnBuilder.scala | 10 +++-- .../memstore2/column/ColumnIterator.scala | 1 + .../column/CompressedColumnIterator.scala | 3 +- .../CompressedColumnIteratorSuite.scala | 43 ++++++++++++++++--- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/main/scala/shark/memstore2/column/ColumnBuilder.scala b/src/main/scala/shark/memstore2/column/ColumnBuilder.scala index 87684c68..375ec244 100644 --- a/src/main/scala/shark/memstore2/column/ColumnBuilder.scala +++ b/src/main/scala/shark/memstore2/column/ColumnBuilder.scala @@ -63,15 +63,17 @@ trait ColumnBuilder[T] { } protected def growIfNeeded(orig: ByteBuffer, size: Int): ByteBuffer = { + val capacity = orig.capacity() if (orig.remaining() < size) { //grow in steps of initial size - var s = (3 * orig.capacity()) / 2 + 1 - if (s < size) { - s = size + var additionalSize = capacity/8 + 1 + var newSize = capacity + additionalSize + if (additionalSize < size) { + newSize = capacity + size } val pos = orig.position() orig.clear() - val b = ByteBuffer.allocate(s) + val b = ByteBuffer.allocate(newSize) b.order(ByteOrder.nativeOrder()) b.put(orig.array(), 0, pos) } else { diff --git a/src/main/scala/shark/memstore2/column/ColumnIterator.scala b/src/main/scala/shark/memstore2/column/ColumnIterator.scala index 937e880f..5c9b267c 100644 --- a/src/main/scala/shark/memstore2/column/ColumnIterator.scala +++ b/src/main/scala/shark/memstore2/column/ColumnIterator.scala @@ -51,6 +51,7 @@ object Implicits { implicit def intToCompressionType(i: Int): CompressionType = i match { case -1 => DefaultCompressionType case 0 => RLECompressionType + case 1 => DictionaryCompressionType case _ => throw new UnsupportedOperationException("Compression Type " + i) } diff --git a/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala b/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala index 3bfaa98b..7b4e5ab8 100644 --- a/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala +++ b/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala @@ -84,12 +84,13 @@ class DictDecoder[V] (buffer:ByteBuffer, columnType: ColumnType[_, V]) extends I private val _dictionary: Map[Int, V] = { val size = buffer.getInt() val d = new HashMap[Int, V]() - val count = 0 + var count = 0 while (count < size) { //read text, followed by index val text = columnType.extract(buffer.position(), buffer) val index = buffer.getInt() d.put(index, text.asInstanceOf[V]) + count+= 1 } d } diff --git a/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala b/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala index e374c871..322de1e1 100644 --- a/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala +++ b/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala @@ -9,12 +9,8 @@ import org.apache.hadoop.io.Text import shark.memstore2.column.Implicits._ class CompressedColumnIteratorSuite extends FunSuite { - + test("RLE Decompression") { - - class TestIterator(val buffer: ByteBuffer, val columnType: ColumnType[_,_]) - extends CompressedColumnIterator - val b = ByteBuffer.allocate(1024) b.order(ByteOrder.nativeOrder()) b.putInt(STRING.typeID) @@ -29,6 +25,41 @@ class CompressedColumnIteratorSuite extends FunSuite { val compressedBuffer = rle.compress(b, STRING) val iter = new TestIterator(compressedBuffer, compressedBuffer.getInt()) iter.next() - println(iter.current) + assert(iter.current.toString().equals("abc")) + iter.next() + assert(iter.current.toString().equals("abc")) + assert(iter.current.toString().equals("abc")) + iter.next() + assert(iter.current.toString().equals("efg")) + iter.next() + assert(iter.current.toString().equals("abc")) + } + + test("Dictionary Decompression") { + val b = ByteBuffer.allocate(1024) + b.order(ByteOrder.nativeOrder()) + b.putInt(STRING.typeID) + val dict = new DictionaryEncoding() + + Array(new Text("abc"), new Text("abc"), new Text("efg"), new Text("abc")).foreach { text => + STRING.append(text, b) + dict.gatherStatsForCompressibility(text, STRING) + } + b.limit(b.position()) + b.rewind() + val compressedBuffer = dict.compress(b, STRING) + val iter = new TestIterator(compressedBuffer, compressedBuffer.getInt()) + iter.next() + assert(iter.current.toString().equals("abc")) + iter.next() + assert(iter.current.toString().equals("abc")) + assert(iter.current.toString().equals("abc")) + iter.next() + assert(iter.current.toString().equals("efg")) + iter.next() + assert(iter.current.toString().equals("abc")) } } + + class TestIterator(val buffer: ByteBuffer, val columnType: ColumnType[_,_]) + extends CompressedColumnIterator From 9dda67b87dfd5113a66a6146bf7dacb11a3f718a Mon Sep 17 00:00:00 2001 From: harshars Date: Sat, 24 Aug 2013 15:24:54 -0700 Subject: [PATCH 2/2] 1. Bug fix: Binary column type should override actual size computation. 2) more tests added --- .../shark/memstore2/column/ColumnType.scala | 2 + src/test/scala/shark/SQLSuite.scala | 47 ++++++++ .../shark/memstore2/ColumnIteratorSuite.scala | 31 ++++++ .../memstore2/column/ColumnTypeSuite.scala | 102 ++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala diff --git a/src/main/scala/shark/memstore2/column/ColumnType.scala b/src/main/scala/shark/memstore2/column/ColumnType.scala index f846cf78..068efe42 100644 --- a/src/main/scala/shark/memstore2/column/ColumnType.scala +++ b/src/main/scala/shark/memstore2/column/ColumnType.scala @@ -336,6 +336,8 @@ object BINARY extends ColumnType[BytesWritable, BytesWritable](10, 16) { } def newWritable() = new BytesWritable + + override def actualSize(v: BytesWritable) = v.getLength() + 4 } diff --git a/src/test/scala/shark/SQLSuite.scala b/src/test/scala/shark/SQLSuite.scala index 78519140..99b4d74e 100644 --- a/src/test/scala/shark/SQLSuite.scala +++ b/src/test/scala/shark/SQLSuite.scala @@ -319,6 +319,53 @@ class SQLSuite extends FunSuite with BeforeAndAfterAll { expectSql("select * from selstar where val='val_487'","487 val_487") } + ////////////////////////////////////////////////////////////////////////////// + // various data types + ////////////////////////////////////////////////////////////////////////////// + + test("various data types") { + sc.sql("drop table if exists checkboolean") + sc.sql("""create table checkboolean TBLPROPERTIES ("shark.cache" = "true") as + select key, val, true as flag from test where key < "300" """) + sc.sql("""insert into table checkboolean + select key, val, false as flag from test where key > "300" """) + expectSql("select flag, count(*) from checkboolean group by flag order by flag asc", + Array[String]("false\t208", "true\t292")) + + sc.sql("drop table if exists checkbyte") + sc.sql("drop table if exists checkbyte_cached") + sc.sql("""create table checkbyte (key string, val string, flag tinyint) """) + sc.sql("""insert into table checkbyte + select key, val, 1 from test where key < "300" """) + sc.sql("""insert into table checkbyte + select key, val, 0 from test where key > "300" """) + sc.sql("""create table checkbyte_cached as select * from checkbyte""") + expectSql("select flag, count(*) from checkbyte_cached group by flag order by flag asc", + Array[String]("0\t208", "1\t292")) + + sc.sql("drop table if exists checkbinary") + sc.sql("drop table if exists checkbinary_cached") + sc.sql("""create table checkbinary (key string, flag binary) """) + sc.sql("""insert into table checkbinary + select key, cast(val as binary) as flag from test where key < "300" """) + sc.sql("""insert into table checkbinary + select key, cast(val as binary) as flag from test where key > "300" """) + sc.sql("create table checkbinary_cached as select key, flag from checkbinary") + expectSql("select cast(flag as string) as f from checkbinary_cached order by f asc limit 2", + Array[String]("val_0", "val_0")) + + sc.sql("drop table if exists checkshort") + sc.sql("drop table if exists checkshort_cached") + sc.sql("""create table checkshort (key string, val string, flag smallint) """) + sc.sql("""insert into table checkshort + select key, val, 23 as flag from test where key < "300" """) + sc.sql("""insert into table checkshort + select key, val, 36 as flag from test where key > "300" """) + sc.sql("create table checkshort_cached as select key, val, flag from checkshort") + expectSql("select flag, count(*) from checkshort_cached group by flag order by flag asc", + Array[String]("23\t292", "36\t208")) + } + ////////////////////////////////////////////////////////////////////////////// // SharkContext APIs (e.g. sql2rdd, sql) ////////////////////////////////////////////////////////////////////////////// diff --git a/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala b/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala index d966cec2..8cf6e827 100644 --- a/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala +++ b/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala @@ -17,6 +17,7 @@ package shark.memstore2 +import org.apache.hadoop.hive.serde2.`lazy`.ByteArrayRef import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector @@ -268,6 +269,36 @@ class ColumnIteratorSuite extends FunSuite { assert(builder.stats.max.equals(ts3)) } + test("Binary Column") { + val b1 = new BytesWritable() + b1.set(Array[Byte](0,1,2), 0, 3) + + val builder = new BinaryColumnBuilder + testColumn( + Array[BytesWritable](b1), + builder, + PrimitiveObjectInspectorFactory.writableBinaryObjectInspector, + PrimitiveObjectInspectorFactory.writableBinaryObjectInspector, + classOf[BinaryColumnIterator], + false, + compareBinary) + assert(builder.stats.isInstanceOf[ColumnStats.NoOpStats[_]]) + + def compareBinary(x: Object, y: Object): Boolean = { + val xdata = x.asInstanceOf[ByteArrayRef].getData + val ywritable = y.asInstanceOf[BytesWritable] + val ydata = ywritable.getBytes() + val length = ywritable.getLength() + if (length != xdata.length) { + false + } else { + val ydatapruned = new Array[Byte](length) + System.arraycopy(ydata, 0, ydatapruned, 0, length) + java.util.Arrays.equals(xdata, ydatapruned) + } + } + } + def testColumn[T, U <: ColumnIterator]( testData: Array[_ <: Object], diff --git a/src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala b/src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala new file mode 100644 index 00000000..ec959bf7 --- /dev/null +++ b/src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala @@ -0,0 +1,102 @@ +package shark.memstore2.column + +import org.scalatest.FunSuite +import java.nio.ByteBuffer +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.hive.serde2.io._ + +class ColumnTypeSuite extends FunSuite { + + test("Int") { + assert(INT.defaultSize == 4) + var buffer = ByteBuffer.allocate(32) + var a: Seq[Int] = Array[Int](35, 67, 899, 4569001) + a.foreach {i => buffer.putInt(i)} + buffer.rewind() + a.foreach {i => + val v = INT.extract(buffer.position(), buffer) + assert(v == i) + } + buffer = ByteBuffer.allocate(32) + a = Range(0, 4) + a.foreach { i => + INT.append(i, buffer) + } + buffer.rewind() + a.foreach { i => assert(buffer.getInt() == i)} + + buffer = ByteBuffer.allocate(32) + a =Range(0,4) + a.foreach { i => buffer.putInt(i)} + buffer.rewind() + val writable = new IntWritable() + a.foreach { i => + INT.extractInto(buffer.position(), buffer, writable) + assert(writable.get == i) + } + + } + + test("Short") { + assert(SHORT.defaultSize == 2) + assert(SHORT.actualSize(8) == 2) + var buffer = ByteBuffer.allocate(32) + var a = Array[Short](35, 67, 87, 45) + a.foreach {i => buffer.putShort(i)} + buffer.rewind() + a.foreach {i => + val v = SHORT.extract(buffer.position(), buffer) + assert(v == i) + } + + buffer = ByteBuffer.allocate(32) + a = Array[Short](0,1,2,3) + a.foreach { i => + SHORT.append(i, buffer) + } + buffer.rewind() + a.foreach { i => assert(buffer.getShort() == i)} + + buffer = ByteBuffer.allocate(32) + a =Array[Short](0,1,2,3) + a.foreach { i => buffer.putShort(i)} + buffer.rewind() + val writable = new ShortWritable() + a.foreach { i => + SHORT.extractInto(buffer.position(), buffer, writable) + assert(writable.get == i) + } + } + + test("Long") { + assert(LONG.defaultSize == 8) + assert(LONG.actualSize(45L) == 8) + var buffer = ByteBuffer.allocate(64) + var a = Array[Long](35L, 67L, 8799000880L, 45000999090L) + a.foreach {i => buffer.putLong(i)} + buffer.rewind() + a.foreach {i => + val v = LONG.extract(buffer.position(), buffer) + assert(v == i) + } + + buffer = ByteBuffer.allocate(32) + a = Array[Long](0,1,2,3) + a.foreach { i => + LONG.append(i, buffer) + } + buffer.rewind() + a.foreach { i => assert(buffer.getLong() == i)} + + buffer = ByteBuffer.allocate(32) + a =Array[Long](0,1,2,3) + a.foreach { i => buffer.putLong(i)} + buffer.rewind() + val writable = new LongWritable() + a.foreach { i => + LONG.extractInto(buffer.position(), buffer, writable) + assert(writable.get == i) + } + } +}