Refactoring
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "lock_server.h"
4 #include <unistd.h>
5 #include <arpa/inet.h>
6 #include "handle.h"
7
8 lock_state::lock_state():
9     held(false)
10 {
11 }
12
13 lock_state::lock_state(const lock_state &other) {
14     *this = other;
15 }
16
17 lock_state& lock_state::operator=(const lock_state& o) {
18     held = o.held;
19     held_by = o.held_by;
20     wanted_by = o.wanted_by;
21     old_requests = o.old_requests;
22     return *this;
23 }
24
25 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
26     lock sl(lock_table_lock);
27     // this will create the lock if it doesn't already exist
28     return lock_table[lid];
29 }
30
31 lock_server::lock_server(rsm *r) : rsm_ (r) {
32     thread(&lock_server::revoker, this).detach();
33     thread(&lock_server::retryer, this).detach();
34     rsm_->set_state_transfer(this);
35 }
36
37 void lock_server::revoker() [[noreturn]] {
38     while (1) {
39         lock_protocol::lockid_t lid;
40         revoke_fifo.deq(&lid);
41         LOG("Revoking " << lid);
42         if (rsm_ && !rsm_->amiprimary())
43             continue;
44
45         lock_state &st = get_lock_state(lid);
46         holder_t held_by;
47         {
48             lock sl(st.m);
49             held_by = st.held_by;
50         }
51
52         rpcc *proxy = NULL;
53         // try a few times?
54         //int t=5;
55         //while (t-- && !proxy)
56         proxy = handle(held_by.first).safebind();
57         if (proxy) {
58             int r;
59             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
60             LOG("Revoke returned " << ret);
61         }
62     }
63 }
64
65 void lock_server::retryer() [[noreturn]] {
66     while (1) {
67         lock_protocol::lockid_t lid;
68         retry_fifo.deq(&lid);
69         if (rsm_ && !rsm_->amiprimary())
70             continue;
71
72         LOG("Sending retry for " << lid);
73         lock_state &st = get_lock_state(lid);
74         holder_t front;
75         {
76             lock sl(st.m);
77             if (st.wanted_by.empty())
78                 continue;
79             front = st.wanted_by.front();
80         }
81
82         rpcc *proxy = NULL;
83         // try a few times?
84         //int t=5;
85         //while (t-- && !proxy)
86         proxy = handle(front.first).safebind();
87         if (proxy) {
88             int r;
89             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
90             LOG("Retry returned " << ret);
91         }
92     }
93 }
94
95 int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
96     LOG("lid=" << lid << " client=" << id << "," << xid);
97     holder_t h = holder_t(id, xid);
98     lock_state &st = get_lock_state(lid);
99     lock sl(st.m);
100
101     // deal with duplicated requests
102     if (st.old_requests.count(id)) {
103         lock_protocol::xid_t old_xid = st.old_requests[id];
104         if (old_xid > xid)
105             return lock_protocol::RPCERR;
106         else if (old_xid == xid) {
107             if (st.held && st.held_by == h) {
108                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
109                 return lock_protocol::OK;
110             }
111         }
112     }
113
114     // grant the lock if it's available and I'm next in line
115     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
116         if (!st.wanted_by.empty())
117             st.wanted_by.pop_front();
118         st.old_requests[id] = xid;
119
120         st.held = true;
121         st.held_by = h;
122         LOG("Lock " << lid << " held by " << h.first);
123         if (st.wanted_by.size())
124             revoke_fifo.enq(lid);
125         return lock_protocol::OK;
126     }
127
128     // get in line
129     bool found = false;
130     for (auto p : st.wanted_by) {
131         if (p.first == id) {
132             // make sure client is obeying serialization
133             if (p.second != xid) {
134                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
135                 return lock_protocol::RPCERR;
136             }
137             found = true;
138             break;
139         }
140     }
141     if (!found)
142         st.wanted_by.push_back(h);
143
144     LOG("wanted_by=" << st.wanted_by);
145
146     // send revoke if we're first in line
147     if (st.wanted_by.front() == h)
148         revoke_fifo.enq(lid);
149
150     return lock_protocol::RETRY;
151 }
152
153 int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
154     LOG("lid=" << lid << " client=" << id << "," << xid);
155     lock_state &st = get_lock_state(lid);
156     lock sl(st.m);
157     if (st.held && st.held_by == holder_t(id, xid)) {
158         st.held = false;
159         LOG("Lock " << lid << " not held");
160     }
161     if (st.wanted_by.size())
162         retry_fifo.enq(lid);
163     return lock_protocol::OK;
164 }
165
166 string lock_server::marshal_state() {
167     lock sl(lock_table_lock);
168     marshall rep;
169     rep << nacquire << lock_table;
170     return rep.content();
171 }
172
173 void lock_server::unmarshal_state(const string & state) {
174     lock sl(lock_table_lock);
175     unmarshall rep(state, false);
176     rep >> nacquire >> lock_table;
177 }
178
179 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
180     LOG("stat request for " << lid);
181     VERIFY(0);
182     r = nacquire;
183     return lock_protocol::OK;
184 }
185