Skip to content

Commit

Permalink
1. Bug fix: Binary column type should override actual size computatio…
Browse files Browse the repository at this point in the history
…n. 2) more tests added
  • Loading branch information
harshars committed Aug 24, 2013
1 parent c911808 commit 9dda67b
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/main/scala/shark/memstore2/column/ColumnType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ object BINARY extends ColumnType[BytesWritable, BytesWritable](10, 16) {
}

def newWritable() = new BytesWritable

override def actualSize(v: BytesWritable) = v.getLength() + 4
}


Expand Down
47 changes: 47 additions & 0 deletions src/test/scala/shark/SQLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,53 @@ class SQLSuite extends FunSuite with BeforeAndAfterAll {
expectSql("select * from selstar where val='val_487'","487 val_487")
}

//////////////////////////////////////////////////////////////////////////////
// various data types
//////////////////////////////////////////////////////////////////////////////

test("various data types") {
sc.sql("drop table if exists checkboolean")
sc.sql("""create table checkboolean TBLPROPERTIES ("shark.cache" = "true") as
select key, val, true as flag from test where key < "300" """)
sc.sql("""insert into table checkboolean
select key, val, false as flag from test where key > "300" """)
expectSql("select flag, count(*) from checkboolean group by flag order by flag asc",
Array[String]("false\t208", "true\t292"))

sc.sql("drop table if exists checkbyte")
sc.sql("drop table if exists checkbyte_cached")
sc.sql("""create table checkbyte (key string, val string, flag tinyint) """)
sc.sql("""insert into table checkbyte
select key, val, 1 from test where key < "300" """)
sc.sql("""insert into table checkbyte
select key, val, 0 from test where key > "300" """)
sc.sql("""create table checkbyte_cached as select * from checkbyte""")
expectSql("select flag, count(*) from checkbyte_cached group by flag order by flag asc",
Array[String]("0\t208", "1\t292"))

sc.sql("drop table if exists checkbinary")
sc.sql("drop table if exists checkbinary_cached")
sc.sql("""create table checkbinary (key string, flag binary) """)
sc.sql("""insert into table checkbinary
select key, cast(val as binary) as flag from test where key < "300" """)
sc.sql("""insert into table checkbinary
select key, cast(val as binary) as flag from test where key > "300" """)
sc.sql("create table checkbinary_cached as select key, flag from checkbinary")
expectSql("select cast(flag as string) as f from checkbinary_cached order by f asc limit 2",
Array[String]("val_0", "val_0"))

sc.sql("drop table if exists checkshort")
sc.sql("drop table if exists checkshort_cached")
sc.sql("""create table checkshort (key string, val string, flag smallint) """)
sc.sql("""insert into table checkshort
select key, val, 23 as flag from test where key < "300" """)
sc.sql("""insert into table checkshort
select key, val, 36 as flag from test where key > "300" """)
sc.sql("create table checkshort_cached as select key, val, flag from checkshort")
expectSql("select flag, count(*) from checkshort_cached group by flag order by flag asc",
Array[String]("23\t292", "36\t208"))
}

//////////////////////////////////////////////////////////////////////////////
// SharkContext APIs (e.g. sql2rdd, sql)
//////////////////////////////////////////////////////////////////////////////
Expand Down
31 changes: 31 additions & 0 deletions src/test/scala/shark/memstore2/ColumnIteratorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package shark.memstore2

import org.apache.hadoop.hive.serde2.`lazy`.ByteArrayRef
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
Expand Down Expand Up @@ -268,6 +269,36 @@ class ColumnIteratorSuite extends FunSuite {
assert(builder.stats.max.equals(ts3))
}

