More renaming
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <sstream>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <arpa/inet.h>
8 #include "lang/verify.h"
9 #include "handle.h"
10 #include "tprintf.h"
11 #include "rpc/marshall.h"
12 #include "lock.h"
13
14 using std::ostringstream;
15 using std::istringstream;
16 using std::vector;
17
18 lock_state::lock_state():
19     held(false)
20 {
21 }
22
23 lock_state::lock_state(const lock_state &other) {
24     *this = other;
25 }
26
27 lock_state& lock_state::operator=(const lock_state& o) {
28     held = o.held;
29     held_by = o.held_by;
30     wanted_by = o.wanted_by;
31     old_requests = o.old_requests;
32     return *this;
33 }
34
35 template <class A, class B>
36 ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
37     o << "<" << d.first << "," << d.second << ">";
38     return o;
39 }
40
41 marshall & operator<<(marshall &m, const lock_state &d) {
42         return m << d.held << d.held_by << d.wanted_by;
43 }
44
45 unmarshall & operator>>(unmarshall &u, lock_state &d) {
46         return u >> d.held >> d.held_by >> d.wanted_by;
47 }
48
49 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
50     lock sl(lock_table_lock);
51     // by the semantics of map, this will create
52     // the lock if it doesn't already exist
53     return lock_table[lid];
54 }
55
56 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
57     std::thread(&lock_server::revoker, this).detach();
58     std::thread(&lock_server::retryer, this).detach();
59     rsm->set_state_transfer(this);
60 }
61
62 void lock_server::revoker() {
63     while (1) {
64         lock_protocol::lockid_t lid;
65         revoke_fifo.deq(&lid);
66         LOG("Revoking " << lid);
67         if (rsm && !rsm->amiprimary())
68             continue;
69
70         lock_state &st = get_lock_state(lid);
71         holder held_by;
72         {
73             lock sl(st.m);
74             held_by = st.held_by;
75         }
76
77         rpcc *proxy = NULL;
78         // try a few times?
79         //int t=5;
80         //while (t-- && !proxy)
81         proxy = handle(held_by.first).safebind();
82         if (proxy) {
83             int r;
84             rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
85             LOG("Revoke returned " << ret);
86         }
87     }
88 }
89
90 void lock_server::retryer() {
91     while (1) {
92         lock_protocol::lockid_t lid;
93         retry_fifo.deq(&lid);
94         if (rsm && !rsm->amiprimary())
95             continue;
96
97         LOG("Sending retry for " << lid);
98         lock_state &st = get_lock_state(lid);
99         holder front;
100         {
101             lock sl(st.m);
102             if (st.wanted_by.empty())
103                 continue;
104             front = st.wanted_by.front();
105         }
106
107         rlock_protocol::status ret = -1;
108
109         rpcc *proxy = NULL;
110         // try a few times?
111         //int t=5;
112         //while (t-- && !proxy)
113         proxy = handle(front.first).safebind();
114         if (proxy) {
115             int r;
116             ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
117             LOG("Retry returned " << ret);
118         }
119     }
120 }
121
122 int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
123     LOG_FUNC_ENTER_SERVER;
124     holder h = holder(id, xid);
125     lock_state &st = get_lock_state(lid);
126     lock sl(st.m);
127
128     // deal with duplicated requests
129     if (st.old_requests.count(id)) {
130         lock_protocol::xid_t old_xid = st.old_requests[id];
131         if (old_xid > xid)
132             return lock_protocol::RPCERR;
133         else if (old_xid == xid) {
134             if (st.held && st.held_by == h) {
135                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
136                 return lock_protocol::OK;
137             }
138         }
139     }
140
141     // grant the lock if it's available and I'm next in line
142     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
143         if (!st.wanted_by.empty())
144             st.wanted_by.pop_front();
145         st.old_requests[id] = xid;
146
147         st.held = true;
148         st.held_by = h;
149         LOG("Lock " << lid << " held by " << h.first);
150         if (st.wanted_by.size())
151             revoke_fifo.enq(lid);
152         return lock_protocol::OK;
153     }
154
155     // get in line
156     bool found = false;
157     for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
158         if (i->first == id) {
159             // make sure client is obeying serialization
160             if (i->second != xid) {
161                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
162                 return lock_protocol::RPCERR;
163             }
164             found = true;
165             break;
166         }
167     }
168     if (!found)
169         st.wanted_by.push_back(h);
170
171     LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
172
173     // send revoke if we're first in line
174     if (st.wanted_by.front() == h)
175         revoke_fifo.enq(lid);
176
177     return lock_protocol::RETRY;
178 }
179
180 int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
181     LOG_FUNC_ENTER_SERVER;
182     lock_state &st = get_lock_state(lid);
183     lock sl(st.m);
184     if (st.held && st.held_by == holder(id, xid)) {
185         st.held = false;
186         LOG("Lock " << lid << " not held");
187     }
188     if (st.wanted_by.size())
189         retry_fifo.enq(lid);
190     return lock_protocol::OK;
191 }
192
193 string lock_server::marshal_state() {
194     lock sl(lock_table_lock);
195     marshall rep;
196     rep << nacquire;
197     rep << lock_table;
198     return rep.str();
199 }
200
201 void lock_server::unmarshal_state(string state) {
202     lock sl(lock_table_lock);
203     unmarshall rep(state);
204     rep >> nacquire;
205     rep >> lock_table;
206 }
207
208 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
209     printf("stat request\n");
210     VERIFY(0);
211     r = nacquire;
212     return lock_protocol::OK;
213 }
214