Skip to content

Commit

Permalink
remove usage of synchronized (#825)
Browse files Browse the repository at this point in the history
  • Loading branch information
zt9788 authored Oct 10, 2023
1 parent 45b5d8f commit 071fb40
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@ public AbstractValueDecoder getDecoder(int identityNumber) {
return decoderMap.get(identityNumber);
}

public synchronized void register(int identityNumber, AbstractValueDecoder decoder) {
decoderMap.put(identityNumber, decoder);
inited = true;
public void register(int identityNumber, AbstractValueDecoder decoder) {
reentrantLock.lock();
try {
decoderMap.put(identityNumber, decoder);
inited = true;
}finally {
reentrantLock.unlock();
}
}

public synchronized void clear() {
decoderMap.clear();
inited = true;
public void clear() {
reentrantLock.lock();
try {
decoderMap.clear();
inited = true;
}finally {
reentrantLock.unlock();
}
}

public void initDefaultDecoder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created on 2016/10/27.
Expand All @@ -28,6 +29,7 @@ public class DefaultCacheMonitor implements CacheMonitor {

private static final Logger logger = LoggerFactory.getLogger(DefaultCacheMonitor.class);

private final ReentrantLock reentrantLock = new ReentrantLock();
protected CacheStat cacheStat;
private String cacheName;

Expand All @@ -43,44 +45,59 @@ public String getCacheName() {
return cacheName;
}

public synchronized void resetStat() {
cacheStat = new CacheStat();
cacheStat.setStatStartTime(System.currentTimeMillis());
cacheStat.setCacheName(cacheName);
public void resetStat() {
reentrantLock.lock();
try {
cacheStat = new CacheStat();
cacheStat.setStatStartTime(System.currentTimeMillis());
cacheStat.setCacheName(cacheName);
}finally {
reentrantLock.unlock();
}
}

public synchronized CacheStat getCacheStat() {
CacheStat stat = cacheStat.clone();
stat.setStatEndTime(System.currentTimeMillis());
return stat;
public CacheStat getCacheStat() {
reentrantLock.lock();
try {
CacheStat stat = cacheStat.clone();
stat.setStatEndTime(System.currentTimeMillis());
return stat;
}finally {
reentrantLock.unlock();
}
}

@Override
public synchronized void afterOperation(CacheEvent event) {
if (event instanceof CacheGetEvent) {
CacheGetEvent e = (CacheGetEvent) event;
afterGet(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CachePutEvent) {
CachePutEvent e = (CachePutEvent) event;
afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult());
} else if (event instanceof CacheRemoveEvent) {
CacheRemoveEvent e = (CacheRemoveEvent) event;
afterRemove(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CacheLoadEvent) {
CacheLoadEvent e = (CacheLoadEvent) event;
afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CacheGetAllEvent) {
CacheGetAllEvent e = (CacheGetAllEvent) event;
afterGetAll(e.getMillis(), e.getKeys(), e.getResult());
} else if (event instanceof CacheLoadAllEvent) {
CacheLoadAllEvent e = (CacheLoadAllEvent) event;
afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CachePutAllEvent) {
CachePutAllEvent e = (CachePutAllEvent) event;
afterPutAll(e.getMillis(), e.getMap(), e.getResult());
} else if (event instanceof CacheRemoveAllEvent) {
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult());
public void afterOperation(CacheEvent event) {
reentrantLock.lock();
try {
if (event instanceof CacheGetEvent) {
CacheGetEvent e = (CacheGetEvent) event;
afterGet(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CachePutEvent) {
CachePutEvent e = (CachePutEvent) event;
afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult());
} else if (event instanceof CacheRemoveEvent) {
CacheRemoveEvent e = (CacheRemoveEvent) event;
afterRemove(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CacheLoadEvent) {
CacheLoadEvent e = (CacheLoadEvent) event;
afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CacheGetAllEvent) {
CacheGetAllEvent e = (CacheGetAllEvent) event;
afterGetAll(e.getMillis(), e.getKeys(), e.getResult());
} else if (event instanceof CacheLoadAllEvent) {
CacheLoadAllEvent e = (CacheLoadAllEvent) event;
afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CachePutAllEvent) {
CachePutAllEvent e = (CachePutAllEvent) event;
afterPutAll(e.getMillis(), e.getMap(), e.getResult());
} else if (event instanceof CacheRemoveAllEvent) {
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult());
}
}finally {
reentrantLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author <a href="mailto:areyouok@gmail.com">huangli</a>
*/
Expand All @@ -35,6 +37,8 @@ public class LettuceBroadcastManager extends BroadcastManager {
private final LettuceConnectionManager lettuceConnectionManager;
private final BaseRedisAsyncCommands<byte[], byte[]> stringAsyncCommands;

private final ReentrantLock reentrantLock = new ReentrantLock();


public LettuceBroadcastManager(CacheManager cacheManager, RedisLettuceCacheConfig<Object, Object> config) {
super(cacheManager);
Expand Down Expand Up @@ -72,20 +76,25 @@ public CacheResult publish(CacheMessage cacheMessage) {
}

@Override
public synchronized void startSubscribe() {
if (subscribeThreadStart) {
throw new IllegalStateException("startSubscribe has invoked");
}
this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
@Override
public void message(byte[] channel, byte[] message) {
processNotification(message, config.getValueDecoder());
public void startSubscribe() {
reentrantLock.lock();
try {
if (subscribeThreadStart) {
throw new IllegalStateException("startSubscribe has invoked");
}
};
config.getPubSubConnection().addListener(this.pubSubAdapter);
RedisPubSubAsyncCommands<byte[], byte[]> asyncCommands = config.getPubSubConnection().async();
asyncCommands.subscribe(channel);
this.subscribeThreadStart = true;
this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
@Override
public void message(byte[] channel, byte[] message) {
processNotification(message, config.getValueDecoder());
}
};
config.getPubSubConnection().addListener(this.pubSubAdapter);
RedisPubSubAsyncCommands<byte[], byte[]> asyncCommands = config.getPubSubConnection().async();
asyncCommands.subscribe(channel);
this.subscribeThreadStart = true;
}finally {
reentrantLock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.data.redis.listener.Topic;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author <a href="mailto:areyouok@gmail.com">huangli</a>
Expand All @@ -32,6 +33,8 @@ public class SpringDataBroadcastManager extends BroadcastManager {
private final byte[] channel;
private volatile RedisMessageListenerContainer listenerContainer;

private final ReentrantLock reentrantLock = new ReentrantLock();

public SpringDataBroadcastManager(CacheManager cacheManager, RedisSpringDataCacheConfig config) {
super(cacheManager);
this.config = config;
Expand Down Expand Up @@ -65,37 +68,47 @@ public CacheResult publish(CacheMessage cacheMessage) {
}

@Override
public synchronized void startSubscribe() {
if (this.listenerContainer != null) {
throw new IllegalStateException("subscribe thread is started");
}
Topic topic = new ChannelTopic(config.getBroadcastChannel());
if (config.getListenerContainer() == null) {
RedisMessageListenerContainer c = new RedisMessageListenerContainer();
c.setConnectionFactory(config.getConnectionFactory());
c.afterPropertiesSet();
c.start();
this.listenerContainer = c;
logger.info("create RedisMessageListenerContainer instance");
} else {
this.listenerContainer = config.getListenerContainer();
public void startSubscribe() {
reentrantLock.lock();
try {
if (this.listenerContainer != null) {
throw new IllegalStateException("subscribe thread is started");
}
Topic topic = new ChannelTopic(config.getBroadcastChannel());
if (config.getListenerContainer() == null) {
RedisMessageListenerContainer c = new RedisMessageListenerContainer();
c.setConnectionFactory(config.getConnectionFactory());
c.afterPropertiesSet();
c.start();
this.listenerContainer = c;
logger.info("create RedisMessageListenerContainer instance");
} else {
this.listenerContainer = config.getListenerContainer();
}
this.listenerContainer.addMessageListener(listener, topic);
logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel());
}finally {
reentrantLock.unlock();
}
this.listenerContainer.addMessageListener(listener, topic);
logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel());
}

private void onMessage(Message message, byte[] pattern) {
processNotification(message.getBody(), config.getValueDecoder());
}

@Override
public synchronized void close() throws Exception {
if (this.listenerContainer != null) {
this.listenerContainer.removeMessageListener(listener);
if (this.config.getListenerContainer() == null) {
this.listenerContainer.destroy();
public void close() throws Exception {
reentrantLock.lock();
try {
if (this.listenerContainer != null) {
this.listenerContainer.removeMessageListener(listener);
if (this.config.getListenerContainer() == null) {
this.listenerContainer.destroy();
}
}
this.listenerContainer = null;
}finally {
reentrantLock.unlock();
}
this.listenerContainer = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import redis.clients.jedis.UnifiedJedis;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created on 2022-05-03
Expand All @@ -32,6 +33,8 @@ public class RedisBroadcastManager extends BroadcastManager {
private volatile boolean subscribe;
private boolean subscribeThreadStart;

private final ReentrantLock reentrantLock = new ReentrantLock();

public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
super(cacheManager);
this.channelStr = config.getBroadcastChannel();
Expand All @@ -48,16 +51,21 @@ public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object,
}

@Override
public synchronized void startSubscribe() {
if (subscribeThreadStart) {
throw new IllegalStateException("subscribe thread is started");
public void startSubscribe() {
reentrantLock.lock();
try {
if (subscribeThreadStart) {
throw new IllegalStateException("subscribe thread is started");
}
this.cacheMessagePubSub = new CacheMessagePubSub();
Thread subThread;
subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
subThread.setDaemon(true);
subThread.start();
this.subscribeThreadStart = true;
}finally {
reentrantLock.unlock();
}
this.cacheMessagePubSub = new CacheMessagePubSub();
Thread subThread;
subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
subThread.setDaemon(true);
subThread.start();
this.subscribeThreadStart = true;
}

private void runSubThread() {
Expand Down Expand Up @@ -116,17 +124,22 @@ public CacheResult publish(CacheMessage message) {


@Override
public synchronized void close() {
if (this.closed) {
return;
}
this.closed = true;
if (subscribe) {
try {
this.cacheMessagePubSub.unsubscribe(channel);
} catch (Exception e) {
logger.warn("unsubscribe {} fail", channelStr, e);
public void close() {
reentrantLock.lock();
try {
if (this.closed) {
return;
}
this.closed = true;
if (subscribe) {
try {
this.cacheMessagePubSub.unsubscribe(channel);
} catch (Exception e) {
logger.warn("unsubscribe {} fail", channelStr, e);
}
}
}finally {
reentrantLock.unlock();
}
}

Expand Down
Loading

0 comments on commit 071fb40

Please sign in to comment.