Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compression codec interface and Snappy support to reduce buffer size to improve scalability #685

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions core/src/main/scala/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()

var compressionCodec: CompressionCodec = null

/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
Expand Down Expand Up @@ -902,8 +904,15 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
if (compressionCodec == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Gavin,

As we discussed in person a while ago, you can move the initialization of the compressionCodec to the place it was declared using "lazy val" in Scala.

compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec",
"spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader)
.newInstance().asInstanceOf[CompressionCodec]
}

if (shouldCompress(blockId)) {
(new LZFOutputStream(s)).setFinishBlockOnFlush(true)
//(new LZFOutputStream(s)).setFinishBlockOnFlush(true)
compressionCodec.compressionOutputStream(s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comment and make this a single line

 if (shouldCompress(blockId)) compressionCodec.compressionOutputStream(s) else s

} else {
s
}
Expand All @@ -913,7 +922,14 @@ private[spark] class BlockManager(
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
if (shouldCompress(blockId)) new LZFInputStream(s) else s
if (compressionCodec == null) {
compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec",
"spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader)
.newInstance().asInstanceOf[CompressionCodec]
}

if (shouldCompress(blockId)) /*new LZFInputStream(s) */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comment, and make this a single line

if (shouldCompress(blockId)) compressionCodec.compressionInputStream(s) else s

compressionCodec.compressionInputStream(s) else s
}

def dataSerialize(
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/spark/storage/CompressionCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package spark.storage

import java.io.{InputStream, OutputStream}


/**
* CompressionCodec allows the customization of the compression codec
*/
trait CompressionCodec {
def compressionOutputStream(s: OutputStream): OutputStream

def compressionInputStream(s: InputStream): InputStream
}
1 change: 0 additions & 1 deletion core/src/main/scala/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
bs.close()
channel = null
bs = null
objOut = null
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/spark/storage/LZFCompressionCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package spark.storage

import java.io.{InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}

/**
* LZF implementation of [[spark.storage.CompressionCodec]]
*/
class LZFCompressionCodec extends CompressionCodec {
def compressionOutputStream(s: OutputStream): OutputStream =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add "override" qualifier for compressionOutputStream and compressionInputStream.

(new LZFOutputStream(s)).setFinishBlockOnFlush(true)

def compressionInputStream(s: InputStream): InputStream =
new LZFInputStream(s)
}
18 changes: 18 additions & 0 deletions core/src/main/scala/spark/storage/SnappyCompressionCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package spark.storage

import java.io.{InputStream, OutputStream}

import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

/**
* Snappy implementation of [[spark.storage.CompressionCodec]]
* block size can be configured by spark.snappy.block.size
*/
class SnappyCompressionCodec extends CompressionCodec {
def compressionOutputStream(s: OutputStream): OutputStream =
new SnappyOutputStream(s,
System.getProperty("spark.snappy.block.size", "32768").toInt)

def compressionInputStream(s: InputStream): InputStream =
new SnappyInputStream(s)
}
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ object SparkBuild extends Build {
"cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test"
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.xerial.snappy" % "snappy-java" % "1.0.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, also update maven build.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I will add it.

) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) {
Expand Down