Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify NetworkChannel interface #150

Merged
merged 22 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/posix/federated/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
LF_FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1,0)
LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
} LF_FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);

LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
Expand All @@ -75,8 +75,8 @@ typedef struct {
} MainRecv;

LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
LF_FEDERATE_CTOR_PREAMBLE();
LF_REACTOR_CTOR(MainRecv);
LF_FEDERATE_CTOR_PREAMBLE();
LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
Expand Down
8 changes: 4 additions & 4 deletions examples/posix/federated/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ LF_DEFINE_TIMER_CTOR(Sender, t, 1, 0)
LF_DEFINE_REACTION_STRUCT(Sender, r, 1)
LF_DEFINE_REACTION_CTOR(Sender, r, 0)
LF_DEFINE_OUTPUT_STRUCT(Sender, out, 1, msg_t)
LF_DEFINE_OUTPUT_CTOR(Sender, out, 1)
LF_DEFINE_OUTPUT_CTOR(Sender, out, 1)

typedef struct {
Reactor super;
Expand Down Expand Up @@ -73,7 +73,7 @@ LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) {
TcpIpChannel_ctor(&self->channel, parent->env, "127.0.0.1", PORT_NUM, AF_INET, true);

LF_FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();

LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t);
}

Expand All @@ -89,12 +89,12 @@ typedef struct {
} MainSender;

LF_REACTOR_CTOR_SIGNATURE(MainSender) {
LF_REACTOR_CTOR(MainSender);
LF_FEDERATE_CTOR_PREAMBLE();
LF_DEFINE_CHILD_OUTPUT_ARGS(sender, out,1,1);
LF_DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver);
LF_BUNDLE_REGISTER_UPSTREAM(Sender, Receiver, sender, out);
LF_REACTOR_CTOR(MainSender);
}
LF_ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1, true)

Expand Down
6 changes: 2 additions & 4 deletions examples/zephyr/basic_federated/common/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ LF_REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_ex
LF_REACTOR_CTOR(Receiver);
LF_INITIALIZE_REACTION(Receiver, r);
LF_INITIALIZE_INPUT(Receiver, in, 1, in_external);

// Register reaction as an effect of in
LF_PORT_REGISTER_EFFECT(self->in, self->r, 1);
}


LF_DEFINE_FEDERATED_INPUT_CONNECTION(Receiver, in, msg_t, 5, MSEC(100), false);

typedef struct {
Expand All @@ -74,7 +73,6 @@ typedef struct {
LF_FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
} LF_FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);


LF_FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
LF_FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, parent->env, IP_ADDR, PORT_NUM, AF_INET, false);
Expand All @@ -91,8 +89,8 @@ typedef struct {
} MainRecv;

