From 247c5827d00481174e0883b03600ee0586886301 Mon Sep 17 00:00:00 2001 From: noclear Date: Fri, 8 Sep 2017 14:25:31 +0800 Subject: [PATCH] rm --- source/.DS_Store | Bin 8196 -> 0 bytes source/KissRpc/Endian.d | 53 ---- source/KissRpc/Logs.d | 363 ----------------------- source/KissRpc/RpcBinaryPackage.d | 364 ------------------------ source/KissRpc/RpcClient.d | 221 -------------- source/KissRpc/RpcClientImpl.d | 63 ---- source/KissRpc/RpcClientListener.d | 49 ---- source/KissRpc/RpcClientSocket.d | 82 ------ source/KissRpc/RpcEventInterface.d | 21 -- source/KissRpc/RpcManager.d | 76 ----- source/KissRpc/RpcPackageBase.d | 11 - source/KissRpc/RpcRecvPackageManage.d | 144 ---------- source/KissRpc/RpcRequest.d | 169 ----------- source/KissRpc/RpcResponse.d | 12 - source/KissRpc/RpcSendPackageManage.d | 103 ------- source/KissRpc/RpcServer.d | 163 ----------- source/KissRpc/RpcServerImpl.d | 37 --- source/KissRpc/RpcServerListener.d | 36 --- source/KissRpc/RpcServerSocket.d | 63 ---- source/KissRpc/RpcSocketBaseInterface.d | 33 --- source/KissRpc/Unit.d | 43 --- 21 files changed, 2106 deletions(-) delete mode 100644 source/.DS_Store delete mode 100755 source/KissRpc/Endian.d delete mode 100644 source/KissRpc/Logs.d delete mode 100755 source/KissRpc/RpcBinaryPackage.d delete mode 100755 source/KissRpc/RpcClient.d delete mode 100755 source/KissRpc/RpcClientImpl.d delete mode 100644 source/KissRpc/RpcClientListener.d delete mode 100755 source/KissRpc/RpcClientSocket.d delete mode 100755 source/KissRpc/RpcEventInterface.d delete mode 100644 source/KissRpc/RpcManager.d delete mode 100755 source/KissRpc/RpcPackageBase.d delete mode 100755 source/KissRpc/RpcRecvPackageManage.d delete mode 100755 source/KissRpc/RpcRequest.d delete mode 100755 source/KissRpc/RpcResponse.d delete mode 100755 source/KissRpc/RpcSendPackageManage.d delete mode 100755 source/KissRpc/RpcServer.d delete mode 100755 source/KissRpc/RpcServerImpl.d delete mode 100644 source/KissRpc/RpcServerListener.d delete mode 100755 source/KissRpc/RpcServerSocket.d delete mode 100755 source/KissRpc/RpcSocketBaseInterface.d delete mode 100755 source/KissRpc/Unit.d diff --git a/source/.DS_Store b/source/.DS_Store deleted file mode 100644 index 464fd0c2616d11ae91837bf2d65e806dff446eba..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8196 zcmeHMPj4GV6n~SrWS6wfCXJg!a99yls?>mD6`@q9s&RwB1|!$OrK2A@|&nyCb%`K`WpY&U`W8jbiA7+q+0%Hm&JVs6j z86nVu^*nB8Gj|sSYqys~**Axu-6#x_banMtGBH`Y@Y9do64JE@!C|FdFB~?|&9BpZ99K%lt%+({R#5h&W8qrmcKt!;_{qr^r(b?6)v4{?zH_K5oW+7*79ADgh2_E8u zI|8Th(Eh@{yz7+U5xPzS$*`Stv2St@_wv9F-bUw%(3Q|FGe>$@;0njfb7#hIJLj>b z6Xw|$$FLK0$AY_)uL`ryjp221rBvR3jCfF7H4*CM=bnNMr4DR~H5b;2+mIMJjSANP z-wiM%O5h!3`<(nR*yq1n)?lZbuTNjRba}>_ox3tSKU*%BuP&4qmzLgWSyx=gd(@7_ zp1sRM9@oMy-3dI-n!yvglhVhjsI;uPg54JNKY!M=eH@|9{Ido*)<-d=Y_A;${^r3! zLergGeYa)JBwozJu)&ic4LIAqpWs1MPid#l4mi$ugh$P0B)XD&G)@plI6J#e96PK{ zYsdSc11U2>b;As>@E*&q$_ zDcMr&iNT5X=3okiWDl|6w}q2r5?>WAjM?KuzCS#yzIp)#CKY#4#Qz_j|NZ|3X3$iv zfL7pTSAdk(o9i{a%fZ=QykQY*+n66<7D?Fk6-o*gG8{*g;W*;iABI@B(Nu7%8TJ)o U3#R}4Lx6t&>-WF(yjQ#U7q}8i@c;k- diff --git a/source/KissRpc/Endian.d b/source/KissRpc/Endian.d deleted file mode 100755 index 4202436..0000000 --- a/source/KissRpc/Endian.d +++ /dev/null @@ -1,53 +0,0 @@ -module kissrpc.Endian; - -import core.bitop : bswap; -import std.traits : isIntegral; - -union IntBuf(T) { - ubyte[T.sizeof] bytes; - T value; -} - -T byteSwap(T)(T t) pure nothrow @trusted if (isIntegral!T) { - static if (T.sizeof == 2) { - return cast(T)((t & 0xff) << 8) | cast(T)((t & 0xff00) >> 8); - } else static if (T.sizeof == 4) { - return cast(T)bswap(cast(uint)t); - } else static if (T.sizeof == 8) { - return cast(T)byteSwap(cast(uint)(t & 0xffffffff)) << 32 | - cast(T)bswap(cast(uint)(t >> 32)); - } else static assert(false, "Type of size " ~ to!string(T.sizeof) ~ " not supported."); -} - -T doNothing(T)(T val) { return val; } - -version (BigEndian) { - alias doNothing hostToNet; - alias doNothing netToHost; - alias byteSwap hostToLe; - alias byteSwap leToHost; -} else { - alias byteSwap hostToNet; - alias byteSwap netToHost; - alias doNothing hostToLe; - alias doNothing leToHost; -} - -unittest { - import std.exception; - - IntBuf!short s; - s.bytes = [1, 2]; - s.value = byteSwap(s.value); - enforce(s.bytes == [2, 1]); - - IntBuf!int i; - i.bytes = [1, 2, 3, 4]; - i.value = byteSwap(i.value); - enforce(i.bytes == [4, 3, 2, 1]); - - IntBuf!long l; - l.bytes = [1, 2, 3, 4, 5, 6, 7, 8]; - l.value = byteSwap(l.value); - enforce(l.bytes == [8, 7, 6, 5, 4, 3, 2, 1]); -} diff --git a/source/KissRpc/Logs.d b/source/KissRpc/Logs.d deleted file mode 100644 index 00a122e..0000000 --- a/source/KissRpc/Logs.d +++ /dev/null @@ -1,363 +0,0 @@ -module kissrpc.Logs; - -import kissrpc.Unit; - -import std.string; -import std.format; -import std.stdio; -import std.array : appender; -import std.file; -import std.experimental.logger; - - -immutable string PRINT_COLOR_NONE = "\033[m"; -immutable string PRINT_COLOR_RED = "\033[0;32;31m"; -immutable string PRINT_COLOR_GREEN = "\033[0;32;32m"; -immutable string PRINT_COLOR_YELLOW = "\033[0;33m"; - -FileLogger logFile = null; - -enum LOG_LEVEL -{ - LL_INFO, - LL_WARNING, - LL_DEBUG, - LL_ERROR, - LL_CRITICAL, - LL_FATAL, -} - - -//#define PRINT_COLOR_YELLOW "\033[1;33m" -//#define PRINT_COLOR_BLUE "\033[0;32;34m" -//#define PRINT_COLOR_WHITE "\033[1;37m" -//#define PRINT_COLOR_CYAN "\033[0;36m" -//#define PRINT_COLOR_PURPLE "\033[0;35m" -//#define PRINT_COLOR_BROWN "\033[0;33m" -//#define PRINT_COLOR_DARY_GRAY "\033[1;30m" -//#define PRINT_COLOR_LIGHT_RED "\033[1;31m" -//#define PRINT_COLOR_LIGHT_GREEN "\033[1;32m" -//#define PRINT_COLOR_LIGHT_BLUE "\033[1;34m" -//#define PRINT_COLOR_LIGHT_CYAN "\033[1;36m" -//#define PRINT_COLOR_LIGHT_PURPLE "\033[1;35m" -//#define PRINT_COLOR_LIGHT_GRAY "\033[0;37m" - - -version(Windows) -{ - import core.sys.windows.wincon; - import core.sys.windows.winbase; - import core.sys.windows.windef; - - __gshared HANDLE g_hout = null; - - void win_writeln(string msg , ushort color) - { - if(g_hout is null) - g_hout = GetStdHandle(STD_OUTPUT_HANDLE); - SetConsoleTextAttribute(g_hout , color); - writeln(msg); - SetConsoleTextAttribute(g_hout , FOREGROUND_BLUE|FOREGROUND_GREEN|FOREGROUND_RED); - } - -} - -private string convTostr(string msg , string file , size_t line) -{ - import std.conv; - return msg ~ " - " ~ file ~ ":" ~ to!string(line); -} - -bool setOutputLogPath(string filePath) -{ - logFile = new FileLogger(filePath); - - if(!logFile.file.isOpen) - { - logFile = null; - logError("log file is not open!!, file path:%s", logFile); - - return false; - } - - return true; -} - - -void outputLog(LOG_LEVEL type, string logText) -{ - switch(type) - { - case LOG_LEVEL.LL_ERROR, LOG_LEVEL.LL_FATAL, LOG_LEVEL.LL_CRITICAL: - logFile.fatal(logText); - break; - - case LOG_LEVEL.LL_INFO: logFile.info(logText); break; - - case LOG_LEVEL.LL_WARNING: logFile.warning(logText); break; - - case LOG_LEVEL.LL_DEBUG: logFile.trace(logText); break; - - default:break; - } -} - -void logKiss(string msg , LOG_LEVEL type , string file = __FILE__ , size_t line = __LINE__) -{ - string timePrior = RPC_SYSTEM_TIMESTAMP_STR; - - version(Posix) - { - string prior; - string suffix = PRINT_COLOR_NONE; - - if(type == LOG_LEVEL.LL_ERROR || type == LOG_LEVEL.LL_FATAL || type == LOG_LEVEL.LL_CRITICAL) - { - prior = PRINT_COLOR_RED; - } - else if(type == LOG_LEVEL.LL_INFO) - { - prior = PRINT_COLOR_GREEN; - - }else if(type == LOG_LEVEL.LL_WARNING) - { - prior = PRINT_COLOR_YELLOW; - } - - if(logFile !is null) - { - outputLog(type, msg); - }else - { - string outInfo = format("%s [%s] %s:%s %s", prior, type, timePrior, msg, suffix); - writeln(outInfo); - } - - } - else - { - - - if(type == LOG_LEVEL.LL_ERROR || type == LOG_LEVEL.LL_FATAL || type == LOG_LEVEL.LL_CRITICAL) - { - if(logFile !is null) - { - outputLog(msg, type); - }else - { - string outInfo = format("[%s] %s:%s ", type, timePrior, msg); - - win_writeln(outInfo , FOREGROUND_RED); - } - - } - else if(type == LOG_LEVEL.LL_WARNING || type == LOG_LEVEL.LL_INFO) - { - if(logFile !is null) - { - outputLog(msg, type); - }else - { - string outInfo = format("[%s] %s:%s ", type, timePrior, msg); - win_writeln(outInfo , FOREGROUND_GREEN); - } - - } - else - { - if(logFile !is null) - { - outputLog(msg, type); - }else - { - string outInfo = format("[%s] %s:%s ", type, timePrior, msg); - win_writeln(outInfo , FOREGROUND_GREEN); - } - } - } - -} - - -version(onyxLog) -{ - import onyx.log; - import onyx.bundle; - import core.sys.windows.winbase; - - __gshared Log g_log; - - void log_debug(string msg , string file = __FILE__ , size_t line = __LINE__) - { - if(g_log !is null ) - g_log.debug_(convTostr(msg , file , line)); - logKiss(msg , LOG_LEVEL.LL_DEBUG , file , line); - } - - void logInfo(string msg , string file = __FILE__ , size_t line = __LINE__) - { - if(g_log !is null) - g_log.info(convTostr(msg , file , line)); - logKiss(msg , LOG_LEVEL.LL_INFO, file , line); - } - - void logWarning(string msg , string file = __FILE__ , size_t line = __LINE__) - { - if(g_log !is null) - g_log.warning(convTostr(msg , file , line)); - logKiss(msg , LOG_LEVEL.LL_WARNING , file , line); - } - - void logError(string msg , string file = __FILE__ , size_t line = __LINE__) - { - if(g_log !is null) - g_log.error(convTostr(msg , file , line)); - logKiss(msg , LOG_LEVEL.LL_ERROR , file , line); - } - - void logCritical(string msg ,string file = __FILE__ , size_t line = __LINE__) - { - if(g_log !is null) - g_log.critical(convTostr(msg , file , line)); - logKiss(msg , LOG_LEVEL.LL_CRITICAL , file , line); - } - - void logFatal(string msg ,string file = __FILE__ , size_t line = __LINE__) - { - if(g_log !is null) - g_log.fatal(convTostr(msg , file , line)); - logKiss(msg , LOG_LEVEL.LL_FATAL , file , line); - } - -} -else -{ - void kissLogDebug(string msg , string file = __FILE__ , size_t line = __LINE__) - { - logKiss(msg , LOG_LEVEL.LL_DEBUG , file , line); - } - void kissLogInfo(string msg , string file = __FILE__ , size_t line = __LINE__) - { - logKiss(msg , LOG_LEVEL.LL_INFO , file , line); - } - void kissLogWarning(string msg , string file = __FILE__ , size_t line = __LINE__) - { - logKiss(msg , LOG_LEVEL.LL_WARNING , file , line); - } - void kissLogError(string msg , string file = __FILE__ , size_t line = __LINE__) - { - logKiss(msg , LOG_LEVEL.LL_ERROR , file , line); - } - void kissLogCritical(string msg ,string file = __FILE__ , size_t line = __LINE__) - { - logKiss(msg , LOG_LEVEL.LL_CRITICAL , file , line); - } - void kissLogFatal(string msg ,string file = __FILE__ , size_t line = __LINE__) - { - logKiss(msg , LOG_LEVEL.LL_FATAL , file , line); - } - -} - -void logFormat(T ...)(T args, const LOG_LEVEL level) -{ - auto strings = appender!string(); - formattedWrite(strings, args); - auto info = strings.data; - - switch(level) - { - case LOG_LEVEL.LL_INFO: kissLogInfo(info); break; - case LOG_LEVEL.LL_WARNING: kissLogWarning(info); break; - case LOG_LEVEL.LL_DEBUG: kissLogDebug(info); break; - case LOG_LEVEL.LL_ERROR: kissLogError(info); break; - case LOG_LEVEL.LL_CRITICAL: kissLogCritical(info); break; - case LOG_LEVEL.LL_FATAL: kissLogFatal(info); break; - - default: - kissLogInfo(info); - } -} - -void logFormatDebug(T ...)(T args) -{ - logFormat(args, LOG_LEVEL.LL_DEBUG); -} - -void logFormatInfo(T ...)(T args) -{ - logFormat(args, LOG_LEVEL.LL_INFO); -} - -void logFormatWarning(T ...)(T args) -{ - logFormat(args, LOG_LEVEL.LL_WARNING); -} - -void logFormatError(T ...)(T args) -{ - logFormat(args, LOG_LEVEL.LL_ERROR); -} - -void logFormatCritical(T ...)(T args) -{ - logFormat(args, LOG_LEVEL.LL_CRITICAL); -} - -void logFormatFatal(T ...)(T args) -{ - logFormat(args, LOG_LEVEL.LL_FATAL); -} - -version(RpcDebug) -{ - alias deWritefln = logFormatDebug; - alias deWriteln = logFormatDebug; -}else -{ - void nullLog(T ...)(T args) - { - - } -// alias deWritefln = logFormatDebug; -// alias deWriteln = logFormatDebug; - alias deWritefln = nullLog; - alias deWriteln = nullLog; -} - -version(UltraHigh) -{ - void nullLog(T ...)(T args) - { - - } - - alias logInfo = nullLog; - alias logWarning = nullLog; - alias logError = nullLog; - alias logCritical = nullLog; - alias logFatal = nullLog; -}else -{ - alias logInfo = logFormatInfo; - alias logWarning = logFormatWarning; - alias logError = logFormatError; - alias logCritical = logFormatCritical; - alias logFatal = logFormatFatal; -} - - - -unittest -{ - import kissrpc.Logs; -// -// setOutputLogPath("./info.log"); -// -// - writeln("-------------------------------------------------------"); - logFormatDebug("debug"); - logInfo("info"); - logError("errro"); - -} diff --git a/source/KissRpc/RpcBinaryPackage.d b/source/KissRpc/RpcBinaryPackage.d deleted file mode 100755 index f21677a..0000000 --- a/source/KissRpc/RpcBinaryPackage.d +++ /dev/null @@ -1,364 +0,0 @@ -module kissrpc.RpcBinaryPackage; - -import kissrpc.Endian; -import kissrpc.Unit; -import kissrpc.Logs; - -import snappy.snappy; -import std.stdio; - -enum RPC_PACKAGE_STATUS_CODE -{ - RPSC_OK, - RPSC_FAILED, -} - - -class RpcBinaryPackage -{ - this(RPC_PACKAGE_PROTOCOL tpp, ulong msgId = 0, RPC_PACKAGE_COMPRESS_TYPE compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO, bool isNonblock = true, size_t funcId = 0) - { - magic = RPC_HANDER_MAGIC; - ver = RPC_HANDER_VERSION; - sequenceId = cast(uint)msgId; - this.funcId = funcId; - - statusInfo |= (isNonblock ? RPC_HANDER_NONBLOCK_FLAG : 0); - - st = cast(short)tpp; - - st |= (compressType << 8); - - statusInfo |= (RPC_PACKAGE_STATUS_CODE.RPSC_OK & RPC_HANDER_STATUS_CODE_FLAG); - - handerSize = ver.sizeof + st.sizeof + statusInfo.sizeof + reserved.sizeof + funcId.sizeof + sequenceId.sizeof + bodySize.sizeof; - } - - int getStartHanderLength()const - { - return magic.sizeof + handerSize.sizeof; - } - - int getHanderSize()const - { - return handerSize + this.getStartHanderLength(); - } - - int getPackgeSize()const - { - return handerSize + bodySize + this.getStartHanderLength(); - } - - size_t getFuncId()const - { - return funcId; - } - - void setFuncId(const size_t id) - { - funcId = id; - } - - ubyte[] getPayload() - { - return bodyPayload; - } - - short getVersion()const - { - return ver; - } - - ulong getBodySize()const - { - return bodySize; - } - - short getSerializedType()const - { - return st & RPC_HANDER_SERI_FLAG; - } - - ulong getSequenceId()const - { - return sequenceId; - } - - bool getNonblock()const - { - return cast(bool)statusInfo & RPC_HANDER_NONBLOCK_FLAG; - } - - short getStatusCode()const - { - return statusInfo & RPC_HANDER_STATUS_CODE_FLAG; - } - - void setStatusCode(const RPC_PACKAGE_STATUS_CODE code) - { - statusInfo |= (code & RPC_HANDER_STATUS_CODE_FLAG); - } - - bool getHB()const - { - return statusInfo & RPC_HANDER_HB_FLAG; - } - - void setHBPackage() - { - statusInfo |= RPC_HANDER_HB_FLAG; - } - - bool getOW()const - { - return cast(bool)statusInfo & RPC_HANDER_OW_FLAG; - } - - void setOWPackage() - { - statusInfo |= RPC_HANDER_OW_FLAG; - } - - bool getRP()const - { - return cast(bool)statusInfo & RPC_HANDER_RP_FLAG; - } - - void setRP() - { - statusInfo |= RPC_HANDER_RP_FLAG; - } - - - ubyte[] toStream(ubyte[] payload) - { - switch(this.getCompressType()) - { - case RPC_PACKAGE_COMPRESS_TYPE.RPCT_COMPRESS: - payload =cast(ubyte[]) Snappy.compress(cast(byte[])payload); - st |= RPC_HANDER_COMPRESS_FLAG; - break; - - case RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC: - if(payload.length >= RPC_PACKAGE_COMPRESS_DYNAMIC_VALUE) - { - payload = cast(ubyte[]) Snappy.compress(cast(byte[])payload); - st |= RPC_HANDER_COMPRESS_FLAG; - }else - { - st &= ~RPC_HANDER_COMPRESS_FLAG; - } - break; - - default:break; - } - - bodySize = cast(ushort)payload.length; - - auto stream = new ubyte[this.getPackgeSize()]; - - ulong pos = 0; - - - pos = writeBytesPos(stream, magic, pos); - pos = writeBytePos(stream, handerSize, pos); - pos = writeBytePos(stream, ver, pos); - - pos = writeBinaryPos(stream, st, pos); - - pos = writeBytePos(stream, statusInfo, pos); - pos = writeBytesPos(stream, reserved, pos); - - pos = writeBinaryPos(stream, funcId, pos); - pos = writeBinaryPos(stream, sequenceId, pos); - pos = writeBinaryPos(stream, bodySize, pos); - - pos = writeBytesPos(stream, payload, pos); - - return stream; - } - - bool fromStream(ubyte[] data) - { - ulong pos = 0; - - try{ - pos = readBytesPos(data, magic, pos); - pos = readBytePos(data, handerSize, pos); - pos = readBytePos(data, ver, pos); - - pos = readBinaryPos(data, st, pos); - - pos = readBytePos(data, statusInfo, pos); - pos = readBytesPos(data, reserved, pos); - - pos = readBinaryPos(data, funcId, pos); - pos = readBinaryPos(data, sequenceId, pos); - pos = readBinaryPos(data, bodySize, pos); - - bodyPayload = data[pos .. $]; - - if(this.isCompress) - { - bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload); - bodySize = cast(ushort)bodyPayload.length; - } - - }catch(Exception e) - { - logWarning("decode binary stream is error:%s", e.msg); - return false; - } - - return true; - } - - bool fromStreamForHander(ubyte[] data) - { - ulong pos = 0; - - try{ - - pos = readBytesPos(data, magic, pos); - pos = readBytePos(data, handerSize, pos); - pos = readBytePos(data, ver, pos); - - pos = readBinaryPos(data, st, pos); - - pos = readBytePos(data, statusInfo, pos); - pos = readBytesPos(data, reserved, pos); - - pos = readBinaryPos(data, funcId, pos); - pos = readBinaryPos(data, sequenceId, pos); - pos = readBinaryPos(data, bodySize, pos); - - - }catch(Exception e) - { - logWarning("decode binary stream for hander is error:%s", e.msg); - return false; - } - - return this.checkHanderValid(); - } - - bool fromStreamForPayload(ubyte[] data) - { - try{ - - bodyPayload = data[0 .. $]; - - if(this.isCompress) - { - bodyPayload =cast(ubyte[]) Snappy.uncompress(cast(byte[])bodyPayload); - bodySize = cast(ushort)bodyPayload.length; - } - - }catch(Exception e) - { - logWarning("decode body stream is error:%s", e.msg); - return false; - } - - return true; - } - - - bool checkHanderValid() - { - return magic == RPC_HANDER_MAGIC && ver == RPC_HANDER_VERSION && this.getPackgeSize <= RPC_PACKAGE_MAX; - } - - RPC_PACKAGE_COMPRESS_TYPE getCompressType() - { - return cast(RPC_PACKAGE_COMPRESS_TYPE)((st & RPC_HANDER_CPNPRESS_TYPE_FLAG)>>8); - } - - bool isCompress() - { - return cast(bool)(st & RPC_HANDER_COMPRESS_FLAG); - } - -protected: - - ulong writeBinaryPos(T)(ubyte[] data, T t, ulong pos) - { - T bits= hostToNet(t); - data[pos .. pos + t.sizeof ] = (cast(ubyte*)&bits)[0 .. t.sizeof]; - return pos + t.sizeof; - } - - ulong writeBytesPos(ubyte[] data, ubyte[] bytes, ulong pos) - { - data[pos .. pos + bytes.length] = bytes[0 .. bytes.length]; - return pos + bytes.length; - } - - ulong writeBytePos(ubyte[] data, ubyte abyte, ulong pos) - { - data[pos .. pos + abyte.sizeof] = abyte; - return pos + abyte.sizeof; - } - - ulong readBinaryPos(T)(ubyte[] data, ref T t, ulong pos) - { - IntBuf!(T) bits; - bits.bytes = data[pos .. pos + t.sizeof]; - t = netToHost(bits.value); - - return pos + t.sizeof; - } - - ulong readBytesPos(ubyte[] data, ubyte[] bytes, ulong pos) - { - bytes[0 .. $] = data[pos .. pos + bytes.length]; - - return pos + bytes.length; - } - - ulong readBytePos(ubyte[] data, ref ubyte abyte, ulong pos) - { - abyte = (data[pos .. pos + abyte.sizeof])[0]; - return pos + abyte.sizeof; - } - - -private: - ubyte[2] magic; - - ubyte handerSize; - ubyte ver; - short st; - - ubyte statusInfo; // [nb:1bit, ow:1bit, rp:1bit, nonblock:1bit, statusCode:4bit] - - ubyte[2] reserved; - uint sequenceId; - size_t funcId; - ushort bodySize; - - ubyte[] bodyPayload; -} - - -unittest{ - import std.stdio; - - auto send_pkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, 0, RPC_PACKAGE_COMPRESS_TYPE.RPCT_DYNAMIC); - auto send_data = "aaaaaaaabbbbbbbbbbbbbbcccccccccccccccdddddddddddddddddddd"; - - writeln("-----------------------------------------------------"); - - auto snd_stream = send_pkg.toStream(cast(ubyte[])send_data); - - writefln("send stream, length:%s, compress:%s, data:%s", snd_stream.length, send_pkg.getCompressType, snd_stream); - - auto recvPkg = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF); - - recvPkg.fromStream(snd_stream); - - writeln("----------------------------------------------------"); - - writefln("recv stream, length:%s, compress:%s, data:%s", recvPkg.getPackgeSize(), recvPkg.getCompressType, recvPkg.getPayload()); - -} diff --git a/source/KissRpc/RpcClient.d b/source/KissRpc/RpcClient.d deleted file mode 100755 index 16242c4..0000000 --- a/source/KissRpc/RpcClient.d +++ /dev/null @@ -1,221 +0,0 @@ -module kissrpc.RpcClient; - -import kissrpc.RpcRequest; -import kissrpc.Unit; -import kissrpc.RpcResponse; -import kissrpc.RpcBinaryPackage; -import kissrpc.RpcClientSocket; -import kissrpc.RpcEventInterface; -import kissrpc.RpcPackageBase; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.RpcEventInterface; -import kissrpc.RpcSendPackageManage; -import kissrpc.Logs; - -import kiss.aio.AsynchronousChannelSelector; -import kiss.aio.ByteBuffer; - -import std.parallelism; -import std.stdio; -import std.experimental.logger.core; - - -alias ReponsCallback = void delegate(RpcResponse); - - -class RpcClient:RpcEventInterface{ - - this(ClientSocketEventInterface socketEvent) - { - clientSocketEvent = socketEvent; - sendPackManage = new RpcSendPackageManage(this); - defaultPoolThreads = RPC_CLIENT_DEFAULT_THREAD_POOL; - compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO; - } - - void bind(string className, string funcName) - { - string key = className ~ "." ~ funcName; - deWritefln("rpc client bind:%s", key); - } - - void bindCallback(size_t funcId, ReponsCallback callback) - { - rpcCallbackMap[funcId] = callback; - } - - - bool requestRemoteCall(RpcRequest req, RPC_PACKAGE_PROTOCOL protocol) - { - packMessageCount++; - req.setSequence(packMessageCount); - req.setSocket(clientSocket); - - if(req.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO) - { - req.setCompressType(this.compressType); - } - - deWritefln("rpc client request remote call:%s, id:%s", req.getCallFuncName(), req.getCallFuncId); - return sendPackManage.add(req); - } - - void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack) - { - - if(sendPackManage.remove(pack.getSequenceId)) - { - deWritefln("client recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", - pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType); - - RpcPackageBase packageBase; - - switch(pack.getSerializedType) - { - case RPC_PACKAGE_PROTOCOL.TPP_JSON:break; - case RPC_PACKAGE_PROTOCOL.TPP_XML: break; - case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break; - case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF: break; - case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break; - - default: - logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType()); - } - - auto rpcResp = new RpcRequest(socket); - - rpcResp.setSequence(pack.getSequenceId()); - rpcResp.setNonblock(pack.getNonblock()); - rpcResp.setCompressType(pack.getCompressType); - rpcResp.bindFunc(pack.getFuncId()); - rpcResp.push(pack.getPayload()); - - if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK) - { - logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, status code:%s", - pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getStatusCode); - - rpcResp.setStatus(RESPONSE_STATUS.RS_FAILD); - - }else - { - rpcResp.setStatus(RESPONSE_STATUS.RS_OK); - deWritefln("rpc server response call, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()); - } - - if(pack.getNonblock) - { - deWritefln("async call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()); - }else - { - deWritefln("sync call from rpc server response, func:%s, arg num:%s", rpcResp.getCallFuncName(), rpcResp.getArgsNum()); - } - - auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null); - - if(callback !is null) - { - callback(rpcResp); - }else - { - logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName); - } - - }else - { - logWarning("Accept error, client recv response failed, package is timeout!!!, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s", - pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize); - } - } - - void rpcSendPackageEvent(RpcResponse rpcResp) - { - switch(rpcResp.getStatus) - { - case RESPONSE_STATUS.RS_TIMEOUT: - logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); - break; - - case RESPONSE_STATUS.RS_FAILD: - logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); - break; - - default: - logWarning("rpc send package event is fatal!!, event type error!"); - } - - auto callback = rpcCallbackMap.get(rpcResp.getCallFuncId, null); - - if(callback !is null) - { - callback(rpcResp); - }else - { - logError("server rpc call function is not bind, function name:%s", rpcResp.getCallFuncName); - } - } - - - void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr) - { - // logInfo("client socket status info:%s", statusStr); - switch(status) - { - case SOCKET_STATUS.SE_CONNECTD: - auto t = task(&clientSocketEvent.connectd, socket); - taskPool.put(t); - break; - - case SOCKET_STATUS.SE_DISCONNECTD: - auto t = task(&clientSocketEvent.disconnectd, socket); - taskPool.put(t); - break; - - case SOCKET_STATUS.SE_READ_FAILED : - auto t = task(&clientSocketEvent.readFailed, socket); - taskPool.put(t); - break; - - case SOCKET_STATUS.SE_WRITE_FAILED: - auto t = task(&clientSocketEvent.writeFailed, socket); - taskPool.put(t); - break; - - default: - logError("client socket status is fatal!!", statusStr); - return; - } - - } - - void connect(string ip, ushort port, AsynchronousChannelSelector sel) - { - clientSocket = new RpcClientSocket(ip, port, sel, this); - } - - void reConnect() - { - clientSocket.reConnect(); - } - - ulong getWaitResponseNum() - { - return sendPackManage.getWaitResponseNum; - } - - void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type) - { - compressType = type; - } - -private: - RpcSendPackageManage sendPackManage; - RPC_PACKAGE_COMPRESS_TYPE compressType; - - ReponsCallback[size_t] rpcCallbackMap; - - ulong packMessageCount; - - RpcClientSocket clientSocket; - ClientSocketEventInterface clientSocketEvent; -} diff --git a/source/KissRpc/RpcClientImpl.d b/source/KissRpc/RpcClientImpl.d deleted file mode 100755 index 8be5ef2..0000000 --- a/source/KissRpc/RpcClientImpl.d +++ /dev/null @@ -1,63 +0,0 @@ -module kissrpc.RpcClientImpl; - -import kissrpc.RpcClient; -import kissrpc.RpcRequest; -import kissrpc.RpcResponse; -import kissrpc.Unit; -import kissrpc.Logs; - -import std.stdio; - -class RpcClientImpl(T) -{ - this(RpcClient refClient) - { - client = refClient; - foreach(i, func; __traits(derivedMembers, T)) - { - foreach(t ;__traits(getVirtualMethods, T, func)) - { - deWritefln("rpc client impl class:%s, member func:%s",typeid(T).toString(), typeid(typeof(t))); - client.bind(typeid(T).toString(), func); - } - } - } - - void asyncCall(RpcRequest req, ReponsCallback callback, RPC_PACKAGE_PROTOCOL protocol = RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF, const size_t funcId = 0) - { - deWritefln("rpc client imlp async call:%s, %s", funcId, RpcBindFunctionMap[funcId]); - - req.bindFunc(funcId); - client.requestRemoteCall(req, protocol); - client.bindCallback(funcId, callback); - } - - - RpcResponse syncCall(RpcRequest req, RPC_PACKAGE_PROTOCOL protocol = RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF, const size_t funcId = 0) - { - deWritefln("rpc client imlp sync call:%s, %s", funcId, RpcBindFunctionMap[funcId]); - - req.bindFunc(funcId); - req.setNonblock(false); - - RpcResponse retResp; - - void callback(RpcResponse resp) - { - retResp = resp; - req.semaphoreRelease(); - } - - client.bindCallback(funcId, &callback); - client.requestRemoteCall(req, protocol); - - req.semaphoreWait(); - - return retResp; - } - - -private: - RpcClient client; -} - diff --git a/source/KissRpc/RpcClientListener.d b/source/KissRpc/RpcClientListener.d deleted file mode 100644 index c21952e..0000000 --- a/source/KissRpc/RpcClientListener.d +++ /dev/null @@ -1,49 +0,0 @@ - - -module kissrpc.RpcClientListener; - -import kissrpc.RpcSocketBaseInterface; -import kissrpc.Logs; -import kiss.aio.AsynchronousChannelSelector; -import kissrpc.RpcClient; - -import std.stdio; - - -class RpcClientListener : ClientSocketEventInterface -{ - - this(string ip, ushort port, AsynchronousChannelSelector sel) - { - _rpClient = new RpcClient(this); - _rpClient.connect(ip, port, sel); - } - - void connectd(RpcSocketBaseInterface socket) - { - - } - - void reConnect() - { - _rpClient.reConnect(); - } - - void disconnectd(RpcSocketBaseInterface socket) - { - writefln("client disconnect ...."); - } - - void writeFailed(RpcSocketBaseInterface socket) - { - deWritefln("client write failed , %s:%s", socket.getIp, socket.getPort); - } - - void readFailed(RpcSocketBaseInterface socket) - { - deWritefln("client read failed , %s:%s", socket.getIp, socket.getPort); - } - -public: - RpcClient _rpClient; -} diff --git a/source/KissRpc/RpcClientSocket.d b/source/KissRpc/RpcClientSocket.d deleted file mode 100755 index bd1f228..0000000 --- a/source/KissRpc/RpcClientSocket.d +++ /dev/null @@ -1,82 +0,0 @@ -module kissrpc.RpcClientSocket; - -import kissrpc.RpcRecvPackageManage; -import kissrpc.RpcEventInterface; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.Unit; -import kissrpc.Logs; - -// import kiss.aio.AsyncTcpBase; -// import kiss.event.Poll; -// import kiss.time.Timer; -// import kiss.aio.AsyncTcpClient; - -import std.stdio; -import std.socket; -import std.conv; - -import core.thread; - - -import kiss.aio.AsynchronousChannelSelector; -import kiss.aio.ByteBuffer; -import kiss.net.TcpClient; - -class RpcClientSocket: TcpClient, RpcSocketBaseInterface -{ -public: - this(string ip, ushort port, AsynchronousChannelSelector sel, RpcEventInterface rpcEventDelegate) { - super(ip, port, sel, RPC_PACKAGE_MAX); - _packageManage = new RpcRecvPackageManage(this, rpcEventDelegate); - _socketEventDelegate = rpcEventDelegate; - } - bool write(byte[] buf) { - // writeln("write index ",index++); - super.doWrite(buf); - return true; - } - override void onConnectCompleted(void* attachment) { - doRead(); - _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_CONNECTD, "connect to server is ok!"); - } - override void onConnectFailed(void* attachment) { - //writeln("onConnectFailed"); - } - override void onWriteCompleted(void* attachment, size_t count , ByteBuffer buffer) { - // writeln("write success index ",index); - } - override void onWriteFailed(void* attachment) { - //writeln("onWriteFailed"); - _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_WRITE_FAILED, "write data to server is failed"); - } - override void onReadCompleted(void* attachment, size_t count , ByteBuffer buffer) { - - _packageManage.add(cast(ubyte[])(buffer.getCurBuffer())); - _readBuffer.clear(); - } - override void onReadFailed(void* attachment) { - //writeln("onReadFailed"); - } - override void onClose() { - _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_DISCONNECTD, "disconnect from server!"); - } - override void reConnect() { - //writeln("RpcClientSocket reConnect"); - super.reConnect(); - } - - void disconnect() - { - close(); - } - int getFd() { return cast(int)(fd()); } - string getIp() { return ip(); } - string getPort() { return port(); } - - -private: - RpcEventInterface _socketEventDelegate; - RpcRecvPackageManage _packageManage; - long index = 1; -} - diff --git a/source/KissRpc/RpcEventInterface.d b/source/KissRpc/RpcEventInterface.d deleted file mode 100755 index 8094b4d..0000000 --- a/source/KissRpc/RpcEventInterface.d +++ /dev/null @@ -1,21 +0,0 @@ -module kissrpc.RpcEventInterface; - -import kissrpc.RpcBinaryPackage; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.RpcResponse; - -enum SOCKET_STATUS{ - SE_CONNECTD, - SE_DISCONNECTD, - SE_WRITE_FAILED, - SE_READ_FAILED, - SE_LISTEN_FAILED, -} - -interface RpcEventInterface -{ - void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage recvPackage); - void rpcSendPackageEvent(RpcResponse resp); - - void socketEvent(RpcSocketBaseInterface socket,const SOCKET_STATUS status, const string statusStr); -} diff --git a/source/KissRpc/RpcManager.d b/source/KissRpc/RpcManager.d deleted file mode 100644 index 45cfd54..0000000 --- a/source/KissRpc/RpcManager.d +++ /dev/null @@ -1,76 +0,0 @@ - - - -module kissrpc.RpcManager; - -import kissrpc.RpcServer; -import kiss.aio.AsynchronousChannelThreadGroup; -import kissrpc.Logs; -import kissrpc.RpcServerListener; - -import std.experimental.logger.core; - - -class RpcManager { - -public: - this() { - - } - - static @property getInstance() { - if (_instance is null) { - _instance = new RpcManager(); - } - return _instance; - } - - //T是RpcServerListener类或者子类 A是server端RPC类 - void startService(T, A...)(string ip, ushort port, int threadNum) { - if(isServerStart) - { - warningf("rpc service has already start !!!"); - return; - } - isServerStart = true; - _serverGroup = AsynchronousChannelThreadGroup.open(5,threadNum); - for(int i = 0; i < threadNum; i++) { - RpcServer service = new RpcServer(ip, port, _serverGroup,new T); - foreach(t;A) { - auto rpcClass = new t(service); - } - } - _serverGroup.start(); - } - - void stopService() { - isServerStart = false; - _serverGroup.stop(); - } - //T是RpcClientListener类 或者子类 - void connectService(T)(string ip, ushort port, int threadNum) { - if(isClientStart) - { - warningf("rpc service has already start !!!"); - return; - } - isClientStart = true; - _ClientGroup = AsynchronousChannelThreadGroup.open(5,threadNum); - for(int i = 0; i < threadNum; i++) { - T client = new T(ip, port, _ClientGroup.getWorkSelector()); - } - _ClientGroup.start(); - } - void stopClient() { - isClientStart = false; - _ClientGroup.stop(); - } - -private : - __gshared static RpcManager _instance; - AsynchronousChannelThreadGroup _serverGroup; - AsynchronousChannelThreadGroup _ClientGroup; - - bool isServerStart; - bool isClientStart; -} \ No newline at end of file diff --git a/source/KissRpc/RpcPackageBase.d b/source/KissRpc/RpcPackageBase.d deleted file mode 100755 index bd5d362..0000000 --- a/source/KissRpc/RpcPackageBase.d +++ /dev/null @@ -1,11 +0,0 @@ -module kissrpc.RpcPackageBase; - -import kissrpc.RpcRequest; -import kissrpc.RpcResponse; - -interface RpcPackageBase{ - RpcRequest getRequestData(); - RpcResponse getResponseData(); - - ubyte[] toBinaryStream(); -} diff --git a/source/KissRpc/RpcRecvPackageManage.d b/source/KissRpc/RpcRecvPackageManage.d deleted file mode 100755 index 408950e..0000000 --- a/source/KissRpc/RpcRecvPackageManage.d +++ /dev/null @@ -1,144 +0,0 @@ -module kissrpc.RpcRecvPackageManage; - -import kissrpc.RpcBinaryPackage; -import kissrpc.RpcServerSocket; -import kissrpc.RpcEventInterface; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.Unit; -import kissrpc.Logs; - -import std.parallelism; -import std.stdio; -import core.thread; - -class CapnprotoRecvPackage -{ - this() - { - binaryPackage = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF); - hander = new ubyte[binaryPackage.getHanderSize]; - recvRemainBytes = hander.length; - } - - - ubyte[] parse(ubyte[] bytes, ref bool isOk) - { - ulong cpySize = bytes.length > recvRemainBytes? recvRemainBytes : bytes.length; - ulong bytesPos = 0; - - if(parseState == 0) - { - hander[handerPos .. handerPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize]; - - handerPos += cpySize; - bytesPos += cpySize; - - recvRemainBytes -= cpySize; - - if(recvRemainBytes == 0) - { - if(binaryPackage.fromStreamForHander(hander)) - { - payload = new ubyte[binaryPackage.getBodySize()]; - recvRemainBytes = payload.length; - parseState = 1; - - return this.parse(bytes[bytesPos .. bytesPos + (bytes.length - cpySize)], isOk); - } - } - } - - if(parseState == 1 && recvRemainBytes > 0) - { - payload[payloadPos .. payloadPos + cpySize] = bytes[bytesPos .. bytesPos + cpySize]; - - payloadPos += cpySize; - bytesPos += cpySize; - recvRemainBytes -= cpySize; - - if(recvRemainBytes == 0) - { - isOk = binaryPackage.fromStreamForPayload(payload); - } - } - - return bytes[bytesPos .. bytesPos + (bytes.length-cpySize)]; - } - - RpcBinaryPackage getPackage() - { - return binaryPackage; - } - - bool checkHanderValid() - { - return binaryPackage.checkHanderValid; - } - - bool checkPackageValid() - { - return binaryPackage.checkHanderValid && payloadPos == payload.length; - } - -private: - ubyte[] hander; - ubyte[] payload; - int parseState; - - ulong handerPos, payloadPos; - - ulong recvRemainBytes; - - RpcBinaryPackage binaryPackage; -} - -class RpcRecvPackageManage -{ - this(RpcSocketBaseInterface baseSocket, RpcEventInterface rpcDelegate) - { - rpcEventDelegate = rpcDelegate; - socket = baseSocket; - } - - - void add(ubyte[] bytes) - { - do{ - auto pack = recvPackage.get(id, new CapnprotoRecvPackage); - - bool parseOk = false; - - recvPackage[id] = pack; - - bytes = pack.parse(bytes, parseOk); - - if(parseOk) - { - auto capnprotoPack = pack.getPackage(); - - if(pack.checkHanderValid()) - { - if(pack.checkPackageValid) - { - rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack); - recvPackage.remove(id); - id++; - }else{ - logError("parse package check hander is error, package data:%s", bytes); - } - }else - { - capnprotoPack.setStatusCode(RPC_PACKAGE_STATUS_CODE.RPSC_FAILED); - recvPackage.remove(id); - rpcEventDelegate.rpcRecvPackageEvent(socket, capnprotoPack); - } - } - }while(bytes.length > 0); - } - -private: - ulong id; - CapnprotoRecvPackage[ulong] recvPackage; - RpcEventInterface rpcEventDelegate; - RpcSocketBaseInterface socket; -} diff --git a/source/KissRpc/RpcRequest.d b/source/KissRpc/RpcRequest.d deleted file mode 100755 index e5d7cc7..0000000 --- a/source/KissRpc/RpcRequest.d +++ /dev/null @@ -1,169 +0,0 @@ -module kissrpc.RpcRequest; - -import kissrpc.Logs; -import kissrpc.Unit; -import kissrpc.Endian; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.RpcResponse; - - -import std.stdio; -import std.traits; -import core.sync.semaphore; - - -alias REQUEST_STATUS = RESPONSE_STATUS; - -class RpcRequest -{ - this(const RPC_PACKAGE_COMPRESS_TYPE type = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO, const int secondsTimeOut = RPC_REQUEST_TIMEOUT_SECONDS) - { - timeOut = secondsTimeOut; - timestamp = RPC_SYSTEM_TIMESTAMP; - semaphore = new Semaphore; - nonblock = true; - compressType = type; - } - - this(RpcRequest req) - { - this.baseSocket = req.baseSocket; - this.funcId = req.funcId; - this.sequeNum = req.sequeNum; - this.nonblock = req.nonblock; - this.compressType = req.compressType; - } - - this(RpcSocketBaseInterface socket) - { - baseSocket = socket; - } - - - void push(ubyte[] arg) - { - funcArg = arg; - } - - - - bool pop(ref ubyte[] arg) - { - arg = funcArg; - - return true; - } - - void bindFunc(const size_t id) - { - funcId = id; - } - - ulong getArgsNum()const - { - return 1; - } - - string getCallFuncName()const - { - return RpcBindFunctionMap[funcId]; - } - - size_t getCallFuncId()const - { - return funcId; - } - - ubyte[] getFunArgList() - { - return funcArg; - } - - void setSocket(RpcSocketBaseInterface socket) - { - baseSocket = socket; - } - - auto getSocket() - { - return baseSocket; - } - - auto getTimestamp()const - { - return timestamp; - } - - void setSequence(ulong seque) - { - sequeNum = seque; - } - - auto getSequence()const - { - return sequeNum; - } - - auto getTimeout()const - { - return timeOut; - } - - void setStatus(RESPONSE_STATUS status) - { - response_status = status; - } - - auto getStatus()const - { - return response_status; - } - - auto getNonblock()const - { - return nonblock; - } - - void setNonblock(bool isNonblock) - { - nonblock = isNonblock; - } - - void semaphoreWait() - { - nonblock = false; - semaphore.wait(); - } - - void semaphoreRelease() - { - semaphore.notify(); - } - - RPC_PACKAGE_COMPRESS_TYPE getCompressType() - { - return compressType; - } - - void setCompressType(RPC_PACKAGE_COMPRESS_TYPE type) - { - compressType = type; - } - -private: - - RPC_PACKAGE_COMPRESS_TYPE compressType; - - RESPONSE_STATUS response_status; - - ubyte[] funcArg; - size_t funcId; - RpcSocketBaseInterface baseSocket; - Semaphore semaphore; - - bool nonblock; - - ulong timestamp; - ulong timeOut; - ulong sequeNum; -} diff --git a/source/KissRpc/RpcResponse.d b/source/KissRpc/RpcResponse.d deleted file mode 100755 index 2634fec..0000000 --- a/source/KissRpc/RpcResponse.d +++ /dev/null @@ -1,12 +0,0 @@ -module kissrpc.RpcResponse; - -import kissrpc.RpcRequest; - -enum RESPONSE_STATUS -{ - RS_OK, - RS_TIMEOUT, - RS_FAILD, -} - -alias RpcRequest RpcResponse; diff --git a/source/KissRpc/RpcSendPackageManage.d b/source/KissRpc/RpcSendPackageManage.d deleted file mode 100755 index b81360d..0000000 --- a/source/KissRpc/RpcSendPackageManage.d +++ /dev/null @@ -1,103 +0,0 @@ -module kissrpc.RpcSendPackageManage; - -import kissrpc.RpcBinaryPackage; -import kissrpc.RpcPackageBase; -import kissrpc.RpcResponse; -import kissrpc.RpcRequest; -import kissrpc.RpcEventInterface; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.Unit; -import kissrpc.Logs; - -import std.datetime; -import core.thread; -import core.memory:GC; - -import std.stdio; - - class RpcSendPackageManage:Thread -{ - this(RpcEventInterface rpc_event) - { - RPC_SYSTEM_TIMESTAMP = Clock.currStdTime().stdTimeToUnixTime!(long)(); - - clientEventInterface = rpc_event; - - super(&this.threadRun); - super.start(); - - } - - - bool add(RpcRequest req, bool checkble = true) - { - synchronized(this) - { - auto streamBinaryPackge = new RpcBinaryPackage(RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF, req.getSequence, req.getCompressType, req.getNonblock, req.getCallFuncId); - - auto sendStream = streamBinaryPackge.toStream(req.getFunArgList()); - - bool isOk = req.getSocket.write(cast(byte[]) sendStream); - - if(isOk) - { - if(checkble) - { - sendPack[req.getSequence()] = req; - } - - deWritefln("send binary stream, sequece:%s, funcId:%s, funcName:%s, length:%s",req.getSequence, req.getCallFuncId, req.getCallFuncName, sendStream.length); - - }else - { - req.setStatus(RESPONSE_STATUS.RS_FAILD); - clientEventInterface.rpcSendPackageEvent(req); - } - - return isOk; - } - } - - bool remove(const ulong index) - { - synchronized(this) - { - return sendPack.remove(index); - } - } - - - ulong getWaitResponseNum() - { - return sendPack.length; - } - -protected: - - void threadRun() - { - while(this.isRunning()) - { - synchronized(this) - { - RPC_SYSTEM_TIMESTAMP = Clock.currStdTime().stdTimeToUnixTime!(long)(); - RPC_SYSTEM_TIMESTAMP_STR = SysTime.fromUnixTime(RPC_SYSTEM_TIMESTAMP).toISOExtString(); - - foreach(k, ref req; sendPack) - { - if(req.getTimestamp() + req.getTimeout() < RPC_SYSTEM_TIMESTAMP) - { - req.setStatus(RESPONSE_STATUS.RS_TIMEOUT); - clientEventInterface.rpcSendPackageEvent(req); - this.remove(k); - } - } - } - this.sleep(dur!("msecs")(100)); - } - } - -private: - RpcRequest[ulong] sendPack; - RpcEventInterface clientEventInterface; -} diff --git a/source/KissRpc/RpcServer.d b/source/KissRpc/RpcServer.d deleted file mode 100755 index 043397b..0000000 --- a/source/KissRpc/RpcServer.d +++ /dev/null @@ -1,163 +0,0 @@ -module kissrpc.RpcServer; - -import kissrpc.RpcRequest; -import kissrpc.Unit; -import kissrpc.RpcResponse; -import kissrpc.RpcBinaryPackage; -import kissrpc.RpcServerSocket; -import kissrpc.RpcEventInterface; -import kissrpc.RpcPackageBase; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.RpcSendPackageManage; -import kissrpc.Logs; - -// import kiss.event.GroupPoll; -// import kiss.aio.AsyncGroupTcpServer; - -import kiss.aio.AsynchronousChannelThreadGroup; -import kiss.aio.AsynchronousSocketChannel; -import kiss.net.TcpAcceptor; - - -import std.stdio; - -alias RequestCallback = void delegate(RpcRequest); - -class RpcServer:TcpAcceptor, RpcEventInterface{ - - this(string ip, ushort port, AsynchronousChannelThreadGroup group, ServerSocketEventInterface socketEvent) - { - serverSocketEvent = socketEvent; - sendPackManage = new RpcSendPackageManage(this); - compressType = RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO; - - super(ip, port, group.getWorkSelector()); - } - - override void onAcceptCompleted(void* attachment, AsynchronousSocketChannel result) { - RpcServerSocket server = new RpcServerSocket(result, this); - } - override void onAcceptFailed(void* attachment) { - deWritefln("rpc acceptFailed"); - } - - void bind(string className, string funcName) - { - string key = className ~ "." ~ funcName; - deWritefln("rpc server bind:%s", key); - } - - void bindCallback(const size_t funcId, RequestCallback callback) - { - rpcCallbackMap[funcId] = callback; - deWritefln("rpc server bind callback:%s, %s, addr:%s",funcId, RpcBindFunctionMap[funcId], callback); - } - - bool RpcResponseRemoteCall(RpcResponse resp, RPC_PACKAGE_PROTOCOL protocol) - { - if(resp.getCompressType == RPC_PACKAGE_COMPRESS_TYPE.RPCT_NO) - { - resp.setCompressType(this.compressType); - } - - deWritefln("rpc response remote call:%s, id:%s", resp.getCallFuncName, resp.getCallFuncId); - return sendPackManage.add(resp, false); - } - - - void rpcRecvPackageEvent(RpcSocketBaseInterface socket, RpcBinaryPackage pack) - { - deWritefln("server recv package event, hander len:%s, package size:%s, ver:%s, func id:%s, sequence id:%s, body size:%s, compress:%s", - pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getFuncId, pack.getSequenceId, pack.getBodySize, pack.getCompressType); - - if(pack.getStatusCode != RPC_PACKAGE_STATUS_CODE.RPSC_OK) - { - logWarning("server recv binary package is failed, hander len:%s, package size:%s, ver:%s, sequence id:%s, body size:%s, compress:%s, status code:%s", - pack.getHanderSize, pack.getPackgeSize, pack.getVersion, pack.getSequenceId, pack.getBodySize, pack.getCompressType, pack.getStatusCode); - - }else - { - RpcPackageBase packageBase; - - switch(pack.getSerializedType) - { - case RPC_PACKAGE_PROTOCOL.TPP_JSON:break; - case RPC_PACKAGE_PROTOCOL.TPP_XML: break; - case RPC_PACKAGE_PROTOCOL.TPP_PROTO_BUF: break; - case RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF: break; - case RPC_PACKAGE_PROTOCOL.TPP_CAPNP_BUF: break; - - default: - logWarning("unpack serialized type is failed!, type:%d", pack.getSerializedType()); - } - - auto rpcReq = new RpcRequest(socket); - - rpcReq.setSequence(pack.getSequenceId()); - rpcReq.setNonblock(pack.getNonblock()); - rpcReq.setCompressType(pack.getCompressType()); - rpcReq.bindFunc(pack.getFuncId()); - rpcReq.push(pack.getPayload()); - - deWritefln("rpc client request call, func:%s, arg num:%s", rpcReq.getCallFuncName(), rpcReq.getArgsNum()); - - auto callback = rpcCallbackMap.get(rpcReq.getCallFuncId, null); - - if(callback !is null) - { - callback(rpcReq); - }else - { - logError("client rpc call function is not bind, function name:%s", rpcReq.getCallFuncName); - } - - } - } - - void rpcSendPackageEvent(RpcResponse rpcResp) - { - switch(rpcResp.getStatus) - { - case RESPONSE_STATUS.RS_TIMEOUT: - logWarning("response timeout, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); - break; - - case RESPONSE_STATUS.RS_FAILD: - logWarning("request failed, func:%s, start time:%s, time:%s, sequence:%s", rpcResp.getCallFuncName, rpcResp.getTimestamp, rpcResp.getTimeout, rpcResp.getSequence); - break; - - default: - logWarning("rpc send package event is fatal!!, event type error!"); - } - } - - void socketEvent(RpcSocketBaseInterface socket, const SOCKET_STATUS status,const string statusStr) - { - logInfo("server socket status:%s", statusStr); - - switch(status) - { - case SOCKET_STATUS.SE_LISTEN_FAILED: serverSocketEvent.listenFailed(statusStr); break; - case SOCKET_STATUS.SE_CONNECTD: serverSocketEvent.inconming(socket); break; - case SOCKET_STATUS.SE_DISCONNECTD:serverSocketEvent.disconnectd(socket); break; - case SOCKET_STATUS.SE_READ_FAILED: serverSocketEvent.readFailed(socket); break; - case SOCKET_STATUS.SE_WRITE_FAILED: serverSocketEvent.writeFailed(socket); break; - - default: - logError("server socket status is fatal!!", statusStr); - } - } - - - void setSocketCompress(RPC_PACKAGE_COMPRESS_TYPE type) - { - compressType = type; - } - -private: - RpcSendPackageManage sendPackManage; - RPC_PACKAGE_COMPRESS_TYPE compressType; - - ServerSocketEventInterface serverSocketEvent; - RequestCallback[size_t] rpcCallbackMap; -} diff --git a/source/KissRpc/RpcServerImpl.d b/source/KissRpc/RpcServerImpl.d deleted file mode 100755 index e26dd08..0000000 --- a/source/KissRpc/RpcServerImpl.d +++ /dev/null @@ -1,37 +0,0 @@ -module kissrpc.RpcServerImpl; - -import kissrpc.RpcServer; -import kissrpc.RpcResponse; -import kissrpc.Unit; -import kissrpc.Logs; - -class RpcServerImpl(T) -{ - this(RpcServer ref_server) - { - server = ref_server; - - foreach(i, funcName; __traits(derivedMembers, T)) - { - foreach(callback ;__traits(getVirtualMethods, T, funcName)) - { - deWritefln("rpc server impl class:%s, member func:%s",typeid(T).toString(), typeid(typeof(callback))); - server.bind(typeid(T).toString(), funcName); - } - } - } - - void bindRequestCallback(const size_t funcId, RequestCallback callback) - { - server.bindCallback(funcId, callback); - } - - void response(RpcResponse resp, RPC_PACKAGE_PROTOCOL protocol = RPC_PACKAGE_PROTOCOL.TPP_FLAT_BUF) - { - server.RpcResponseRemoteCall(resp, protocol); - } - -private: - RpcServer server; -} - diff --git a/source/KissRpc/RpcServerListener.d b/source/KissRpc/RpcServerListener.d deleted file mode 100644 index 8e3c72d..0000000 --- a/source/KissRpc/RpcServerListener.d +++ /dev/null @@ -1,36 +0,0 @@ - - -module kissrpc.RpcServerListener; - -import kissrpc.RpcSocketBaseInterface; -import kissrpc.Logs; - -class RpcServerListener : ServerSocketEventInterface -{ - this() {} - void listenFailed(const string str) - { - deWriteln("server listen failed", str); - } - - void disconnectd(RpcSocketBaseInterface socket) - { - deWriteln("client is disconnect"); - } - - shared static int connect_num; - void inconming(RpcSocketBaseInterface socket) - { - logInfo("client inconming:%s:%s, connect num:%s", socket.getIp, socket.getPort, connect_num++); - } - - void writeFailed(RpcSocketBaseInterface socket) - { - deWritefln("write buffer to client is failed, %s:%s", socket.getIp, socket.getPort); - } - - void readFailed(RpcSocketBaseInterface socket) - { - deWritefln("read buffer from client is failed, %s:%s", socket.getIp, socket.getPort); - } -} \ No newline at end of file diff --git a/source/KissRpc/RpcServerSocket.d b/source/KissRpc/RpcServerSocket.d deleted file mode 100755 index af90d78..0000000 --- a/source/KissRpc/RpcServerSocket.d +++ /dev/null @@ -1,63 +0,0 @@ -module kissrpc.RpcServerSocket; - -import kissrpc.RpcRecvPackageManage; -import kissrpc.RpcEventInterface; -import kissrpc.RpcSocketBaseInterface; -import kissrpc.Unit; - -import std.socket; -import std.stdio; -import std.conv; -import core.thread; - - -import kiss.net.TcpServer; -import kiss.aio.AsynchronousSocketChannel; -import kiss.aio.ByteBuffer; - -class RpcServerSocket:TcpServer, RpcSocketBaseInterface{ - -public: - this(AsynchronousSocketChannel client, RpcEventInterface rpcEventDalegate) - { - _socketEventDelegate = rpcEventDalegate; - _packageManage = new RpcRecvPackageManage(this, rpcEventDalegate); - super(client, RPC_PACKAGE_MAX); - _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_CONNECTD, "client inconming...."); - } - override void onWriteCompleted(void* attachment, size_t count , ByteBuffer buffer) { - - } - override void onWriteFailed(void* attachment) { - - _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_WRITE_FAILED, "write data to client is failed"); - } - override void onReadCompleted(void* attachment, size_t count , ByteBuffer buffer) { - _packageManage.add(cast(ubyte[])(buffer.getCurBuffer())); - _readBuffer.clear(); - } - override void onReadFailed(void* attachment) { - - } - override void onClose() { - _socketEventDelegate.socketEvent(this, SOCKET_STATUS.SE_DISCONNECTD, "disconnect from client!"); - } - - void disconnect() - { - close(); - } - bool write(byte[] buf) { - super.doWrite(buf); - return true; - } - - int getFd() { return cast(int)(fd()); } - string getIp() { return ip(); } - string getPort() { return port(); } - -private: - RpcEventInterface _socketEventDelegate; - RpcRecvPackageManage _packageManage; -} - diff --git a/source/KissRpc/RpcSocketBaseInterface.d b/source/KissRpc/RpcSocketBaseInterface.d deleted file mode 100755 index 446b98a..0000000 --- a/source/KissRpc/RpcSocketBaseInterface.d +++ /dev/null @@ -1,33 +0,0 @@ -module kissrpc.RpcSocketBaseInterface; - -import std.socket; - -interface RpcSocketBaseInterface -{ - bool write(byte[] data); - - int getFd(); - - string getIp(); - - string getPort(); - - void disconnect(); -} - -interface ClientSocketEventInterface -{ - void connectd(RpcSocketBaseInterface socket); - void disconnectd(RpcSocketBaseInterface socket); - void writeFailed(RpcSocketBaseInterface socket); - void readFailed(RpcSocketBaseInterface socket); -} - -interface ServerSocketEventInterface -{ - void listenFailed(const string str); - void inconming(RpcSocketBaseInterface socket); - void disconnectd(RpcSocketBaseInterface socket); - void writeFailed(RpcSocketBaseInterface socket); - void readFailed(RpcSocketBaseInterface socket); -} diff --git a/source/KissRpc/Unit.d b/source/KissRpc/Unit.d deleted file mode 100755 index 9587c6a..0000000 --- a/source/KissRpc/Unit.d +++ /dev/null @@ -1,43 +0,0 @@ -module kissrpc.Unit; - -import std.traits; - -shared string[size_t] RpcBindFunctionMap; -const ulong RPC_PACKAGE_MAX = 64*1024; -const uint RPC_REQUEST_TIMEOUT_SECONDS = 20; -const uint RPC_CLIENT_DEFAULT_THREAD_POOL = 1; - -shared ulong RPC_PACKAGE_COMPRESS_DYNAMIC_VALUE = 200; -shared ulong RPC_SYSTEM_TIMESTAMP = 0; -shared string RPC_SYSTEM_TIMESTAMP_STR; - - -// rpc package hander flags -const ubyte[2] RPC_HANDER_MAGIC = [0xff, 0xff]; -const ubyte RPC_HANDER_VERSION = 0x01; - -const short RPC_HANDER_COMPRESS_FLAG = cast(short)0xf000; -const short RPC_HANDER_CPNPRESS_TYPE_FLAG = cast(short)0x0f00; -const short RPC_HANDER_SERI_FLAG = cast(short)0x000f; - -const ubyte RPC_HANDER_HB_FLAG = cast(ubyte)(1 << 8); -const ubyte RPC_HANDER_OW_FLAG = cast(ubyte)(1 << 7); -const ubyte RPC_HANDER_RP_FLAG = cast(ubyte)(1 << 6); -const ubyte RPC_HANDER_NONBLOCK_FLAG = cast(ubyte)(1 << 5); -const ubyte RPC_HANDER_STATUS_CODE_FLAG = cast(ubyte) 0x0f; - -enum RPC_PACKAGE_COMPRESS_TYPE -{ - RPCT_NO, - RPCT_DYNAMIC, - RPCT_COMPRESS, -} - -enum RPC_PACKAGE_PROTOCOL -{ - TPP_JSON, - TPP_XML, - TPP_PROTO_BUF, - TPP_FLAT_BUF, - TPP_CAPNP_BUF, -}