-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSession.cpp
123 lines (110 loc) · 3.34 KB
/
Session.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//
// Created by yangning on 18-1-14.
//
// Descriprion :
//
// Copyright (c) yangning All rights reserved.
//
#include "msg/MsgFactory.h"
#include "Session.h"
#include "proto/Protocol.h"
#include "consumer/ConsumerList.h"
using namespace rapidjson;
static const char* begin = "BEGIN";
static const char* end = "END";
void Session::handleRequest(net::TcpConnectionPtr& connection, net::SocketBuf* buf)
{
auto json = parse(buf);
if ( json.empty() )
return;
//至此协议解析成功
//下面做业务逻辑的处理
auto reply_info = process(json,connection);
//回复客户端处理结果
connection->sendInLoop(reply_info);
}
std::string Session::process(const std::string& json,net::TcpConnectionPtr& connection)
{
std::map<std::string,std::string> map;
jsonstr2map(map,json.c_str());
std::string reply_info("");
auto itr = map.find("cmd");
if ( itr == map.end())
return NULL;
int cmd_type = atoi(itr->second.c_str());
switch (cmd_type) {
case Protocol::kCmdRegister: {
int type = atoi(map["msg_type"].c_str());
reply_info = handleRegister(connection,type) ? Protocol::kSuccess : Protocol::kFail;
break;
}
case Protocol::kCmdPush: {
int proto = atoi(map["msg_proto"].c_str());
int type = atoi(map["msg_type"].c_str());
std::string str = map["content"];
reply_info = handlePushMsg(type, proto, str) ? Protocol::kSuccess : Protocol::kFail;
push2Consumer(map);
break;
}
default:break;
}
return reply_info;
}
std::string Session::parse(net::SocketBuf* buf)
{
if ( buf->readableBytes() <= 8 ) {
//接收到的数据太少,返回等待下次数据到达
return "";
}
const char* str = buf->readBegin();
size_t len = buf->readableBytes();
if ( strncmp(str, begin, strlen(begin)) != 0 ) {
//数据块错误没有包含BEGIN
return "";
}
const char* flag = std::search(str + strlen(begin), str + len, end, end + 3);
if ( flag == str + len ) {
//接收到的数据中没有END
return "";
}
std::string json_str(str + strlen(begin), flag);
//auto json = Protocol::stringToJsonObj(json_str);
buf->skip(len);
return json_str;
}
bool Session::handleRegister(net::TcpConnectionPtr& connection,int type)
{
Consumer consumer(type);
ConsumerList::getInstence().addConsumer(connection,consumer);
return true;
}
bool Session::handlePushMsg(int type, int proto, const std::string& content)
{//构建一个消息对象,将这个对象push到全局的数据结构中
/*switch (type) {
case Protocol::kAnnounceMsg : {
std::shared_ptr<Msg> msg(new AnnounceMsg(content));
msg->setMsgType(proto);
msgSet_.insert(msg);
break;
}
case Protocol::kInfoMsg: {
std::shared_ptr<Msg> infomsg(new InfoMsg(content));
infomsg->setMsgType(proto);
msgSet_.insert(infomsg);
break;
}
case Protocol::kRemindMsg: {
std::shared_ptr<Msg> remindmsg(new InfoMsg(content));
remindmsg->setMsgType(proto);
msgSet_.insert(remindmsg);
break;
}
default:break;
}*/
return true;
}
bool Session::push2Consumer(std::map<std::string, std::string>& json_map)
{
ConsumerList::getInstence().pushmsg2Consumter(json_map);
return false;
}