-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove duplicated NetworkChannel return codes / state #143
Changes from all commits
e401ced
65e239d
04fdbc1
6e163d5
c45a551
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,25 +15,25 @@ void FederatedConnectionBundle_connect_to_peers(FederatedConnectionBundle **bund | |
NetworkChannel *chan = bundle->net_channel; | ||
ret = chan->open_connection(chan); | ||
validate(ret == LF_OK); | ||
bundle->network_channel_state_changed(bundle); | ||
} | ||
|
||
bool all_connected = false; | ||
interval_t wait_before_retry = FOREVER; // Intialize to maximum so we can find the lowest requested. | ||
interval_t wait_before_retry = FOREVER; // Initialize to maximum so we can find the lowest requested. | ||
while (!all_connected) { | ||
all_connected = true; | ||
for (size_t i = 0; i < bundles_size; i++) { | ||
FederatedConnectionBundle *bundle = bundles[i]; | ||
NetworkChannel *chan = bundle->net_channel; | ||
NetworkChannelState state = chan->get_connection_state(chan); | ||
if (state != NETWORK_CHANNEL_STATE_CONNECTED) { | ||
ret = chan->try_connect(chan); | ||
switch (ret) { | ||
case LF_OK: | ||
bundle->network_channel_state_changed(bundle); | ||
ret = chan->try_connect(chan); | ||
if (ret == LF_OK) { | ||
NetworkChannelState state = chan->get_connection_state(chan); | ||
switch (state) { | ||
case NETWORK_CHANNEL_STATE_CONNECTED: | ||
break; | ||
case LF_IN_PROGRESS: | ||
case LF_TRY_AGAIN: | ||
case NETWORK_CHANNEL_STATE_OPEN: | ||
case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS: | ||
case NETWORK_CHANNEL_STATE_CONNECTION_FAILED: | ||
case NETWORK_CHANNEL_STATE_LOST_CONNECTION: | ||
if (chan->expected_try_connect_duration < wait_before_retry && chan->expected_try_connect_duration > 0) { | ||
wait_before_retry = chan->expected_try_connect_duration; | ||
} | ||
|
@@ -108,14 +108,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { | |
|
||
LF_DEBUG(FED, "FedOutConn %p sending tagged message with tag=%" PRId64 ":%" PRIu32, trigger, tagged_msg->tag.time, | ||
tagged_msg->tag.microstep); | ||
ret = channel->send_blocking(channel, &msg); | ||
switch (ret) { | ||
case LF_OK: | ||
break; | ||
case LF_CONNECTION_CLOSED: | ||
self->bundle->network_channel_state_changed(self->bundle); | ||
// Intentional fallthrough | ||
default: | ||
if (channel->send_blocking(channel, &msg) != LF_OK) { | ||
LF_ERR(FED, "FedOutConn %p failed to send message", trigger); | ||
} | ||
} else { | ||
|
@@ -285,21 +278,6 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, | |
} | ||
} | ||
|
||
void FederatedConnectionBundle_network_channel_state_changed(FederatedConnectionBundle *self) { | ||
NetworkChannelState state = self->net_channel->get_connection_state(self->net_channel); | ||
switch (state) { | ||
case NETWORK_CHANNEL_STATE_CONNECTED: | ||
case NETWORK_CHANNEL_STATE_LOST_CONNECTION: | ||
for (size_t i = 0; i < self->inputs_size; i++) { | ||
FederatedInputConnection *input = self->inputs[i]; | ||
input->last_known_tag = FOREVER_TAG; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I still don't understand the runtime fully enough, so I am not sure where to set this tag now instead There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could set it in Scheduler_acquire_tag, there we can check the state of the input port before we go to sleep waiting for more messages on it. We can add that in the next PR |
||
} | ||
break; | ||
default: // Handle other states also | ||
break; | ||
} | ||
} | ||
|
||
void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel, | ||
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks, | ||
size_t inputs_size, FederatedOutputConnection **outputs, | ||
|
@@ -316,7 +294,6 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa | |
self->deserialize_hooks = deserialize_hooks; | ||
self->serialize_hooks = serialize_hooks; | ||
self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); | ||
self->network_channel_state_changed = FederatedConnectionBundle_network_channel_state_changed; | ||
} | ||
|
||
void Federated_distribute_start_tag(Environment *env, instant_t start_time) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't also the channel state be updated if try_connect failes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I am not sure because the channel itself should have already notified the bundle, I would need to look deeper into the runtime code tomorrow to see if that is covered