Skip to content

Commit

Permalink
add dll dependency. add keepalive param:5s(default)
Browse files Browse the repository at this point in the history
add keepalive:5s. win:buf size 2M. DefaultFakeConn

add rsock dll dependency
  • Loading branch information
nmq committed May 7, 2018
1 parent 13ce521 commit d621fa8
Show file tree
Hide file tree
Showing 27 changed files with 377 additions and 220 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ set(SOURCE_FILES
cap/cap_util.cpp cap/RCap.cpp
thirdparty/md5.c thirdparty/json11.cpp
conn/INetConn.cpp conn/BtmUdpConn.cpp conn/FakeTcp.cpp conn/INetGroup.cpp conn/IGroup.cpp conn/FakeUdp.cpp
conn/IConn.cpp conn/RConn.cpp conn/RawTcp.cpp conn/IAppGroup.cpp conn/IBtmConn.cpp
conn/IConn.cpp conn/RConn.cpp conn/RawTcp.cpp conn/IAppGroup.cpp conn/IBtmConn.cpp conn/DefaultFakeConn.cpp
net/NetUtil.cpp net/ClientNetManager.cpp net/TcpAckPool.cpp net/TcpListenPool.cpp net/INetManager.cpp
net/ServerNetManager.cpp
src/sync/SyncConnFactory.cpp src/sync/LoopSreamSyncConn.cpp src/sync/ISyncConn.cpp src/sync/UdpSyncConn.cpp
src/sync/TcpStreamSyncConn.cpp src/sync/IPacketSyncConn.cpp
)
callbacks/NetConnKeepAliveHelper.cpp callbacks/NetConnKeepAliveHelper.h callbacks/ResetHelper.cpp callbacks/ResetHelper.h)

# platform dependent dir
SET(ROS_DIR src/os/unix)
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ Parameter explanation:

2. For Windows users, speed is far slow than rsock on mac/Linux. (500-800KB/s during my test)

3. For Windows users, you'd better pass --lcapIp arguments instead of -d. Because it's hard to find NIC name for normal users.
3. For Windows users, you'd better pass --lcapIp arguments instead of -d. e.g. `--lcapIp=x.x.x.x` Where `x.x.x.x` is your Internet IP. Because it's hard to find NIC name for normal users.

4. For Windows users, rsock doesn't behave very well like it does on Linux and Mac. e.g. On Mac/Linux, rsock can support to watch 1080P youtube video smoothly. For windows users, rsock can only support 720P youtube video.

### Exit

