From 88347b56590d3617629b49725e57639c6065323a Mon Sep 17 00:00:00 2001 From: HzjNeverStop <441627022@qq.com> Date: Tue, 28 May 2024 14:32:31 +0800 Subject: [PATCH] add runnable/callable class in async invoke method (#1327) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 致节 --- .../health/HealthCheckerProcessor.java | 96 +++++++++++++------ .../health/HealthIndicatorProcessor.java | 69 ++++++++++--- .../isle/stage/SpringContextInstallStage.java | 76 +++++++++------ .../AsyncInitializeBeanMethodInvoker.java | 43 +++++---- 4 files changed, 197 insertions(+), 87 deletions(-) diff --git a/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthCheckerProcessor.java b/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthCheckerProcessor.java index dbb87b207..55c2a4cef 100644 --- a/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthCheckerProcessor.java +++ b/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthCheckerProcessor.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -111,19 +112,8 @@ public boolean readinessHealthCheck(Map healthMap) { if (isParallelCheck()) { CountDownLatch countDownLatch = new CountDownLatch(readinessHealthCheckers.size()); AtomicBoolean parallelResult = new AtomicBoolean(true); - readinessHealthCheckers.forEach((String key, HealthChecker value) -> healthCheckExecutor.execute(() -> { - try { - if (!doHealthCheck(key, value, false, healthMap, true, false)) { - parallelResult.set(false); - } - } catch (Throwable t) { - parallelResult.set(false); - logger.error(ErrorCode.convert("01-22004"), t); - healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN).build()); - } finally { - countDownLatch.countDown(); - } - })); + readinessHealthCheckers.forEach((String key, HealthChecker value) -> healthCheckExecutor.execute( + new AsyncHealthCheckRunnable(key, value, healthMap, parallelResult, countDownLatch))); boolean finished = false; try { finished = countDownLatch.await(getParallelCheckTimeout(), TimeUnit.MILLISECONDS); @@ -160,7 +150,7 @@ public boolean readinessHealthCheck(Map healthMap) { * @return health check passes or not */ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolean isRetry, - Map healthMap, boolean isReadiness, boolean wait) { + Map healthMap, boolean isReadiness, boolean wait) { Assert.notNull(healthMap, "HealthMap must not be null"); Health health; @@ -180,20 +170,23 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea do { try { if (wait) { - Future future = healthCheckExecutor.submit(healthChecker::isHealthy); + Future future = healthCheckExecutor + .submit(new AsyncHealthCheckCallable(healthChecker)); health = future.get(timeout, TimeUnit.MILLISECONDS); } else { health = healthChecker.isHealthy(); } - } catch (TimeoutException e) { - logger.error( + } catch (TimeoutException e) { + logger + .error( "Timeout occurred while doing HealthChecker[{}] {} check, the timeout value is: {}ms.", beanId, checkType, timeout); - health = new Health.Builder().withException(e).withDetail("timeout", timeout).status(Status.UNKNOWN).build(); + health = new Health.Builder().withException(e).withDetail("timeout", timeout) + .status(Status.UNKNOWN).build(); } catch (Throwable e) { logger.error(String.format( - "Exception occurred while wait the result of HealthChecker[%s] %s check.", - beanId, checkType), e); + "Exception occurred while wait the result of HealthChecker[%s] %s check.", + beanId, checkType), e); health = new Health.Builder().withException(e).status(Status.DOWN).build(); } result = health.getStatus().equals(Status.UP); @@ -208,9 +201,7 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea retryCount += 1; TimeUnit.MILLISECONDS.sleep(healthChecker.getRetryTimeInterval()); } catch (InterruptedException e) { - logger - .error(ErrorCode.convert("01-23002", retryCount, beanId, - checkType), e); + logger.error(ErrorCode.convert("01-23002", retryCount, beanId, checkType), e); } } } while (isRetry && retryCount < healthChecker.getRetryCount()); @@ -223,12 +214,12 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea if (!result) { if (healthChecker.isStrictCheck()) { logger.error(ErrorCode.convert("01-23001", beanId, checkType, retryCount, - objectMapper.writeValueAsString(health.getDetails()), - healthChecker.isStrictCheck())); + objectMapper.writeValueAsString(health.getDetails()), + healthChecker.isStrictCheck())); } else { logger.warn(ErrorCode.convert("01-23001", beanId, checkType, retryCount, - objectMapper.writeValueAsString(health.getDetails()), - healthChecker.isStrictCheck())); + objectMapper.writeValueAsString(health.getDetails()), + healthChecker.isStrictCheck())); } } } catch (JsonProcessingException ex) { @@ -364,4 +355,55 @@ public int getTimeout() { } } + + private class AsyncHealthCheckRunnable implements Runnable { + private final String key; + private final HealthChecker value; + private final Map healthMap; + + private final AtomicBoolean parallelResult; + + private final CountDownLatch countDownLatch; + + public AsyncHealthCheckRunnable(String key, HealthChecker value, + Map healthMap, + AtomicBoolean parallelResult, CountDownLatch countDownLatch) { + this.key = key; + this.value = value; + this.healthMap = healthMap; + this.parallelResult = parallelResult; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + if (!HealthCheckerProcessor.this.doHealthCheck(key, value, false, healthMap, true, + false)) { + parallelResult.set(false); + } + } catch (Throwable t) { + parallelResult.set(false); + logger.error(ErrorCode.convert("01-22004"), t); + healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN) + .build()); + } finally { + countDownLatch.countDown(); + } + } + } + + private class AsyncHealthCheckCallable implements Callable { + + private final HealthChecker healthChecker; + + public AsyncHealthCheckCallable(HealthChecker healthChecker) { + this.healthChecker = healthChecker; + } + + @Override + public Health call() throws Exception { + return healthChecker.isHealthy(); + } + } } diff --git a/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthIndicatorProcessor.java b/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthIndicatorProcessor.java index c5caae897..a37cf2636 100644 --- a/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthIndicatorProcessor.java +++ b/sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthIndicatorProcessor.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -167,19 +168,8 @@ public boolean readinessHealthCheck(Map healthMap) { if (isParallelCheck()) { CountDownLatch countDownLatch = new CountDownLatch(healthIndicators.size()); AtomicBoolean parallelResult = new AtomicBoolean(true); - healthIndicators.forEach((key, value) -> healthCheckExecutor.execute(() -> { - try { - if (!doHealthCheck(key, value, healthMap, false)) { - parallelResult.set(false); - } - } catch (Throwable t) { - parallelResult.set(false); - logger.error(ErrorCode.convert("01-21003"), t); - healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN).build()); - } finally { - countDownLatch.countDown(); - } - })); + healthIndicators.forEach((key, value) -> healthCheckExecutor.execute( + new AsyncHealthIndicatorRunnable(key, value, healthMap, parallelResult, countDownLatch))); boolean finished = false; try { finished = countDownLatch.await(getParallelCheckTimeout(), TimeUnit.MILLISECONDS); @@ -226,7 +216,7 @@ public boolean doHealthCheck(String beanId, HealthIndicator healthIndicator, try { if (wait) { Future future = healthCheckExecutor - .submit(healthIndicator::health); + .submit(new AsyncHealthIndicatorCallable(healthIndicator)); health = future.get(timeout, TimeUnit.MILLISECONDS); } else { health = healthIndicator.health(); @@ -315,4 +305,55 @@ public void setHealthIndicatorConfig(Map healthIndi public List getHealthIndicatorStartupStatList() { return healthIndicatorStartupStatList; } + + private class AsyncHealthIndicatorRunnable implements Runnable { + private final String key; + private final HealthIndicator value; + private final Map healthMap; + + private final AtomicBoolean parallelResult; + + private final CountDownLatch countDownLatch; + + public AsyncHealthIndicatorRunnable(String key, HealthIndicator value, + Map healthMap, + AtomicBoolean parallelResult, + CountDownLatch countDownLatch) { + this.key = key; + this.value = value; + this.healthMap = healthMap; + this.parallelResult = parallelResult; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + if (!HealthIndicatorProcessor.this.doHealthCheck(key, value, healthMap, false)) { + parallelResult.set(false); + } + } catch (Throwable t) { + parallelResult.set(false); + logger.error(ErrorCode.convert("01-21003"), t); + healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN) + .build()); + } finally { + countDownLatch.countDown(); + } + } + } + + private class AsyncHealthIndicatorCallable implements Callable { + + private final HealthIndicator healthIndicator; + + public AsyncHealthIndicatorCallable(HealthIndicator healthIndicator) { + this.healthIndicator = healthIndicator; + } + + @Override + public Health call() throws Exception { + return healthIndicator.health(); + } + } } diff --git a/sofa-boot-project/sofa-boot-core/isle-sofa-boot/src/main/java/com/alipay/sofa/boot/isle/stage/SpringContextInstallStage.java b/sofa-boot-project/sofa-boot-core/isle-sofa-boot/src/main/java/com/alipay/sofa/boot/isle/stage/SpringContextInstallStage.java index 97942557f..bdf0bfcf8 100644 --- a/sofa-boot-project/sofa-boot-core/isle-sofa-boot/src/main/java/com/alipay/sofa/boot/isle/stage/SpringContextInstallStage.java +++ b/sofa-boot-project/sofa-boot-core/isle-sofa-boot/src/main/java/com/alipay/sofa/boot/isle/stage/SpringContextInstallStage.java @@ -184,38 +184,12 @@ private void doRefreshSpringContextParallel() { /** * Refresh all {@link ApplicationContext} recursively */ - private void refreshRecursively(DeploymentDescriptor deployment, - CountDownLatch latch, List> futures) { + private void refreshRecursively(DeploymentDescriptor deployment, CountDownLatch latch, + List> futures) { // if interrupted, moduleRefreshExecutorService will be null; if (moduleRefreshExecutorService != null) { - futures.add(moduleRefreshExecutorService.submit(() -> { - String oldName = Thread.currentThread().getName(); - try { - Thread.currentThread().setName( - "sofa-module-refresh-" + deployment.getModuleName()); - if (deployment.isSpringPowered() && !application.getFailed().contains(deployment)) { - refreshAndCollectCost(deployment); - } - DependencyTree.Entry entry = application - .getDeployRegistry().getEntry(deployment.getModuleName()); - if (entry != null && entry.getDependsOnMe() != null) { - for (DependencyTree.Entry child : entry - .getDependsOnMe()) { - child.getDependencies().remove(entry); - if (child.getDependencies().size() == 0) { - refreshRecursively(child.get(), latch, futures); - } - } - } - } catch (Throwable t) { - LOGGER.error(ErrorCode.convert("01-11002", deployment.getName()), t); - throw new RuntimeException(ErrorCode.convert("01-11002", deployment.getName()), - t); - } finally { - latch.countDown(); - Thread.currentThread().setName(oldName); - } - })); + futures.add(moduleRefreshExecutorService.submit(new AsyncSpringContextRunnable( + deployment, latch, futures))); } } @@ -291,4 +265,46 @@ public String getName() { public int getOrder() { return 20000; } + + private class AsyncSpringContextRunnable implements Runnable { + private final DeploymentDescriptor deployment; + private final CountDownLatch latch; + private final List> futures; + + public AsyncSpringContextRunnable(DeploymentDescriptor deployment, CountDownLatch latch, + List> futures) { + this.deployment = deployment; + this.latch = latch; + this.futures = futures; + } + + @Override + public void run() { + String oldName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("sofa-module-refresh-" + deployment.getModuleName()); + if (deployment.isSpringPowered() && !application.getFailed().contains(deployment)) { + SpringContextInstallStage.this.refreshAndCollectCost(deployment); + } + DependencyTree.Entry entry = application + .getDeployRegistry().getEntry(deployment.getModuleName()); + if (entry != null && entry.getDependsOnMe() != null) { + for (DependencyTree.Entry child : entry + .getDependsOnMe()) { + child.getDependencies().remove(entry); + if (child.getDependencies().size() == 0) { + SpringContextInstallStage.this.refreshRecursively(child.get(), latch, + futures); + } + } + } + } catch (Throwable t) { + LOGGER.error(ErrorCode.convert("01-11002", deployment.getName()), t); + throw new RuntimeException(ErrorCode.convert("01-11002", deployment.getName()), t); + } finally { + latch.countDown(); + Thread.currentThread().setName(oldName); + } + } + } } diff --git a/sofa-boot-project/sofa-boot-core/runtime-sofa-boot/src/main/java/com/alipay/sofa/runtime/async/AsyncInitializeBeanMethodInvoker.java b/sofa-boot-project/sofa-boot-core/runtime-sofa-boot/src/main/java/com/alipay/sofa/runtime/async/AsyncInitializeBeanMethodInvoker.java index 850a1c55f..632361d99 100644 --- a/sofa-boot-project/sofa-boot-core/runtime-sofa-boot/src/main/java/com/alipay/sofa/runtime/async/AsyncInitializeBeanMethodInvoker.java +++ b/sofa-boot-project/sofa-boot-core/runtime-sofa-boot/src/main/java/com/alipay/sofa/runtime/async/AsyncInitializeBeanMethodInvoker.java @@ -75,28 +75,15 @@ public Object invoke(final MethodInvocation invocation) throws Throwable { if (!isAsyncCalled && methodName.equals(asyncMethodName)) { isAsyncCalled = true; isAsyncCalling = true; - asyncInitMethodManager.submitTask(() -> { - try { - long startTime = System.currentTimeMillis(); - invocation.getMethod().invoke(targetObject, invocation.getArguments()); - LOGGER.info("{}({}) {} method execute {}dms.", targetObject - .getClass().getName(), beanName, methodName, (System - .currentTimeMillis() - startTime)); - } catch (Throwable e) { - throw new RuntimeException(e); - } finally { - asyncMethodFinish(); - } - }); + asyncInitMethodManager.submitTask(new AsyncBeanInitRunnable(invocation)); return null; } if (isAsyncCalling) { long startTime = System.currentTimeMillis(); initCountDownLatch.await(); - LOGGER.info("{}({}) {} method wait {}ms.", - targetObject.getClass().getName(), beanName, methodName, - (System.currentTimeMillis() - startTime)); + LOGGER.info("{}({}) {} method wait {}ms.", targetObject.getClass().getName(), beanName, + methodName, (System.currentTimeMillis() - startTime)); } return invocation.getMethod().invoke(targetObject, invocation.getArguments()); } @@ -105,4 +92,28 @@ void asyncMethodFinish() { this.initCountDownLatch.countDown(); this.isAsyncCalling = false; } + + private class AsyncBeanInitRunnable implements Runnable { + + private final MethodInvocation invocation; + + public AsyncBeanInitRunnable(MethodInvocation invocation) { + this.invocation = invocation; + } + + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + invocation.getMethod().invoke(targetObject, invocation.getArguments()); + LOGGER.info("{}({}) {} method execute {}dms.", targetObject.getClass().getName(), + beanName, invocation.getMethod().getName(), + (System.currentTimeMillis() - startTime)); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + AsyncInitializeBeanMethodInvoker.this.asyncMethodFinish(); + } + } + } }