Skip to content

Commit

Permalink
fixed that tcp connection ended up in an infinite loop when write ope…
Browse files Browse the repository at this point in the history
…ration failed because of broken pipe
  • Loading branch information
EmielBruijntjes committed Nov 23, 2015
1 parent d5d5d41 commit 38e4b97
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
18 changes: 14 additions & 4 deletions src/tcpbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,13 @@ class TcpBuffer : public Buffer
/**
* Send the buffer to a socket
* @param socket
* @return ssize_t
*/
void sendto(int socket)
ssize_t sendto(int socket)
{
// total number of bytes written
ssize_t total = 0;

// keep looping
while (_size > 0)
{
Expand All @@ -354,12 +358,18 @@ class TcpBuffer : public Buffer
// send the data
auto result = writev(socket, (const struct iovec *)&buffer, index);

// skip on error
if (result <= 0) return;
// skip on error, or when nothing was written
if (result <= 0) return total > 0 ? total : result;

// shrink the buffer
if (result > 0) shrink(result);
shrink(result);

// update total number of bytes written
total += 0;
}

// done
return total;
}

/**
Expand Down
84 changes: 52 additions & 32 deletions src/tcpconnected.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,35 @@ class TcpConnected : public TcpState, private Watchable
TcpBuffer _in;


/**
* Helper method to report an error
* @return bool Was an error reported?
*/
bool reportError()
{
// some errors are ok and do not (necessarily) mean that we're disconnected
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;

// we have an error - report this to the user
_handler->onError(_connection, strerror(errno));

// done
return true;
}

/**
* Construct the next state
* @param monitor Object that monitors whether connection still exists
* @return TcpState*
*/
TcpState *nextState(const Monitor &monitor)
{
// if the object is still in a valid state, we can move to the close-state,
// otherwise there is no point in moving to a next state
return monitor.valid() ? new TcpClosed(this) : nullptr;
}


public:
/**
* Constructor
Expand Down Expand Up @@ -90,12 +119,18 @@ class TcpConnected : public TcpState, private Watchable
{
// must be the socket
if (fd != _socket) return this;

// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);

// can we write more data to the socket?
if (flags & writable)
{
// send out the buffered data
_out.sendto(_socket);
auto result = _out.sendto(_socket);

// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);

// if buffer is empty by now, we no longer have to check for
// writability, but only for readability
Expand All @@ -108,39 +143,24 @@ class TcpConnected : public TcpState, private Watchable
// read data from buffer
auto result = _in.receivefrom(_socket);

// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);

// we need a local copy of the buffer - because it is possible that "this"
// object gets destructed halfway through the call to the parse() method
TcpBuffer buffer(std::move(_in));

// is the connection in an error state?
if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
{
// we have an error - report this to the user
_handler->onError(_connection, strerror(errno));

// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;

// we have a new state
return new TcpClosed(this);
}
else
{
// we need a local copy of the buffer - because it is possible that "this"
// object gets destructed halfway through the call to the parse() method
TcpBuffer buffer(std::move(_in));

// parse the buffer
auto processed = _connection->parse(buffer);
// parse the buffer
auto processed = _connection->parse(buffer);

// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;

// shrink buffer
buffer.shrink(processed);

// restore the buffer as member
std::swap(_in, buffer);
}
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;

// shrink buffer
buffer.shrink(processed);

// restore the buffer as member
std::swap(_in, buffer);
}

// keep same object
Expand Down

0 comments on commit 38e4b97

Please sign in to comment.