Expand Down
18 changes: 18 additions & 0 deletions bean/RConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ int RConfig::Parse(bool is_server, int argc, const char *const *argv) {

args::ValueFlag<uint16_t> cap_timeout(opt, "", "pcap timeout(ms). > 0 and <= 50", {"cap_timeout"});

args::ValueFlag<int> keepAlive(opt, "keepalive interval", "interval used to send keepalive request. default 5s."
, {"keepalive"});
try {
parser.ParseCLI(argc, argv);
do {
Expand Down Expand Up @@ -145,6 +147,13 @@ int RConfig::Parse(bool is_server, int argc, const char *const *argv) {
this->isDaemon = (daemon.Get() != 0);
}

if (keepAlive) {
this->param.keepAliveInterval = keepAlive.Get();
if (this->param.keepAliveInterval <= 0) {
throw args::Error("keepalive must > 0: " + std::to_string(this->param.keepAliveInterval));
}
}

} while (false);

if (param.selfCapIp.empty()) {
Expand Down Expand Up @@ -198,6 +207,8 @@ void RConfig::CheckValidation(const RConfig &c) {
assert((p.type == OM_PIPE_TCP) || (p.type == OM_PIPE_UDP) || (p.type == OM_PIPE_ALL));
assert(p.cap_timeout > 0 && p.cap_timeout < 50);

assert(p.keepAliveInterval > 0);

if (!DevIpMatch(p.dev, p.selfCapIp)) {
char buf[BUFSIZ] = {0};
snprintf(buf, BUFSIZ, "dev %s and self capture ip %s not match", p.dev.c_str(),
Expand Down Expand Up @@ -300,6 +311,10 @@ void RConfig::parseJsonString(RConfig &c, const std::string &content, std::strin
if (o["cap_timeout"].is_string()) {
p.cap_timeout = o["cap_timeout"].int_value();
}

if (o["keepalive"].is_number()) {
p.keepAliveInterval = o["keepalive"].int_value();
}
}
}

Expand All @@ -322,6 +337,7 @@ json11::Json RConfig::to_json() const {
{"duration", (int) param.conn_duration_sec},
{"type", strOfType(param.type)},
{"hash", param.hashKey},
{"keepalive", param.keepAliveInterval},
{"cap_timeout", param.cap_timeout},
}},
};
Expand Down Expand Up @@ -406,6 +422,8 @@ std::string RConfig::BuildExampleString() {
out << "client:\n";
#ifdef __MACH__
out << "sudo ./client_rsock_Darwin -d en0";
#elif _WIN32
out << "./client_rsock_Windows.exe --lcapIp=127.0.0.1:30000 ";
#else
out << "sudo ./client_rsock_Linux -d wlan0";
#endif
Expand Down
2 changes: 2 additions & 0 deletions bean/RConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ struct RConfig {

int type = OM_PIPE_TCP; // default tcp ports
uint16_t cap_timeout = OM_PCAP_TIMEOUT;

int keepAliveInterval = 5; // default 5s, 3 times
};

// if turned to debug, speed of rsock will be very slow on macOS.
Expand Down
3 changes: 1 addition & 2 deletions callbacks/INetConnKeepAlive.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
#define RSOCK_IKEEPALIVE_H

#include "rcommon.h"
#include "rscomm.h"

class INetConn;

struct ConnInfo;

class INetConnKeepAlive {
public:
using IntKeyType = uint32_t ;

class INetConnAliveHelper {
public:
virtual ~INetConnAliveHelper() = default;
Expand Down
3 changes: 1 addition & 2 deletions callbacks/IReset.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
#define RSOCK_IRESET_H

#include "rcommon.h"
#include "rscomm.h"

struct ConnInfo;

class IReset {
public:
using IntKeyType = uint32_t;

class IRestHelper {
public:
virtual ~IRestHelper() = default;
Expand Down
100 changes: 100 additions & 0 deletions callbacks/NetConnKeepAliveHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//
// Created by System Administrator on 5/7/18.
//

#include <cstdlib>
#include <plog/Log.h>
#include "NetConnKeepAliveHelper.h"
#include "../conn/IAppGroup.h"
#include "../conn/INetGroup.h"
#include "NetConnKeepAlive.h"

NetConnKeepAliveHelper::NetConnKeepAliveHelper(IAppGroup *group, uv_loop_t *loop, bool active) {
mAppGroup = group;
mKeepAlive = new NetConnKeepAlive(this);
if (active) {
setupTimer(loop);
}
}

void NetConnKeepAliveHelper::Close() {
if (mFlushTimer) {
uv_timer_stop(mFlushTimer);
uv_close(reinterpret_cast<uv_handle_t *>(mFlushTimer), close_cb);
mFlushTimer = nullptr;
}
if (mKeepAlive) {
mKeepAlive->Close();
delete mKeepAlive;
mKeepAlive = nullptr;
}
}

int NetConnKeepAliveHelper::OnSendResponse(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) {
return mAppGroup->doSendCmd(cmd, nread, rbuf);
}

int NetConnKeepAliveHelper::OnSendRequest(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) {
return mAppGroup->doSendCmd(cmd, nread, rbuf);
}

int NetConnKeepAliveHelper::OnRecvResponse(IntKeyType connKey) {
return RemoveRequest(connKey);
}

INetConn *NetConnKeepAliveHelper::ConnOfIntKey(IntKeyType connKey) {
return mAppGroup->NetGroup()->ConnOfIntKey(connKey);
}

int NetConnKeepAliveHelper::SendNetConnRst(const ConnInfo &src, IntKeyType connKey) {
return mAppGroup->sendNetConnRst(src, connKey);
}

void NetConnKeepAliveHelper::setupTimer(uv_loop_t *loop) {
if (!mFlushTimer) {
mFlushTimer = static_cast<uv_timer_t *>(malloc(sizeof(uv_timer_t)));
uv_timer_init(loop, mFlushTimer);
mFlushTimer->data = this;
uv_timer_start(mFlushTimer, timer_cb, FIRST_FLUSH_DELAY, FLUSH_INTERVAL);
}
}

void NetConnKeepAliveHelper::timer_cb(uv_timer_t *timer) {
NetConnKeepAliveHelper *helper = static_cast<NetConnKeepAliveHelper *>(timer->data);
helper->onFlush();
}

void NetConnKeepAliveHelper::onFlush() {
auto conns = mAppGroup->NetGroup()->GetAllConns();
for (auto &e: conns) {
auto *conn = dynamic_cast<INetConn *>(e.second);
auto it = mReqMap.find(conn->IntKey());
if (it != mReqMap.end()) {
it->second++;
} else {
mReqMap.emplace(conn->IntKey(), 0);
}

if (it == mReqMap.end() || it->second < MAX_RETRY) { // new or still valid
mKeepAlive->SendRequest(conn->IntKey());
}
}

// todo: make an interface. and move these into inetgroup
auto aCopy = mReqMap;
for (auto &e: aCopy) {
if (e.second >= MAX_RETRY) { // keep alive timeout
LOGE << "keepalive timeout";
mAppGroup->onNetconnDead(e.first);
RemoveRequest(e.first);
}
}
}

INetConnKeepAlive *NetConnKeepAliveHelper::GetIKeepAlive() const {
return mKeepAlive;
}

int NetConnKeepAliveHelper::RemoveRequest(IntKeyType connKey) {
return mReqMap.erase(connKey);
}
54 changes: 54 additions & 0 deletions callbacks/NetConnKeepAliveHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Created by System Administrator on 5/7/18.
//

#ifndef RSOCK_NETCONNKEEPALIVEHELPER_H
#define RSOCK_NETCONNKEEPALIVEHELPER_H

#include <map>

#include "INetConnKeepAlive.h"

class IAppGroup;

class NetConnKeepAliveHelper : public INetConnKeepAlive::INetConnAliveHelper {
public:
using IntKeyType = uint32_t;

explicit NetConnKeepAliveHelper(IAppGroup *group, uv_loop_t *loop, bool active);

int OnSendResponse(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override;

int OnRecvResponse(IntKeyType connKey) override;

int OnSendRequest(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override;

INetConn *ConnOfIntKey(IntKeyType connKey) override;

int SendNetConnRst(const ConnInfo &src, IntKeyType connKey) override;

void Close() override;

INetConnKeepAlive *GetIKeepAlive() const override;

int RemoveRequest(IntKeyType connKey) override;

private:
void onFlush();

private:
void setupTimer(uv_loop_t *loop);

static void timer_cb(uv_timer_t *timer);

private:
const int MAX_RETRY = 3;
const uint32_t FLUSH_INTERVAL = 5000; // every 2sec
const uint32_t FIRST_FLUSH_DELAY = 5000; // on app start
IAppGroup *mAppGroup = nullptr;
uv_timer_t *mFlushTimer = nullptr;
std::map<IntKeyType, int> mReqMap;
INetConnKeepAlive *mKeepAlive = nullptr;
};

#endif //RSOCK_NETCONNKEEPALIVEHELPER_H
46 changes: 46 additions & 0 deletions callbacks/ResetHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Created by System Administrator on 5/7/18.
//

#include "../conn/IAppGroup.h"
#include "ResetHelper.h"
#include "ConnReset.h"
#include "../util/rsutil.h"

ResetHelper::ResetHelper(IAppGroup *appGroup) {
mAppGroup = appGroup;
mReset = new ConnReset(this);
}

void ResetHelper::Close() {
if (mReset) {
mReset->Close();
delete mReset;
mReset = nullptr;
}
mAppGroup = nullptr;
}

int ResetHelper::OnSendNetConnReset(uint8_t cmd, const ConnInfo &src, ssize_t nread, const rbuf_t &rbuf) {
if (cmd == EncHead::TYPE_NETCONN_RST) {
auto rbuf2 = new_buf(0, "", (void *) &src);
mAppGroup->Output(rbuf2.len, rbuf2); // directly send
}
return mAppGroup->doSendCmd(cmd, nread, rbuf);
}

int ResetHelper::OnSendConvRst(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) {
return mAppGroup->doSendCmd(cmd, nread, rbuf);
}

int ResetHelper::OnRecvNetconnRst(const ConnInfo &src, IntKeyType key) {
return mAppGroup->onPeerNetConnRst(src, key);
}

int ResetHelper::OnRecvConvRst(const ConnInfo &src, uint32_t conv) {
return mAppGroup->onPeerConvRst(src, conv);
}

IReset *ResetHelper::GetReset() {
return mReset;
}
34 changes: 34 additions & 0 deletions callbacks/ResetHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Created by System Administrator on 5/7/18.
//

#ifndef RSOCK_RESETHELPER_H
#define RSOCK_RESETHELPER_H

#include "IReset.h"

class IAppGroup;

class ResetHelper : public IReset::IRestHelper {
public:
explicit ResetHelper(IAppGroup *appGroup);

void Close() override;

int OnSendNetConnReset(uint8_t cmd, const ConnInfo &src, ssize_t nread, const rbuf_t &rbuf) override;

int OnSendConvRst(uint8_t cmd, ssize_t nread, const rbuf_t &rbuf) override;

int OnRecvNetconnRst(const ConnInfo &src, IntKeyType key) override;

int OnRecvConvRst(const ConnInfo &src, uint32_t conv) override;

IReset *GetReset() override;

private:
IAppGroup *mAppGroup = nullptr;
IReset *mReset = nullptr;
};


#endif //RSOCK_RESETHELPER_H
27 changes: 27 additions & 0 deletions conn/DefaultFakeConn.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// Created by System Administrator on 5/7/18.
//

#include "DefaultFakeConn.h"

DefaultFakeConn::DefaultFakeConn() : INetConn("DefaultFakeConn") {}

bool DefaultFakeConn::Alive() {
return true;
}

bool DefaultFakeConn::IsUdp() {
return false;
}

ConnInfo *DefaultFakeConn::GetInfo() {
return nullptr;
}

IntKeyType DefaultFakeConn::IntKey() {
return 0;
}

int DefaultFakeConn::OnRecv(ssize_t nread, const rbuf_t &rbuf) {
return IConn::OnRecv(nread, rbuf);
}
Loading

0 comments on commit d621fa8

Please sign in to comment.