diff --git a/doc/getting-started.md b/doc/getting-started.md index a551cd47..f0e0ea8d 100644 --- a/doc/getting-started.md +++ b/doc/getting-started.md @@ -91,7 +91,7 @@ df.write ### Create Stream ```scala -import com.redislabs.provider.redis._ +import com.redislabs.provider.redis.streaming._ val ssc = new StreamingContext(sc, Seconds(1)) val redisStream = ssc.createRedisStream(Array("foo", "bar"), diff --git a/doc/streaming.md b/doc/streaming.md index d5b5ea69..09c0a0f3 100644 --- a/doc/streaming.md +++ b/doc/streaming.md @@ -10,7 +10,7 @@ Spark-Redis supports streaming data from Stream and List data structures: To stream data from [Redis Stream](https://redis.io/topics/streams-intro) use `createRedisXStream` method (added in Spark-Redis 2.3.1): ```scala -import com.redislabs.provider.redis._ +import com.redislabs.provider.redis.streaming._ import com.redislabs.provider.redis.streaming.{ConsumerConfig, StreamItem} import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.InputDStream @@ -115,7 +115,7 @@ Use the following to get a `(listName, value)` stream from `foo` and `bar` list ```scala import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel -import com.redislabs.provider.redis._ +import com.redislabs.provider.redis.streaming._ val ssc = new StreamingContext(sc, Seconds(1)) val redisStream = ssc.createRedisStream(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2) redisStream.print() @@ -128,7 +128,7 @@ Use the following to get a `value` stream from `foo` and `bar` list ```scala import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel -import com.redislabs.provider.redis._ +import com.redislabs.provider.redis.streaming._ val ssc = new StreamingContext(sc, Seconds(1)) val redisStream = ssc.createRedisStreamWithoutListname(Array("foo", "bar"), storageLevel = StorageLevel.MEMORY_AND_DISK_2) redisStream.print() diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index 5e263549..563e87c8 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -1,13 +1,9 @@ package com.redislabs.provider.redis -import com.redislabs.provider.redis.streaming.{ConsumerConfig, RedisInputDStream, RedisStreamReceiver, StreamItem} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD import com.redislabs.provider.redis.rdd._ import com.redislabs.provider.redis.util.PipelineUtils._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD /** * RedisContext extends sparkContext's functionality with redis functions @@ -443,52 +439,9 @@ object RedisContext extends Serializable { } } -/** - * RedisStreamingContext extends StreamingContext's functionality with Redis - * - * @param ssc a spark StreamingContext - */ -class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable { - /** - * @param keys an Array[String] which consists all the Lists we want to listen to - * @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2 - * @return a stream of (listname, value) - */ - def createRedisStream(keys: Array[String], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) - (implicit - redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): - RedisInputDStream[(String, String)] = { - new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)]) - } - - /** - * @param keys an Array[String] which consists all the Lists we want to listen to - * @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2 - * @return a stream of (value) - */ - def createRedisStreamWithoutListname(keys: Array[String], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) - (implicit - redisConf: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): - RedisInputDStream[String] = { - new RedisInputDStream(ssc, keys, storageLevel, redisConf, classOf[String]) - } - - def createRedisXStream(consumersConfig: Seq[ConsumerConfig], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) - (implicit - redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): - InputDStream[StreamItem] = { - val readWriteConfig = ReadWriteConfig.fromSparkConf(ssc.sparkContext.getConf) - val receiver = new RedisStreamReceiver(consumersConfig, redisConfig, readWriteConfig, storageLevel) - ssc.receiverStream(receiver) - } -} - trait RedisFunctions { + implicit def toRedisContext(sc: SparkContext): RedisContext = new RedisContext(sc) - implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc) } diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/package.scala b/src/main/scala/com/redislabs/provider/redis/streaming/package.scala new file mode 100644 index 00000000..d3fbbdf6 --- /dev/null +++ b/src/main/scala/com/redislabs/provider/redis/streaming/package.scala @@ -0,0 +1,5 @@ +package com.redislabs.provider.redis + +package object streaming extends RedisStreamingFunctions { + +} diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/redisStreamingFunctions.scala b/src/main/scala/com/redislabs/provider/redis/streaming/redisStreamingFunctions.scala new file mode 100644 index 00000000..a2bd81e4 --- /dev/null +++ b/src/main/scala/com/redislabs/provider/redis/streaming/redisStreamingFunctions.scala @@ -0,0 +1,56 @@ +package com.redislabs.provider.redis.streaming + +import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.InputDStream + +/** + * RedisStreamingContext extends StreamingContext's functionality with Redis + * + * @param ssc a spark StreamingContext + */ +class RedisStreamingContext(@transient val ssc: StreamingContext) extends Serializable { + /** + * @param keys an Array[String] which consists all the Lists we want to listen to + * @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2 + * @return a stream of (listname, value) + */ + def createRedisStream(keys: Array[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): + RedisInputDStream[(String, String)] = { + new RedisInputDStream(ssc, keys, storageLevel, redisConfig, classOf[(String, String)]) + } + + /** + * @param keys an Array[String] which consists all the Lists we want to listen to + * @param storageLevel the receiver' storage tragedy of received data, default as MEMORY_AND_DISK_2 + * @return a stream of (value) + */ + def createRedisStreamWithoutListname(keys: Array[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) + (implicit + redisConf: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): + RedisInputDStream[String] = { + new RedisInputDStream(ssc, keys, storageLevel, redisConf, classOf[String]) + } + + def createRedisXStream(consumersConfig: Seq[ConsumerConfig], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(ssc.sparkContext.getConf)): + InputDStream[StreamItem] = { + val readWriteConfig = ReadWriteConfig.fromSparkConf(ssc.sparkContext.getConf) + val receiver = new RedisStreamReceiver(consumersConfig, redisConfig, readWriteConfig, storageLevel) + ssc.receiverStream(receiver) + } +} + +trait RedisStreamingFunctions { + + implicit def toRedisStreamingContext(ssc: StreamingContext): RedisStreamingContext = new RedisStreamingContext(ssc) + +} + diff --git a/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala b/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala index 2368341d..1cbef3b5 100644 --- a/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/stream/RedisXStreamSuite.scala @@ -3,7 +3,8 @@ package com.redislabs.provider.redis.stream import com.redislabs.provider.redis.streaming.{ConsumerConfig, Earliest} import com.redislabs.provider.redis.util.ConnectionUtils.withConnection import com.redislabs.provider.redis.util.TestUtils -import com.redislabs.provider.redis.{SparkStreamingRedisSuite, _} +import com.redislabs.provider.redis.SparkStreamingRedisSuite +import com.redislabs.provider.redis.streaming._ import org.apache.spark.storage.StorageLevel import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._