From a77eaa857197967774d3f89239f30ab2675b267e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rowicki?= Date: Wed, 3 Apr 2024 23:07:42 +0200 Subject: [PATCH] Align to use virtual threads (#540) Align to use virtual threads - In scope of this ticket I removed synchronized blocks as they pin virtual thread to carrier thread. - Expose user way to inject ThreadFactory for creating virtual threads --- .../java/com/optimizely/ab/Optimizely.java | 8 +- .../config/PollingProjectConfigManager.java | 85 ++++++++++++------ .../ab/event/BatchEventProcessor.java | 23 +++-- .../ab/internal/DefaultLRUCache.java | 18 +++- .../ab/notification/NotificationManager.java | 12 ++- .../java/com/optimizely/ab/odp/ODPConfig.java | 90 ++++++++++++++----- .../optimizely/ab/odp/ODPEventManager.java | 10 ++- .../com/optimizely/ab/NamedThreadFactory.java | 12 ++- .../ab/config/HttpProjectConfigManager.java | 33 +++++-- .../ab/event/AsyncEventHandler.java | 21 ++++- 10 files changed, 234 insertions(+), 78 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/Optimizely.java b/core-api/src/main/java/com/optimizely/ab/Optimizely.java index 3524cb24a..e64882ed6 100644 --- a/core-api/src/main/java/com/optimizely/ab/Optimizely.java +++ b/core-api/src/main/java/com/optimizely/ab/Optimizely.java @@ -35,6 +35,7 @@ import com.optimizely.ab.optimizelyconfig.OptimizelyConfigService; import com.optimizely.ab.optimizelydecision.*; import com.optimizely.ab.optimizelyjson.OptimizelyJSON; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +102,8 @@ public class Optimizely implements AutoCloseable { @Nullable private final ODPManager odpManager; + private final ReentrantLock lock = new ReentrantLock(); + private Optimizely(@Nonnull EventHandler eventHandler, @Nonnull EventProcessor eventProcessor, @Nonnull ErrorHandler errorHandler, @@ -1451,8 +1454,11 @@ public List fetchQualifiedSegments(String userId, @Nonnull List scheduledFuture; + private ReentrantLock lock = new ReentrantLock(); public PollingProjectConfigManager(long period, TimeUnit timeUnit) { this(period, timeUnit, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new NotificationCenter()); @@ -70,6 +73,15 @@ public PollingProjectConfigManager(long period, TimeUnit timeUnit, NotificationC } public PollingProjectConfigManager(long period, TimeUnit timeUnit, long blockingTimeoutPeriod, TimeUnit blockingTimeoutUnit, NotificationCenter notificationCenter) { + this(period, timeUnit, blockingTimeoutPeriod, blockingTimeoutUnit, notificationCenter, null); + } + + public PollingProjectConfigManager(long period, + TimeUnit timeUnit, + long blockingTimeoutPeriod, + TimeUnit blockingTimeoutUnit, + NotificationCenter notificationCenter, + @Nullable ThreadFactory customThreadFactory) { this.period = period; this.timeUnit = timeUnit; this.blockingTimeoutPeriod = blockingTimeoutPeriod; @@ -78,7 +90,7 @@ public PollingProjectConfigManager(long period, TimeUnit timeUnit, long blocking if (TimeUnit.SECONDS.convert(period, this.timeUnit) < 30) { logger.warn("Polling intervals below 30 seconds are not recommended."); } - final ThreadFactory threadFactory = Executors.defaultThreadFactory(); + final ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : Executors.defaultThreadFactory(); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = threadFactory.newThread(runnable); thread.setDaemon(true); @@ -176,43 +188,58 @@ public String getSDKKey() { return this.sdkKey; } - public synchronized void start() { - if (started) { - logger.warn("Manager already started."); - return; - } + public void start() { + lock.lock(); + try { + if (started) { + logger.warn("Manager already started."); + return; + } - if (scheduledExecutorService.isShutdown()) { - logger.warn("Not starting. Already in shutdown."); - return; - } + if (scheduledExecutorService.isShutdown()) { + logger.warn("Not starting. Already in shutdown."); + return; + } - Runnable runnable = new ProjectConfigFetcher(); - scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, period, timeUnit); - started = true; + Runnable runnable = new ProjectConfigFetcher(); + scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, period, timeUnit); + started = true; + } finally { + lock.unlock(); + } } - public synchronized void stop() { - if (!started) { - logger.warn("Not pausing. Manager has not been started."); - return; - } + public void stop() { + lock.lock(); + try { + if (!started) { + logger.warn("Not pausing. Manager has not been started."); + return; + } - if (scheduledExecutorService.isShutdown()) { - logger.warn("Not pausing. Already in shutdown."); - return; - } + if (scheduledExecutorService.isShutdown()) { + logger.warn("Not pausing. Already in shutdown."); + return; + } - logger.info("pausing project watcher"); - scheduledFuture.cancel(true); - started = false; + logger.info("pausing project watcher"); + scheduledFuture.cancel(true); + started = false; + } finally { + lock.unlock(); + } } @Override - public synchronized void close() { - stop(); - scheduledExecutorService.shutdownNow(); - started = false; + public void close() { + lock.lock(); + try { + stop(); + scheduledExecutorService.shutdownNow(); + started = false; + } finally { + lock.unlock(); + } } protected void setSdkKey(String sdkKey) { diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index daf81d71a..740cfb8c3 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -21,6 +21,7 @@ import com.optimizely.ab.event.internal.UserEvent; import com.optimizely.ab.internal.PropertyUtils; import com.optimizely.ab.notification.NotificationCenter; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { private Future future; private boolean isStarted = false; + private final ReentrantLock lock = new ReentrantLock(); private BatchEventProcessor(BlockingQueue eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) { this.eventHandler = eventHandler; @@ -78,15 +80,20 @@ private BatchEventProcessor(BlockingQueue eventQueue, EventHandler event this.executor = executor; } - public synchronized void start() { - if (isStarted) { - logger.info("Executor already started."); - return; - } + public void start() { + lock.lock(); + try { + if (isStarted) { + logger.info("Executor already started."); + return; + } - isStarted = true; - EventConsumer runnable = new EventConsumer(); - future = executor.submit(runnable); + isStarted = true; + EventConsumer runnable = new EventConsumer(); + future = executor.submit(runnable); + } finally { + lock.unlock(); + } } @Override diff --git a/core-api/src/main/java/com/optimizely/ab/internal/DefaultLRUCache.java b/core-api/src/main/java/com/optimizely/ab/internal/DefaultLRUCache.java index a531c5c83..b946a65ea 100644 --- a/core-api/src/main/java/com/optimizely/ab/internal/DefaultLRUCache.java +++ b/core-api/src/main/java/com/optimizely/ab/internal/DefaultLRUCache.java @@ -19,10 +19,11 @@ import com.optimizely.ab.annotations.VisibleForTesting; import java.util.*; +import java.util.concurrent.locks.ReentrantLock; public class DefaultLRUCache implements Cache { - private final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); private final Integer maxSize; @@ -51,8 +52,11 @@ public void save(String key, T value) { return; } - synchronized (lock) { + lock.lock(); + try { linkedHashMap.put(key, new CacheEntity(value)); + } finally { + lock.unlock(); } } @@ -62,7 +66,8 @@ public T lookup(String key) { return null; } - synchronized (lock) { + lock.lock(); + try { if (linkedHashMap.containsKey(key)) { CacheEntity entity = linkedHashMap.get(key); Long nowMs = new Date().getTime(); @@ -75,12 +80,17 @@ public T lookup(String key) { linkedHashMap.remove(key); } return null; + } finally { + lock.unlock(); } } public void reset() { - synchronized (lock) { + lock.lock(); + try { linkedHashMap.clear(); + } finally { + lock.unlock(); } } diff --git a/core-api/src/main/java/com/optimizely/ab/notification/NotificationManager.java b/core-api/src/main/java/com/optimizely/ab/notification/NotificationManager.java index 7415e6b23..986a142a8 100644 --- a/core-api/src/main/java/com/optimizely/ab/notification/NotificationManager.java +++ b/core-api/src/main/java/com/optimizely/ab/notification/NotificationManager.java @@ -16,6 +16,7 @@ */ package com.optimizely.ab.notification; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ public class NotificationManager { private final Map> handlers = Collections.synchronizedMap(new LinkedHashMap<>()); private final AtomicInteger counter; + private final ReentrantLock lock = new ReentrantLock(); public NotificationManager() { this(new AtomicInteger()); @@ -48,13 +50,16 @@ public NotificationManager(AtomicInteger counter) { public int addHandler(NotificationHandler newHandler) { // Prevent registering a duplicate listener. - synchronized (handlers) { + lock.lock(); + try { for (NotificationHandler handler : handlers.values()) { if (handler.equals(newHandler)) { logger.warn("Notification listener was already added"); return -1; } } + } finally { + lock.unlock(); } int notificationId = counter.incrementAndGet(); @@ -64,7 +69,8 @@ public int addHandler(NotificationHandler newHandler) { } public void send(final T message) { - synchronized (handlers) { + lock.lock(); + try { for (Map.Entry> handler: handlers.entrySet()) { try { handler.getValue().handle(message); @@ -72,6 +78,8 @@ public void send(final T message) { logger.warn("Catching exception sending notification for class: {}, handler: {}", message.getClass(), handler.getKey()); } } + } finally { + lock.unlock(); } } diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java index eb055e63f..8ffaaeada 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; public class ODPConfig { @@ -27,6 +28,8 @@ public class ODPConfig { private Set allSegments; + private final ReentrantLock lock = new ReentrantLock(); + public ODPConfig(String apiKey, String apiHost, Set allSegments) { this.apiKey = apiKey; this.apiHost = apiHost; @@ -37,46 +40,91 @@ public ODPConfig(String apiKey, String apiHost) { this(apiKey, apiHost, Collections.emptySet()); } - public synchronized Boolean isReady() { - return !( - this.apiKey == null || this.apiKey.isEmpty() - || this.apiHost == null || this.apiHost.isEmpty() - ); + public Boolean isReady() { + lock.lock(); + try { + return !( + this.apiKey == null || this.apiKey.isEmpty() + || this.apiHost == null || this.apiHost.isEmpty() + ); + } finally { + lock.unlock(); + } } - public synchronized Boolean hasSegments() { - return allSegments != null && !allSegments.isEmpty(); + public Boolean hasSegments() { + lock.lock(); + try { + return allSegments != null && !allSegments.isEmpty(); + } finally { + lock.unlock(); + } } - public synchronized void setApiKey(String apiKey) { - this.apiKey = apiKey; + public void setApiKey(String apiKey) { + lock.lock(); + try { + this.apiKey = apiKey; + } finally { + lock.unlock(); + } } - public synchronized void setApiHost(String apiHost) { - this.apiHost = apiHost; + public void setApiHost(String apiHost) { + lock.lock(); + try { + this.apiHost = apiHost; + } finally { + lock.unlock(); + } } - public synchronized String getApiKey() { - return apiKey; + public String getApiKey() { + lock.lock(); + try { + return apiKey; + } finally { + lock.unlock(); + } } - public synchronized String getApiHost() { - return apiHost; + public String getApiHost() { + lock.lock(); + try { + return apiHost; + } finally { + lock.unlock(); + } } - public synchronized Set getAllSegments() { - return allSegments; + public Set getAllSegments() { + lock.lock(); + try { + return allSegments; + } finally { + lock.unlock(); + } } - public synchronized void setAllSegments(Set allSegments) { - this.allSegments = allSegments; + public void setAllSegments(Set allSegments) { + lock.lock(); + try { + this.allSegments = allSegments; + } finally { + lock.unlock(); + } } public Boolean equals(ODPConfig toCompare) { return getApiHost().equals(toCompare.getApiHost()) && getApiKey().equals(toCompare.getApiKey()) && getAllSegments().equals(toCompare.allSegments); } - public synchronized ODPConfig getClone() { - return new ODPConfig(apiKey, apiHost, allSegments); + public ODPConfig getClone() { + lock.lock(); + try { + return new ODPConfig(apiKey, apiHost, allSegments); + } finally { + lock.unlock(); + } } } diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java index efcdd6cda..b50c16045 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -59,16 +59,25 @@ public class ODPEventManager { // The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here // because `LinkedBlockingQueue` itself is thread safe. private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); + private ThreadFactory threadFactory; public ODPEventManager(@Nonnull ODPApiManager apiManager) { this(apiManager, null, null); } public ODPEventManager(@Nonnull ODPApiManager apiManager, @Nullable Integer queueSize, @Nullable Integer flushInterval) { + this(apiManager, queueSize, flushInterval, null); + } + + public ODPEventManager(@Nonnull ODPApiManager apiManager, + @Nullable Integer queueSize, + @Nullable Integer flushInterval, + @Nullable ThreadFactory threadFactory) { this.apiManager = apiManager; this.queueSize = queueSize != null ? queueSize : DEFAULT_QUEUE_SIZE; this.flushInterval = (flushInterval != null && flushInterval > 0) ? flushInterval : DEFAULT_FLUSH_INTERVAL; this.batchSize = (flushInterval != null && flushInterval == 0) ? 1 : DEFAULT_BATCH_SIZE; + this.threadFactory = threadFactory != null ? threadFactory : Executors.defaultThreadFactory(); } // these user-provided common data are included in all ODP events in addition to the SDK-generated common data. @@ -86,7 +95,6 @@ public void start() { eventDispatcherThread = new EventDispatcherThread(); } if (!isRunning) { - final ThreadFactory threadFactory = Executors.defaultThreadFactory(); ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { Thread thread = threadFactory.newThread(runnable); thread.setDaemon(true); diff --git a/core-httpclient-impl/src/main/java/com/optimizely/ab/NamedThreadFactory.java b/core-httpclient-impl/src/main/java/com/optimizely/ab/NamedThreadFactory.java index 5b3cb2fbb..594ce0e20 100644 --- a/core-httpclient-impl/src/main/java/com/optimizely/ab/NamedThreadFactory.java +++ b/core-httpclient-impl/src/main/java/com/optimizely/ab/NamedThreadFactory.java @@ -28,7 +28,7 @@ public class NamedThreadFactory implements ThreadFactory { private final String nameFormat; private final boolean daemon; - private final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory(); + private final ThreadFactory backingThreadFactory; private final AtomicLong threadCount = new AtomicLong(0); /** @@ -36,8 +36,18 @@ public class NamedThreadFactory implements ThreadFactory { * @param daemon whether the threads created should be {@link Thread#daemon}s or not */ public NamedThreadFactory(String nameFormat, boolean daemon) { + this(nameFormat, daemon, null); + } + + /** + * @param nameFormat the thread name format which should include a string placeholder for the thread number + * @param daemon whether the threads created should be {@link Thread#daemon}s or not + * @param backingThreadFactory the backing {@link ThreadFactory} to use for creating threads + */ + public NamedThreadFactory(String nameFormat, boolean daemon, ThreadFactory backingThreadFactory) { this.nameFormat = nameFormat; this.daemon = daemon; + this.backingThreadFactory = backingThreadFactory != null ? backingThreadFactory : Executors.defaultThreadFactory(); } @Override diff --git a/core-httpclient-impl/src/main/java/com/optimizely/ab/config/HttpProjectConfigManager.java b/core-httpclient-impl/src/main/java/com/optimizely/ab/config/HttpProjectConfigManager.java index 15325350f..a583eae98 100644 --- a/core-httpclient-impl/src/main/java/com/optimizely/ab/config/HttpProjectConfigManager.java +++ b/core-httpclient-impl/src/main/java/com/optimizely/ab/config/HttpProjectConfigManager.java @@ -21,6 +21,9 @@ import com.optimizely.ab.config.parser.ConfigParseException; import com.optimizely.ab.internal.PropertyUtils; import com.optimizely.ab.notification.NotificationCenter; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; import org.apache.http.*; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; @@ -62,6 +65,7 @@ public class HttpProjectConfigManager extends PollingProjectConfigManager { private final URI uri; private final String datafileAccessToken; private String datafileLastModified; + private final ReentrantLock lock = new ReentrantLock(); private HttpProjectConfigManager(long period, TimeUnit timeUnit, @@ -70,8 +74,9 @@ private HttpProjectConfigManager(long period, String datafileAccessToken, long blockingTimeoutPeriod, TimeUnit blockingTimeoutUnit, - NotificationCenter notificationCenter) { - super(period, timeUnit, blockingTimeoutPeriod, blockingTimeoutUnit, notificationCenter); + NotificationCenter notificationCenter, + @Nullable ThreadFactory threadFactory) { + super(period, timeUnit, blockingTimeoutPeriod, blockingTimeoutUnit, notificationCenter, threadFactory); this.httpClient = httpClient; this.uri = URI.create(url); this.datafileAccessToken = datafileAccessToken; @@ -146,12 +151,17 @@ protected ProjectConfig poll() { } @Override - public synchronized void close() { - super.close(); + public void close() { + lock.lock(); try { - httpClient.close(); - } catch (IOException e) { - e.printStackTrace(); + super.close(); + try { + httpClient.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } finally { + lock.unlock(); } } @@ -193,6 +203,7 @@ public static class Builder { // force-close the persistent connection after this idle time long evictConnectionIdleTimePeriod = PropertyUtils.getLong(CONFIG_EVICT_DURATION, DEFAULT_EVICT_DURATION); TimeUnit evictConnectionIdleTimeUnit = PropertyUtils.getEnum(CONFIG_EVICT_UNIT, TimeUnit.class, DEFAULT_EVICT_UNIT); + ThreadFactory threadFactory = null; public Builder withDatafile(String datafile) { this.datafile = datafile; @@ -302,6 +313,11 @@ public Builder withNotificationCenter(NotificationCenter notificationCenter) { return this; } + public Builder withThreadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + /** * HttpProjectConfigManager.Builder that builds and starts a HttpProjectConfigManager. * This is the default builder which will block until a config is available. @@ -363,7 +379,8 @@ public HttpProjectConfigManager build(boolean defer) { datafileAccessToken, blockingTimeoutPeriod, blockingTimeoutUnit, - notificationCenter); + notificationCenter, + threadFactory); httpProjectManager.setSdkKey(sdkKey); if (datafile != null) { try { diff --git a/core-httpclient-impl/src/main/java/com/optimizely/ab/event/AsyncEventHandler.java b/core-httpclient-impl/src/main/java/com/optimizely/ab/event/AsyncEventHandler.java index 391f89b57..539b8b642 100644 --- a/core-httpclient-impl/src/main/java/com/optimizely/ab/event/AsyncEventHandler.java +++ b/core-httpclient-impl/src/main/java/com/optimizely/ab/event/AsyncEventHandler.java @@ -21,6 +21,8 @@ import com.optimizely.ab.annotations.VisibleForTesting; import com.optimizely.ab.internal.PropertyUtils; +import java.util.concurrent.ThreadFactory; +import javax.annotation.Nullable; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.ResponseHandler; @@ -108,6 +110,17 @@ public AsyncEventHandler(int queueCapacity, int validateAfter, long closeTimeout, TimeUnit closeTimeoutUnit) { + this(queueCapacity, numWorkers, maxConnections, connectionsPerRoute, validateAfter, closeTimeout, closeTimeoutUnit, null); + } + + public AsyncEventHandler(int queueCapacity, + int numWorkers, + int maxConnections, + int connectionsPerRoute, + int validateAfter, + long closeTimeout, + TimeUnit closeTimeoutUnit, + @Nullable ThreadFactory threadFactory) { queueCapacity = validateInput("queueCapacity", queueCapacity, DEFAULT_QUEUE_CAPACITY); numWorkers = validateInput("numWorkers", numWorkers, DEFAULT_NUM_WORKERS); @@ -123,10 +136,12 @@ public AsyncEventHandler(int queueCapacity, .withEvictIdleConnections(1L, TimeUnit.MINUTES) .build(); + NamedThreadFactory namedThreadFactory = new NamedThreadFactory("optimizely-event-dispatcher-thread-%s", true, threadFactory); + this.workerExecutor = new ThreadPoolExecutor(numWorkers, numWorkers, - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueCapacity), - new NamedThreadFactory("optimizely-event-dispatcher-thread-%s", true)); + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), + namedThreadFactory); this.closeTimeout = closeTimeout; this.closeTimeoutUnit = closeTimeoutUnit;