LF_REACTOR_CTOR_SIGNATURE(MainRecv) {
LF_FEDERATE_CTOR_PREAMBLE();
LF_REACTOR_CTOR(MainRecv);
LF_FEDERATE_CTOR_PREAMBLE();
LF_DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
LF_INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
Expand Down
46 changes: 19 additions & 27 deletions include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
#include "reactor-uc/error.h"
#include "reactor-uc/federated.h"

/**
* @brief The current state of the connection.
* NETWORK_CHANNEL_STATE_UNINITIALIZED if the connection has not been initialized yet,
* NETWORK_CHANNEL_STATE_OPEN if the connection is open and waiting for try_connect to be called,
* NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS if try_connect has been called but it is not yet connected,
* NETWORK_CHANNEL_STATE_CONNECTION_FAILED if the connection failed,
* NETWORK_CHANNEL_STATE_CONNECTED if the channel is successfully connected to another federate,
* NETWORK_CHANNEL_STATE_LOST_CONNECTION if the connection was unexpectedly closed,
* NETWORK_CHANNEL_STATE_CLOSED if the connection was manually closed.
*/
typedef enum {
NETWORK_CHANNEL_STATE_UNINITIALIZED,
NETWORK_CHANNEL_STATE_OPEN,
Expand All @@ -23,14 +33,16 @@ typedef enum {
NETWORK_CHANNEL_TYPE_COAP_UDP_IP,
} NetworkChannelType;

char *NetworkChannel_state_to_string(NetworkChannelState state);

typedef struct FederatedConnectionBundle FederatedConnectionBundle;
typedef struct NetworkChannel NetworkChannel;

struct NetworkChannel {
/**
* @brief Expected time until a connection is established after calling @p try_connect.
* @brief Expected time until a connection is established after calling @p open_connection.
*/
interval_t expected_try_connect_duration;
interval_t expected_connect_duration;

/**
* @brief Type of the network channel to differentiate between different implementations such as TcpIp or CoapUdpIp.
Expand All @@ -39,38 +51,18 @@ struct NetworkChannel {

/**
* @brief Get the current state of the connection.
* @return NETWORK_CHANNEL_STATE_UNINITIALIZED if the connection has not been initialized yet,
* NETWORK_CHANNEL_STATE_OPEN if the connection is open and waiting for try_connect to be called,
* NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS if try_connect has been called but it is not yet connected,
* NETWORK_CHANNEL_STATE_CONNECTION_FAILED if the connection failed,
* NETWORK_CHANNEL_STATE_CONNECTED if the channel is successfully connected to another federate,
* NETWORK_CHANNEL_STATE_LOST_CONNECTION if the connection was unexpectedly closed,
* NETWORK_CHANNEL_STATE_CLOSED if the connection was manually closed.
* @return true if the channel is connected, false if the channel is not connected
*/
NetworkChannelState (*get_connection_state)(NetworkChannel *self);
bool (*is_connected)(NetworkChannel *self);

/**
* @brief Opens the connection to the corresponding NetworkChannel on another federate (non-blocking).
* For client-server channels this usually is implemented as the "bind" call on the server side.
* @return LF_OK if connection is opened, LF_INVALID_VALUE if the channel is configured incorrectly,
* LF_NETWORK_SETUP_FAILED if the connection open operation fails.
* The channel is not connected unless @p is_connected returns true.
* @return LF_OK if channel opened without error, LF_ERR if the channel is configured incorrectly or the connection
* open operation fails.
*/
lf_ret_t (*open_connection)(NetworkChannel *self);

/**
* @brief Try to connect to corresponding NetworkChannel on another federate (non-blocking).
* @return LF_OK if connection is established, LF_IN_PROGRESS if connection is in progress, LF_TRY_AGAIN if connection
* failed and should be retried, LF_ERR if connection failed and should not be retried.
*/
lf_ret_t (*try_connect)(NetworkChannel *self);

/**
* @brief Try to reconnect to corresponding NetworkChannel after the connection broke of (non-blocking).
* @return LF_OK if connection is established, LF_IN_PROGRESS if connection is in progress, LF_TRY_AGAIN if connection
* failed and should be retried, LF_ERR if connection failed and should not be retried.
*/
lf_ret_t (*try_reconnect)(NetworkChannel *self);

/**
* @brief Closes the connection to the corresponding NetworkChannel on another federate.
*/
Expand Down
13 changes: 8 additions & 5 deletions include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "reactor-uc/network_channel.h"
#include "reactor-uc/environment.h"

#define TCP_IP_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10)
#define TCP_IP_CHANNEL_WORKER_THREAD_MAIN_LOOP_SLEEP MSEC(100)
#define TCP_IP_CHANNEL_BUFFERSIZE 1024
#define TCP_IP_CHANNEL_NUM_RETRIES 255
#define TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE 2048
Expand All @@ -22,6 +24,7 @@ struct TcpIpChannel {

int fd;
int client;
int send_failed_event_fds; // These file descriptors are used to signal the recv select to stop blocking
NetworkChannelState state;

const char *host;
Expand All @@ -34,19 +37,19 @@ struct TcpIpChannel {
unsigned int read_index;

fd_set set;
bool server;
bool is_server;
bool terminate;

// required for callbacks
pthread_t receive_thread;
pthread_attr_t receive_thread_attr;
char receive_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE];
pthread_t worker_thread;
pthread_attr_t worker_thread_attr;
char worker_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE];

FederatedConnectionBundle *federated_connection;
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message);
};

void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, unsigned short port, int protocol_family,
bool server);
bool is_server);

