Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication channel transparency #60

Open
wants to merge 25 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8c3e109
Channel transparency, base implementation.
altafhusen-mr May 3, 2020
763d5c9
Channel transparency, base implementation.
altafhusen-mr May 9, 2020
f5bc5c1
Channel transparency, base implementation.
altafhusen-mr May 10, 2020
580fc27
Channel transparency, base implementation.
altafhusen-mr May 11, 2020
c92f462
Channel transparency : DirectChannel implementation
altafhusen-mr May 17, 2020
d61f495
Channel transparency : DirectChannel implementation
sourabhpoddar404 May 18, 2020
027807e
Package refactotization
altafhusen-mr May 20, 2020
56564b4
Changes to SenderReceiverFactory
sourabhpoddar404 May 20, 2020
fe47b1a
Including ExecutorService for DirectChannel and ReadByteChannel, othe…
altafhusen-mr May 24, 2020
595aa23
Fix for Tests failing due to data over writing during DirectSenderImpl
altafhusen-mr May 24, 2020
c2adfb6
changes to AbstractEvaluationStorage component
sourabhpoddar404 May 25, 2020
62e88f9
Changes to AbstractEvaluationStorage
sourabhpoddar404 May 25, 2020
feed030
DataGereratorTest fix for multiple test runs, TaskGeneratorTest changes,
altafhusen-mr May 31, 2020
6092dc0
Implementing RabbitMQChannel, DataGenerator test working implementation
altafhusen-mr Jun 7, 2020
1b6069a
ContainerCreationNoCorrelationTest and ContainerCreationTest for
altafhusen-mr Jun 7, 2020
5ebdaf3
Remove comment from catch block
altafhusen-mr Jun 7, 2020
0416688
BenchmarkControllingTest for RabbitMQChannel
altafhusen-mr Jun 9, 2020
21c4c40
Fixing All the tests except SequencingTaskGeneratorTest for
altafhusen-mr Jun 21, 2020
ec5cba3
Fix for SequencingTaskGeneratorTest for RabbitMQChannel
altafhusen-mr Jun 21, 2020
36ace5f
Code documentation
altafhusen-mr Jun 21, 2020
6523996
Merge branch 'develop' of https://github.com/hobbit-project/core into…
altafhusen-mr Jun 24, 2020
89d32df
Revert test cases changes and resolve conflict
altafhusen-mr Jun 24, 2020
8a8c30f
Moving classes to specified packages, code format, unnecessary exception
altafhusen-mr Jun 28, 2020
3cb1680
Added Javadocs for the components.
melissadas Jul 5, 2020
585f811
Rename CommonChannel to Channel as per the feedback
altafhusen-mr Jul 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/org/hobbit/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private Constants() {
}

// =============== ENVIRONMENT CONSTANTS ===============
public static final String IS_RABBIT_MQ_ENABLED = "IS_RABBIT_MQ_ENABLED";

public static final String HOBBIT_SESSION_ID_KEY = "HOBBIT_SESSION_ID";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.concurrent.Future;
import java.util.stream.Stream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand All @@ -37,6 +39,7 @@
import org.apache.commons.io.IOUtils;
import org.hobbit.core.Commands;
import org.hobbit.core.Constants;
import org.hobbit.core.components.channel.DirectCallback;
import org.hobbit.core.data.StartCommandData;
import org.hobbit.core.data.StopCommandData;
import org.hobbit.core.rabbit.RabbitMQUtils;
Expand Down Expand Up @@ -79,7 +82,7 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen
* Consumer of the queue that is used to receive responses for messages that
* are sent via the command queue and for which an answer is expected.
*/
private Consumer responseConsumer = null;
private Object responseConsumer = null;
/**
* Factory for generating queues with which the commands are sent and
* received. It is separated from the data connections since otherwise the
Expand All @@ -90,7 +93,7 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen
/**
* Channel that is used for the command queue.
*/
protected Channel cmdChannel = null;
//protected Channel cmdChannel = null;
/**
* Default type of containers created by this container
*/
Expand All @@ -110,7 +113,11 @@ public abstract class AbstractCommandReceivingComponent extends AbstractComponen

private ExecutorService cmdThreadPool;

