forked from bohaoist/yfs2012
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock_server_cache.cc
133 lines (120 loc) · 3.34 KB
/
lock_server_cache.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// the caching lock server implementation
#include "lock_server_cache.h"
#include <sstream>
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <algorithm>
#include "lang/verify.h"
#include "handle.h"
#include "tprintf.h"
lock_server_cache::lock_server_cache()
: nacquire(0),
states_()
{
}
int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id,
int& retryR)
{
retryR = 0;
lock_protocol::status ret = lock_protocol::OK;
mutex_.lock();
auto it = states_.find(lid);
if(it == states_.end() || it->second.isHold_ == false)
{
//insert
states_[lid].isHold_ = true;
states_[lid].ownerID_ = id;
tprintf("%s insert and acquire %lld \n",id.c_str(),lid);
mutex_.unlock();
return lock_protocol::OK;
}
//lock exits and must be hold by a clients
assert(it->second.isHold_);
if (!it->second.waitingClients_.empty())
{
ret = lock_protocol::RETRY;
for (auto &waiter : it->second.waitingClients_)
{
if(waiter == id)
assert(false);
}
it->second.waitingClients_.push_back(id);
tprintf("%s push and send RETRY %lld \n",id.c_str(),lid);
mutex_.unlock();
return ret;
}
//waitinglist is empty
std::string ownerID = it->second.ownerID_;
tprintf("%s first push %lld \n",id.c_str(),lid);
it->second.waitingClients_.push_back(id);
mutex_.unlock();
//send revok
handle ownerHandle(ownerID);
handle retryHandle(id);
rpcc* owner = ownerHandle.safebind();
rpcc* retry = retryHandle.safebind();
if(owner != nullptr && retry != nullptr)
{
int r = 0;
tprintf("%s begin send revoke %lld \n",ownerID.c_str(),lid);
ret = owner->call(rlock_protocol::revoke,lid,r);
tprintf("%s revoke finish %lld \n",ownerID.c_str(),lid);
int r_;
//check is empty?
{
mutex_.lock();
it = states_.find(lid);
it->second.waitingClients_.pop_front();
if(!it->second.waitingClients_.empty())
retryR = 1;
it->second.ownerID_ = id;
mutex_.unlock();
}
return lock_protocol::OK;
}
else
{
assert(false);
}
}
int
lock_server_cache::release(lock_protocol::lockid_t lid, std::string id,
int &r)
{
tprintf("%s begin release %lld \n",id.c_str(),lid);
lock_protocol::status ret = lock_protocol::OK;
int retryR = 0;
std::unique_lock<std::mutex> lk(mutex_);
auto it = states_.find(lid);
assert(it != states_.end());
assert(it->second.isHold_);
assert(!it->second.waitingClients_.empty());
tprintf("%s release %lld \n",id.c_str(),lid);
std::string ownerID = it->second.waitingClients_.front();
it->second.ownerID_ = ownerID;
it->second.waitingClients_.pop_front();
if(!it->second.waitingClients_.empty())
retryR = 1;
lk.unlock();
handle retryHandle(ownerID);
rpcc* retry = retryHandle.safebind();
int r_;
ret = retry->call(rlock_protocol::retry,lid,retryR,r_);
tprintf("%s send retry down %lld \n",ownerID.c_str(),lid);
//pop_fron set after the rpc_retry to prevent during the call a new require arrive
//and the waitSet is empty
// {
// lk.lock();
// it = states_.find(lid);
// it->second.waitingClients_.pop_front();
// }
return lock_protocol::OK;
}
lock_protocol::status
lock_server_cache::stat(lock_protocol::lockid_t lid, int &r)
{
tprintf("stat request\n");
r = nacquire;
return lock_protocol::OK;
}