Skip to content

Commit

Permalink
Merge pull request #167 from fe2s/split-implicits
Browse files Browse the repository at this point in the history
separate core(rdd) and streaming implicits to separate packages, so t…
  • Loading branch information
gkorland authored May 12, 2019
2 parents 7a1a658 + 1ca81b2 commit 8720b69
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 55 deletions.
2 changes: 1 addition & 1 deletion doc/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ df.write
### Create Stream

```scala
import com.redislabs.provider.redis._
import com.redislabs.provider.redis.streaming._

val ssc = new StreamingContext(sc, Seconds(1))
val redisStream = ssc.createRedisStream(Array("foo", "bar"),
Expand Down
6 changes: 3 additions & 3 deletions doc/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Spark-Redis supports streaming data from Stream and List data structures:
To stream data from [Redis Stream](https://redis.io/topics/streams-intro) use `createRedisXStream` method (added in Spark-Redis 2.3.1):

```scala
import com.redislabs.provider.redis._
import com.redislabs.provider.redis.streaming._
import com.redislabs.provider.redis.streaming.{ConsumerConfig, StreamItem}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
Expand Down Expand Up @@ -115,7 +115,7 @@ Use the following to get a `(listName, value)` stream from `foo` and `bar` list
```scala
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.redislabs.provider.redis._
import com.redislabs.provider.redis.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
redisStream.print()
Expand All @@ -128,7 +128,7 @@ Use the following to get a `value` stream from `foo` and `bar` list
```scala
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.redislabs.provider.redis._
import com.redislabs.provider.redis.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
val redisStream = ssc.createRedisStreamWithoutListname(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2)
redisStream.print()
Expand Down
53 changes: 3 additions & 50 deletions src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.redislabs.provider.redis

import com.redislabs.provider.redis.streaming.{ConsumerConfig, RedisInputDStream, RedisStreamReceiver, StreamItem}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import com.redislabs.provider.redis.rdd._
import com.redislabs.provider.redis.util.PipelineUtils._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

/**
* RedisContext extends sparkContext's functionality with redis functions
Expand Down Expand Up @@ -443,52 +439,9 @@ object RedisContext extends Serializable {
}
}

/**
* RedisStreamingContext extends StreamingContext's functionality with Redis
*
* @param ssc a spark StreamingContext
*/
class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable {
/**
* @param keys an Array[String] which consists all the Lists we want to listen to
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
* @return a stream of (listname, value)
*/
def createRedisStream(keys: Array[String],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)):
RedisInputDStream[(String, String)] = {
new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)])
}

/**
* @param keys an Array[String] which consists all the Lists we want to listen to
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
* @return a stream of (value)
*/
def createRedisStreamWithoutListname(keys: Array[String],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
(implicit
redisConf: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)):
RedisInputDStream[String] = {
new RedisInputDStream(ssc, keys, storageLevel, redisConf, classOf[String])
}

def createRedisXStream(consumersConfig: Seq[ConsumerConfig],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)):
InputDStream[StreamItem] = {
val readWriteConfig = ReadWriteConfig.fromSparkConf(ssc.sparkContext.getConf)
val receiver = new RedisStreamReceiver(consumersConfig, redisConfig, readWriteConfig, storageLevel)
ssc.receiverStream(receiver)
}
}

trait RedisFunctions {

implicit def toRedisContext(sc: SparkContext): RedisContext = new RedisContext(sc)

implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.redislabs.provider.redis

package object streaming extends RedisStreamingFunctions {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.redislabs.provider.redis.streaming

import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream

/**
* RedisStreamingContext extends StreamingContext's functionality with Redis
*
* @param ssc a spark StreamingContext
*/
class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable {
/**
* @param keys an Array[String] which consists all the Lists we want to listen to
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
* @return a stream of (listname, value)
*/
def createRedisStream(keys: Array[String],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)):
RedisInputDStream[(String, String)] = {
new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)])
}

/**
* @param keys an Array[String] which consists all the Lists we want to listen to
* @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2
* @return a stream of (value)
*/
def createRedisStreamWithoutListname(keys: Array[String],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
(implicit
redisConf: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)):
RedisInputDStream[String] = {
new RedisInputDStream(ssc, keys, storageLevel, redisConf, classOf[String])
}

def createRedisXStream(consumersConfig: Seq[ConsumerConfig],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2)
(implicit
redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)):
InputDStream[StreamItem] = {
val readWriteConfig = ReadWriteConfig.fromSparkConf(ssc.sparkContext.getConf)
val receiver = new RedisStreamReceiver(consumersConfig, redisConfig, readWriteConfig, storageLevel)
ssc.receiverStream(receiver)
}
}

trait RedisStreamingFunctions {

implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc)

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package com.redislabs.provider.redis.stream
import com.redislabs.provider.redis.streaming.{ConsumerConfig, Earliest}
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
import com.redislabs.provider.redis.util.TestUtils
import com.redislabs.provider.redis.{SparkStreamingRedisSuite, _}
import com.redislabs.provider.redis.SparkStreamingRedisSuite
import com.redislabs.provider.redis.streaming._
import org.apache.spark.storage.StorageLevel
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
Expand Down

0 comments on commit 8720b69

Please sign in to comment.