Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Tachyon WriteType configurable. #327

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/scala/shark/SharkConfVars.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ object SharkConfVars {

// Number of mappers to force for table scan jobs
val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1)

val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here to both explain what this option is and to warn against the dangers of setting the WriteType without THROUGH?


// Add Shark configuration variables and their default values to the given conf,
// so default values show up in 'set'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
outputRDD = outputRDD.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.setLocalHconf(op.getLocalHconf)
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/shark/execution/SparkLoadTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value)
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package shark.memstore2
import java.util
import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.rdd.RDD

import shark.LogHelper
Expand Down Expand Up @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient {
}

abstract class OffHeapTableWriter extends Serializable {
@transient @BeanProperty var localHconf: HiveConf = _

/** Creates this table. Called only on the driver node. */
def createTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package shark.tachyon

import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import tachyon.client.WriteType

import shark.LogHelper
import shark.{LogHelper, SharkConfVars}
import shark.execution.serialization.JavaSerializer
import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats}

Expand Down Expand Up @@ -51,7 +53,9 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
val rawColumn = rawTable.getRawColumn(column)
rawColumn.createPartition(part)
val file = rawColumn.getPartition(part)
val outStream = file.getOutStream(WriteType.CACHE_THROUGH)
val writeType: WriteType = WriteType.valueOf(
SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE))
val outStream = file.getOutStream(writeType)
outStream.write(data.array(), 0, data.limit())
outStream.close()
}
Expand Down