Skip to content

Commit

Permalink
add manager class
Browse files Browse the repository at this point in the history
  • Loading branch information
noclear committed Sep 1, 2017
1 parent 7d35712 commit f216635
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 0 deletions.
44 changes: 44 additions & 0 deletions source/KissRpc/RpcClientListener.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@


module KissRpc.RpcClientListener;

import KissRpc.RpcSocketBaseInterface;
import KissRpc.Logs;
import kiss.aio.AsynchronousChannelSelector;
import KissRpc.RpcClient;

import std.stdio;


class RpcClientListener : ClientSocketEventInterface
{

this(string ip, ushort port, AsynchronousChannelSelector sel)
{
_rpClient = new RpcClient(this);
_rpClient.connect(ip, port, sel);
}

void connectd(RpcSocketBaseInterface socket)
{

}

void disconnectd(RpcSocketBaseInterface socket)
{
writefln("client disconnect ....");
}

void writeFailed(RpcSocketBaseInterface socket)
{
deWritefln("client write failed , %s:%s", socket.getIp, socket.getPort);
}

void readFailed(RpcSocketBaseInterface socket)
{
deWritefln("client read failed , %s:%s", socket.getIp, socket.getPort);
}

public:
RpcClient _rpClient;
}
76 changes: 76 additions & 0 deletions source/KissRpc/RpcManager.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@



module KissRpc.RpcManager;

import KissRpc.RpcServer;
import kiss.aio.AsynchronousChannelThreadGroup;
import KissRpc.Logs;
import KissRpc.RpcServerListener;

import std.experimental.logger.core;


class RpcManager {

public:
this() {

}

static @property getInstance() {
if (_instance is null) {
_instance = new RpcManager();
}
return _instance;
}

//T是RpcServerListener类或者子类 A是server端RPC类
void startService(T, A...)(string ip, ushort port, int threadNum) {
if(isServerStart)
{
warningf("rpc service has already start !!!");
return;
}
isServerStart = true;
_serverGroup = AsynchronousChannelThreadGroup.open(5,threadNum);
for(int i = 0; i < threadNum; i++) {
RpcServer service = new RpcServer(ip, port, _serverGroup,new T);
foreach(t;A) {
auto rpcClass = new t(service);
}
}
_serverGroup.start();
}

void stopService() {
isServerStart = false;
_serverGroup.stop();
}
//T是RpcClientListener类 或者子类
void connectService(T)(string ip, ushort port, int threadNum) {
if(isClientStart)
{
warningf("rpc service has already start !!!");
return;
}
isClientStart = true;
_ClientGroup = AsynchronousChannelThreadGroup.open(5,threadNum);
for(int i = 0; i < threadNum; i++) {
T client = new T(ip, port, _ClientGroup.getWorkSelector());
}
_ClientGroup.start();
}
void stopClient() {
isClientStart = false;
_ClientGroup.stop();
}

private :
__gshared static RpcManager _instance;
AsynchronousChannelThreadGroup _serverGroup;
AsynchronousChannelThreadGroup _ClientGroup;

bool isServerStart;
bool isClientStart;
}
36 changes: 36 additions & 0 deletions source/KissRpc/RpcServerListener.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@


module KissRpc.RpcServerListener;

import KissRpc.RpcSocketBaseInterface;
import KissRpc.Logs;

class RpcServerListener : ServerSocketEventInterface
{
this() {}
void listenFailed(const string str)
{
deWriteln("server listen failed", str);
}

void disconnectd(RpcSocketBaseInterface socket)
{
deWriteln("client is disconnect");
}

shared static int connect_num;
void inconming(RpcSocketBaseInterface socket)
{
logInfo("client inconming:%s:%s, connect num:%s", socket.getIp, socket.getPort, connect_num++);
}

void writeFailed(RpcSocketBaseInterface socket)
{
deWritefln("write buffer to client is failed, %s:%s", socket.getIp, socket.getPort);
}

void readFailed(RpcSocketBaseInterface socket)
{
deWritefln("read buffer from client is failed, %s:%s", socket.getIp, socket.getPort);
}
}

0 comments on commit f216635

Please sign in to comment.