Skip to content

Commit

Permalink
Revamp configuration handling mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
IsuruMaduranga committed Aug 30, 2023
1 parent f94011f commit c1ce2d1
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,32 +188,5 @@ public class APIMgtGatewayConstants {
//This will be a reserved name for the synapse message context properties.
public static final String ADDITIONAL_ANALYTICS_PROPS = "ADDITIONAL_ANALYTICS_PROPS_TO_PUBLISH";

/**
* Constants for transaction counting
*/
public static final String IS_THERE_ASSOCIATED_INCOMING_REQUEST = "is_there_incoming_request";
public static final String TRANSPORT_WS = "ws";
public static final String TRANSPORT_WSS = "wss";
public static final String TRANSACTION_COUNTER_CONFIG_ROOT = "APIGateway.TransactionCounter";
public static final String TRANSACTION_COUNTER_PRODUCER_THREAD_POOL_SIZE = TRANSACTION_COUNTER_CONFIG_ROOT +
".ProducerThreadPoolSize";
public static final String TRANSACTION_COUNTER_QUEUE_SIZE = TRANSACTION_COUNTER_CONFIG_ROOT + ".QueueSize";
public static final String TRANSACTION_COUNTER_STORE_CLASS = TRANSACTION_COUNTER_CONFIG_ROOT + ".StoreClass";
public static final String TRANSACTION_COUNTER_MAX_TRANSACTION_COUNT = TRANSACTION_COUNTER_CONFIG_ROOT +
".MaxTransactionCount";
public static final String TRANSACTION_COUNTER_RECORD_INTERVAL = TRANSACTION_COUNTER_CONFIG_ROOT
+ ".ProducerScheduledInterval";
public static final String TRANSACTION_COUNTER_MAX_RETRY_COUNT = TRANSACTION_COUNTER_CONFIG_ROOT + ".MaxRetryCount";
public static final String TRANSACTION_COUNTER_MIN_RETRY_COUNT = TRANSACTION_COUNTER_CONFIG_ROOT + ".MinRetryCount";
public static final String TRANSACTION_COUNTER_MAX_TRANSACTION_RECORDS_PER_COMMIT = TRANSACTION_COUNTER_CONFIG_ROOT
+ ".MaxBatchSize";
public static final String TRANSACTION_COUNTER_CONSUMER_COMMIT_INTERVAL = TRANSACTION_COUNTER_CONFIG_ROOT
+ ".PublisherScheduledInterval";
public static final String TRANSACTION_COUNTER_SERVER_ID = TRANSACTION_COUNTER_CONFIG_ROOT + ".ServerID";
public static final String TRANSACTION_COUNTER_SERVICE = TRANSACTION_COUNTER_CONFIG_ROOT + ".ServiceURL";
public static final String TRANSACTION_COUNTER_SERVICE_USERNAME = TRANSACTION_COUNTER_CONFIG_ROOT
+ ".ServiceUsername";
public static final String TRANSACTION_COUNTER_SERVICE_PASSWORD = TRANSACTION_COUNTER_CONFIG_ROOT
+ ".ServicePassword";
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import org.apache.synapse.AbstractExtendedSynapseHandler;
import org.apache.synapse.MessageContext;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.consumer.TransactionRecordConsumer;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterInitializationException;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.producer.TransactionRecordProducer;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.queue.TransactionRecordQueue;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.store.TransactionRecordStore;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.util.TransactionCountConfig;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.config.TransactionCounterConfig;

import java.lang.reflect.Constructor;
import java.util.concurrent.*;
Expand All @@ -30,11 +30,17 @@ public class TransactionCountHandler extends AbstractExtendedSynapseHandler {

public TransactionCountHandler() {

try {
TransactionCounterConfig.init();
} catch (TransactionCounterInitializationException e) {
throw new RuntimeException(e);
}

// Obtain config values
PRODUCER_THREAD_POOL_SIZE = TransactionCountConfig.getProducerThreadPoolSize();
CONSUMER_COMMIT_INTERVAL = TransactionCountConfig.getConsumerCommitInterval();
TRANSACTION_RECORD_QUEUE_SIZE = TransactionCountConfig.getTransactionRecordQueueSize();
TRANSACTION_COUNT_STORE_CLASS = TransactionCountConfig.getTransactionCountStoreClass();
PRODUCER_THREAD_POOL_SIZE = TransactionCounterConfig.getProducerThreadPoolSize();
CONSUMER_COMMIT_INTERVAL = TransactionCounterConfig.getConsumerCommitInterval();
TRANSACTION_RECORD_QUEUE_SIZE = TransactionCounterConfig.getTransactionRecordQueueSize();
TRANSACTION_COUNT_STORE_CLASS = TransactionCounterConfig.getTransactionCountStoreClass();

this.transactionRecordQueue = TransactionRecordQueue.getInstance(TRANSACTION_RECORD_QUEUE_SIZE);
// Load the transaction count store
Expand Down Expand Up @@ -62,13 +68,14 @@ public boolean handleRequestInFlow(MessageContext messageContext) {
((Axis2MessageContext) messageContext).getAxis2MessageContext();

// Setting this property to identify request-response pairs
messageContext.setProperty(APIMgtGatewayConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST, true);
messageContext.setProperty(TransactionCounterConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST, true);

LOG.info("Recieved an incoming request");

// Counting message received via an open WebSocket
String transport = axis2MessageContext.getIncomingTransportName();
if (transport.equals(APIMgtGatewayConstants.TRANSPORT_WS) || transport.equals(APIMgtGatewayConstants.TRANSPORT_WSS)){
if (transport.equals(TransactionCounterConstants.TRANSPORT_WS) ||
transport.equals(TransactionCounterConstants.TRANSPORT_WSS)){
LOG.info("Counting WebSocket message");
this.transactionRecordProducer.addTransaction();
}
Expand All @@ -82,7 +89,7 @@ public boolean handleRequestInFlow(MessageContext messageContext) {
public boolean handleRequestOutFlow(MessageContext messageContext) {
try {
Object isThereAnAssociatedIncomingRequest = messageContext.getProperty(
APIMgtGatewayConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST);
TransactionCounterConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST);

// Counting outgoing messages that are not related to any request-response pair
if (isThereAnAssociatedIncomingRequest == null) {
Expand All @@ -103,7 +110,7 @@ public boolean handleResponseInFlow(MessageContext messageContext) {
@Override
public boolean handleResponseOutFlow(MessageContext messageContext) {
Object isThereAnAssociatedIncomingRequest = messageContext.getProperty(
APIMgtGatewayConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST);
TransactionCounterConstants.IS_THERE_ASSOCIATED_INCOMING_REQUEST);

// Counting request-response pairs
if (isThereAnAssociatedIncomingRequest instanceof Boolean) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction;

public class TransactionCounterConstants {

public static final String IS_THERE_ASSOCIATED_INCOMING_REQUEST = "is_there_incoming_request";
public static final String TRANSPORT_WS = "ws";
public static final String TRANSPORT_WSS = "wss";

public static final String SERVER_ID = "serverId";
public static final String TRANSACTION_COUNT_STORE_CLASS = "transactionCountStoreClass";
public static final String TRANSACTION_RECORD_QUEUE_SIZE = "transactionRecordQueueSize";
public static final String PRODUCER_THREAD_POOL_SIZE = "producerThreadPoolSize";
public static final String TRANSACTION_COUNT_RECORD_INTERVAL = "transactionCountRecordInterval";
public static final String MAX_TRANSACTION_COUNT = "maxTransactionCount";
public static final String CONSUMER_COMMIT_INTERVAL = "consumerCommitInterval";
public static final String MAX_TRANSACTION_RECORDS_PER_COMMIT = "maxTransactionRecordsPerCommit";
public static final String MAX_RETRY_COUNT = "maxRetryCount";
public static final String TRANSACTION_COUNT_SERVICE = "transactionCountService";
public static final String TRANSACTION_COUNT_SERVICE_USERNAME = "transactionCountServiceUsername";
public static final String TRANSACTION_COUNT_SERVICE_PASSWORD = "transactionCountServicePassword";

// APIM Gateway related constants
public static final String APIM_CONFIG_CLASS = "org.wso2.carbon.apimgt.impl.internal.ServiceReferenceHolder";
public static final String GATEWAY_CONFIG_ROOT = "APIGateway.TransactionCounter";
public static final String GATEWAY_PRODUCER_THREAD_POOL_SIZE = GATEWAY_CONFIG_ROOT +
".ProducerThreadPoolSize";
public static final String GATEWAY_QUEUE_SIZE = GATEWAY_CONFIG_ROOT + ".QueueSize";
public static final String GATEWAY_STORE_CLASS = GATEWAY_CONFIG_ROOT + ".StoreClass";
public static final String GATEWAY_MAX_TRANSACTION_COUNT = GATEWAY_CONFIG_ROOT +
".MaxTransactionCount";
public static final String GATEWAY_RECORD_INTERVAL = GATEWAY_CONFIG_ROOT
+ ".ProducerScheduledInterval";
public static final String GATEWAY_MAX_RETRY_COUNT = GATEWAY_CONFIG_ROOT + ".MaxRetryCount";
public static final String GATEWAY_MAX_TRANSACTION_RECORDS_PER_COMMIT = GATEWAY_CONFIG_ROOT
+ ".MaxBatchSize";
public static final String GATEWAY_CONSUMER_COMMIT_INTERVAL = GATEWAY_CONFIG_ROOT
+ ".PublisherScheduledInterval";
public static final String GATEWAY_SERVER_ID = GATEWAY_CONFIG_ROOT + ".ServerID";
public static final String GATEWAY_SERVICE = GATEWAY_CONFIG_ROOT + ".ServiceURL";
public static final String GATEWAY_SERVICE_USERNAME = GATEWAY_CONFIG_ROOT
+ ".ServiceUsername";
public static final String GATEWAY_SERVICE_PASSWORD = GATEWAY_CONFIG_ROOT
+ ".ServicePassword";

// MI related constants
public static final String MI_CONFIG_CLASS = "org.wso2.config.mapper.ConfigParser";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction.config;

import org.wso2.carbon.apimgt.gateway.handlers.transaction.TransactionCounterConstants;
import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterInitializationException;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Objects;

public class APIMConfigFetcher implements ConfigFetcher {

private static APIMConfigFetcher instance = null;
private static HashMap<String, Object> configMap = new HashMap<>();

private APIMConfigFetcher() throws TransactionCounterInitializationException {
try {
Class<?> configClass = Class.forName(TransactionCounterConstants.APIM_CONFIG_CLASS);

Object serviceReferenceHolder = configClass.getMethod("getInstance").invoke(null);
Object apiManagerConfigurationService = configClass.getMethod("getAPIManagerConfigurationService")
.invoke(serviceReferenceHolder);
Object apiManagerConfiguration = apiManagerConfigurationService.getClass()
.getMethod("getAPIManagerConfiguration").invoke(apiManagerConfigurationService);
Method getFirstProperty = apiManagerConfiguration.getClass().getMethod("getFirstProperty",
String.class);

// Reading the config values
String temp;

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_SERVER_ID);
String SERVER_ID = Objects.requireNonNull( temp, "Server ID cannot be null");

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_STORE_CLASS);
String TRANSACTION_COUNT_STORE_CLASS = Objects.requireNonNull(
temp, "Transaction count store class cannot be null");

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_QUEUE_SIZE);
temp = Objects.requireNonNull(temp, "Transaction record queue size cannot be null");
Integer TRANSACTION_RECORD_QUEUE_SIZE = Integer.parseInt(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_PRODUCER_THREAD_POOL_SIZE);
temp = Objects.requireNonNull(temp, "Producer thread pool size cannot be null");
Integer PRODUCER_THREAD_POOL_SIZE = Integer.parseInt(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_RECORD_INTERVAL);
temp = Objects.requireNonNull(temp, "Transaction count record interval cannot be null");
Integer TRANSACTION_COUNT_RECORD_INTERVAL = Integer.parseInt(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_MAX_TRANSACTION_COUNT);
temp = Objects.requireNonNull(temp, "Max transaction count cannot be null");
Double MAX_TRANSACTION_COUNT = Double.parseDouble(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_CONSUMER_COMMIT_INTERVAL);
temp = Objects.requireNonNull(temp, "Consumer commit interval cannot be null");
Integer CONSUMER_COMMIT_INTERVAL = Integer.parseInt(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_MAX_TRANSACTION_RECORDS_PER_COMMIT);
temp = Objects.requireNonNull(temp, "Max transaction records per commit cannot be null");
Integer MAX_TRANSACTION_RECORDS_PER_COMMIT = Integer.parseInt(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_MAX_RETRY_COUNT);
temp = Objects.requireNonNull(temp, "Max retry count cannot be null");
Integer MAX_RETRY_COUNT = Integer.parseInt(temp);

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_SERVICE);
String TRANSACTION_COUNT_SERVICE = Objects.requireNonNull(temp,
"Transaction count service cannot be null");

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_SERVICE_USERNAME);
String TRANSACTION_COUNT_SERVICE_USERNAME = Objects.requireNonNull(temp,
"Transaction count service username cannot be null");

temp = (String) getFirstProperty.invoke(apiManagerConfiguration,
TransactionCounterConstants.GATEWAY_SERVICE_PASSWORD);
String TRANSACTION_COUNT_SERVICE_PASSWORD = Objects.requireNonNull(temp,
"Transaction count service password cannot be null");

configMap.put(TransactionCounterConstants.SERVER_ID, SERVER_ID);
configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_STORE_CLASS, TRANSACTION_COUNT_STORE_CLASS);
configMap.put(TransactionCounterConstants.TRANSACTION_RECORD_QUEUE_SIZE, TRANSACTION_RECORD_QUEUE_SIZE);
configMap.put(TransactionCounterConstants.PRODUCER_THREAD_POOL_SIZE, PRODUCER_THREAD_POOL_SIZE);
configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_RECORD_INTERVAL , TRANSACTION_COUNT_RECORD_INTERVAL);
configMap.put(TransactionCounterConstants.MAX_TRANSACTION_COUNT, MAX_TRANSACTION_COUNT);
configMap.put(TransactionCounterConstants.CONSUMER_COMMIT_INTERVAL, CONSUMER_COMMIT_INTERVAL);
configMap.put(TransactionCounterConstants.MAX_TRANSACTION_RECORDS_PER_COMMIT, MAX_TRANSACTION_RECORDS_PER_COMMIT);
configMap.put(TransactionCounterConstants.MAX_RETRY_COUNT, MAX_RETRY_COUNT);
configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE, TRANSACTION_COUNT_SERVICE);
configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE_USERNAME, TRANSACTION_COUNT_SERVICE_USERNAME);
configMap.put(TransactionCounterConstants.TRANSACTION_COUNT_SERVICE_PASSWORD, TRANSACTION_COUNT_SERVICE_PASSWORD);

} catch (ClassNotFoundException e) {
// This error won't be thrown here because it is already checked in TransactionCountConfig
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new TransactionCounterInitializationException();
} catch (NullPointerException | NumberFormatException e) {
throw new TransactionCounterInitializationException("Error while reading the config values", e);
}
}

public static ConfigFetcher getInstance() throws TransactionCounterInitializationException {
if (instance == null) {
instance = new APIMConfigFetcher();
}
return instance;
}

@Override
public String getConfigValue(String key) {
return configMap.get(key).toString();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction.config;

public interface ConfigFetcher {
String getConfigValue(String key);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.wso2.carbon.apimgt.gateway.handlers.transaction.config;

import org.wso2.carbon.apimgt.gateway.handlers.transaction.exception.TransactionCounterInitializationException;

import java.util.HashMap;

public class MIConfigFetcher implements ConfigFetcher {

private static MIConfigFetcher instance = null;
private static HashMap<String, Object> configMap = new HashMap<>();

private MIConfigFetcher() throws TransactionCounterInitializationException {
// To be implemented
}

public static MIConfigFetcher getInstance() throws TransactionCounterInitializationException{
if(instance == null) {
instance = new MIConfigFetcher();
}
return instance;
}

@Override
public String getConfigValue(String key) {
return null;
}
}
Loading

0 comments on commit c1ce2d1

Please sign in to comment.