diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 9b39d3aadf..2d4a3502c6 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -141,6 +141,8 @@ private[spark] class BlockManager( val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) initialize() + var compressionCodec: CompressionCodec = null + /** * Construct a BlockManager with a memory limit set based on system properties. */ @@ -902,8 +904,15 @@ private[spark] class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { + if (compressionCodec == null) { + compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", + "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) + .newInstance().asInstanceOf[CompressionCodec] + } + if (shouldCompress(blockId)) { - (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + //(new LZFOutputStream(s)).setFinishBlockOnFlush(true) + compressionCodec.compressionOutputStream(s) } else { s } @@ -913,7 +922,14 @@ private[spark] class BlockManager( * Wrap an input stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: InputStream): InputStream = { - if (shouldCompress(blockId)) new LZFInputStream(s) else s + if (compressionCodec == null) { + compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", + "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) + .newInstance().asInstanceOf[CompressionCodec] + } + + if (shouldCompress(blockId)) /*new LZFInputStream(s) */ + compressionCodec.compressionInputStream(s) else s } def dataSerialize( diff --git a/core/src/main/scala/spark/storage/CompressionCodec.scala b/core/src/main/scala/spark/storage/CompressionCodec.scala new file mode 100644 index 0000000000..cd80de33f6 --- /dev/null +++ b/core/src/main/scala/spark/storage/CompressionCodec.scala @@ -0,0 +1,13 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + + +/** + * CompressionCodec allows the customization of the compression codec + */ +trait CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream + + def compressionInputStream(s: InputStream): InputStream +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index da859eebcb..221e285192 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -49,7 +49,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { objOut.close() - bs.close() channel = null bs = null objOut = null diff --git a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala new file mode 100644 index 0000000000..3328b949ef --- /dev/null +++ b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala @@ -0,0 +1,16 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} + +/** + * LZF implementation of [[spark.storage.CompressionCodec]] + */ +class LZFCompressionCodec extends CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream = + (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + + def compressionInputStream(s: InputStream): InputStream = + new LZFInputStream(s) +} diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala new file mode 100644 index 0000000000..62b00ef3f6 --- /dev/null +++ b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala @@ -0,0 +1,18 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + +/** + * Snappy implementation of [[spark.storage.CompressionCodec]] + * block size can be configured by spark.snappy.block.size + */ +class SnappyCompressionCodec extends CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream = + new SnappyOutputStream(s, + System.getProperty("spark.snappy.block.size", "32768").toInt) + + def compressionInputStream(s: InputStream): InputStream = + new SnappyInputStream(s) +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 07572201de..f824826af3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -162,7 +162,8 @@ object SparkBuild extends Build { "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), "org.apache.mesos" % "mesos" % "0.9.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", - "org.apache.derby" % "derby" % "10.4.2.0" % "test" + "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.xerial.snappy" % "snappy-java" % "1.0.5" ) ++ ( if (HADOOP_MAJOR_VERSION == "2") { if (HADOOP_YARN) {