public AbstractCommandReceivingComponent() {
public ExecutorService getCmdThreadPool() {
return cmdThreadPool;
}

public AbstractCommandReceivingComponent() {
this(false);
}

Expand All @@ -129,36 +136,20 @@ public void init() throws Exception {
super.init();
addCommandHeaderId(getHobbitSessionId());

cmdQueueFactory = new RabbitQueueFactoryImpl(createConnection());
cmdChannel = cmdQueueFactory.getConnection().createChannel();
String queueName = cmdChannel.queueDeclare().getQueue();
cmdChannel.exchangeDeclare(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "fanout", false, true, null);
cmdChannel.queueBind(queueName, Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "");

Consumer consumer = new DefaultConsumer(cmdChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
cmdThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
handleCmd(body, properties);
} catch (Exception e) {
LOGGER.error("Exception while trying to handle incoming command.", e);
}
}
});
}
};
cmdChannel.basicConsume(queueName, true, consumer);
commonChannel.createChannel();
String queueName = commonChannel.declareQueue(null) == null ? Constants.HOBBIT_COMMAND_EXCHANGE_NAME : commonChannel.getQueueName(this);
commonChannel.exchangeDeclare(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "fanout", false, true, null);
commonChannel.queueBind(queueName, Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "");
Object consumerCallback = getCommonConsumer();
commonChannel.readBytes(consumerCallback, this, true, queueName);

containerName = EnvVariables.getString(Constants.CONTAINER_NAME_KEY, containerName);
if (containerName == null) {
LOGGER.info("Couldn't get the id of this Docker container. Won't be able to create containers.");
}
}


/**
* Sends the given command to the command queue.
*
Expand Down Expand Up @@ -215,7 +206,8 @@ protected void sendToCmdQueue(byte command, byte data[], BasicProperties props)
if (attachData) {
buffer.put(data);
}
cmdChannel.basicPublish(Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "", props, buffer.array());

commonChannel.writeBytes(buffer, Constants.HOBBIT_COMMAND_EXCHANGE_NAME, "", props);
}

/**
Expand All @@ -239,7 +231,8 @@ protected void addCommandHeaderId(String sessionId) {
* properties of the RabbitMQ message
*/
protected void handleCmd(byte bytes[], AMQP.BasicProperties props) {
handleCmd(bytes, props.getReplyTo());
String replyTo = props!=null?props.getReplyTo():"";
handleCmd(bytes, replyTo);
}

