Skip to content

Commit

Permalink
Align to use virtual threads (#540)
Browse files Browse the repository at this point in the history
Align to use virtual threads
-  In scope of this ticket I removed synchronized blocks as they
pin virtual thread to carrier thread.
- Expose user way to inject ThreadFactory for creating virtual
threads
  • Loading branch information
wyhasany authored Apr 3, 2024
1 parent 3afa432 commit a77eaa8
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 78 deletions.
8 changes: 7 additions & 1 deletion core-api/src/main/java/com/optimizely/ab/Optimizely.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.optimizely.ab.optimizelyconfig.OptimizelyConfigService;
import com.optimizely.ab.optimizelydecision.*;
import com.optimizely.ab.optimizelyjson.OptimizelyJSON;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,6 +102,8 @@ public class Optimizely implements AutoCloseable {
@Nullable
private final ODPManager odpManager;

private final ReentrantLock lock = new ReentrantLock();

private Optimizely(@Nonnull EventHandler eventHandler,
@Nonnull EventProcessor eventProcessor,
@Nonnull ErrorHandler errorHandler,
Expand Down Expand Up @@ -1451,8 +1454,11 @@ public List<String> fetchQualifiedSegments(String userId, @Nonnull List<ODPSegme
return null;
}
if (odpManager != null) {
synchronized (odpManager) {
lock.lock();
try {
return odpManager.getSegmentManager().getQualifiedSegments(userId, segmentOptions);
} finally {
lock.unlock();
}
}
logger.error("Audience segments fetch failed (ODP is not enabled).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.optimizely.ab.optimizelyconfig.OptimizelyConfig;
import com.optimizely.ab.optimizelyconfig.OptimizelyConfigManager;
import com.optimizely.ab.optimizelyconfig.OptimizelyConfigService;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,6 +62,7 @@ public abstract class PollingProjectConfigManager implements ProjectConfigManage
private volatile String sdkKey;
private volatile boolean started;
private ScheduledFuture<?> scheduledFuture;
private ReentrantLock lock = new ReentrantLock();

public PollingProjectConfigManager(long period, TimeUnit timeUnit) {
this(period, timeUnit, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new NotificationCenter());
Expand All @@ -70,6 +73,15 @@ public PollingProjectConfigManager(long period, TimeUnit timeUnit, NotificationC
}

public PollingProjectConfigManager(long period, TimeUnit timeUnit, long blockingTimeoutPeriod, TimeUnit blockingTimeoutUnit, NotificationCenter notificationCenter) {
this(period, timeUnit, blockingTimeoutPeriod, blockingTimeoutUnit, notificationCenter, null);
}

public PollingProjectConfigManager(long period,
TimeUnit timeUnit,
long blockingTimeoutPeriod,
TimeUnit blockingTimeoutUnit,
NotificationCenter notificationCenter,
@Nullable ThreadFactory customThreadFactory) {
this.period = period;
this.timeUnit = timeUnit;
this.blockingTimeoutPeriod = blockingTimeoutPeriod;
Expand All @@ -78,7 +90,7 @@ public PollingProjectConfigManager(long period, TimeUnit timeUnit, long blocking
if (TimeUnit.SECONDS.convert(period, this.timeUnit) < 30) {
logger.warn("Polling intervals below 30 seconds are not recommended.");
}
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
final ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : Executors.defaultThreadFactory();
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = threadFactory.newThread(runnable);
thread.setDaemon(true);
Expand Down Expand Up @@ -176,43 +188,58 @@ public String getSDKKey() {
return this.sdkKey;
}

public synchronized void start() {
if (started) {
logger.warn("Manager already started.");
return;
}
public void start() {
lock.lock();
try {
if (started) {
logger.warn("Manager already started.");
return;
}

if (scheduledExecutorService.isShutdown()) {
logger.warn("Not starting. Already in shutdown.");
return;
}
if (scheduledExecutorService.isShutdown()) {
logger.warn("Not starting. Already in shutdown.");
return;
}

Runnable runnable = new ProjectConfigFetcher();
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, period, timeUnit);
started = true;
Runnable runnable = new ProjectConfigFetcher();
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, period, timeUnit);
started = true;
} finally {
lock.unlock();
}
}

public synchronized void stop() {
if (!started) {
logger.warn("Not pausing. Manager has not been started.");
return;
}
public void stop() {
lock.lock();
try {
if (!started) {
logger.warn("Not pausing. Manager has not been started.");
return;
}

if (scheduledExecutorService.isShutdown()) {
logger.warn("Not pausing. Already in shutdown.");
return;
}
if (scheduledExecutorService.isShutdown()) {
logger.warn("Not pausing. Already in shutdown.");
return;
}

logger.info("pausing project watcher");
scheduledFuture.cancel(true);
started = false;
logger.info("pausing project watcher");
scheduledFuture.cancel(true);
started = false;
} finally {
lock.unlock();
}
}

@Override
public synchronized void close() {
stop();
scheduledExecutorService.shutdownNow();
started = false;
public void close() {
lock.lock();
try {
stop();
scheduledExecutorService.shutdownNow();
started = false;
} finally {
lock.unlock();
}
}

protected void setSdkKey(String sdkKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.optimizely.ab.event.internal.UserEvent;
import com.optimizely.ab.internal.PropertyUtils;
import com.optimizely.ab.notification.NotificationCenter;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,6 +68,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {

private Future<?> future;
private boolean isStarted = false;
private final ReentrantLock lock = new ReentrantLock();

private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) {
this.eventHandler = eventHandler;
Expand All @@ -78,15 +80,20 @@ private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler event
this.executor = executor;
}

public synchronized void start() {
if (isStarted) {
logger.info("Executor already started.");
return;
}
public void start() {
lock.lock();
try {
if (isStarted) {
logger.info("Executor already started.");
return;
}

isStarted = true;
EventConsumer runnable = new EventConsumer();
future = executor.submit(runnable);
isStarted = true;
EventConsumer runnable = new EventConsumer();
future = executor.submit(runnable);
} finally {
lock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.optimizely.ab.annotations.VisibleForTesting;

import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

public class DefaultLRUCache<T> implements Cache<T> {

private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();

private final Integer maxSize;

Expand Down Expand Up @@ -51,8 +52,11 @@ public void save(String key, T value) {
return;
}

synchronized (lock) {
lock.lock();
try {
linkedHashMap.put(key, new CacheEntity(value));
} finally {
lock.unlock();
}
}

Expand All @@ -62,7 +66,8 @@ public T lookup(String key) {
return null;
}

synchronized (lock) {
lock.lock();
try {
if (linkedHashMap.containsKey(key)) {
CacheEntity entity = linkedHashMap.get(key);
Long nowMs = new Date().getTime();
Expand All @@ -75,12 +80,17 @@ public T lookup(String key) {
linkedHashMap.remove(key);
}
return null;
} finally {
lock.unlock();
}
}

public void reset() {
synchronized (lock) {
lock.lock();
try {
linkedHashMap.clear();
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.optimizely.ab.notification;

import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,7 @@ public class NotificationManager<T> {

private final Map<Integer, NotificationHandler<T>> handlers = Collections.synchronizedMap(new LinkedHashMap<>());
private final AtomicInteger counter;
private final ReentrantLock lock = new ReentrantLock();

public NotificationManager() {
this(new AtomicInteger());
Expand All @@ -48,13 +50,16 @@ public NotificationManager(AtomicInteger counter) {
public int addHandler(NotificationHandler<T> newHandler) {

// Prevent registering a duplicate listener.
synchronized (handlers) {
lock.lock();
try {
for (NotificationHandler<T> handler : handlers.values()) {
if (handler.equals(newHandler)) {
logger.warn("Notification listener was already added");
return -1;
}
}
} finally {
lock.unlock();
}

int notificationId = counter.incrementAndGet();
Expand All @@ -64,14 +69,17 @@ public int addHandler(NotificationHandler<T> newHandler) {
}

public void send(final T message) {
synchronized (handlers) {
lock.lock();
try {
for (Map.Entry<Integer, NotificationHandler<T>> handler: handlers.entrySet()) {
try {
handler.getValue().handle(message);
} catch (Exception e) {
logger.warn("Catching exception sending notification for class: {}, handler: {}", message.getClass(), handler.getKey());
}
}
} finally {
lock.unlock();
}
}

Expand Down
Loading

0 comments on commit a77eaa8

Please sign in to comment.