diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java new file mode 100644 index 000000000000..e9fbf90757fa --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountHandler.java @@ -0,0 +1,171 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.AbstractExtendedSynapseHandler; +import org.apache.synapse.MessageContext; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.consumer.TransactionRecordConsumer; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterConfigurationException; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.producer.TransactionRecordProducer; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.record.TransactionRecord; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.config.TransactionCounterConfig; + +import java.lang.reflect.Constructor; + +/** + * This class is the starting point of the transaction counter. This class is responsible for initializing all the + * components of the transaction counter and starting them. This class extends the Synapse handler interface and is + * responsible for intercepting the Synapse message flow and passing the message context to the transaction counting + * logic. This is registered in handlers.xml in case of APIM and service bus initializer in case of MI/ESB. + * @author - Isuru Wijesiri + * @version - 1.0.0 + */ +public class TransactionCountHandler extends AbstractExtendedSynapseHandler { + private static final Log LOG = LogFactory.getLog(TransactionCountHandler.class); + private TransactionRecordQueue transactionRecordQueue; + private TransactionRecordProducer transactionRecordProducer; + private TransactionRecordConsumer transactionRecordConsumer; + private TransactionRecordStore transactionCountStore; + private static boolean enabled = false; + + public TransactionCountHandler() { + + // Initialize the config mechanism + try { + TransactionCounterConfig.init(); + } catch (TransactionCounterConfigurationException e) { + LOG.error("Error while initializing Transaction Counter. Transaction counter will be disabled", e); + return; + } + + try { + Class clazz = Class.forName( + TransactionCounterConfig.getTransactionCountStoreClass() + ); + Constructor constructor = clazz.getConstructor(); + this.transactionCountStore = (TransactionRecordStore) constructor.newInstance(); + } catch (Exception e) { + LOG.error("Error while initializing Transaction Counter. Transaction counter will be disabled", e); + return; + } + + this.transactionRecordProducer = TransactionRecordProducer.getInstance(); + this.transactionRecordConsumer = TransactionRecordConsumer.getInstance(); + this.transactionRecordQueue = TransactionRecordQueue.getInstance(); + + TransactionRecord.init( + TransactionCounterConfig.getServerID(), + TransactionCounterConfig.getServerType().toString() + ); + + this.transactionRecordQueue.init( + TransactionCounterConfig.getTransactionRecordQueueSize() + ); + + this.transactionRecordProducer.init( + transactionRecordQueue, + TransactionCounterConfig.getProducerThreadPoolSize(), + TransactionCounterConfig.getMaxTransactionCount(), + TransactionCounterConfig.getMinTransactionCount(), + TransactionCounterConfig.getTransactionCountRecordInterval()); + + this.transactionCountStore.init( + TransactionCounterConfig.getTransactionCountService(), + TransactionCounterConfig.getTransactionCountServiceUsername(), + TransactionCounterConfig.getTransactionCountServicePassword() + ); + + this.transactionRecordConsumer.init( + transactionCountStore, + transactionRecordQueue, + TransactionCounterConfig.getConsumerCommitInterval(), + TransactionCounterConfig.getMaxRetryCount(), + TransactionCounterConfig.getMaxTransactionRecordsPerCommit()); + + enabled = true; + } + + @Override + public boolean handleRequestInFlow(MessageContext messageContext) { + if(!enabled) { + return true; + } + int tCount = TransactionCountingLogic.handleRequestInFlow(messageContext); + if(tCount > 0) { + this.transactionRecordProducer.addTransaction(tCount); + } + return true; + } + + @Override + public boolean handleRequestOutFlow(MessageContext messageContext) { + if(!enabled) { + return true; + } + int tCount = TransactionCountingLogic.handleRequestOutFlow(messageContext); + if(tCount > 0) { + this.transactionRecordProducer.addTransaction(tCount); + } + return true; + } + + @Override + public boolean handleResponseInFlow(MessageContext messageContext) { + if(!enabled) { + return true; + } + int tCount = TransactionCountingLogic.handleResponseInFlow(messageContext); + if(tCount > 0) { + this.transactionRecordProducer.addTransaction(tCount); + } + return true; + } + + @Override + public boolean handleResponseOutFlow(MessageContext messageContext) { + if(!enabled) { + return true; + } + int tCount = TransactionCountingLogic.handleResponseOutFlow(messageContext); + if(tCount > 0) { + this.transactionRecordProducer.addTransaction(tCount); + } + return true; + } + + @Override + public boolean handleServerInit() { + // Nothing to implement + return true; + } + + @Override + public boolean handleServerShutDown() { + // Clen up resources + transactionRecordProducer.shutdown(); + transactionRecordConsumer.shutdown(); + transactionRecordQueue.clenUp(); + transactionCountStore.clenUp(); + return true; + } + + @Override + public boolean handleArtifactDeployment(String s, String s1, String s2) { + // Nothing to implement + return true; + } + + @Override + public boolean handleArtifactUnDeployment(String s, String s1, String s2) { + // Nothing to implement + return true; + } + + @Override + public boolean handleError(MessageContext messageContext) { + // Nothing to implement + return true; + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounterConstants.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounterConstants.java new file mode 100644 index 000000000000..4b66b82b57de --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCounterConstants.java @@ -0,0 +1,54 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +public class TransactionCounterConstants { + + public static enum ServerType { + GATEWAY, MI + } + + public static final String IS_THERE_ASSOCIATED_INCOMING_REQUEST = "is_there_incoming_request"; + public static final String TRANSPORT_WS = "ws"; + public static final String TRANSPORT_WSS = "wss"; + + public static final String SERVER_ID = "serverId"; + public static final String TRANSACTION_COUNT_STORE_CLASS = "transactionCountStoreClass"; + public static final String TRANSACTION_RECORD_QUEUE_SIZE = "transactionRecordQueueSize"; + public static final String PRODUCER_THREAD_POOL_SIZE = "producerThreadPoolSize"; + public static final String TRANSACTION_COUNT_RECORD_INTERVAL = "transactionCountRecordInterval"; + public static final String MAX_TRANSACTION_COUNT = "maxTransactionCount"; + public static final String MIN_TRANSACTION_COUNT = "minTransactionCount"; + public static final String CONSUMER_COMMIT_INTERVAL = "consumerCommitInterval"; + public static final String MAX_TRANSACTION_RECORDS_PER_COMMIT = "maxTransactionRecordsPerCommit"; + public static final String MAX_RETRY_COUNT = "maxRetryCount"; + public static final String TRANSACTION_COUNT_SERVICE = "transactionCountService"; + public static final String TRANSACTION_COUNT_SERVICE_USERNAME = "transactionCountServiceUsername"; + public static final String TRANSACTION_COUNT_SERVICE_PASSWORD = "transactionCountServicePassword"; + + // APIM Gateway related constants + public static final String APIM_CONFIG_CLASS = "org.wso2.carbon.apimgt.impl.internal.ServiceReferenceHolder"; + public static final String GATEWAY_CONFIG_ROOT = "APIGateway.TransactionCounter"; + public static final String GATEWAY_PRODUCER_THREAD_POOL_SIZE = GATEWAY_CONFIG_ROOT + + ".ProducerThreadPoolSize"; + public static final String GATEWAY_QUEUE_SIZE = GATEWAY_CONFIG_ROOT + ".QueueSize"; + public static final String GATEWAY_STORE_CLASS = GATEWAY_CONFIG_ROOT + ".StoreClass"; + public static final String GATEWAY_MAX_TRANSACTION_COUNT = GATEWAY_CONFIG_ROOT + + ".MaxTransactionCount"; + public static final String GATEWAY_MIN_TRANSACTION_COUNT = GATEWAY_CONFIG_ROOT + + ".MinTransactionCount"; + public static final String GATEWAY_RECORD_INTERVAL = GATEWAY_CONFIG_ROOT + + ".ProducerScheduledInterval"; + public static final String GATEWAY_MAX_RETRY_COUNT = GATEWAY_CONFIG_ROOT + ".MaxRetryCount"; + public static final String GATEWAY_MAX_TRANSACTION_RECORDS_PER_COMMIT = GATEWAY_CONFIG_ROOT + + ".MaxBatchSize"; + public static final String GATEWAY_CONSUMER_COMMIT_INTERVAL = GATEWAY_CONFIG_ROOT + + ".PublisherScheduledInterval"; + public static final String GATEWAY_SERVER_ID = GATEWAY_CONFIG_ROOT + ".ServerID"; + public static final String GATEWAY_SERVICE = GATEWAY_CONFIG_ROOT + ".ServiceURL"; + public static final String GATEWAY_SERVICE_USERNAME = GATEWAY_CONFIG_ROOT + + ".ServiceUsername"; + public static final String GATEWAY_SERVICE_PASSWORD = GATEWAY_CONFIG_ROOT + + ".ServicePassword"; + + // MI related constants + public static final String MI_CONFIG_CLASS = "org.wso2.config.mapper.ConfigParser"; +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountingLogic.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountingLogic.java new file mode 100644 index 000000000000..caab7ecd0fd5 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/TransactionCountingLogic.java @@ -0,0 +1,56 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction; + +import org.apache.synapse.MessageContext; +import org.apache.synapse.core.axis2.Axis2MessageContext; + + +/** + * This class contains the logic for counting transactions. + * In case of changing the logic, this class should be modified, replaced or extended. + * @author - Isuru Wijesiri + * @version - 1.0.0 + */ +public class TransactionCountingLogic { + + public static int handleRequestInFlow(MessageContext messageContext) { + org.apache.axis2.context.MessageContext axis2MessageContext = + ((Axis2MessageContext) messageContext).getAxis2MessageContext(); + + // Setting this property to identify request-response pairs + messageContext.setProperty(TransactionCounterConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST, true); + + // Counting message received via an open WebSocket + String transport = axis2MessageContext.getIncomingTransportName(); + if (transport.equals(TransactionCounterConstants.TRANSPORT_WS) || + transport.equals(TransactionCounterConstants.TRANSPORT_WSS)){ + return 1; + } + return 0; + } + + public static int handleRequestOutFlow(MessageContext messageContext) { + Object isThereAnAssociatedIncomingRequest = messageContext.getProperty( + TransactionCounterConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST); + + // Counting outgoing messages that are not related to any request-response pair + if (isThereAnAssociatedIncomingRequest == null) { + return 1; + } + return 0; + } + + public static int handleResponseInFlow(MessageContext messageContext) { + return 0; + } + + public static int handleResponseOutFlow(MessageContext messageContext) { + Object isThereAnAssociatedIncomingRequest = messageContext.getProperty( + TransactionCounterConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST); + + // Counting request-response pairs + if (isThereAnAssociatedIncomingRequest instanceof Boolean) { + return 1; + } + return 0; + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/APIMConfigFetcher.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/APIMConfigFetcher.java new file mode 100644 index 000000000000..22ef1c6e0127 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/APIMConfigFetcher.java @@ -0,0 +1,130 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.config; + +import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionCounterConstants; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterConfigurationException; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Objects; + +public class APIMConfigFetcher implements ConfigFetcher { + + private static APIMConfigFetcher instance = null; + private final static HashMap configMap = new HashMap<>(); + + private APIMConfigFetcher() throws TransactionCounterConfigurationException { + try { + Class configClass = Class.forName(TransactionCounterConstants.APIM_CONFIG_CLASS); + + Object serviceReferenceHolder = configClass.getMethod("getInstance").invoke(null); + Object apiManagerConfigurationService = configClass.getMethod("getAPIManagerConfigurationService") + .invoke(serviceReferenceHolder); + Object apiManagerConfiguration = apiManagerConfigurationService.getClass() + .getMethod("getAPIManagerConfiguration").invoke(apiManagerConfigurationService); + Method getFirstProperty = apiManagerConfiguration.getClass().getMethod("getFirstProperty", + String.class); + + // Reading the config values + String temp; + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_SERVER_ID); + String SERVER_ID = Objects.requireNonNull( temp, "Server ID cannot be null"); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_STORE_CLASS); + String TRANSACTION_COUNT_STORE_CLASS = Objects.requireNonNull( + temp, "Transaction count store class cannot be null"); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_QUEUE_SIZE); + temp = Objects.requireNonNull(temp, "Transaction record queue size cannot be null"); + Integer TRANSACTION_RECORD_QUEUE_SIZE = Integer.parseInt(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_PRODUCER_THREAD_POOL_SIZE); + temp = Objects.requireNonNull(temp, "Producer thread pool size cannot be null"); + Integer PRODUCER_THREAD_POOL_SIZE = Integer.parseInt(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_RECORD_INTERVAL); + temp = Objects.requireNonNull(temp, "Transaction count record interval cannot be null"); + Integer TRANSACTION_COUNT_RECORD_INTERVAL = Integer.parseInt(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_MAX_TRANSACTION_COUNT); + temp = Objects.requireNonNull(temp, "Max transaction count cannot be null"); + Double MAX_TRANSACTION_COUNT = Double.parseDouble(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_MIN_TRANSACTION_COUNT); + temp = Objects.requireNonNull(temp, "Min transaction count cannot be null"); + Double MIN_TRANSACTION_COUNT = Double.parseDouble(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_CONSUMER_COMMIT_INTERVAL); + temp = Objects.requireNonNull(temp, "Consumer commit interval cannot be null"); + Integer CONSUMER_COMMIT_INTERVAL = Integer.parseInt(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_MAX_TRANSACTION_RECORDS_PER_COMMIT); + temp = Objects.requireNonNull(temp, "Max transaction records per commit cannot be null"); + Integer MAX_TRANSACTION_RECORDS_PER_COMMIT = Integer.parseInt(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_MAX_RETRY_COUNT); + temp = Objects.requireNonNull(temp, "Max retry count cannot be null"); + Integer MAX_RETRY_COUNT = Integer.parseInt(temp); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_SERVICE); + String TRANSACTION_COUNT_SERVICE = Objects.requireNonNull(temp, + "Transaction count service cannot be null"); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_SERVICE_USERNAME); + String TRANSACTION_COUNT_SERVICE_USERNAME = Objects.requireNonNull(temp, + "Transaction count service username cannot be null"); + + temp = (String) getFirstProperty.invoke(apiManagerConfiguration, + TransactionCounterConstants.GATEWAY_SERVICE_PASSWORD); + String TRANSACTION_COUNT_SERVICE_PASSWORD = Objects.requireNonNull(temp, + "Transaction count service password cannot be null"); + + configMap.put(TransactionCounterConstants.SERVER_ID, SERVER_ID); + configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_STORE_CLASS, TRANSACTION_COUNT_STORE_CLASS); + configMap.put(TransactionCounterConstants.TRANSACTION_RECORD_QUEUE_SIZE, TRANSACTION_RECORD_QUEUE_SIZE); + configMap.put(TransactionCounterConstants.PRODUCER_THREAD_POOL_SIZE, PRODUCER_THREAD_POOL_SIZE); + configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_RECORD_INTERVAL , TRANSACTION_COUNT_RECORD_INTERVAL); + configMap.put(TransactionCounterConstants.MAX_TRANSACTION_COUNT, MAX_TRANSACTION_COUNT); + configMap.put(TransactionCounterConstants.MIN_TRANSACTION_COUNT, MIN_TRANSACTION_COUNT); + configMap.put(TransactionCounterConstants.CONSUMER_COMMIT_INTERVAL, CONSUMER_COMMIT_INTERVAL); + configMap.put(TransactionCounterConstants.MAX_TRANSACTION_RECORDS_PER_COMMIT, MAX_TRANSACTION_RECORDS_PER_COMMIT); + configMap.put(TransactionCounterConstants.MAX_RETRY_COUNT, MAX_RETRY_COUNT); + configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE, TRANSACTION_COUNT_SERVICE); + configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE_USERNAME, TRANSACTION_COUNT_SERVICE_USERNAME); + configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE_PASSWORD, TRANSACTION_COUNT_SERVICE_PASSWORD); + + } catch (ClassNotFoundException e) { + // This error won't be thrown here because it is already checked in TransactionCountConfig + } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new TransactionCounterConfigurationException(); + } catch (NumberFormatException | NullPointerException e) { + throw new TransactionCounterConfigurationException("Error while reading the config values", e); + } + } + + public static ConfigFetcher getInstance() throws TransactionCounterConfigurationException { + if (instance == null) { + instance = new APIMConfigFetcher(); + } + return instance; + } + + @Override + public String getConfigValue(String key) { + return configMap.get(key).toString(); + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/ConfigFetcher.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/ConfigFetcher.java new file mode 100644 index 000000000000..8522b2347549 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/ConfigFetcher.java @@ -0,0 +1,6 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.config; + +public interface ConfigFetcher { + String getConfigValue(String key); + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/MIConfigFetcher.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/MIConfigFetcher.java new file mode 100644 index 000000000000..2c52dcd260b2 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/MIConfigFetcher.java @@ -0,0 +1,27 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.config; + +import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterConfigurationException; + +import java.util.HashMap; + +public class MIConfigFetcher implements ConfigFetcher { + + private static MIConfigFetcher instance = null; + private static HashMap configMap = new HashMap<>(); + + private MIConfigFetcher() throws TransactionCounterConfigurationException { + // To be implemented + } + + public static MIConfigFetcher getInstance() throws TransactionCounterConfigurationException { + if(instance == null) { + instance = new MIConfigFetcher(); + } + return instance; + } + + @Override + public String getConfigValue(String key) { + return null; + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/TransactionCounterConfig.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/TransactionCounterConfig.java new file mode 100644 index 000000000000..977e148f72bd --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/config/TransactionCounterConfig.java @@ -0,0 +1,92 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.config; + +import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionCounterConstants; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterConfigurationException; + +public class TransactionCounterConfig { + + private static ConfigFetcher configFetcher; + private static TransactionCounterConstants.ServerType serverType; + + public static void init() throws TransactionCounterConfigurationException { + try { + // Check whether the APIM Config class is available + Class.forName(TransactionCounterConstants.APIM_CONFIG_CLASS); + configFetcher = APIMConfigFetcher.getInstance(); + serverType = TransactionCounterConstants.ServerType.GATEWAY; + } catch (ClassNotFoundException e) { + try { + // Check whether the MI Config class is available + Class.forName(TransactionCounterConstants.MI_CONFIG_CLASS); + configFetcher = MIConfigFetcher.getInstance(); + serverType = TransactionCounterConstants.ServerType.MI; + } catch (ClassNotFoundException ex) { + throw new TransactionCounterConfigurationException(ex); + } + } + } + + public static TransactionCounterConstants.ServerType getServerType() { + return serverType; + } + + public static String getServerID() { + return configFetcher.getConfigValue(TransactionCounterConstants.SERVER_ID); + } + + public static String getTransactionCountStoreClass() { + return configFetcher.getConfigValue(TransactionCounterConstants.TRANSACTION_COUNT_STORE_CLASS); + } + + public static int getProducerThreadPoolSize() { + return Integer.parseInt( + configFetcher.getConfigValue(TransactionCounterConstants.PRODUCER_THREAD_POOL_SIZE)); + } + + public static double getMaxTransactionCount() { + return Double.parseDouble( + configFetcher.getConfigValue(TransactionCounterConstants.MAX_TRANSACTION_COUNT)); + } + + public static double getMinTransactionCount() { + return Double.parseDouble( + configFetcher.getConfigValue(TransactionCounterConstants.MIN_TRANSACTION_COUNT)); + } + + public static int getTransactionCountRecordInterval() { + return Integer.parseInt( + configFetcher.getConfigValue(TransactionCounterConstants.TRANSACTION_COUNT_RECORD_INTERVAL)); + } + + public static int getMaxRetryCount() { + return Integer.parseInt( + configFetcher.getConfigValue(TransactionCounterConstants.MAX_RETRY_COUNT)); + } + + public static int getMaxTransactionRecordsPerCommit() { + return Integer.parseInt( + configFetcher.getConfigValue(TransactionCounterConstants.MAX_TRANSACTION_RECORDS_PER_COMMIT)); + } + + public static int getTransactionRecordQueueSize() { + return Integer.parseInt( + configFetcher.getConfigValue(TransactionCounterConstants.TRANSACTION_RECORD_QUEUE_SIZE)); + } + + public static String getTransactionCountService() { + return configFetcher.getConfigValue(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE); + } + + public static String getTransactionCountServiceUsername() { + return configFetcher.getConfigValue(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE_USERNAME); + } + + public static String getTransactionCountServicePassword() { + return configFetcher.getConfigValue(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE_PASSWORD); + } + + public static int getConsumerCommitInterval() { + return Integer.parseInt( + configFetcher.getConfigValue(TransactionCounterConstants.CONSUMER_COMMIT_INTERVAL)); + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/consumer/TransactionRecordConsumer.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/consumer/TransactionRecordConsumer.java new file mode 100644 index 000000000000..e14594613043 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/consumer/TransactionRecordConsumer.java @@ -0,0 +1,68 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.consumer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.config.TransactionCounterConfig; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.record.TransactionRecord; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore; + +import java.util.ArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TransactionRecordConsumer { + + private int MAX_RETRY_COUNT; + private int MAX_TRANSACTION_RECORDS_PER_COMMIT; + private static TransactionRecordConsumer instance = null; + private TransactionRecordStore transactionRecordStore; + private TransactionRecordQueue transactionRecordQueue; + private ScheduledExecutorService scheduledExecutorService; + + private TransactionRecordConsumer() {} + + public static TransactionRecordConsumer getInstance() { + if(instance == null) { + instance = new TransactionRecordConsumer(); + } + return instance; + } + + public void init(TransactionRecordStore transactionRecordStore, TransactionRecordQueue transactionRecordQueue, + int commitInterval, int maxRetryCount, int maxTransactionRecordsPerCommit) { + + MAX_RETRY_COUNT = maxRetryCount; + MAX_TRANSACTION_RECORDS_PER_COMMIT = maxTransactionRecordsPerCommit; + + this.transactionRecordStore = transactionRecordStore; + this.transactionRecordQueue = transactionRecordQueue; + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + scheduledExecutorService.scheduleAtFixedRate(this::commitWithRetries, + 0, commitInterval, TimeUnit.SECONDS); + } + + private void commitWithRetries() { + // Drain the transaction count records from the queue + ArrayList transactionRecordList = new ArrayList<>(); + transactionRecordQueue.drain(transactionRecordList, MAX_TRANSACTION_RECORDS_PER_COMMIT); + + if(transactionRecordList.isEmpty()) { + return; + } + + // Committing the transaction count records to the store with retries + // If failed to commit after MAX_RETRY_COUNT, the transaction count records will be added to the queue again + boolean commited = this.transactionRecordStore.commit(transactionRecordList, MAX_RETRY_COUNT); + if (!commited) { + transactionRecordQueue.addAll(transactionRecordList); + } + } + + public void shutdown() { + this.scheduledExecutorService.shutdownNow(); + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/exception/TransactionCounterConfigurationException.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/exception/TransactionCounterConfigurationException.java new file mode 100644 index 000000000000..b78f30c3ae9c --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/exception/TransactionCounterConfigurationException.java @@ -0,0 +1,19 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.exception; + +public class TransactionCounterConfigurationException extends Exception { + + public TransactionCounterConfigurationException() { + super("Error while reading configuration"); + } + + public TransactionCounterConfigurationException(Exception e) { + super("Error while reading configuration", e); + } + public TransactionCounterConfigurationException(String msg) { + super(msg); + } + + public TransactionCounterConfigurationException(String msg, Exception e) { + super(msg, e); + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/producer/TransactionRecordProducer.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/producer/TransactionRecordProducer.java new file mode 100644 index 000000000000..4eec3accb269 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/producer/TransactionRecordProducer.java @@ -0,0 +1,91 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.producer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.record.TransactionRecord; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class TransactionRecordProducer { + + private static double MAX_TRANSACTION_COUNT; + private static double MIN_TRANSACTION_COUNT; + private static final Log LOG = LogFactory.getLog(TransactionRecordProducer.class); + private static TransactionRecordProducer instance = null; + private TransactionRecordQueue transactionRecordQueue; + private ExecutorService executorService; + private ScheduledExecutorService scheduledExecutorService; + private static final ReentrantLock lock = new ReentrantLock(); + private static final AtomicInteger transactionCount = new AtomicInteger(0); + + private TransactionRecordProducer() {} + + public static TransactionRecordProducer getInstance() { + if(instance == null) { + instance = new TransactionRecordProducer(); + } + return instance; + } + + public void init(TransactionRecordQueue transactionRecordQueue, int threadPoolSize, double maxTransactionCount, + double minTransactionCount, int transactionCountRecordInterval) { + + MAX_TRANSACTION_COUNT = maxTransactionCount; + MIN_TRANSACTION_COUNT = minTransactionCount; + + this.transactionRecordQueue = transactionRecordQueue; + this.executorService = Executors.newFixedThreadPool(threadPoolSize); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + scheduledExecutorService.scheduleAtFixedRate(this::produceRecordScheduled, 0, + transactionCountRecordInterval, TimeUnit.SECONDS); + } + + public void addTransaction(int tCount) { + executorService.execute(() -> this.produceRecord(tCount)); + } + + private void produceRecord(int tCount) { + lock.lock(); + try { + int count = transactionCount.addAndGet(tCount); + if (count >= MAX_TRANSACTION_COUNT) { + TransactionRecord transactionRecord = new TransactionRecord(transactionCount.get()); + transactionRecordQueue.add(transactionRecord); + transactionCount.set(0); + } + } catch (Exception e) { + LOG.error("Error while handling transaction count.", e); + } finally { + lock.unlock(); + } + } + + private void produceRecordScheduled() { + lock.lock(); + try { + int transactionCountValue = transactionCount.get(); + if (transactionCountValue >= MIN_TRANSACTION_COUNT) { + TransactionRecord transactionRecord = new TransactionRecord(transactionCountValue); + transactionRecordQueue.add(transactionRecord); + transactionCount.set(0); + } + } catch (Exception e) { + LOG.error("Error while handling transaction count.", e); + } finally { + lock.unlock(); + } + } + + public void shutdown() { + scheduledExecutorService.shutdownNow(); + executorService.shutdownNow(); + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java new file mode 100644 index 000000000000..95ed50308ade --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/queue/TransactionRecordQueue.java @@ -0,0 +1,46 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.record.TransactionRecord; + +import java.util.ArrayList; +import java.util.concurrent.ArrayBlockingQueue; + +public class TransactionRecordQueue { + + private static TransactionRecordQueue instance = null; + private static ArrayBlockingQueue transactionRecordQueue; + + private TransactionRecordQueue() {} + + public static TransactionRecordQueue getInstance() { + if(instance == null) { + instance = new TransactionRecordQueue(); + } + return instance; + } + + public void init(int size) { + transactionRecordQueue = new ArrayBlockingQueue<>(size); + } + + public void add(TransactionRecord transactionRecord) { + transactionRecordQueue.add(transactionRecord); + } + public void addAll(ArrayList transactionRecordList) { + transactionRecordQueue.addAll(transactionRecordList); + } + + public TransactionRecord take() throws InterruptedException { + return transactionRecordQueue.take(); + } + + public void drain(ArrayList transactionRecordList, int maxRecords) { + transactionRecordQueue.drainTo(transactionRecordList, maxRecords); + } + + public void clenUp() { + transactionRecordQueue.clear(); + } +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/record/TransactionRecord.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/record/TransactionRecord.java new file mode 100644 index 000000000000..7bb1bc55d038 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/record/TransactionRecord.java @@ -0,0 +1,69 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.record; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.sql.Timestamp; +import java.util.UUID; + +public class TransactionRecord { + private static String localhost; + private static String server; + private static String type; + + static { + try { + localhost = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + localhost = "Unknown"; + } + } + + private String id; + private String serverType; + private String host; + private String serverID; + private Integer count; + private String recordedTime; + + public TransactionRecord(Integer count) { + this.id = UUID.randomUUID().toString(); + this.host = localhost; + this.serverID = server; + this.serverType = type; + this.count = count; + this.recordedTime = new Timestamp(System.currentTimeMillis()).toString(); + } + + public static void init(String serverID, String serverType) { + try { + localhost = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + localhost = "Unknown"; + } + TransactionRecord.server = serverID; + TransactionRecord.type = serverType; + } + + public String getId() { + return id; + } + public String getHost() { + return host; + } + public String getServerID() { + return serverID; + } + public String getServerType() { + return serverType; + } + public void setCount(Integer count) { + this.count = count; + } + public Integer getCount() { + return count; + } + public String getRecordedTime() { + return recordedTime; + } + +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStore.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStore.java new file mode 100644 index 000000000000..284a87a88381 --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStore.java @@ -0,0 +1,11 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.store; + +import org.wso2.carbon.apimgt.gateway.handlers.transaction.record.TransactionRecord; + +import java.util.ArrayList; + +public interface TransactionRecordStore { + void init(String endpoint, String username, String password); + boolean commit(ArrayList transactionRecordList, int retryCount); + void clenUp(); +} diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java new file mode 100644 index 000000000000..e4c93e91878c --- /dev/null +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/handlers/transaction/store/TransactionRecordStoreImpl.java @@ -0,0 +1,94 @@ +package org.wso2.carbon.apimgt.gateway.handlers.transaction.store; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.config.TransactionCounterConfig; +import org.wso2.carbon.apimgt.gateway.handlers.transaction.record.TransactionRecord; +import org.wso2.carbon.apimgt.impl.utils.APIUtil; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; + +public class TransactionRecordStoreImpl implements TransactionRecordStore { + + private static final Log LOG = LogFactory.getLog(TransactionRecordStoreImpl.class); + private static HttpClient httpClient; + private static String ENDPOINT; + private static String encodedCredentials; + + @Override + public void init(String endpoint, String username, String password) { + + ENDPOINT = endpoint; + String credentials = username + ":" + password; + encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8)); + + URL url = null; + try { + url = new URL(endpoint); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + httpClient = APIUtil.getHttpClient(url.getPort(), url.getProtocol()); + } + + @Override + public boolean commit(ArrayList transactionRecordList, int maxRetryCount) { + + Gson gson = new Gson(); + String jsonPayload = gson.toJson(transactionRecordList); + + HttpPost httpPost = new HttpPost(ENDPOINT); + HttpEntity stringEntity = new StringEntity(jsonPayload , ContentType.APPLICATION_JSON); + httpPost.setEntity(stringEntity); + httpPost.setHeader("Accept", "application/json"); + httpPost.setHeader("Content-type", "application/json"); + httpPost.setHeader("Authorization", "Basic " + encodedCredentials); + + int retryCount = 0; + boolean retry; + do { + try { + retry = false; + HttpResponse result = httpClient.execute(httpPost); + int statusCode = result.getStatusLine().getStatusCode(); + if (statusCode < 200 || statusCode >= 300) { + throw new IOException("Status Code: " + statusCode); + } + } catch (IOException ex) { + retryCount++; + if (retryCount < maxRetryCount) { + retry = true; + LOG.warn("Failed to persist transaction count records to remote endpoint. Retrying after 1s"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.error("Could not persist following transaction count records: " + jsonPayload, e); + } + } else { + LOG.error("Could not persist transaction count records. Added back to the queue. Error: " + + ex.getMessage(), ex); + return false; + } + } + } while (retry); + + return true; + } + + @Override + public void clenUp() { + // To be implemented + } +}