diff --git a/src/main/scala/shark/SharkConfVars.scala b/src/main/scala/shark/SharkConfVars.scala index b5592024..d08d4b57 100755 --- a/src/main/scala/shark/SharkConfVars.scala +++ b/src/main/scala/shark/SharkConfVars.scala @@ -56,6 +56,16 @@ object SharkConfVars { // Number of mappers to force for table scan jobs val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1) + + // WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE", + // "CACHE_THROUGH", "THROUGH". + // For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH", + // which means to write the table synchronously to the under fs, and cache the host columns. + // Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write + // performance. However be careful to use those two options! If the entire table + // cannot be fully cached, some data part will be evicted and lost forever. + // "THROUGH" only writes the table to under fs and with no cache at all. + val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH") // Add Shark configuration variables and their default values to the given conf, // so default values show up in 'set'. diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 2d44fbf9..a48c5cb8 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -132,6 +132,7 @@ class MemoryStoreSinkOperator extends TerminalOperator { outputRDD = outputRDD.mapPartitionsWithIndex { case(part, iter) => val partition = iter.next() partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => + offHeapWriter.setLocalHconf(op.getLocalHconf) offHeapWriter.writeColumnPartition(column, part, buf) } Iterator(partition) diff --git a/src/main/scala/shark/execution/SparkLoadTask.scala b/src/main/scala/shark/execution/SparkLoadTask.scala index 64ae4567..022d453a 100644 --- a/src/main/scala/shark/execution/SparkLoadTask.scala +++ b/src/main/scala/shark/execution/SparkLoadTask.scala @@ -234,6 +234,7 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) => val partition = iter.next() partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => + offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value) offHeapWriter.writeColumnPartition(column, part, buf) } Iterator(partition) diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 9193b2c2..88827828 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -20,6 +20,9 @@ package shark.memstore2 import java.util import java.nio.ByteBuffer +import scala.reflect.BeanProperty + +import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.rdd.RDD import shark.LogHelper @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient { } abstract class OffHeapTableWriter extends Serializable { + @transient @BeanProperty var localHconf: HiveConf = _ /** Creates this table. Called only on the driver node. */ def createTable() diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 5a1288b0..49febd04 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -19,9 +19,11 @@ package shark.tachyon import java.nio.ByteBuffer +import scala.reflect.BeanProperty + import tachyon.client.WriteType -import shark.LogHelper +import shark.{LogHelper, SharkConfVars} import shark.execution.serialization.JavaSerializer import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} @@ -51,7 +53,9 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: val rawColumn = rawTable.getRawColumn(column) rawColumn.createPartition(part) val file = rawColumn.getPartition(part) - val outStream = file.getOutStream(WriteType.CACHE_THROUGH) + val writeType: WriteType = WriteType.valueOf( + SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE)) + val outStream = file.getOutStream(writeType) outStream.write(data.array(), 0, data.limit()) outStream.close() }