diff --git a/CMakeLists.txt b/CMakeLists.txt index 5903caa..bc0dc1f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,12 +18,12 @@ set(SOURCE_FILES cap/cap_util.cpp cap/RCap.cpp thirdparty/md5.c thirdparty/json11.cpp conn/INetConn.cpp conn/BtmUdpConn.cpp conn/FakeTcp.cpp conn/INetGroup.cpp conn/IGroup.cpp conn/FakeUdp.cpp - conn/IConn.cpp conn/RConn.cpp conn/RawTcp.cpp conn/IAppGroup.cpp conn/IBtmConn.cpp + conn/IConn.cpp conn/RConn.cpp conn/RawTcp.cpp conn/IAppGroup.cpp conn/IBtmConn.cpp conn/DefaultFakeConn.cpp net/NetUtil.cpp net/ClientNetManager.cpp net/TcpAckPool.cpp net/TcpListenPool.cpp net/INetManager.cpp net/ServerNetManager.cpp src/sync/SyncConnFactory.cpp src/sync/LoopSreamSyncConn.cpp src/sync/ISyncConn.cpp src/sync/UdpSyncConn.cpp src/sync/TcpStreamSyncConn.cpp src/sync/IPacketSyncConn.cpp - ) + callbacks/NetConnKeepAliveHelper.cpp callbacks/NetConnKeepAliveHelper.h callbacks/ResetHelper.cpp callbacks/ResetHelper.h) # platform dependent dir SET(ROS_DIR src/os/unix) diff --git a/README.md b/README.md index 1345815..2eaf0f3 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,9 @@ Parameter explanation: 2. For Windows users, speed is far slow than rsock on mac/Linux. (500-800KB/s during my test) -3. For Windows users, you'd better pass --lcapIp arguments instead of -d. Because it's hard to find NIC name for normal users. +3. For Windows users, you'd better pass --lcapIp arguments instead of -d. e.g. `--lcapIp=x.x.x.x` Where `x.x.x.x` is your Internet IP. Because it's hard to find NIC name for normal users. + +4. For Windows users, rsock doesn't behave very well like it does on Linux and Mac. e.g. On Mac/Linux, rsock can support to watch 1080P youtube video smoothly. For windows users, rsock can only support 720P youtube video. ### Exit diff --git a/bean/RConfig.cpp b/bean/RConfig.cpp index 0423f78..e36dacc 100644 --- a/bean/RConfig.cpp +++ b/bean/RConfig.cpp @@ -61,6 +61,8 @@ int RConfig::Parse(bool is_server, int argc, const char *const *argv) { args::ValueFlag cap_timeout(opt, "", "pcap timeout(ms). > 0 and <= 50", {"cap_timeout"}); + args::ValueFlag keepAlive(opt, "keepalive interval", "interval used to send keepalive request. default 5s." + , {"keepalive"}); try { parser.ParseCLI(argc, argv); do { @@ -145,6 +147,13 @@ int RConfig::Parse(bool is_server, int argc, const char *const *argv) { this->isDaemon = (daemon.Get() != 0); } + if (keepAlive) { + this->param.keepAliveInterval = keepAlive.Get(); + if (this->param.keepAliveInterval <= 0) { + throw args::Error("keepalive must > 0: " + std::to_string(this->param.keepAliveInterval)); + } + } + } while (false); if (param.selfCapIp.empty()) { @@ -198,6 +207,8 @@ void RConfig::CheckValidation(const RConfig &c) { assert((p.type == OM_PIPE_TCP) || (p.type == OM_PIPE_UDP) || (p.type == OM_PIPE_ALL)); assert(p.cap_timeout > 0 && p.cap_timeout < 50); + assert(p.keepAliveInterval > 0); + if (!DevIpMatch(p.dev, p.selfCapIp)) { char buf[BUFSIZ] = {0}; snprintf(buf, BUFSIZ, "dev %s and self capture ip %s not match", p.dev.c_str(), @@ -300,6 +311,10 @@ void RConfig::parseJsonString(RConfig &c, const std::string &content, std::strin if (o["cap_timeout"].is_string()) { p.cap_timeout = o["cap_timeout"].int_value(); } + + if (o["keepalive"].is_number()) { + p.keepAliveInterval = o["keepalive"].int_value(); + } } } @@ -322,6 +337,7 @@ json11::Json RConfig::to_json() const { {"duration", (int) param.conn_duration_sec}, {"type", strOfType(param.type)}, {"hash", param.hashKey}, + {"keepalive", param.keepAliveInterval}, {"cap_timeout", param.cap_timeout}, }}, }; @@ -406,6 +422,8 @@ std::string RConfig::BuildExampleString() { out << "client:\n"; #ifdef __MACH__ out << "sudo ./client_rsock_Darwin -d en0"; +#elif _WIN32 + out << "./client_rsock_Windows.exe --lcapIp=127.0.0.1:30000 "; #else out << "sudo ./client_rsock_Linux -d wlan0"; #endif diff --git a/bean/RConfig.h b/bean/RConfig.h index fda2266..0ef42a0 100644 --- a/bean/RConfig.h +++ b/bean/RConfig.h @@ -43,6 +43,8 @@ struct RConfig { int type = OM_PIPE_TCP; // default tcp ports uint16_t cap_timeout = OM_PCAP_TIMEOUT; + + int keepAliveInterval = 5; // default 5s, 3 times }; // if turned to debug, speed of rsock will be very slow on macOS. diff --git a/callbacks/INetConnKeepAlive.h b/callbacks/INetConnKeepAlive.h index 5620235..78f7886 100644 --- a/callbacks/INetConnKeepAlive.h +++ b/callbacks/INetConnKeepAlive.h @@ -6,6 +6,7 @@ #define RSOCK_IKEEPALIVE_H #include "rcommon.h" +#include "rscomm.h" class INetConn; @@ -13,8 +14,6 @@ struct ConnInfo; class INetConnKeepAlive { public: - using IntKeyType = uint32_t ; - class INetConnAliveHelper { public: virtual ~INetConnAliveHelper() = default; diff --git a/callbacks/IReset.h b/callbacks/IReset.h index 69971cb..c6afdae 100644 --- a/callbacks/IReset.h +++ b/callbacks/IReset.h @@ -6,13 +6,12 @@ #define RSOCK_IRESET_H #include "rcommon.h" +#include "rscomm.h" struct ConnInfo; class IReset { public: - using IntKeyType = uint32_t; - class IRestHelper { public: virtual ~IRestHelper() = default; diff --git a/callbacks/NetConnKeepAliveHelper.cpp b/callbacks/NetConnKeepAliveHelper.cpp new file mode 100644 index 0000000..d623e77 --- /dev/null +++ b/callbacks/NetConnKeepAliveHelper.cpp @@ -0,0 +1,100 @@ +// +// Created by System Administrator on 5/7/18. +// + +#include +#include +#include "NetConnKeepAliveHelper.h" +#include "../conn/IAppGroup.h" +#include "../conn/INetGroup.h" +#include "NetConnKeepAlive.h" + +NetConnKeepAliveHelper::NetConnKeepAliveHelper(IAppGroup *group, uv_loop_t *loop, bool active) { + mAppGroup = group; + mKeepAlive = new NetConnKeepAlive(this); + if (active) { + setupTimer(loop); + } +} + +void NetConnKeepAliveHelper::Close() { + if (mFlushTimer) { + uv_timer_stop(mFlushTimer); + uv_close(reinterpret_cast(mFlushTimer), close_cb); + mFlushTimer = nullptr; + } + if (mKeepAlive) { + mKeepAlive->Close(); + delete mKeepAlive; + mKeepAlive = nullptr; + } +} + +int NetConnKeepAliveHelper::OnSendResponse(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) { + return mAppGroup->doSendCmd(cmd, nread, rbuf); +} + +int NetConnKeepAliveHelper::OnSendRequest(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) { + return mAppGroup->doSendCmd(cmd, nread, rbuf); +} + +int NetConnKeepAliveHelper::OnRecvResponse(IntKeyType connKey) { + return RemoveRequest(connKey); +} + +INetConn *NetConnKeepAliveHelper::ConnOfIntKey(IntKeyType connKey) { + return mAppGroup->NetGroup()->ConnOfIntKey(connKey); +} + +int NetConnKeepAliveHelper::SendNetConnRst(const ConnInfo &src, IntKeyType connKey) { + return mAppGroup->sendNetConnRst(src, connKey); +} + +void NetConnKeepAliveHelper::setupTimer(uv_loop_t *loop) { + if (!mFlushTimer) { + mFlushTimer = static_cast(malloc(sizeof(uv_timer_t))); + uv_timer_init(loop, mFlushTimer); + mFlushTimer->data = this; + uv_timer_start(mFlushTimer, timer_cb, FIRST_FLUSH_DELAY, FLUSH_INTERVAL); + } +} + +void NetConnKeepAliveHelper::timer_cb(uv_timer_t *timer) { + NetConnKeepAliveHelper *helper = static_cast(timer->data); + helper->onFlush(); +} + +void NetConnKeepAliveHelper::onFlush() { + auto conns = mAppGroup->NetGroup()->GetAllConns(); + for (auto &e: conns) { + auto *conn = dynamic_cast(e.second); + auto it = mReqMap.find(conn->IntKey()); + if (it != mReqMap.end()) { + it->second++; + } else { + mReqMap.emplace(conn->IntKey(), 0); + } + + if (it == mReqMap.end() || it->second < MAX_RETRY) { // new or still valid + mKeepAlive->SendRequest(conn->IntKey()); + } + } + + // todo: make an interface. and move these into inetgroup + auto aCopy = mReqMap; + for (auto &e: aCopy) { + if (e.second >= MAX_RETRY) { // keep alive timeout + LOGE << "keepalive timeout"; + mAppGroup->onNetconnDead(e.first); + RemoveRequest(e.first); + } + } +} + +INetConnKeepAlive *NetConnKeepAliveHelper::GetIKeepAlive() const { + return mKeepAlive; +} + +int NetConnKeepAliveHelper::RemoveRequest(IntKeyType connKey) { + return mReqMap.erase(connKey); +} diff --git a/callbacks/NetConnKeepAliveHelper.h b/callbacks/NetConnKeepAliveHelper.h new file mode 100644 index 0000000..7b51267 --- /dev/null +++ b/callbacks/NetConnKeepAliveHelper.h @@ -0,0 +1,54 @@ +// +// Created by System Administrator on 5/7/18. +// + +#ifndef RSOCK_NETCONNKEEPALIVEHELPER_H +#define RSOCK_NETCONNKEEPALIVEHELPER_H + +#include + +#include "INetConnKeepAlive.h" + +class IAppGroup; + +class NetConnKeepAliveHelper : public INetConnKeepAlive::INetConnAliveHelper { +public: + using IntKeyType = uint32_t; + + explicit NetConnKeepAliveHelper(IAppGroup *group, uv_loop_t *loop, bool active); + + int OnSendResponse(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override; + + int OnRecvResponse(IntKeyType connKey) override; + + int OnSendRequest(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override; + + INetConn *ConnOfIntKey(IntKeyType connKey) override; + + int SendNetConnRst(const ConnInfo &src, IntKeyType connKey) override; + + void Close() override; + + INetConnKeepAlive *GetIKeepAlive() const override; + + int RemoveRequest(IntKeyType connKey) override; + +private: + void onFlush(); + +private: + void setupTimer(uv_loop_t *loop); + + static void timer_cb(uv_timer_t *timer); + +private: + const int MAX_RETRY = 3; + const uint32_t FLUSH_INTERVAL = 5000; // every 2sec + const uint32_t FIRST_FLUSH_DELAY = 5000; // on app start + IAppGroup *mAppGroup = nullptr; + uv_timer_t *mFlushTimer = nullptr; + std::map mReqMap; + INetConnKeepAlive *mKeepAlive = nullptr; +}; + +#endif //RSOCK_NETCONNKEEPALIVEHELPER_H diff --git a/callbacks/ResetHelper.cpp b/callbacks/ResetHelper.cpp new file mode 100644 index 0000000..58248db --- /dev/null +++ b/callbacks/ResetHelper.cpp @@ -0,0 +1,46 @@ +// +// Created by System Administrator on 5/7/18. +// + +#include "../conn/IAppGroup.h" +#include "ResetHelper.h" +#include "ConnReset.h" +#include "../util/rsutil.h" + +ResetHelper::ResetHelper(IAppGroup *appGroup) { + mAppGroup = appGroup; + mReset = new ConnReset(this); +} + +void ResetHelper::Close() { + if (mReset) { + mReset->Close(); + delete mReset; + mReset = nullptr; + } + mAppGroup = nullptr; +} + +int ResetHelper::OnSendNetConnReset(uint8_t cmd, const ConnInfo &src, ssize_t nread, const rbuf_t &rbuf) { + if (cmd == EncHead::TYPE_NETCONN_RST) { + auto rbuf2 = new_buf(0, "", (void *) &src); + mAppGroup->Output(rbuf2.len, rbuf2); // directly send + } + return mAppGroup->doSendCmd(cmd, nread, rbuf); +} + +int ResetHelper::OnSendConvRst(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) { + return mAppGroup->doSendCmd(cmd, nread, rbuf); +} + +int ResetHelper::OnRecvNetconnRst(const ConnInfo &src, IntKeyType key) { + return mAppGroup->onPeerNetConnRst(src, key); +} + +int ResetHelper::OnRecvConvRst(const ConnInfo &src, uint32_t conv) { + return mAppGroup->onPeerConvRst(src, conv); +} + +IReset *ResetHelper::GetReset() { + return mReset; +} \ No newline at end of file diff --git a/callbacks/ResetHelper.h b/callbacks/ResetHelper.h new file mode 100644 index 0000000..6b0e2d0 --- /dev/null +++ b/callbacks/ResetHelper.h @@ -0,0 +1,34 @@ +// +// Created by System Administrator on 5/7/18. +// + +#ifndef RSOCK_RESETHELPER_H +#define RSOCK_RESETHELPER_H + +#include "IReset.h" + +class IAppGroup; + +class ResetHelper : public IReset::IRestHelper { +public: + explicit ResetHelper(IAppGroup *appGroup); + + void Close() override; + + int OnSendNetConnReset(uint8_t cmd, const ConnInfo &src, ssize_t nread, const rbuf_t &rbuf) override; + + int OnSendConvRst(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override; + + int OnRecvNetconnRst(const ConnInfo &src, IntKeyType key) override; + + int OnRecvConvRst(const ConnInfo &src, uint32_t conv) override; + + IReset *GetReset() override; + +private: + IAppGroup *mAppGroup = nullptr; + IReset *mReset = nullptr; +}; + + +#endif //RSOCK_RESETHELPER_H diff --git a/conn/DefaultFakeConn.cpp b/conn/DefaultFakeConn.cpp new file mode 100644 index 0000000..51fa79a --- /dev/null +++ b/conn/DefaultFakeConn.cpp @@ -0,0 +1,27 @@ +// +// Created by System Administrator on 5/7/18. +// + +#include "DefaultFakeConn.h" + +DefaultFakeConn::DefaultFakeConn() : INetConn("DefaultFakeConn") {} + +bool DefaultFakeConn::Alive() { + return true; +} + +bool DefaultFakeConn::IsUdp() { + return false; +} + +ConnInfo *DefaultFakeConn::GetInfo() { + return nullptr; +} + +IntKeyType DefaultFakeConn::IntKey() { + return 0; +} + +int DefaultFakeConn::OnRecv(ssize_t nread, const rbuf_t &rbuf) { + return IConn::OnRecv(nread, rbuf); +} diff --git a/conn/DefaultFakeConn.h b/conn/DefaultFakeConn.h new file mode 100644 index 0000000..9ab88a6 --- /dev/null +++ b/conn/DefaultFakeConn.h @@ -0,0 +1,28 @@ +// +// Created by System Administrator on 5/7/18. +// + +#ifndef RSOCK_DEFAULTFAKECONN_H +#define RSOCK_DEFAULTFAKECONN_H + + +#include "INetConn.h" + +class DefaultFakeConn : public INetConn { +public: + DefaultFakeConn(); + + // always return true + bool Alive() override; + + bool IsUdp() override; + + ConnInfo *GetInfo() override; + + IntKeyType IntKey() override; + + int OnRecv(ssize_t nread, const rbuf_t &rbuf) override; +}; + + +#endif //RSOCK_DEFAULTFAKECONN_H diff --git a/conn/IAppGroup.cpp b/conn/IAppGroup.cpp index 90bb19b..b7ca3b8 100644 --- a/conn/IAppGroup.cpp +++ b/conn/IAppGroup.cpp @@ -11,6 +11,8 @@ #include "../callbacks/ConnReset.h" #include "../util/rhash.h" #include "../util/rsutil.h" +#include "../callbacks/NetConnKeepAliveHelper.h" +#include "../callbacks/ResetHelper.h" using namespace std::placeholders; @@ -176,131 +178,4 @@ int IAppGroup::onNetconnDead(uint32_t key) { return 0; } -IAppGroup::NetConnKeepAliveHelper::NetConnKeepAliveHelper(IAppGroup *group, uv_loop_t *loop, bool active) { - mAppGroup = group; - mKeepAlive = new NetConnKeepAlive(this); - if (active) { - setupTimer(loop); - } -} - -void IAppGroup::NetConnKeepAliveHelper::Close() { - if (mFlushTimer) { - uv_timer_stop(mFlushTimer); - uv_close(reinterpret_cast(mFlushTimer), close_cb); - mFlushTimer = nullptr; - } - if (mKeepAlive) { - mKeepAlive->Close(); - delete mKeepAlive; - mKeepAlive = nullptr; - } -} - -int IAppGroup::NetConnKeepAliveHelper::OnSendResponse(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) { - return mAppGroup->doSendCmd(cmd, nread, rbuf); -} - -int IAppGroup::NetConnKeepAliveHelper::OnSendRequest(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) { - return mAppGroup->doSendCmd(cmd, nread, rbuf); -} - -int IAppGroup::NetConnKeepAliveHelper::OnRecvResponse(INetConn::IntKeyType connKey) { - return RemoveRequest(connKey); -} - -INetConn *IAppGroup::NetConnKeepAliveHelper::ConnOfIntKey(INetConn::IntKeyType connKey) { - return mAppGroup->NetGroup()->ConnOfIntKey(connKey); -} - -int IAppGroup::NetConnKeepAliveHelper::SendNetConnRst(const ConnInfo &src, INetConn::IntKeyType connKey) { - return mAppGroup->sendNetConnRst(src, connKey); -} - -void IAppGroup::NetConnKeepAliveHelper::setupTimer(uv_loop_t *loop) { - if (!mFlushTimer) { - mFlushTimer = static_cast(malloc(sizeof(uv_timer_t))); - uv_timer_init(loop, mFlushTimer); - mFlushTimer->data = this; - uv_timer_start(mFlushTimer, timer_cb, FIRST_FLUSH_DELAY, FLUSH_INTERVAL); - } -} - -void IAppGroup::NetConnKeepAliveHelper::timer_cb(uv_timer_t *timer) { - NetConnKeepAliveHelper *helper = static_cast(timer->data); - helper->onFlush(); -} - -void IAppGroup::NetConnKeepAliveHelper::onFlush() { - auto conns = mAppGroup->NetGroup()->GetAllConns(); - for (auto &e: conns) { - auto *conn = dynamic_cast(e.second); - auto it = mReqMap.find(conn->IntKey()); - if (it != mReqMap.end()) { - it->second++; - } else { - mReqMap.emplace(conn->IntKey(), 0); - } - - if (it == mReqMap.end() || it->second < MAX_RETRY) { // new or still valid - mKeepAlive->SendRequest(conn->IntKey()); - } - } - - // todo: make an interface. and move these into inetgroup - auto aCopy = mReqMap; - for (auto &e: aCopy) { - if (e.second >= MAX_RETRY) { // keep alive timeout - LOGE << "keepalive timeout"; - mAppGroup->onNetconnDead(e.first); - RemoveRequest(e.first); - } - } -} - -INetConnKeepAlive *IAppGroup::NetConnKeepAliveHelper::GetIKeepAlive() const { - return mKeepAlive; -} - -int IAppGroup::NetConnKeepAliveHelper::RemoveRequest(INetConnKeepAlive::IntKeyType connKey) { - return mReqMap.erase(connKey); -} - -IAppGroup::ResetHelper::ResetHelper(IAppGroup *appGroup) { - mAppGroup = appGroup; - mReset = new ConnReset(this); -} - -void IAppGroup::ResetHelper::Close() { - if (mReset) { - mReset->Close(); - delete mReset; - mReset = nullptr; - } - mAppGroup = nullptr; -} - -int IAppGroup::ResetHelper::OnSendNetConnReset(uint8_t cmd, const ConnInfo &src, ssize_t nread, const rbuf_t &rbuf) { - if (cmd == EncHead::TYPE_NETCONN_RST) { - auto rbuf2 = new_buf(0, "", (void *) &src); - mAppGroup->Output(rbuf2.len, rbuf2); // directly send - } - return mAppGroup->doSendCmd(cmd, nread, rbuf); -} - -int IAppGroup::ResetHelper::OnSendConvRst(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) { - return mAppGroup->doSendCmd(cmd, nread, rbuf); -} - -int IAppGroup::ResetHelper::OnRecvNetconnRst(const ConnInfo &src, IReset::IntKeyType key) { - return mAppGroup->onPeerNetConnRst(src, key); -} - -int IAppGroup::ResetHelper::OnRecvConvRst(const ConnInfo &src, uint32_t conv) { - return mAppGroup->onPeerConvRst(src, conv); -} - -IReset *IAppGroup::ResetHelper::GetReset() { - return mReset; -} diff --git a/conn/IAppGroup.h b/conn/IAppGroup.h index 3c607bd..4af3ac7 100644 --- a/conn/IAppGroup.h +++ b/conn/IAppGroup.h @@ -8,7 +8,7 @@ #include "IGroup.h" #include "../callbacks/ITcpObserver.h" #include "../bean/EncHead.h" -#include "../callbacks/NetConnKeepAlive.h" +#include "../callbacks/INetConnKeepAlive.h" #include "../callbacks/IReset.h" class INetGroup; @@ -17,11 +17,13 @@ class INetConn; struct ConnInfo; + class IAppGroup : public IGroup, public ITcpObserver { public: using IntKeyType = uint32_t; - IAppGroup(const std::string &groupId, INetGroup *fakeNetGroup, IConn *btm, bool activeKeepAlive, const std::string &printableStr = ""); + IAppGroup(const std::string &groupId, INetGroup *fakeNetGroup, IConn *btm, bool activeKeepAlive, + const std::string &printableStr = ""); int Init() override; @@ -47,7 +49,7 @@ class IAppGroup : public IGroup, public ITcpObserver { const std::string ToStr() override; -protected: +//protected: virtual int sendNetConnRst(const ConnInfo &src, IntKeyType key); virtual int onPeerNetConnRst(const ConnInfo &src, uint32_t key); @@ -60,65 +62,6 @@ class IAppGroup : public IGroup, public ITcpObserver { virtual int onNetconnDead(uint32_t key); -protected: - class NetConnKeepAliveHelper : public NetConnKeepAlive::INetConnAliveHelper { - public: - explicit NetConnKeepAliveHelper(IAppGroup *group, uv_loop_t *loop, bool active = true); - - int OnSendResponse(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override; - - int OnRecvResponse(IntKeyType connKey) override; - - int OnSendRequest(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override; - - INetConn *ConnOfIntKey(IntKeyType connKey) override; - - int SendNetConnRst(const ConnInfo &src, IntKeyType connKey) override; - - void Close() override; - - INetConnKeepAlive *GetIKeepAlive() const override; - - int RemoveRequest(IntKeyType connKey) override; - - private: - void onFlush(); - - private: - void setupTimer(uv_loop_t *loop); - - static void timer_cb(uv_timer_t *timer); - - private: - const int MAX_RETRY = 3; - const uint32_t FLUSH_INTERVAL = 2000; // every 2sec - const uint32_t FIRST_FLUSH_DELAY = 5000; // on app start - IAppGroup *mAppGroup = nullptr; - uv_timer_t *mFlushTimer = nullptr; - std::map mReqMap; - INetConnKeepAlive *mKeepAlive = nullptr; - }; - - class ResetHelper : public IReset::IRestHelper { - public: - explicit ResetHelper(IAppGroup *appGroup); - - void Close() override; - - int OnSendNetConnReset(uint8_t cmd, const ConnInfo &src, ssize_t nread, const rbuf_t &rbuf) override; - - int OnSendConvRst(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override; - - int OnRecvNetconnRst(const ConnInfo &src, IntKeyType key) override; - - int OnRecvConvRst(const ConnInfo &src, uint32_t conv) override; - - IReset *GetReset() override; - - private: - IAppGroup *mAppGroup = nullptr; - IReset *mReset = nullptr; - }; protected: EncHead mHead; @@ -126,8 +69,8 @@ class IAppGroup : public IGroup, public ITcpObserver { private: bool mActive = true; std::string mPrintableStr; - ResetHelper *mResetHelper = nullptr; - NetConnKeepAliveHelper::INetConnAliveHelper *mKeepAliveHelper = nullptr; + IReset::IRestHelper *mResetHelper = nullptr; + INetConnKeepAlive::INetConnAliveHelper *mKeepAliveHelper = nullptr; INetGroup *mFakeNetGroup = nullptr; }; diff --git a/conn/INetConn.cpp b/conn/INetConn.cpp index 3106696..d406b86 100644 --- a/conn/INetConn.cpp +++ b/conn/INetConn.cpp @@ -46,7 +46,7 @@ void INetConn::Close() { mErrCb = nullptr; } -INetConn::IntKeyType INetConn::HashKey(const ConnInfo &info) { +IntKeyType INetConn::HashKey(const ConnInfo &info) { IntKeyType key = info.sp << 1; if (info.IsUdp()) { key |= 0x1; @@ -54,10 +54,10 @@ INetConn::IntKeyType INetConn::HashKey(const ConnInfo &info) { return static_cast(key); } -INetConn::IntKeyType INetConn::IntKey() { +IntKeyType INetConn::IntKey() { return mIntKey; } -void INetConn::SetIntKey(INetConn::IntKeyType intKey) { +void INetConn::SetIntKey(IntKeyType intKey) { mIntKey = intKey; } diff --git a/conn/INetConn.h b/conn/INetConn.h index d7568d2..971ff8d 100644 --- a/conn/INetConn.h +++ b/conn/INetConn.h @@ -8,12 +8,12 @@ #include "IConn.h" #include "../bean/EncHead.h" +#include "rscomm.h" struct ConnInfo; class INetConn : public IConn { public: - using IntKeyType = EncHead::IntConnKeyType; explicit INetConn(const std::string &key); diff --git a/conn/INetGroup.cpp b/conn/INetGroup.cpp index 94c82ad..fe74f80 100644 --- a/conn/INetGroup.cpp +++ b/conn/INetGroup.cpp @@ -8,6 +8,7 @@ #include "INetGroup.h" #include "../bean/ConnInfo.h" #include "../bean/TcpInfo.h" +#include "DefaultFakeConn.h" using namespace std::placeholders; @@ -24,6 +25,16 @@ int INetGroup::Init() { auto cb = std::bind(&INetGroup::handleMessage, this, std::placeholders::_1); mHandler = Handler::NewHandler(mLoop, cb); + + mDefaultFakeConn = new DefaultFakeConn(); + nret = mDefaultFakeConn->Init(); + if (nret) { + return nret; + } + + auto fn = std::bind(&IConn::OnRecv, this, _1, _2); + mDefaultFakeConn->SetOnRecvCb(fn); + return 0; } @@ -31,6 +42,11 @@ void INetGroup::Close() { IGroup::Close(); mErrCb = nullptr; mHandler = nullptr; // handler will automatically remove all pending messages and tasks + if (mDefaultFakeConn) { + mDefaultFakeConn->Close(); + delete mDefaultFakeConn; + mDefaultFakeConn = nullptr; + } } int INetGroup::Input(ssize_t nread, const rbuf_t &rbuf) { @@ -52,8 +68,9 @@ int INetGroup::Input(ssize_t nread, const rbuf_t &rbuf) { afterInput(n); return n; } - LOGD << "Cannot input, no such conn: " << key; // todo: send conn rst - return ERR_NO_CONN; + + LOGD << "Cannot input, no such conn: " << key << ", use default conn to process data"; + return mDefaultFakeConn->Input(nread, rbuf); } return nread; } @@ -69,8 +86,7 @@ void INetGroup::AddNetConn(INetConn *conn) { int INetGroup::Send(ssize_t nread, const rbuf_t &rbuf) { if (nread > 0) { - decltype(mConns) fails; - while (fails.size() < mConns.size()) { + while (!mConns.empty()) { int n = rand() % mConns.size(); auto it = mConns.begin(); std::advance(it, n); @@ -79,12 +95,10 @@ int INetGroup::Send(ssize_t nread, const rbuf_t &rbuf) { afterSend(n); return n; } - LOGW << "send, conn " << it->second->Key() << " is dead but not removed"; - fails.emplace(it->first, it->second); - } - for (auto &e: fails) { // remove dead conn - childConnErrCb(dynamic_cast(e.second), -1); + LOGW << "send, conn " << it->second->Key() << " is dead. Remove it now"; + childConnErrCb(dynamic_cast(it->second), -1); } + LOGE << "All conns are dead!!! Wait to reconnect"; return -1; } return nread; @@ -145,7 +159,7 @@ void INetGroup::netConnErr(const ConnInfo &info) { } } -INetConn *INetGroup::ConnOfIntKey(INetConn::IntKeyType key) { +INetConn *INetGroup::ConnOfIntKey(IntKeyType key) { auto &conns = GetAllConns(); for (auto &e: conns) { INetConn *conn = dynamic_cast(e.second); diff --git a/conn/INetGroup.h b/conn/INetGroup.h index 22cc13d..cb955e7 100644 --- a/conn/INetGroup.h +++ b/conn/INetGroup.h @@ -40,7 +40,7 @@ class INetGroup : public IGroup { // flush detect error bool OnConnDead(IConn *conn) override; - INetConn *ConnOfIntKey(INetConn::IntKeyType key); + INetConn *ConnOfIntKey(IntKeyType key); uv_loop_t *GetLoop() const { return mLoop; } @@ -59,6 +59,7 @@ class INetGroup : public IGroup { uv_loop_t *mLoop = nullptr; Handler::SPHandler mHandler = nullptr; NetConnErrCb mErrCb = nullptr; + IConn *mDefaultFakeConn = nullptr; }; diff --git a/doc/README.zh-cn.md b/doc/README.zh-cn.md index 8b7c683..3e95883 100644 --- a/doc/README.zh-cn.md +++ b/doc/README.zh-cn.md @@ -67,7 +67,9 @@ done 2. 在Windows系统上,rsock的速度要远慢于mac/linux上。我测试的情况是,速度在500-800KB/s之间。 -3. 在windows系统上,建议传入 --lcapIp参数,而不是-d。因为要找到网卡名字,并不容易。 +3. 在windows系统上,建议传入 --lcapIp参数,而不是-d。因为要找到网卡名字,并不容易。比如: `--lcapIp=x.x.x.x` 其中 `x.x.x.x`是你的ip地址。 + +4. 在Windows系统上,rsock的表现没有它在Linux和Mac上好。举个例子:Linux和Mac用户,可以流畅的观看1080P youtube。Windows用户只能看720P,1080P会有点卡。 ### 退出运行 diff --git a/include/rscomm.h b/include/rscomm.h index 51b5766..29f5fe7 100644 --- a/include/rscomm.h +++ b/include/rscomm.h @@ -9,6 +9,8 @@ extern "C" { #endif +#include + #ifndef OM_TTL_OUT #define OM_TTL_OUT 64 #endif @@ -36,9 +38,7 @@ extern "C" { #endif #endif -#ifndef RSOCK_SOCK_BUF_TIMES -#define RSOCK_SOCK_BUF_TIMES 64 // 4 * original buf size -#endif + #ifndef RSOCK_UV_MAX_BUF #define RSOCK_UV_MAX_BUF 65536 // 64K @@ -51,6 +51,8 @@ typedef struct sockaddr_in SA4; //#define OM_ACKPOOL_FLUSH_SEC 5 +using IntKeyType = uint32_t; + #ifdef __cplusplus } #endif diff --git a/net/ClientNetManager.cpp b/net/ClientNetManager.cpp index 6b459e7..21f8c20 100644 --- a/net/ClientNetManager.cpp +++ b/net/ClientNetManager.cpp @@ -73,7 +73,7 @@ void ClientNetManager::flushPending(uint64_t now) { // not running and timeout if (!helper.req && now >= helper.nextRetryMs) { - LOGD << "retry to connect " << helper.info.ToStr(); + LOGD << "retry to connect " << helper.info.ToStr() << ", nRetry left: " << helper.nRetry; auto req = NetUtil::ConnectTcp(mLoop, helper.info, connectCb, this); if (req) { helper.req = req; @@ -102,7 +102,6 @@ void ClientNetManager::onTcpConnect(uv_connect_t *req, int status) { if (0 == add2PoolAutoClose(c)) { c = TransferConn(c->Key()); it->cb(c, tcpInfo); - }; mPending.erase(it); } @@ -131,4 +130,7 @@ void ClientNetManager::DialHelper::dialFailed(uint64_t now) { durationMs = 1000; } nextRetryMs = now + durationMs; + if (nRetry > 0) { + LOGD << "will connect " << (durationMs / 1000) << " seconds later: " << info.ToStr(); + } } diff --git a/src/os/include/os_unix.h b/src/os/include/os_unix.h index 2b2290b..70f0ec0 100644 --- a/src/os/include/os_unix.h +++ b/src/os/include/os_unix.h @@ -17,6 +17,11 @@ #include #define SOCKOPT_VAL_TYPE void* + +#ifndef RSOCK_SOCK_BUF_TIMES +#define RSOCK_SOCK_BUF_TIMES 64 // 64 * original buf size +#endif + #endif // !_WIN32 #endif // !OS_UNIX_H diff --git a/src/os/include/os_win.h b/src/os/include/os_win.h index d93c1b7..4534e89 100644 --- a/src/os/include/os_win.h +++ b/src/os/include/os_win.h @@ -87,6 +87,10 @@ struct sockaddr_un { #define F_OK 00 #endif // !F_OK +#ifndef RSOCK_SOCK_BUF_TIMES +#define RSOCK_SOCK_BUF_TIMES 256 // 256 * 8K(original buf size) = 2MB +#endif + #endif // _cplusplus #endif // _WIN32 #endif // !OS_WIN_H diff --git a/src/os/unix/conn/UnixDgramSyncConn.cpp b/src/os/unix/conn/UnixDgramSyncConn.cpp index 1a859ee..fc1cd55 100644 --- a/src/os/unix/conn/UnixDgramSyncConn.cpp +++ b/src/os/unix/conn/UnixDgramSyncConn.cpp @@ -1,6 +1,7 @@ #include #include "uv.h" #include +#include #include "UnixDgramSyncConn.h" #include "os_util.h" #include "../../../../util/rsutil.h" diff --git a/src/sync/TcpStreamSyncConn.cpp b/src/sync/TcpStreamSyncConn.cpp index 3d9072a..3e7515d 100644 --- a/src/sync/TcpStreamSyncConn.cpp +++ b/src/sync/TcpStreamSyncConn.cpp @@ -175,7 +175,6 @@ int TcpStreamSyncConn::rawInput(int nread, const char *buf) { if (totLen > mNextLen) { // totLen > mNextLen. copy two buf into one char *pbuf = mLargeBuf; // mLargeBuf is large enough to store both mTempBuf and buf - const int BUF_LEN = totLen; int left = totLen; if (mNextLen > 0) { // if nextLen valid. prepend it to buf. *(COUNT_TYPE*)pbuf = mNextLen; diff --git a/xbuild/lib/Windows_x86/ucrtbased.dll b/xbuild/lib/Windows_x86/ucrtbased.dll new file mode 100755 index 0000000..a812fa4 Binary files /dev/null and b/xbuild/lib/Windows_x86/ucrtbased.dll differ diff --git a/xbuild/lib/Windows_x86/vcruntime140d.dll b/xbuild/lib/Windows_x86/vcruntime140d.dll new file mode 100755 index 0000000..5f994b6 Binary files /dev/null and b/xbuild/lib/Windows_x86/vcruntime140d.dll differ