#endif
2 changes: 2 additions & 0 deletions include/reactor-uc/platform/riot/coap_udp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "net/sock/udp.h"
#include "mutex.h"

#define COAP_UDP_IP_CHANNEL_EXPECTED_CONNECT_DURATION MSEC(10);
#define COAP_UDP_IP_CHANNEL_BUFFERSIZE 1024
// #define COAP_UDP_IP_CHANNEL_NUM_RETRIES 255;
// #define COAP_UDP_IP_CHANNEL_RECV_THREAD_STACK_SIZE 2048
Expand All @@ -21,6 +22,7 @@ struct CoapUdpIpChannel {

sock_udp_ep_t remote;

bool send_ack_received;
FederateMessage output;
uint8_t write_buffer[COAP_UDP_IP_CHANNEL_BUFFERSIZE];

Expand Down
2 changes: 0 additions & 2 deletions include/reactor-uc/platform/riot/riot.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
#include "mutex.h"
#include "reactor-uc/platform.h"

#include "mutex.h"

typedef struct {
Platform super;
mutex_t lock;
Expand Down
24 changes: 5 additions & 19 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,11 @@ void FederatedConnectionBundle_connect_to_peers(FederatedConnectionBundle **bund
for (size_t i = 0; i < bundles_size; i++) {
FederatedConnectionBundle *bundle = bundles[i];
NetworkChannel *chan = bundle->net_channel;
ret = chan->try_connect(chan);
if (ret == LF_OK) {
NetworkChannelState state = chan->get_connection_state(chan);
switch (state) {
case NETWORK_CHANNEL_STATE_CONNECTED:
break;
case NETWORK_CHANNEL_STATE_OPEN:
case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS:
case NETWORK_CHANNEL_STATE_CONNECTION_FAILED:
case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
if (chan->expected_try_connect_duration < wait_before_retry && chan->expected_try_connect_duration > 0) {
wait_before_retry = chan->expected_try_connect_duration;
}
all_connected = false;
break;
default:
throw("Could not connect to federate during assemble");
break;
if (!chan->is_connected(chan)) {
if (chan->expected_connect_duration < wait_before_retry && chan->expected_connect_duration > 0) {
wait_before_retry = chan->expected_connect_duration;
}
all_connected = false;
}
}
if (!all_connected && wait_before_retry < FOREVER) {
Expand Down Expand Up @@ -87,7 +73,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {

EventPayloadPool *pool = trigger->payload_pool;

if (channel->get_connection_state(channel) == NETWORK_CHANNEL_STATE_CONNECTED) {
if (channel->is_connected(channel)) {
assert(self->staged_payload_ptr);
assert(trigger->is_registered_for_cleanup);
assert(trigger->is_present == false);
Expand Down
23 changes: 23 additions & 0 deletions src/network_channel.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "reactor-uc/network_channel.h"

#if defined(PLATFORM_POSIX)
#ifdef NETWORK_CHANNEL_TCP_POSIX
#include "platform/posix/tcp_ip_channel.c"
Expand Down Expand Up @@ -29,3 +31,24 @@
#else
#error "Platform not supported"
#endif

char *NetworkChannel_state_to_string(NetworkChannelState state) {
switch (state) {
case NETWORK_CHANNEL_STATE_UNINITIALIZED:
return "UNINITIALIZED";
case NETWORK_CHANNEL_STATE_OPEN:
return "OPEN";
case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS:
return "CONNECTION_IN_PROGRESS";
case NETWORK_CHANNEL_STATE_CONNECTION_FAILED:
return "CONNECTION_FAILED";
case NETWORK_CHANNEL_STATE_CONNECTED:
return "CONNECTED";
case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
return "LOST_CONNECTION";
case NETWORK_CHANNEL_STATE_CLOSED:
return "CLOSED";
}

return "UNKNOWN";
}
Loading
Loading