Skip to content

Commit

Permalink
the connection.close function was only waiting for synchronous calls …
Browse files Browse the repository at this point in the history
…to complete, async calls that were waiting (after a synchronous) were still discarded, this has been fixed
  • Loading branch information
EmielBruijntjes committed Aug 20, 2014
1 parent d23e818 commit c7b3f71
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 14 deletions.
6 changes: 2 additions & 4 deletions include/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class Channel
*/
void onReady(const SuccessCallback &callback)
{
// store callback in implementation
_implementation->_readyCallback = callback;
_implementation->onReady(callback);
}

/**
Expand All @@ -66,8 +65,7 @@ class Channel
*/
void onError(const ErrorCallback &callback)
{
// store callback in implementation
_implementation->_errorCallback = callback;
_implementation->onError(callback);
}

/**
Expand Down
33 changes: 32 additions & 1 deletion include/channelimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,40 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
*/
void detach()
{
// connection is gone
_connection = nullptr;
}

/**
* Callback that is called when the channel was succesfully created.
* @param callback the callback to execute
*/
void onReady(const SuccessCallback &callback)
{
// store callback
_readyCallback = callback;

// direct call if channel is already ready
if (_state == state_connected) callback();
}

/**
* Callback that is called when an error occurs.
*
* Only one error callback can be registered. Calling this function
* multiple times will remove the old callback.
*
* @param callback the callback to execute
*/
void onError(const ErrorCallback &callback)
{
// store callback
_errorCallback = callback;

// direct call if channel is already in error state
if (_state != state_connected) callback("Channel is in error state");
}

/**
* Pause deliveries on a channel
*
Expand Down Expand Up @@ -480,7 +511,7 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
*/
bool waiting() const
{
return _synchronous;
return _synchronous || !_queue.empty();
}

/**
Expand Down
24 changes: 15 additions & 9 deletions src/channelimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ void ChannelImpl::attach(Connection *connection)
// this is invalid
_state = state_closed;
}
else
else
{
// busy connecting
// assume channel is connected
_state = state_connected;

// valid id, send a channel open frame
send(ChannelOpenFrame(_id));

// send the open frame
if (send(ChannelOpenFrame(_id))) return;

// report an error
reportError("Channel could not be initialized", true);
}
}

Expand Down Expand Up @@ -663,11 +666,14 @@ bool ChannelImpl::send(const Frame &frame)
return true;
}

// enter synchronous mode if necessary
_synchronous = frame.synchronous();

// send to tcp connection
return _connection->send(frame);
if (!_connection->send(frame)) return false;

// frame was sent, if this was a synchronous frame, we now have to wait
_synchronous = frame.synchronous();

// done
return true;
}

/**
Expand Down

0 comments on commit c7b3f71

Please sign in to comment.