Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove duplicated NetworkChannel return codes / state #143

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/posix/federated/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ typedef struct {
REACTION_INSTANCE(Receiver, r);
PORT_INSTANCE(Receiver, in, 1);
int cnt;
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0);
} Receiver;

DEFINE_REACTION_BODY(Receiver, r) {
Expand All @@ -56,12 +56,12 @@ typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1,0)
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
} FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, false);
TcpIpChannel_ctor(&self->channel, parent->env, "127.0.0.1", PORT_NUM, AF_INET, false);
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t);
}
Expand All @@ -71,14 +71,14 @@ typedef struct {
CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
FEDERATE_BOOKKEEPING_INSTANCES(1);
CHILD_INPUT_SOURCES(receiver, in,1,1, 0);
CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
} MainRecv;

REACTOR_CTOR_SIGNATURE(MainRecv) {
FEDERATE_CTOR_PREAMBLE();
REACTOR_CTOR(MainRecv);
DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver,1, _receiver_in_args[i]);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in);
}
Expand Down
21 changes: 10 additions & 11 deletions examples/posix/federated/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ DEFINE_TIMER_CTOR(Sender, t, 1, 0)
DEFINE_REACTION_STRUCT(Sender, r, 1)
DEFINE_REACTION_CTOR(Sender, r, 0)
DEFINE_OUTPUT_STRUCT(Sender, out, 1, msg_t)
DEFINE_OUTPUT_CTOR(Sender, out, 1)
DEFINE_OUTPUT_CTOR(Sender, out, 1)

typedef struct {
Reactor super;
TIMER_INSTANCE(Sender, t);
REACTION_INSTANCE(Sender, r);
PORT_INSTANCE(Sender, out, 1);
REACTOR_BOOKKEEPING_INSTANCES(1,2,0);
REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0);
} Sender;

DEFINE_REACTION_BODY(Sender, r) {
Expand Down Expand Up @@ -65,15 +65,15 @@ typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver);

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, true);
TcpIpChannel_ctor(&self->channel, parent->env, "127.0.0.1", PORT_NUM, AF_INET, true);

FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();

INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t);
}

Expand All @@ -82,17 +82,16 @@ typedef struct {
Reactor super;
CHILD_REACTOR_INSTANCE(Sender, sender, 1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver);
TcpIpChannel channel;
FEDERATE_BOOKKEEPING_INSTANCES(1);
CHILD_OUTPUT_CONNECTIONS(sender, out, 1,1, 1);
CHILD_OUTPUT_EFFECTS(sender, out, 1,1, 0);
CHILD_OUTPUT_OBSERVERS(sender, out, 1,1,0);
CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
} MainSender;

REACTOR_CTOR_SIGNATURE(MainSender) {
FEDERATE_CTOR_PREAMBLE();
DEFINE_CHILD_OUTPUT_ARGS(sender, out,1,1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender,1, _sender_out_args[i]);
DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver);
BUNDLE_REGISTER_UPSTREAM(Sender, Receiver, sender, out);
REACTOR_CTOR(MainSender);
Expand Down
18 changes: 8 additions & 10 deletions examples/zephyr/basic_federated/common/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ typedef struct {
char msg[32];
} msg_t;

DEFINE_REACTION_STRUCT(Receiver, r, 0);
DEFINE_REACTION_STRUCT(Receiver, r, 0);
DEFINE_REACTION_CTOR(Receiver, r, 0)

DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, msg_t, 0)
Expand All @@ -42,7 +42,7 @@ typedef struct {
Reactor super;
REACTION_INSTANCE(Receiver, r);
PORT_INSTANCE(Receiver, in, 1);
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0);
int cnt;
} Receiver;

Expand All @@ -59,25 +59,23 @@ REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_exter
REACTOR_CTOR(Receiver);
INITIALIZE_REACTION(Receiver, r);
INITIALIZE_INPUT(Receiver, in, 1, in_external);

// Register reaction as an effect of in
PORT_REGISTER_EFFECT(self->in, self->r, 1);
}


DEFINE_FEDERATED_INPUT_CONNECTION(Receiver, in, msg_t, 5, MSEC(100), false);

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1,0)
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
} FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);


FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, IP_ADDR, PORT_NUM, AF_INET, false);
TcpIpChannel_ctor(&self->channel, parent->env, IP_ADDR, PORT_NUM, AF_INET, false);
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_payload_default);
}
Expand All @@ -87,14 +85,14 @@ typedef struct {
CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
FEDERATE_BOOKKEEPING_INSTANCES(1);
CHILD_INPUT_SOURCES(receiver, in, 1,1, 0);
CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
} MainRecv;

REACTOR_CTOR_SIGNATURE(MainRecv) {
FEDERATE_CTOR_PREAMBLE();
REACTOR_CTOR(MainRecv);
DEFINE_CHILD_INPUT_ARGS(receiver, in,1,1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver,1, _receiver_in_args[i]);
DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in);
}
Expand Down
21 changes: 10 additions & 11 deletions examples/zephyr/basic_federated/federated_sender/src/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,12 @@ void setup_led() {
gpio_pin_configure_dt(&led, GPIO_OUTPUT_ACTIVE);
}


typedef struct {
Reactor super;
REACTION_INSTANCE(Sender, r);
ACTION_INSTANCE(Sender, act);
PORT_INSTANCE(Sender, out, 1);
REACTOR_BOOKKEEPING_INSTANCES(1,2,0);
REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0);
} Sender;

DEFINE_REACTION_BODY(Sender, r) {
Expand Down Expand Up @@ -104,31 +103,31 @@ typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver1);

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver2);

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver1) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_CONN_1, AF_INET, true);
TcpIpChannel_ctor(&self->channel, parent->env, "192.168.1.100", PORT_CONN_1, AF_INET, true);

FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();

INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_payload_default);
}

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver2) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_CONN_2, AF_INET, true);
TcpIpChannel_ctor(&self->channel, parent->env, "192.168.1.100", PORT_CONN_2, AF_INET, true);

FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();

INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_payload_default);
}

Expand All @@ -138,9 +137,9 @@ typedef struct {
CHILD_REACTOR_INSTANCE(Sender, sender, 1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver2);
CHILD_OUTPUT_CONNECTIONS(sender, out, 1,1, 2);
CHILD_OUTPUT_EFFECTS(sender, out, 1,1,0);
CHILD_OUTPUT_OBSERVERS(sender, out,1,1, 0);
CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 2);
CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
FEDERATE_BOOKKEEPING_INSTANCES(2);
} MainSender;

Expand Down
6 changes: 0 additions & 6 deletions include/reactor-uc/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ typedef enum {
LF_INVALID_VALUE,
LF_OUT_OF_BOUNDS,
LF_NO_MEM,
LF_COULD_NOT_CONNECT,
LF_NETWORK_SETUP_FAILED,
LF_CONNECTION_CLOSED,
LF_TIMEOUT,
LF_TRY_AGAIN,
LF_IN_PROGRESS
} lf_ret_t;

/**
Expand Down
1 change: 0 additions & 1 deletion include/reactor-uc/federated.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ struct FederatedConnectionBundle {
serialize_hook *serialize_hooks;
size_t outputs_size;
bool server; // Does this federate work as server or client
void (*network_channel_state_changed)(FederatedConnectionBundle *self);
};

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
Expand Down
4 changes: 3 additions & 1 deletion include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "proto/message.pb.h"
#include "reactor-uc/error.h"
#include "reactor-uc/network_channel.h"
#include "reactor-uc/environment.h"

#define TCP_IP_CHANNEL_BUFFERSIZE 1024
#define TCP_IP_CHANNEL_NUM_RETRIES 255
Expand Down Expand Up @@ -45,6 +46,7 @@ struct TcpIpChannel {
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message);
};

void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family, bool server);
void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, unsigned short port, int protocol_family,
bool server);

#endif
45 changes: 11 additions & 34 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ void FederatedConnectionBundle_connect_to_peers(FederatedConnectionBundle **bund
NetworkChannel *chan = bundle->net_channel;
ret = chan->open_connection(chan);
validate(ret == LF_OK);
bundle->network_channel_state_changed(bundle);
}

bool all_connected = false;
interval_t wait_before_retry = FOREVER; // Intialize to maximum so we can find the lowest requested.
interval_t wait_before_retry = FOREVER; // Initialize to maximum so we can find the lowest requested.
while (!all_connected) {
all_connected = true;
for (size_t i = 0; i < bundles_size; i++) {
FederatedConnectionBundle *bundle = bundles[i];
NetworkChannel *chan = bundle->net_channel;
NetworkChannelState state = chan->get_connection_state(chan);
if (state != NETWORK_CHANNEL_STATE_CONNECTED) {
ret = chan->try_connect(chan);
switch (ret) {
case LF_OK:
bundle->network_channel_state_changed(bundle);
ret = chan->try_connect(chan);
if (ret == LF_OK) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't also the channel state be updated if try_connect failes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am not sure because the channel itself should have already notified the bundle, I would need to look deeper into the runtime code tomorrow to see if that is covered

NetworkChannelState state = chan->get_connection_state(chan);
switch (state) {
case NETWORK_CHANNEL_STATE_CONNECTED:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
case NETWORK_CHANNEL_STATE_OPEN:
case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS:
case NETWORK_CHANNEL_STATE_CONNECTION_FAILED:
case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
if (chan->expected_try_connect_duration < wait_before_retry && chan->expected_try_connect_duration > 0) {
wait_before_retry = chan->expected_try_connect_duration;
}
Expand Down Expand Up @@ -108,14 +108,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {

LF_DEBUG(FED, "FedOutConn %p sending tagged message with tag=%" PRId64 ":%" PRIu32, trigger, tagged_msg->tag.time,
tagged_msg->tag.microstep);
ret = channel->send_blocking(channel, &msg);
switch (ret) {
case LF_OK:
break;
case LF_CONNECTION_CLOSED:
self->bundle->network_channel_state_changed(self->bundle);
// Intentional fallthrough
default:
if (channel->send_blocking(channel, &msg) != LF_OK) {
LF_ERR(FED, "FedOutConn %p failed to send message", trigger);
}
} else {
Expand Down Expand Up @@ -285,21 +278,6 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
}
}

void FederatedConnectionBundle_network_channel_state_changed(FederatedConnectionBundle *self) {
NetworkChannelState state = self->net_channel->get_connection_state(self->net_channel);
switch (state) {
case NETWORK_CHANNEL_STATE_CONNECTED:
case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
for (size_t i = 0; i < self->inputs_size; i++) {
FederatedInputConnection *input = self->inputs[i];
input->last_known_tag = FOREVER_TAG;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still don't understand the runtime fully enough, so I am not sure where to set this tag now instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could set it in Scheduler_acquire_tag, there we can check the state of the input port before we go to sleep waiting for more messages on it. We can add that in the next PR

}
break;
default: // Handle other states also
break;
}
}

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
Expand All @@ -316,7 +294,6 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa
self->deserialize_hooks = deserialize_hooks;
self->serialize_hooks = serialize_hooks;
self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);
self->network_channel_state_changed = FederatedConnectionBundle_network_channel_state_changed;
}

void Federated_distribute_start_tag(Environment *env, instant_t start_time) {
Expand Down
Loading
Loading