cac6a90c280c625338321dd66cfbcca48122fea5
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <sstream>
5 #include <unistd.h>
6 #include <arpa/inet.h>
7 #include "lang/verify.h"
8 #include "handle.h"
9 #include "threaded_log.h"
10 #include "rpc/marshall.h"
11 #include "lock.h"
12
13 using std::ostringstream;
14 using std::istringstream;
15 using std::vector;
16
17 lock_state::lock_state():
18     held(false)
19 {
20 }
21
22 lock_state::lock_state(const lock_state &other) {
23     *this = other;
24 }
25
26 lock_state& lock_state::operator=(const lock_state& o) {
27     held = o.held;
28     held_by = o.held_by;
29     wanted_by = o.wanted_by;
30     old_requests = o.old_requests;
31     return *this;
32 }
33
34 marshall & operator<<(marshall &m, const lock_state &d) {
35         return m << d.held << d.held_by << d.wanted_by;
36 }
37
38 unmarshall & operator>>(unmarshall &u, lock_state &d) {
39         return u >> d.held >> d.held_by >> d.wanted_by;
40 }
41
42 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
43     lock sl(lock_table_lock);
44     // by the semantics of map, this will create
45     // the lock if it doesn't already exist
46     return lock_table[lid];
47 }
48
49 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
50     std::thread(&lock_server::revoker, this).detach();
51     std::thread(&lock_server::retryer, this).detach();
52     rsm->set_state_transfer(this);
53 }
54
55 void lock_server::revoker() [[noreturn]] {
56     while (1) {
57         lock_protocol::lockid_t lid;
58         revoke_fifo.deq(&lid);
59         LOG("Revoking " << lid);
60         if (rsm && !rsm->amiprimary())
61             continue;
62
63         lock_state &st = get_lock_state(lid);
64         holder held_by;
65         {
66             lock sl(st.m);
67             held_by = st.held_by;
68         }
69
70         rpcc *proxy = NULL;
71         // try a few times?
72         //int t=5;
73         //while (t-- && !proxy)
74         proxy = handle(held_by.first).safebind();
75         if (proxy) {
76             int r;
77             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
78             LOG("Revoke returned " << ret);
79         }
80     }
81 }
82
83 void lock_server::retryer() [[noreturn]] {
84     while (1) {
85         lock_protocol::lockid_t lid;
86         retry_fifo.deq(&lid);
87         if (rsm && !rsm->amiprimary())
88             continue;
89
90         LOG("Sending retry for " << lid);
91         lock_state &st = get_lock_state(lid);
92         holder front;
93         {
94             lock sl(st.m);
95             if (st.wanted_by.empty())
96                 continue;
97             front = st.wanted_by.front();
98         }
99
100         rpcc *proxy = NULL;
101         // try a few times?
102         //int t=5;
103         //while (t-- && !proxy)
104         proxy = handle(front.first).safebind();
105         if (proxy) {
106             int r;
107             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
108             LOG("Retry returned " << ret);
109         }
110     }
111 }
112
113 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
114     LOG_FUNC_ENTER_SERVER;
115     holder h = holder(id, xid);
116     lock_state &st = get_lock_state(lid);
117     lock sl(st.m);
118
119     // deal with duplicated requests
120     if (st.old_requests.count(id)) {
121         lock_protocol::xid_t old_xid = st.old_requests[id];
122         if (old_xid > xid)
123             return lock_protocol::RPCERR;
124         else if (old_xid == xid) {
125             if (st.held && st.held_by == h) {
126                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
127                 return lock_protocol::OK;
128             }
129         }
130     }
131
132     // grant the lock if it's available and I'm next in line
133     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
134         if (!st.wanted_by.empty())
135             st.wanted_by.pop_front();
136         st.old_requests[id] = xid;
137
138         st.held = true;
139         st.held_by = h;
140         LOG("Lock " << lid << " held by " << h.first);
141         if (st.wanted_by.size())
142             revoke_fifo.enq(lid);
143         return lock_protocol::OK;
144     }
145
146     // get in line
147     bool found = false;
148     for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
149         if (i->first == id) {
150             // make sure client is obeying serialization
151             if (i->second != xid) {
152                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
153                 return lock_protocol::RPCERR;
154             }
155             found = true;
156             break;
157         }
158     }
159     if (!found)
160         st.wanted_by.push_back(h);
161
162     LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
163
164     // send revoke if we're first in line
165     if (st.wanted_by.front() == h)
166         revoke_fifo.enq(lid);
167
168     return lock_protocol::RETRY;
169 }
170
171 int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
172     LOG_FUNC_ENTER_SERVER;
173     lock_state &st = get_lock_state(lid);
174     lock sl(st.m);
175     if (st.held && st.held_by == holder(id, xid)) {
176         st.held = false;
177         LOG("Lock " << lid << " not held");
178     }
179     if (st.wanted_by.size())
180         retry_fifo.enq(lid);
181     return lock_protocol::OK;
182 }
183
184 string lock_server::marshal_state() {
185     lock sl(lock_table_lock);
186     marshall rep;
187     rep << nacquire;
188     rep << lock_table;
189     return rep.str();
190 }
191
192 void lock_server::unmarshal_state(string state) {
193     lock sl(lock_table_lock);
194     unmarshall rep(state);
195     rep >> nacquire;
196     rep >> lock_table;
197 }
198
199 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
200     LOG("stat request for " << lid);
201     VERIFY(0);
202     r = nacquire;
203     return lock_protocol::OK;
204 }
205