379838abffd9c05593e15d74852b74dbeb027bae
[invirt/third/libt4.git] / lock_server.cc
1 // the caching lock server implementation
2
3 #include "types.h"
4 #include "lock_server.h"
5 #include <unistd.h>
6 #include <arpa/inet.h>
7 #include "handle.h"
8
9 lock_state::lock_state():
10     held(false)
11 {
12 }
13
14 lock_state::lock_state(const lock_state &other) {
15     *this = other;
16 }
17
18 lock_state& lock_state::operator=(const lock_state& o) {
19     held = o.held;
20     held_by = o.held_by;
21     wanted_by = o.wanted_by;
22     old_requests = o.old_requests;
23     return *this;
24 }
25
26 marshall & operator<<(marshall &m, const lock_state &d) {
27         return m << d.held << d.held_by << d.wanted_by;
28 }
29
30 unmarshall & operator>>(unmarshall &u, lock_state &d) {
31         return u >> d.held >> d.held_by >> d.wanted_by;
32 }
33
34 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
35     lock sl(lock_table_lock);
36     // by the semantics of map, this will create
37     // the lock if it doesn't already exist
38     return lock_table[lid];
39 }
40
41 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
42     std::thread(&lock_server::revoker, this).detach();
43     std::thread(&lock_server::retryer, this).detach();
44     rsm->set_state_transfer(this);
45 }
46
47 void lock_server::revoker() [[noreturn]] {
48     while (1) {
49         lock_protocol::lockid_t lid;
50         revoke_fifo.deq(&lid);
51         LOG("Revoking " << lid);
52         if (rsm && !rsm->amiprimary())
53             continue;
54
55         lock_state &st = get_lock_state(lid);
56         holder_t held_by;
57         {
58             lock sl(st.m);
59             held_by = st.held_by;
60         }
61
62         rpcc *proxy = NULL;
63         // try a few times?
64         //int t=5;
65         //while (t-- && !proxy)
66         proxy = handle(held_by.first).safebind();
67         if (proxy) {
68             int r;
69             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
70             LOG("Revoke returned " << ret);
71         }
72     }
73 }
74
75 void lock_server::retryer() [[noreturn]] {
76     while (1) {
77         lock_protocol::lockid_t lid;
78         retry_fifo.deq(&lid);
79         if (rsm && !rsm->amiprimary())
80             continue;
81
82         LOG("Sending retry for " << lid);
83         lock_state &st = get_lock_state(lid);
84         holder_t front;
85         {
86             lock sl(st.m);
87             if (st.wanted_by.empty())
88                 continue;
89             front = st.wanted_by.front();
90         }
91
92         rpcc *proxy = NULL;
93         // try a few times?
94         //int t=5;
95         //while (t-- && !proxy)
96         proxy = handle(front.first).safebind();
97         if (proxy) {
98             int r;
99             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
100             LOG("Retry returned " << ret);
101         }
102     }
103 }
104
105 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
106     LOG("lid=" << lid << " client=" << id << "," << xid);
107     holder_t h = holder_t(id, xid);
108     lock_state &st = get_lock_state(lid);
109     lock sl(st.m);
110
111     // deal with duplicated requests
112     if (st.old_requests.count(id)) {
113         lock_protocol::xid_t old_xid = st.old_requests[id];
114         if (old_xid > xid)
115             return lock_protocol::RPCERR;
116         else if (old_xid == xid) {
117             if (st.held && st.held_by == h) {
118                 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
119                 return lock_protocol::OK;
120             }
121         }
122     }
123
124     // grant the lock if it's available and I'm next in line
125     if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
126         if (!st.wanted_by.empty())
127             st.wanted_by.pop_front();
128         st.old_requests[id] = xid;
129
130         st.held = true;
131         st.held_by = h;
132         LOG("Lock " << lid << " held by " << h.first);
133         if (st.wanted_by.size())
134             revoke_fifo.enq(lid);
135         return lock_protocol::OK;
136     }
137
138     // get in line
139     bool found = false;
140     for (auto p : st.wanted_by) {
141         if (p.first == id) {
142             // make sure client is obeying serialization
143             if (p.second != xid) {
144                 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
145                 return lock_protocol::RPCERR;
146             }
147             found = true;
148             break;
149         }
150     }
151     if (!found)
152         st.wanted_by.push_back(h);
153
154     LOG("wanted_by=" << st.wanted_by);
155
156     // send revoke if we're first in line
157     if (st.wanted_by.front() == h)
158         revoke_fifo.enq(lid);
159
160     return lock_protocol::RETRY;
161 }
162
163 int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
164     LOG("lid=" << lid << " client=" << id << "," << xid);
165     lock_state &st = get_lock_state(lid);
166     lock sl(st.m);
167     if (st.held && st.held_by == holder_t(id, xid)) {
168         st.held = false;
169         LOG("Lock " << lid << " not held");
170     }
171     if (st.wanted_by.size())
172         retry_fifo.enq(lid);
173     return lock_protocol::OK;
174 }
175
176 string lock_server::marshal_state() {
177     lock sl(lock_table_lock);
178     marshall rep;
179     rep << nacquire;
180     rep << lock_table;
181     return rep.str();
182 }
183
184 void lock_server::unmarshal_state(string state) {
185     lock sl(lock_table_lock);
186     unmarshall rep(state);
187     rep >> nacquire;
188     rep >> lock_table;
189 }
190
191 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
192     LOG("stat request for " << lid);
193     VERIFY(0);
194     r = nacquire;
195     return lock_protocol::OK;
196 }
197