Full-Featured Redis Solution with Redisson: Caching, Message Queue, Pub/Sub, and Distributed Structures
This project demonstrates a complete implementation of Redissonβs features, including advanced caching mechanisms, distributed collections, message Queue, and pub/sub messaging. Ideal for applications requiring robust Redis functionalities and scalable solutions.
- WebFlux
- Redisson "Redis"
- Configuration
- Features
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
βοΈ Initializes RMapReactive with the name "map" and a StringCodec for serialization. then performs two asynchronous put operations to add entries "map1" and "map2" with respective values "First Map" and "Second Map". Finally, it uses StepVerifier to validate that both put operations complete successfully.
@Test
public void testMap() {
RMapReactive<String, String> map = this.client.getMap("map", StringCodec.INSTANCE);
Mono<String> map1 = map.put("map1", "First Map");
Mono<String> map2 = map.put("map2", "Secound Map");
StepVerifier.create(map1.concatWith(map2)).verifyComplete();
}
βοΈ Initializes RLocalCachedMap named "cacheMap" is created with a JSON codec TypedJsonJacksonCodec for serialization of Integer keys and String values. cachedMapStrategies defines the caching strategies used. testLocalCacheMap1 will adds two entries key 1 with "cachemap1" and key 2 with "cacheMap2" to the local cached map. testLocalCacheMap2 will update a single entry key1 with "cacheMap1" to the local cached map.
LocalCachedMapOptions<Integer, String> cachedMapStrategies = LocalCachedMapOptions.<Integer, String>defaults()
.syncStrategy(LocalCachedMapOptions.SyncStrategy.UPDATE) // you can choose None or Invalidate
.reconnectionStrategy(LocalCachedMapOptions.ReconnectionStrategy.NONE); // you can choose Clean
private RLocalCachedMap<Integer, String> cachedMap = redissonClient.getLocalCachedMap("cacheMap", new TypedJsonJacksonCodec(Integer.class, String.class), cachedMapStrategies);
@Test
public void testLocalCacheMap1() {
this.cachedMap.put(1, "cachemap1");
this.cachedMap.put(2, "cacheMap2");
Flux.interval(Duration.ofSeconds(1))
.doOnNext(item -> System.out.println(item + "-->" + cachedMap.get(item))).subscribe();
}
@Test
public void testLocalCacheMap2() {
this.cachedMap.put(1, "cacheMap1");
Flux.interval(Duration.ofSeconds(1))
.doOnNext(item -> System.out.println(item + "-->" + cachedMap.get(item))).subscribe();
}
βοΈ Initializes RListReactive named "number" is created with a LongCodec for serialization of long values. list of long numbers from 1 to 10 is created and boxed them as list collect so to send it the same time to be same order. The list.addAll(numbersList).then() operation adds all these numbers to the Redis list. The step verifier is used to ensure this operation completes successfully and verifying size.
@Test
public void testList() {
RListReactive<Long> list = this.client.getList("number", LongCodec.INSTANCE);
List<Long> numbersList = LongStream.rangeClosed(1, 10)
.boxed().collect(Collectors.toList());
/* Another Way
Mono<Void> numbersList = Flux.range(1, 10)
.flatMap(number -> list.add(number)).then();
*/
StepVerifier.create(list.addAll(numbersList).then()).verifyComplete();
StepVerifier.create(list.size())
.expectNext(10)
.verifyComplete();
}
βοΈ Initializes RQueueReactive named "number" is created with a LongCodec for serialization of long values. The method polls items from the queue using queue.poll(), repeating the operation 3 times. Step verifier ensures that the polling operation completes successfully. Additionally, it verifies that the size of the queue is 6 after the polling operations, confirming that the expected number of elements is present.
@Test
public void testQueue() {
RQueueReactive<Long> queue = this.client.getQueue("number", LongCodec.INSTANCE);
Mono<Void> poll = queue.poll().repeat(3).doOnNext(System.out::println).then();
StepVerifier.create(poll).verifyComplete();
StepVerifier.create(queue.size())
.expectNext(6)
.verifyComplete();
}
βοΈ Initializes RDequeReactive named "number" is created with a LongCodec for serialization of long values. The method performs a pollLast() operation to remove items from the end of the stack, repeating this operation 3 times. Step verifier ensures that the polling operation completes successfully. It also verifies that the size of the stack is 2 after the polling operations, confirming the remaining number of elements in the stack.
@Test
public void testStack() {
RDequeReactive<Long> stack = this.client.getDeque("number", LongCodec.INSTANCE);
Mono<Void> poll = stack.pollLast().repeat(3).doOnNext(System.out::println).then();
StepVerifier.create(poll).verifyComplete();
StepVerifier.create(stack.size())
.expectNext(2)
.verifyComplete();
}
βοΈ Initializes RScoredSortedSetReactive named "name:score" is created with a StringCodec for serialization of string values and scores. The method adds three entries to the sorted set with associated scores: "Sam" with a score of 1.0, "Mike" with a score of 2.5, and "Jake" with a score of 0.5. Step verifier is used to ensure the completion of the score addition. Afterward, the sorted set entries are retrieved in a specified range from rank 0 to 1.
@Test
public void testSortedSet() {
private RScoredSortedSetReactive<String> sortedSet = this.client.getScoredSortedSet("name:score", StringCodec.INSTANCE);
Mono<Void> sort = this.sortedSet.addScore("Sam", 1.0).then
(this.sortedSet.addScore("Mike", 2.5))
.then(this.sortedSet.addScore("jake", 0.5)).then();
StepVerifier.create(sort).verifyComplete();
this.sortedSet.entryRange(0, 1)
.flatMapIterable(items -> items)
.map(item -> item.getScore() + ":" + item.getValue())
.doOnNext(System.out::println)
.subscribe();
}
Result π€© As you can see Jake is priority than Sam, if you restart will be the score of Jake is priority than Sam.
βοΈ Producer adds messages to the message queue. It generates a sequence of numbers from 1 to 20, with a delay of 500 milliseconds between each numbe. Consumer1 and consumer2 will listents to a message queue to consume messages then print it.
@Test
public void testMessageQueueConsumer1() throws InterruptedException {
this.messageQueue.takeElements()
.doOnNext(elem -> System.out.println("Consumer 1: " + elem))
.doOnError(System.out::println)
.subscribe();
Thread.sleep(600_000);
}
@Test
public void testMessageQueueConsumer2() throws InterruptedException {
this.messageQueue.takeElements()
.doOnNext(elem -> System.out.println("Consumer 2: " + elem))
.doOnError(System.out::println)
.subscribe();
Thread.sleep(600_000);
}
@Test
public void testMessageQueueProducer() {
Mono<Void> numbers = Flux.range(1, 20)
.delayElements(Duration.ofMillis(500))
.doOnNext(number -> System.out.println("going to add" + number))
.flatMap(number -> this.messageQueue.add(Long.valueOf(number)))
.then();
StepVerifier.create(numbers).verifyComplete();
}
βοΈ PubSub1 and PubSub2 subscribes to a redis topic to receive messages of type String. It prints each received message to the console.
@Test
public void testPubSub1() throws InterruptedException {
this.topic.getMessages(String.class)
.doOnNext(System.out::println)
.doOnError(System.out::println)
.subscribe();
Thread.sleep(600_000);
}
@Test
public void testPubSub2() throws InterruptedException {
this.topic.getMessages(String.class)
.doOnNext(System.out::println)
.doOnError(System.out::println)
.subscribe();
Thread.sleep(600_000);
}