a82231eab25d1ba3bc467ded211a014ed0aa0b85
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <sstream>
5 #include <unistd.h>
6 #include <arpa/inet.h>
7 #include "lang/verify.h"
8 #include "handle.h"
9 #include "tprintf.h"
10 #include "rpc/marshall.h"
11 #include "lock.h"
12
13 using std::ostringstream;
14 using std::istringstream;
15 using std::vector;
16
17 lock_state::lock_state():
18     held(false)
19 {
20 }
21
22 lock_state::lock_state(const lock_state &other) {
23     *this = other;
24 }
25
26 lock_state& lock_state::operator=(const lock_state& o) {
27     held = o.held;
28     held_by = o.held_by;
29     wanted_by = o.wanted_by;
30     old_requests = o.old_requests;
31     return *this;
32 }
33
34 marshall & operator<<(marshall &m, const lock_state &d) {
35         return m << d.held << d.held_by << d.wanted_by;
36 }
37
38 unmarshall & operator>>(unmarshall &u, lock_state &d) {
39         return u >> d.held >> d.held_by >> d.wanted_by;
40 }
41
42 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
43     lock sl(lock_table_lock);
44     // by the semantics of map, this will create
45     // the lock if it doesn't already exist
46     return lock_table[lid];
47 }
48
49 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
50     std::thread(&lock_server::revoker, this).detach();
51     std::thread(&lock_server::retryer, this).detach();
52     rsm->set_state_transfer(this);
53 }
54
55 void lock_server::revoker() [[noreturn]] {
56     while (1) {
57         lock_protocol::lockid_t lid;
58         revoke_fifo.deq(&lid);
59         LOG("Revoking " << lid);
60         if (rsm && !rsm->amiprimary())
61             continue;
62
63         lock_state &st = get_lock_state(lid);
64         holder held_by;
65         {
66             lock sl(st.m);
67             held_by = st.held_by;
68         }
69
70         rpcc *proxy = NULL;
71         // try a few times?
72         //int t=5;
73         //while (t-- && !proxy)
74         proxy = handle(held_by.first).safebind();
75         if (proxy) {
76             int r;
77             rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
78             LOG("Revoke returned " << ret);
79         }
80     }
81 }
82
83 void lock_server::retryer() [[noreturn]] {
84     while (1) {
85         lock_protocol::lockid_t lid;
86         retry_fifo.deq(&lid);
87         if (rsm && !rsm->amiprimary())
88             continue;
89
90         LOG("Sending retry for " << lid);
91         lock_state &st = get_lock_state(lid);
92         holder front;
93         {
94             lock sl(st.m);
95             if (st.wanted_by.empty())
96                 continue;
97             front = st.wanted_by.front();
98         }
99
100         rlock_protocol::status ret = -1;
101
102         rpcc *proxy = NULL;
103         // try a few times?
104         //int t=5;
105         //while (t-- && !proxy)
106         proxy = handle(front.first).safebind();
107         if (proxy) {
108             int r;
109             ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
110             LOG("Retry returned " << ret);
111         }
112     }
113 }
114
115 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
116     LOG_FUNC_ENTER_SERVER;
117     holder h = holder(id, xid);
118     lock_state &st = get_lock_state(lid);
119     lock sl(st.m);
120
121     // deal with duplicated requests
122     if (st.old_requests.count(id)) {
123         lock_protocol::xid_t old_xid = st.old_requests[id];
124         if (old_xid > xid)
125             return lock_protocol::RPCERR;
126         else if (old_xid == xid) {
127             if (st.held && st.held_by == h) {
128                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
129                 return lock_protocol::OK;
130             }
131         }
132     }
133
134     // grant the lock if it's available and I'm next in line
135     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
136         if (!st.wanted_by.empty())
137             st.wanted_by.pop_front();
138         st.old_requests[id] = xid;
139
140         st.held = true;
141         st.held_by = h;
142         LOG("Lock " << lid << " held by " << h.first);
143         if (st.wanted_by.size())
144             revoke_fifo.enq(lid);
145         return lock_protocol::OK;
146     }
147
148     // get in line
149     bool found = false;
150     for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
151         if (i->first == id) {
152             // make sure client is obeying serialization
153             if (i->second != xid) {
154                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
155                 return lock_protocol::RPCERR;
156             }
157             found = true;
158             break;
159         }
160     }
161     if (!found)
162         st.wanted_by.push_back(h);
163
164     LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
165
166     // send revoke if we're first in line
167     if (st.wanted_by.front() == h)
168         revoke_fifo.enq(lid);
169
170     return lock_protocol::RETRY;
171 }
172
173 int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
174     LOG_FUNC_ENTER_SERVER;
175     lock_state &st = get_lock_state(lid);
176     lock sl(st.m);
177     if (st.held && st.held_by == holder(id, xid)) {
178         st.held = false;
179         LOG("Lock " << lid << " not held");
180     }
181     if (st.wanted_by.size())
182         retry_fifo.enq(lid);
183     return lock_protocol::OK;
184 }
185
186 string lock_server::marshal_state() {
187     lock sl(lock_table_lock);
188     marshall rep;
189     rep << nacquire;
190     rep << lock_table;
191     return rep.str();
192 }
193
194 void lock_server::unmarshal_state(string state) {
195     lock sl(lock_table_lock);
196     unmarshall rep(state);
197     rep >> nacquire;
198     rep >> lock_table;
199 }
200
201 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
202     LOG("stat request for " << lid);
203     VERIFY(0);
204     r = nacquire;
205     return lock_protocol::OK;
206 }
207