/**
Expand All @@ -251,7 +244,7 @@ protected void handleCmd(byte bytes[], AMQP.BasicProperties props) {
* @param replyTo
* name of the queue in which response is expected
*/
protected void handleCmd(byte bytes[], String replyTo) {
public void handleCmd(byte bytes[], String replyTo) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
String sessionId = RabbitMQUtils.readString(buffer);
if (acceptedCmdHeaderIds.contains(sessionId)) {
Expand Down Expand Up @@ -486,43 +479,21 @@ protected void stopContainer(String containerName) {
*/
private void initResponseQueue() throws IOException {
if (responseQueueName == null) {
responseQueueName = cmdChannel.queueDeclare().getQueue();
try {
commonChannel.createChannel();
responseQueueName = commonChannel.declareQueue(null);//cmdChannel.queueDeclare().getQueue();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (responseConsumer == null) {
responseConsumer = new DefaultConsumer(cmdChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String key = properties.getCorrelationId();

synchronized (responseFutures) {
SettableFuture<String> future = null;
if (key != null) {
future = responseFutures.remove(key);
if (future == null) {
LOGGER.error("Received a message with correlationId ({}) not in map ({})", key, responseFutures.keySet());
}
} else {
LOGGER.warn("Received a message with null correlationId. This is an error unless the other component uses an older version of HOBBIT core library.");
Iterator<SettableFuture<String>> iter = responseFutures.values().iterator();
if (iter.hasNext()) {
LOGGER.info("Correlating with the eldest request as a workaround.");
future = iter.next();
iter.remove();
} else {
LOGGER.error("There are no pending requests.");
}
}
responseConsumer = getResponseConsumer();
//byte[] bytes = commonChannel.readBytes(this);

if (future != null) {
String value = RabbitMQUtils.readString(body);
future.set(value);
}
}
}
};
//cmdChannel.basicConsume(responseQueueName, responseConsumer);
commonChannel.readBytes(responseConsumer, this, null, responseQueueName);

cmdChannel.basicConsume(responseQueueName, responseConsumer);
}
}

Expand All @@ -542,17 +513,167 @@ public void setCmdResponseTimeout(long cmdResponseTimeout) {

@Override
public void close() throws IOException {
if (cmdChannel != null) {
/*if (cmdChannel != null) {
try {
cmdChannel.close();
} catch (Exception e) {
}
}
}*/
IOUtils.closeQuietly(cmdQueueFactory);
if (cmdThreadPool != null) {
/*if (cmdThreadPool != null) {
cmdThreadPool.shutdown();
}
}*/
super.close();
}
/**
* Provides the instance for command queue based on property {@link org.hobbit.core.Constants#IS_RABBIT_MQ_ENABLED}
* @return
*/
private Object getCommonConsumer() {
Object consumer = null;
if(isRabbitMQEnabled()) {
consumer = getCommonDefaultConsumer();
} else {
consumer = getCommonDirectConsumer();
}
return consumer;
}
/**
* RabbitMQ consumer for command queue
*/
private Object getCommonDefaultConsumer() {

return new DefaultConsumer((Channel) commonChannel.getChannel()) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
cmdThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
handleCmd(body, properties);
} catch (Exception e) {
LOGGER.error("Exception while trying to handle incoming command.", e);
}
}
});
}
};
}
/**
* Direct consumer for command queue
*/
private Object getCommonDirectConsumer() {
return new DirectCallback() {
@Override
public void callback(byte[] data, List<Object> cmdCallbackObjectList, BasicProperties props) {
for(Object cmdCallbackObject:cmdCallbackObjectList) {
if(cmdCallbackObject != null &&
cmdCallbackObject instanceof AbstractCommandReceivingComponent) {
cmdThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
((AbstractCommandReceivingComponent) cmdCallbackObject).
handleCmd(data, "");
} catch (Exception e) {
LOGGER.error("Exception while trying to handle incoming command.", e);
}
}
});
}
}
}
};
}
/**
* Provides the consumer for container creation
*/
private Object getResponseConsumer() {
Object consumer = null;
if(isRabbitMQEnabled()) {
consumer = getResponseDefaultConsumer();
} else {
consumer = getResponseDirectConsumer();
}
return consumer;
}
/**
* Provides RabbirMQ consumer for container creation
*/
private Object getResponseDefaultConsumer() {

return new DefaultConsumer((Channel) commonChannel.getChannel()) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String key = properties.getCorrelationId();

synchronized (responseFutures) {
SettableFuture<String> future = null;
if (key != null) {
future = responseFutures.remove(key);
if (future == null) {
LOGGER.error("Received a message with correlationId ({}) not in map ({})", key, responseFutures.keySet());
}
} else {
LOGGER.warn("Received a message with null correlationId. This is an error unless the other component uses an older version of HOBBIT core library.");
Iterator<SettableFuture<String>> iter = responseFutures.values().iterator();
if (iter.hasNext()) {
LOGGER.info("Correlating with the eldest request as a workaround.");
future = iter.next();
iter.remove();
} else {
LOGGER.error("There are no pending requests.");
}
}

if (future != null) {
String value = RabbitMQUtils.readString(body);
future.set(value);
}
}
}
};
}
/**
* Provides Direct consumer for container creation
*/
private Object getResponseDirectConsumer() {

return new DirectCallback() {

@Override
public void callback(byte[] data, List<Object> classs, BasicProperties properties) {
String key = properties.getCorrelationId();

synchronized (responseFutures) {
SettableFuture<String> future = null;
if (key != null) {
future = responseFutures.remove(key);
if (future == null) {
LOGGER.error("Received a message with correlationId ({}) not in map ({})", key, responseFutures.keySet());
}
} else {
LOGGER.warn("Received a message with null correlationId. This is an error unless the other component uses an older version of HOBBIT core library.");
Iterator<SettableFuture<String>> iter = responseFutures.values().iterator();
if (iter.hasNext()) {
LOGGER.info("Correlating with the eldest request as a workaround.");
future = iter.next();
iter.remove();
} else {
LOGGER.error("There are no pending requests.");
}
}

if (future != null) {
String value = RabbitMQUtils.readString(data);
future.set(value);
}
}
}

};
}

}
Loading