GearPump is a lightweight real-time big data streaming engine. It is inspired by recent advances in the Akka framework and a desire to improve on existing streaming frameworks.
We model streaming within the Akka actor hierarchy.
Per initial benchmarks we are able to process 11 million messages/second (100 bytes per message) with a 17ms latency on a 4-node cluster.
- Clone the GearPump repository
git clone https://github.com/intel-hadoop/gearpump.git
cd gearpump
- Start master
We support Master HA and allow master to start on multiple nodes. Modify core/src/main/resources/reference.conf
and set gearpump.cluster.masters
to the list of nodes you plan to start master on (e.g. node1).
gearpump {
...
cluster {
masters = ["node1:3000"]
}
}
Build a package, distribute to all nodes, and extract it.
sbt clean pack-archive
Start master on the nodes you set in the conf previously.
## on node1
cd gearpump-$VERSION
bin/master -ip node1 -port 3000
- Start worker
Start multiple workers on one or more nodes. Worker will read the master location information gearpump.cluster.masters
from reference.conf
.
bin/worker
- Distribute application jar and run
Distribute wordcount jar examples/wordcount/target/gearpump-examples-wordcount-$VERSION.jar
to one of cluster nodes and run with
## Run WordCount example
bin/gear app -jar gearpump-examples-wordcount-$VERSION.jar org.apache.gearpump.streaming.examples.wordcount.WordCount -master node1:3000
Check the wiki pages for more on build and running examples in local modes.
This is what a GearPump WordCount looks like.
class WordCount extends Starter with ArgumentsParser {
private val LOG: Logger = LoggerFactory.getLogger(classOf[WordCount])
override val options: Array[(String, CLIOption[Any])] = Array(
"master" -> CLIOption[String]("<host1:port1,host2:port2,host3:port3>", required = true),
"split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(4)),
"sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(4)),
"runseconds"-> CLIOption[Int]("<how long to run this example>", required = false, defaultValue = Some(60))
)
override def application(config: ParseResult) : AppDescription = {
val splitNum = config.getInt("split")
val sumNum = config.getInt("sum")
val appConfig = Configs(Configs.SYSTEM_DEFAULT_CONFIG)
val partitioner = new HashPartitioner()
val split = TaskDescription(classOf[Split].getCanonicalName, splitNum)
val sum = TaskDescription(classOf[Sum].getCanonicalName, sumNum)
val app = AppDescription("wordCount", classOf[AppMaster].getCanonicalName, appConfig, Graph(split ~ partitioner ~> sum))
app
}
}
For detailed description on writing a GearPump application, please check Write GearPump Applications on the wiki.
For more documentation and implementation details, please visit GearPump Wiki.
We'll have QnA and discussions at GearPump User List.
Issues should be reported to GearPump GitHub issue tracker and contributions are welcomed via pull requests
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
The netty transport code work is based on Apache Storm. Thanks Apache Storm contributors.