Imported from 6.824 labs
[invirt/third/libt4.git] / lock_server_cache_rsm.cc
1 // the caching lock server implementation
2
3 #include "lock_server_cache_rsm.h"
4 #include <sstream>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <arpa/inet.h>
8 #include "lang/verify.h"
9 #include "handle.h"
10 #include "tprintf.h"
11 #include "rpc/marshall.h"
12
13 lock_state::lock_state():
14     held(false)
15 {
16 }
17
18 template <class A, class B>
19 ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
20     o << "<" << d.first << "," << d.second << ">";
21     return o;
22 }
23
24 template <class A>
25 marshall & operator<<(marshall &m, const list<A> &d) {
26     m << vector<A>(d.begin(), d.end());
27     return m;
28 }
29
30 template <class A>
31 unmarshall & operator>>(unmarshall &u, list<A> &d) {
32     vector<A> v;
33     u >> v;
34     d.assign(v.begin(), v.end());
35     return u;
36 }
37
38
39 template <class A, class B>
40 marshall & operator<<(marshall &m, const pair<A,B> &d) {
41     m << d.first;
42     m << d.second;
43     return m;
44 }
45
46 template <class A, class B>
47 unmarshall & operator>>(unmarshall &u, pair<A,B> &d) {
48     u >> d.first;
49     u >> d.second;
50     return u;
51 }
52
53 marshall & operator<<(marshall &m, const lock_state &d) {
54     m << d.held;
55     m << d.held_by;
56     m << d.wanted_by;
57         return m;
58 }
59
60 unmarshall & operator>>(unmarshall &u, lock_state &d) {
61     u >> d.held;
62     u >> d.held_by;
63     u >> d.wanted_by;
64         return u;
65 }
66
67
68 lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
69     ScopedLock sl(lock_table_lock);
70     // by the semantics of map, this will create
71     // the lock if it doesn't already exist
72     return lock_table[lid];
73 }
74
75 static void *revokethread(void *x) {
76     lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
77     sc->revoker();
78     return 0;
79 }
80
81 static void *retrythread(void *x) {
82     lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
83     sc->retryer();
84     return 0;
85 }
86
87 lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
88     pthread_t th;
89     VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0);
90     VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0);
91     rsm->set_state_transfer(this);
92 }
93
94 void lock_server_cache_rsm::revoker() {
95     while (1) {
96         lock_protocol::lockid_t lid;
97         revoke_fifo.deq(&lid);
98         LOG("Revoking " << lid);
99         if (rsm && !rsm->amiprimary())
100             continue;
101
102         lock_state &st = get_lock_state(lid);
103         holder held_by;
104         {
105             ScopedLock sl(st.m);
106             held_by = st.held_by;
107         }
108
109         rpcc *proxy = NULL;
110         // try a few times?
111         //int t=5;
112         //while (t-- && !proxy)
113         proxy = handle(held_by.first).safebind();
114         if (proxy) {
115             int r;
116             rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r);
117             LOG("Revoke returned " << ret);
118         }
119     }
120 }
121
122 void lock_server_cache_rsm::retryer() {
123     while (1) {
124         lock_protocol::lockid_t lid;
125         retry_fifo.deq(&lid);
126         if (rsm && !rsm->amiprimary())
127             continue;
128
129         LOG("Sending retry for " << lid);
130         lock_state &st = get_lock_state(lid);
131         holder front;
132         {
133             ScopedLock sl(st.m);
134             if (st.wanted_by.empty())
135                 continue;
136             front = st.wanted_by.front();
137         }
138
139         rlock_protocol::status ret = -1;
140
141         rpcc *proxy = NULL;
142         // try a few times?
143         //int t=5;
144         //while (t-- && !proxy)
145         proxy = handle(front.first).safebind();
146         if (proxy) {
147             int r;
148             ret = proxy->call(rlock_protocol::retry, lid, front.second, r);
149             LOG("Retry returned " << ret);
150         }
151     }
152 }
153
154 int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) {
155     LOG_FUNC_ENTER_SERVER;
156     holder h = holder(id, xid);
157     lock_state &st = get_lock_state(lid);
158     ScopedLock sl(st.m);
159
160     // deal with duplicated requests
161     if (st.old_requests.count(id)) {
162         lock_protocol::xid_t old_xid = st.old_requests[id];
163         if (old_xid > xid)
164             return lock_protocol::RPCERR;
165         else if (old_xid == xid) {
166             if (st.held && st.held_by == h) {
167                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
168                 return lock_protocol::OK;
169             }
170         }
171     }
172
173     // grant the lock if it's available and I'm next in line
174     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
175         if (!st.wanted_by.empty())
176             st.wanted_by.pop_front();
177         st.old_requests[id] = xid;
178
179         st.held = true;
180         st.held_by = h;
181         LOG("Lock " << lid << " held by " << h.first);
182         if (st.wanted_by.size())
183             revoke_fifo.enq(lid);
184         return lock_protocol::OK;
185     }
186
187     // get in line
188     bool found = false;
189     for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
190         if (i->first == id) {
191             // make sure client is obeying serialization
192             if (i->second != xid) {
193                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
194                 return lock_protocol::RPCERR;
195             }
196             found = true;
197             break;
198         }
199     }
200     if (!found)
201         st.wanted_by.push_back(h);
202
203     LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
204
205     // send revoke if we're first in line
206     if (st.wanted_by.front() == h)
207         revoke_fifo.enq(lid);
208
209     return lock_protocol::RETRY;
210 }
211
212 int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
213     LOG_FUNC_ENTER_SERVER;
214     lock_state &st = get_lock_state(lid);
215     ScopedLock sl(st.m);
216     if (st.held && st.held_by == holder(id, xid)) {
217         st.held = false;
218         LOG("Lock " << lid << " not held");
219     }
220     if (st.wanted_by.size())
221         retry_fifo.enq(lid);
222     return lock_protocol::OK;
223 }
224
225 string lock_server_cache_rsm::marshal_state() {
226     ScopedLock sl(lock_table_lock);
227     marshall rep;
228     rep << nacquire;
229     rep << lock_table;
230     return rep.str();
231 }
232
233 void lock_server_cache_rsm::unmarshal_state(string state) {
234     ScopedLock sl(lock_table_lock);
235     unmarshall rep(state);
236     rep >> nacquire;
237     rep >> lock_table;
238 }
239
240 lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) {
241     printf("stat request\n");
242     r = nacquire;
243     return lock_protocol::OK;
244 }
245