From 9dda67b87dfd5113a66a6146bf7dacb11a3f718a Mon Sep 17 00:00:00 2001 From: harshars Date: Sat, 24 Aug 2013 15:24:54 -0700 Subject: [PATCH] 1. Bug fix: Binary column type should override actual size computation. 2) more tests added --- .../shark/memstore2/column/ColumnType.scala | 2 + src/test/scala/shark/SQLSuite.scala | 47 ++++++++ .../shark/memstore2/ColumnIteratorSuite.scala | 31 ++++++ .../memstore2/column/ColumnTypeSuite.scala | 102 ++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala diff --git a/src/main/scala/shark/memstore2/column/ColumnType.scala b/src/main/scala/shark/memstore2/column/ColumnType.scala index f846cf78..068efe42 100644 --- a/src/main/scala/shark/memstore2/column/ColumnType.scala +++ b/src/main/scala/shark/memstore2/column/ColumnType.scala @@ -336,6 +336,8 @@ object BINARY extends ColumnType[BytesWritable, BytesWritable](10, 16) { } def newWritable() = new BytesWritable + + override def actualSize(v: BytesWritable) = v.getLength() + 4 } diff --git a/src/test/scala/shark/SQLSuite.scala b/src/test/scala/shark/SQLSuite.scala index 78519140..99b4d74e 100644 --- a/src/test/scala/shark/SQLSuite.scala +++ b/src/test/scala/shark/SQLSuite.scala @@ -319,6 +319,53 @@ class SQLSuite extends FunSuite with BeforeAndAfterAll { expectSql("select * from selstar where val='val_487'","487 val_487") } + ////////////////////////////////////////////////////////////////////////////// + // various data types + ////////////////////////////////////////////////////////////////////////////// + + test("various data types") { + sc.sql("drop table if exists checkboolean") + sc.sql("""create table checkboolean TBLPROPERTIES ("shark.cache" = "true") as + select key, val, true as flag from test where key < "300" """) + sc.sql("""insert into table checkboolean + select key, val, false as flag from test where key > "300" """) + expectSql("select flag, count(*) from checkboolean group by flag order by flag asc", + Array[String]("false\t208", "true\t292")) + + sc.sql("drop table if exists checkbyte") + sc.sql("drop table if exists checkbyte_cached") + sc.sql("""create table checkbyte (key string, val string, flag tinyint) """) + sc.sql("""insert into table checkbyte + select key, val, 1 from test where key < "300" """) + sc.sql("""insert into table checkbyte + select key, val, 0 from test where key > "300" """) + sc.sql("""create table checkbyte_cached as select * from checkbyte""") + expectSql("select flag, count(*) from checkbyte_cached group by flag order by flag asc", + Array[String]("0\t208", "1\t292")) + + sc.sql("drop table if exists checkbinary") + sc.sql("drop table if exists checkbinary_cached") + sc.sql("""create table checkbinary (key string, flag binary) """) + sc.sql("""insert into table checkbinary + select key, cast(val as binary) as flag from test where key < "300" """) + sc.sql("""insert into table checkbinary + select key, cast(val as binary) as flag from test where key > "300" """) + sc.sql("create table checkbinary_cached as select key, flag from checkbinary") + expectSql("select cast(flag as string) as f from checkbinary_cached order by f asc limit 2", + Array[String]("val_0", "val_0")) + + sc.sql("drop table if exists checkshort") + sc.sql("drop table if exists checkshort_cached") + sc.sql("""create table checkshort (key string, val string, flag smallint) """) + sc.sql("""insert into table checkshort + select key, val, 23 as flag from test where key < "300" """) + sc.sql("""insert into table checkshort + select key, val, 36 as flag from test where key > "300" """) + sc.sql("create table checkshort_cached as select key, val, flag from checkshort") + expectSql("select flag, count(*) from checkshort_cached group by flag order by flag asc", + Array[String]("23\t292", "36\t208")) + } + ////////////////////////////////////////////////////////////////////////////// // SharkContext APIs (e.g. sql2rdd, sql) ////////////////////////////////////////////////////////////////////////////// diff --git a/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala b/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala index d966cec2..8cf6e827 100644 --- a/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala +++ b/src/test/scala/shark/memstore2/ColumnIteratorSuite.scala @@ -17,6 +17,7 @@ package shark.memstore2 +import org.apache.hadoop.hive.serde2.`lazy`.ByteArrayRef import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector @@ -268,6 +269,36 @@ class ColumnIteratorSuite extends FunSuite { assert(builder.stats.max.equals(ts3)) } + test("Binary Column") { + val b1 = new BytesWritable() + b1.set(Array[Byte](0,1,2), 0, 3) + + val builder = new BinaryColumnBuilder + testColumn( + Array[BytesWritable](b1), + builder, + PrimitiveObjectInspectorFactory.writableBinaryObjectInspector, + PrimitiveObjectInspectorFactory.writableBinaryObjectInspector, + classOf[BinaryColumnIterator], + false, + compareBinary) + assert(builder.stats.isInstanceOf[ColumnStats.NoOpStats[_]]) + + def compareBinary(x: Object, y: Object): Boolean = { + val xdata = x.asInstanceOf[ByteArrayRef].getData + val ywritable = y.asInstanceOf[BytesWritable] + val ydata = ywritable.getBytes() + val length = ywritable.getLength() + if (length != xdata.length) { + false + } else { + val ydatapruned = new Array[Byte](length) + System.arraycopy(ydata, 0, ydatapruned, 0, length) + java.util.Arrays.equals(xdata, ydatapruned) + } + } + } + def testColumn[T, U <: ColumnIterator]( testData: Array[_ <: Object], diff --git a/src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala b/src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala new file mode 100644 index 00000000..ec959bf7 --- /dev/null +++ b/src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala @@ -0,0 +1,102 @@ +package shark.memstore2.column + +import org.scalatest.FunSuite +import java.nio.ByteBuffer +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.hive.serde2.io._ + +class ColumnTypeSuite extends FunSuite { + + test("Int") { + assert(INT.defaultSize == 4) + var buffer = ByteBuffer.allocate(32) + var a: Seq[Int] = Array[Int](35, 67, 899, 4569001) + a.foreach {i => buffer.putInt(i)} + buffer.rewind() + a.foreach {i => + val v = INT.extract(buffer.position(), buffer) + assert(v == i) + } + buffer = ByteBuffer.allocate(32) + a = Range(0, 4) + a.foreach { i => + INT.append(i, buffer) + } + buffer.rewind() + a.foreach { i => assert(buffer.getInt() == i)} + + buffer = ByteBuffer.allocate(32) + a =Range(0,4) + a.foreach { i => buffer.putInt(i)} + buffer.rewind() + val writable = new IntWritable() + a.foreach { i => + INT.extractInto(buffer.position(), buffer, writable) + assert(writable.get == i) + } + + } + + test("Short") { + assert(SHORT.defaultSize == 2) + assert(SHORT.actualSize(8) == 2) + var buffer = ByteBuffer.allocate(32) + var a = Array[Short](35, 67, 87, 45) + a.foreach {i => buffer.putShort(i)} + buffer.rewind() + a.foreach {i => + val v = SHORT.extract(buffer.position(), buffer) + assert(v == i) + } + + buffer = ByteBuffer.allocate(32) + a = Array[Short](0,1,2,3) + a.foreach { i => + SHORT.append(i, buffer) + } + buffer.rewind() + a.foreach { i => assert(buffer.getShort() == i)} + + buffer = ByteBuffer.allocate(32) + a =Array[Short](0,1,2,3) + a.foreach { i => buffer.putShort(i)} + buffer.rewind() + val writable = new ShortWritable() + a.foreach { i => + SHORT.extractInto(buffer.position(), buffer, writable) + assert(writable.get == i) + } + } + + test("Long") { + assert(LONG.defaultSize == 8) + assert(LONG.actualSize(45L) == 8) + var buffer = ByteBuffer.allocate(64) + var a = Array[Long](35L, 67L, 8799000880L, 45000999090L) + a.foreach {i => buffer.putLong(i)} + buffer.rewind() + a.foreach {i => + val v = LONG.extract(buffer.position(), buffer) + assert(v == i) + } + + buffer = ByteBuffer.allocate(32) + a = Array[Long](0,1,2,3) + a.foreach { i => + LONG.append(i, buffer) + } + buffer.rewind() + a.foreach { i => assert(buffer.getLong() == i)} + + buffer = ByteBuffer.allocate(32) + a =Array[Long](0,1,2,3) + a.foreach { i => buffer.putLong(i)} + buffer.rewind() + val writable = new LongWritable() + a.foreach { i => + LONG.extractInto(buffer.position(), buffer, writable) + assert(writable.get == i) + } + } +}