test("Binary Column") {
val b1 = new BytesWritable()
b1.set(Array[Byte](0,1,2), 0, 3)

val builder = new BinaryColumnBuilder
testColumn(
Array[BytesWritable](b1),
builder,
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
classOf[BinaryColumnIterator],
false,
compareBinary)
assert(builder.stats.isInstanceOf[ColumnStats.NoOpStats[_]])

def compareBinary(x: Object, y: Object): Boolean = {
val xdata = x.asInstanceOf[ByteArrayRef].getData
val ywritable = y.asInstanceOf[BytesWritable]
val ydata = ywritable.getBytes()
val length = ywritable.getLength()
if (length != xdata.length) {
false
} else {
val ydatapruned = new Array[Byte](length)
System.arraycopy(ydata, 0, ydatapruned, 0, length)
java.util.Arrays.equals(xdata, ydatapruned)
}
}
}


def testColumn[T, U <: ColumnIterator](
testData: Array[_ <: Object],
Expand Down
102 changes: 102 additions & 0 deletions src/test/scala/shark/memstore2/column/ColumnTypeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package shark.memstore2.column

import org.scalatest.FunSuite
import java.nio.ByteBuffer
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.hive.serde2.io._

class ColumnTypeSuite extends FunSuite {

test("Int") {
assert(INT.defaultSize == 4)
var buffer = ByteBuffer.allocate(32)
var a: Seq[Int] = Array[Int](35, 67, 899, 4569001)
a.foreach {i => buffer.putInt(i)}
buffer.rewind()
a.foreach {i =>
val v = INT.extract(buffer.position(), buffer)
assert(v == i)
}
buffer = ByteBuffer.allocate(32)
a = Range(0, 4)
a.foreach { i =>
INT.append(i, buffer)
}
buffer.rewind()
a.foreach { i => assert(buffer.getInt() == i)}

buffer = ByteBuffer.allocate(32)
a =Range(0,4)
a.foreach { i => buffer.putInt(i)}
buffer.rewind()
val writable = new IntWritable()
a.foreach { i =>
INT.extractInto(buffer.position(), buffer, writable)
assert(writable.get == i)
}

}

test("Short") {
assert(SHORT.defaultSize == 2)
assert(SHORT.actualSize(8) == 2)
var buffer = ByteBuffer.allocate(32)
var a = Array[Short](35, 67, 87, 45)
a.foreach {i => buffer.putShort(i)}
buffer.rewind()
a.foreach {i =>
val v = SHORT.extract(buffer.position(), buffer)
assert(v == i)
}

buffer = ByteBuffer.allocate(32)
a = Array[Short](0,1,2,3)
a.foreach { i =>
SHORT.append(i, buffer)
}
buffer.rewind()
a.foreach { i => assert(buffer.getShort() == i)}

buffer = ByteBuffer.allocate(32)
a =Array[Short](0,1,2,3)
a.foreach { i => buffer.putShort(i)}
buffer.rewind()
val writable = new ShortWritable()
a.foreach { i =>
SHORT.extractInto(buffer.position(), buffer, writable)
assert(writable.get == i)
}
}

test("Long") {
assert(LONG.defaultSize == 8)
assert(LONG.actualSize(45L) == 8)
var buffer = ByteBuffer.allocate(64)
var a = Array[Long](35L, 67L, 8799000880L, 45000999090L)
a.foreach {i => buffer.putLong(i)}
buffer.rewind()
a.foreach {i =>
val v = LONG.extract(buffer.position(), buffer)
assert(v == i)
}

buffer = ByteBuffer.allocate(32)
a = Array[Long](0,1,2,3)
a.foreach { i =>
LONG.append(i, buffer)
}
buffer.rewind()
a.foreach { i => assert(buffer.getLong() == i)}

buffer = ByteBuffer.allocate(32)
a =Array[Long](0,1,2,3)
a.foreach { i => buffer.putLong(i)}
buffer.rewind()
val writable = new LongWritable()
a.foreach { i =>
LONG.extractInto(buffer.position(), buffer, writable)
assert(writable.get == i)
}
}
}

0 comments on commit 9dda67b

Please sign in to comment.