From ac341403e1f91b44bca18a1e0fa700a10f06bf3c Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Wed, 11 Jan 2017 19:46:18 +0800 Subject: [PATCH] Enhance server cache and inject unique request id for each run --- .../org/pentaho/di/cluster/ServerCache.java | 81 ++++++++++++++++++- .../src/main/java/org/pentaho/di/job/Job.java | 8 +- .../main/java/org/pentaho/di/trans/Trans.java | 8 +- .../org/pentaho/di/www/CarteSingleton.java | 5 +- .../di/www/StartExecutionTransServlet.java | 2 + .../org/pentaho/di/www/StartJobServlet.java | 2 + .../scheduler2/quartz/JobIdInjectionRule.java | 2 + .../quartz/QuartzSchedulerHelper.java | 1 + 8 files changed, 100 insertions(+), 9 deletions(-) diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java index 7ccb152..c362ab3 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java @@ -21,15 +21,24 @@ import com.google.common.cache.CacheBuilder; import org.pentaho.di.base.AbstractMeta; import org.pentaho.di.core.Const; +import org.pentaho.di.core.logging.LogChannel; import org.pentaho.di.core.logging.LogChannelInterface; +import org.pentaho.di.core.parameters.NamedParams; +import org.pentaho.di.core.parameters.UnknownParamException; import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.job.JobMeta; import org.pentaho.di.repository.ObjectRevision; import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.www.GetCacheStatusServlet; import org.pentaho.di.www.SlaveServerJobStatus; import org.pentaho.di.www.SlaveServerTransStatus; +import org.pentaho.di.www.WebResult; +import javax.servlet.http.HttpServletRequest; +import java.net.URLEncoder; +import java.util.AbstractMap; import java.util.Date; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -47,6 +56,9 @@ public final class ServerCache { = Integer.parseInt(System.getProperty("KETTLE_RESOURCE_EXPIRATION_MINUTE", "1800")); public static final String PARAM_ETL_JOB_ID = System.getProperty("KETTLE_JOB_ID_KEY", "ETL_CALLER"); + static final String KEY_ETL_CACHE_ID = System.getProperty("KETTLE_CACHE_ID_KEY", "CACHE_ID"); + static final String KEY_ETL_REQUEST_ID = System.getProperty("KETTLE_REQUEST_ID_KEY", "REQUEST_ID"); + // On master node, it's for name -> revision + md5; on slave server, it's name -> md5 private static final Cache resourceCache = CacheBuilder.newBuilder() .maximumSize(RESOURCE_CACHE_SIZE) @@ -97,6 +109,55 @@ private static String buildResourceName(AbstractMeta meta, Map p return sb.append('@').append(host).append(':').append(port).toString(); } + public static Map buildRequestParameters(String resourceName, + Map params, + Map vars) { + Map map = new HashMap(); + + if (!Strings.isNullOrEmpty(resourceName)) { + map.put(KEY_ETL_CACHE_ID, resourceName); + } + + if (params != null) { + String requestId = params.get(KEY_ETL_REQUEST_ID); + if (!Strings.isNullOrEmpty(requestId)) { + map.put(KEY_ETL_REQUEST_ID, requestId); + } + } + + if (vars != null) { + String requestId = vars.get(KEY_ETL_REQUEST_ID); + if (!Strings.isNullOrEmpty(requestId)) { + map.put(KEY_ETL_REQUEST_ID, requestId); + } + } + + LogChannel.GENERAL.logError("=====> Request Parameters: " + map.toString()); + + return map; + } + + public static void updateParametersAndCache(HttpServletRequest request, NamedParams params, String carteObjectId) { + String cacheId = request == null ? null : request.getHeader(KEY_ETL_CACHE_ID); + String requestId = request == null ? null : request.getHeader(KEY_ETL_REQUEST_ID); + + LogChannel.GENERAL.logError( + "=====> cacheId=" + cacheId + ", requetId=" + requestId + ", carteId=" + carteObjectId); + + if (!Strings.isNullOrEmpty(requestId)) { + try { + params.setParameterValue(KEY_ETL_REQUEST_ID, requestId); + } catch (UnknownParamException e) { + // this should not happen + } + } + + // update cache + if (!Strings.isNullOrEmpty(cacheId) && !Strings.isNullOrEmpty(carteObjectId)) { + cacheIdentity(cacheId, carteObjectId); + } + } + /** * Retrieve a unique id generated for the given resource if it's been cached. * @@ -107,10 +168,26 @@ public static String getCachedIdentity(String resourceName) { return RESOURCE_CACHE_DISABLED ? null : resourceCache.getIfPresent(resourceName); } - public static String getCachedIdentity(AbstractMeta meta, Map params, SlaveServer server) { + public static Map.Entry getCachedEntry( + AbstractMeta meta, Map params, SlaveServer server) { String resourceName = buildResourceName(meta, params, server); String identity = getCachedIdentity(resourceName); + if (Strings.isNullOrEmpty(identity)) { + // don't give up so quick as this might be cached on slave server + try { + String reply = + server.execService(GetCacheStatusServlet.CONTEXT_PATH + "/?name=" + + URLEncoder.encode(resourceName, "UTF-8")); + WebResult webResult = WebResult.fromXMLString(reply); + if (webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + identity = webResult.getId(); + } + } catch (Exception e) { + // ignore as this is usually a network issue + } + } + // let's see if the slave server still got this if (!Strings.isNullOrEmpty(identity)) { try { @@ -142,7 +219,7 @@ public static String getCachedIdentity(AbstractMeta meta, Map pa } } - return identity; + return new AbstractMap.SimpleImmutableEntry(resourceName, identity); } /** diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java index 0fbdf4d..16a6fd4 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java @@ -1718,7 +1718,9 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio // Align logging levels between execution configuration and remote server slaveServer.getLogChannel().setLogLevel(executionConfiguration.getLogLevel()); - String carteObjectId = ServerCache.getCachedIdentity(jobMeta, executionConfiguration.getParams(), slaveServer); + Map.Entry entry + = ServerCache.getCachedEntry(jobMeta, executionConfiguration.getParams(), slaveServer); + String carteObjectId = entry.getValue(); FileObject tempFile = null; try { @@ -1782,7 +1784,9 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio // String reply = slaveServer.execService(StartJobServlet.CONTEXT_PATH + "/?name=" + URLEncoder.encode(jobMeta.getName(), - "UTF-8") + "&xml=Y&id=" + carteObjectId); + "UTF-8") + "&xml=Y&id=" + carteObjectId, + ServerCache.buildRequestParameters(entry.getKey(), + executionConfiguration.getParams(), executionConfiguration.getVariables())); WebResult webResult = WebResult.fromXMLString(reply); if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { ServerCache.invalidate(jobMeta, executionConfiguration.getParams(), slaveServer); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java index f6d0e59..85873a3 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java @@ -4109,7 +4109,9 @@ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfig throw new KettleException("The transformation needs a name to uniquely identify it by on the remote server."); } - String carteObjectId = ServerCache.getCachedIdentity(transMeta, executionConfiguration.getParams(), slaveServer); + Map.Entry entry + = ServerCache.getCachedEntry(transMeta, executionConfiguration.getParams(), slaveServer); + String carteObjectId = entry.getValue(); FileObject tempFile = null; try { @@ -4196,7 +4198,9 @@ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfig // reply = slaveServer.execService(StartExecutionTransServlet.CONTEXT_PATH + "/?name=" + URLEncoder.encode(transMeta - .getName(), "UTF-8") + "&xml=Y&id=" + carteObjectId); + .getName(), "UTF-8") + "&xml=Y&id=" + carteObjectId, + ServerCache.buildRequestParameters(entry.getKey(), + executionConfiguration.getParams(), executionConfiguration.getVariables())); webResult = WebResult.fromXMLString(reply); if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java index a39ebe6..466139c 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean; public class CarteSingleton { - private static Class PKG = Carte.class; // for i18n purposes, needed by Translator2!! private static SlaveServerConfig slaveServerConfig; @@ -173,8 +172,8 @@ public void run() { // Remove the logging information from the log registry & central log store // - LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); KettleLogStore.discardLines(logChannelId, false); + LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); // transformationMap.deallocateServerSocketPorts(entry); @@ -206,8 +205,8 @@ public void run() { // Remove the logging information from the log registry & central log store // - LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); KettleLogStore.discardLines(logChannelId, false); + LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); log.logMinimal("Cleaned up job " + entry.getName() + " with id " + entry.getId() + " from " + logDate); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java index 030185e..880d72c 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java @@ -24,6 +24,7 @@ import org.owasp.esapi.ESAPI; import org.owasp.esapi.Encoder; +import org.pentaho.di.cluster.ServerCache; import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.xml.XMLHandler; @@ -194,6 +195,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro if (trans != null) { if (trans.isReadyToStart()) { + ServerCache.updateParametersAndCache(request, trans, id); startThreads(trans); if (useXML) { diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java index 45c8a53..c408a90 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java @@ -24,6 +24,7 @@ import org.owasp.esapi.ESAPI; import org.owasp.esapi.Encoder; +import org.pentaho.di.cluster.ServerCache; import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.KettleLogStore; @@ -242,6 +243,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro } } + ServerCache.updateParametersAndCache(request, job, id); runJob(job); String message = BaseMessages.getString(PKG, "StartJobServlet.Log.JobStarted", jobName); diff --git a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/JobIdInjectionRule.java b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/JobIdInjectionRule.java index 3af9c77..391afd0 100644 --- a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/JobIdInjectionRule.java +++ b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/JobIdInjectionRule.java @@ -23,6 +23,7 @@ import org.quartz.Scheduler; import java.util.Map; +import java.util.UUID; import static org.pentaho.platform.scheduler2.quartz.QuartzSchedulerHelper.*; @@ -55,6 +56,7 @@ void applyRule(Phase phase, Scheduler scheduler, JobDetail jobDetail) throws Job && etlScript != null) { jobParams.put(KEY_ETL_JOB_ID, jobKey.toString()); jobParams.put(KEY_ETL_TRACE_ID, lineAgeId); + jobParams.put(KEY_ETL_REQUEST_ID, UUID.randomUUID().toString()); } } } diff --git a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzSchedulerHelper.java b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzSchedulerHelper.java index 2839cdd..ecd0bdc 100644 --- a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzSchedulerHelper.java +++ b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzSchedulerHelper.java @@ -51,6 +51,7 @@ public enum Phase { static final String KEY_ETL_SCRIPT = System.getProperty("KETTLE_JOB_NAME_KEY", "ETL_SCRIPT"); static final String KEY_ETL_JOB_ID = System.getProperty("KETTLE_JOB_ID_KEY", "ETL_CALLER"); static final String KEY_ETL_TRACE_ID = System.getProperty("KETTLE_TRACE_ID_KEY", "UNIQUE_ID"); + static final String KEY_ETL_REQUEST_ID = System.getProperty("KETTLE_REQUEST_ID_KEY", "REQUEST_ID"); static final int KETTLE_JOB_KILLER_MAX_WAIT = Integer.parseInt(System.getProperty("KETTLE_JOB_KILLER_WAIT_SEC", "8000"));