From 38e4b97eed758bbc0d6e4f018dcf5e36543646a0 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 23 Nov 2015 09:40:54 +0100 Subject: [PATCH] fixed that tcp connection ended up in an infinite loop when write operation failed because of broken pipe --- src/tcpbuffer.h | 18 +++++++--- src/tcpconnected.h | 84 ++++++++++++++++++++++++++++------------------ 2 files changed, 66 insertions(+), 36 deletions(-) diff --git a/src/tcpbuffer.h b/src/tcpbuffer.h index 17c034c9..2062c8c2 100644 --- a/src/tcpbuffer.h +++ b/src/tcpbuffer.h @@ -328,9 +328,13 @@ class TcpBuffer : public Buffer /** * Send the buffer to a socket * @param socket + * @return ssize_t */ - void sendto(int socket) + ssize_t sendto(int socket) { + // total number of bytes written + ssize_t total = 0; + // keep looping while (_size > 0) { @@ -354,12 +358,18 @@ class TcpBuffer : public Buffer // send the data auto result = writev(socket, (const struct iovec *)&buffer, index); - // skip on error - if (result <= 0) return; + // skip on error, or when nothing was written + if (result <= 0) return total > 0 ? total : result; // shrink the buffer - if (result > 0) shrink(result); + shrink(result); + + // update total number of bytes written + total += 0; } + + // done + return total; } /** diff --git a/src/tcpconnected.h b/src/tcpconnected.h index b4ffa1f7..7516bcc4 100644 --- a/src/tcpconnected.h +++ b/src/tcpconnected.h @@ -48,6 +48,35 @@ class TcpConnected : public TcpState, private Watchable TcpBuffer _in; + /** + * Helper method to report an error + * @return bool Was an error reported? + */ + bool reportError() + { + // some errors are ok and do not (necessarily) mean that we're disconnected + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false; + + // we have an error - report this to the user + _handler->onError(_connection, strerror(errno)); + + // done + return true; + } + + /** + * Construct the next state + * @param monitor Object that monitors whether connection still exists + * @return TcpState* + */ + TcpState *nextState(const Monitor &monitor) + { + // if the object is still in a valid state, we can move to the close-state, + // otherwise there is no point in moving to a next state + return monitor.valid() ? new TcpClosed(this) : nullptr; + } + + public: /** * Constructor @@ -90,12 +119,18 @@ class TcpConnected : public TcpState, private Watchable { // must be the socket if (fd != _socket) return this; + + // because the object might soon be destructed, we create a monitor to check this + Monitor monitor(this); // can we write more data to the socket? if (flags & writable) { // send out the buffered data - _out.sendto(_socket); + auto result = _out.sendto(_socket); + + // are we in an error state? + if (result < 0 && reportError()) return nextState(monitor); // if buffer is empty by now, we no longer have to check for // writability, but only for readability @@ -108,39 +143,24 @@ class TcpConnected : public TcpState, private Watchable // read data from buffer auto result = _in.receivefrom(_socket); - // because the object might soon be destructed, we create a monitor to check this - Monitor monitor(this); + // are we in an error state? + if (result < 0 && reportError()) return nextState(monitor); + + // we need a local copy of the buffer - because it is possible that "this" + // object gets destructed halfway through the call to the parse() method + TcpBuffer buffer(std::move(_in)); - // is the connection in an error state? - if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) - { - // we have an error - report this to the user - _handler->onError(_connection, strerror(errno)); - - // "this" could be removed by now, check this - if (!monitor.valid()) return nullptr; - - // we have a new state - return new TcpClosed(this); - } - else - { - // we need a local copy of the buffer - because it is possible that "this" - // object gets destructed halfway through the call to the parse() method - TcpBuffer buffer(std::move(_in)); - - // parse the buffer - auto processed = _connection->parse(buffer); + // parse the buffer + auto processed = _connection->parse(buffer); - // "this" could be removed by now, check this - if (!monitor.valid()) return nullptr; - - // shrink buffer - buffer.shrink(processed); - - // restore the buffer as member - std::swap(_in, buffer); - } + // "this" could be removed by now, check this + if (!monitor.valid()) return nullptr; + + // shrink buffer + buffer.shrink(processed); + + // restore the buffer as member + std::swap(_in, buffer); } // keep same object