Lots more clean-ups
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "lock_client.h"
4 #include <arpa/inet.h>
5
6 void lock_state::wait(lock & mutex_lock) {
7     auto self = this_thread::get_id();
8     c[self].wait(mutex_lock);
9     c.erase(self);
10 }
11
12 void lock_state::signal() {
13     // signal anyone
14     if (c.begin() != c.end())
15         c.begin()->second.notify_one();
16 }
17
18 void lock_state::signal(thread::id who) {
19     if (c.count(who))
20         c[who].notify_one();
21 }
22
23 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
24
25 in_port_t lock_client::last_port = 0;
26
27 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
28     lock sl(lock_table_lock);
29     return lock_table[lid]; // creates the lock if it doesn't already exist
30 }
31
32 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
33     cl = new rpcc(xdst);
34     if (cl->bind() < 0)
35         LOG("lock_client: call bind");
36
37     srandom((uint32_t)time(NULL)^last_port);
38     rlock_port = ((random()%32000) | (0x1 << 10));
39     id = "127.0.0.1:" + to_string(rlock_port);
40     last_port = rlock_port;
41     rpcs *rlsrpc = new rpcs(rlock_port);
42     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
43     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
44     rsmc = new rsm_client(xdst);
45     releaser_thread = thread(&lock_client::releaser, this);
46 }
47
48 void lock_client::releaser() [[noreturn]] {
49     while (1) {
50         lock_protocol::lockid_t lid;
51         release_fifo.deq(&lid);
52         LOG("Releaser: " << lid);
53
54         lock_state &st = get_lock_state(lid);
55         lock sl(st.m);
56         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
57         st.state = lock_state::releasing;
58         {
59             sl.unlock();
60             int r;
61             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
62             if (lu)
63                 lu->dorelease(lid);
64             sl.lock();
65         }
66         st.state = lock_state::none;
67         LOG("Lock " << lid << ": none");
68         st.signal();
69     }
70 }
71
72 int lock_client::stat(lock_protocol::lockid_t lid) {
73     VERIFY(0);
74     int r;
75     auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
76     VERIFY (ret == lock_protocol::OK);
77     return r;
78 }
79
80 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
81     lock_state &st = get_lock_state(lid);
82     lock sl(st.m);
83     auto self = this_thread::get_id();
84
85     // check for reentrancy
86     VERIFY(st.state != lock_state::locked || st.held_by != self);
87     VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
88
89     st.wanted_by.push_back(self);
90
91     while (1) {
92         if (st.state != lock_state::free)
93             LOG("Lock " << lid << ": not free");
94
95         if (st.state == lock_state::none || st.state == lock_state::retrying) {
96             if (st.state == lock_state::none) {
97                 lock l(xid_mutex);
98                 st.xid = next_xid++;
99             }
100             st.state = lock_state::acquiring;
101             LOG("Lock " << lid << ": acquiring");
102             lock_protocol::status result;
103             {
104                 sl.unlock();
105                 int r;
106                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
107                 sl.lock();
108             }
109             LOG("acquire returned " << result);
110             if (result == lock_protocol::OK) {
111                 st.state = lock_state::free;
112                 LOG("Lock " << lid << ": free");
113             }
114         }
115
116         VERIFY(st.wanted_by.size() != 0);
117         if (st.state == lock_state::free) {
118             // is it for me?
119             auto front = st.wanted_by.front();
120             if (front == releaser_thread.get_id()) {
121                 st.wanted_by.pop_front();
122                 st.state = lock_state::locked;
123                 st.held_by = releaser_thread.get_id();
124                 LOG("Queuing " << lid << " for release");
125                 release_fifo.enq(lid);
126             } else if (front == self) {
127                 st.wanted_by.pop_front();
128                 st.state = lock_state::locked;
129                 st.held_by = self;
130                 break;
131             } else {
132                 st.signal(front);
133             }
134         }
135
136         LOG("waiting...");
137         st.wait(sl);
138         LOG("wait ended");
139     }
140
141     LOG("Lock " << lid << ": locked");
142     return lock_protocol::OK;
143 }
144
145 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
146     lock_state &st = get_lock_state(lid);
147     lock sl(st.m);
148     auto self = this_thread::get_id();
149     VERIFY(st.state == lock_state::locked && st.held_by == self);
150     st.state = lock_state::free;
151     LOG("Lock " << lid << ": free");
152     if (st.wanted_by.size()) {
153         auto front = st.wanted_by.front();
154         if (front == releaser_thread.get_id()) {
155             st.state = lock_state::locked;
156             st.held_by = releaser_thread.get_id();
157             st.wanted_by.pop_front();
158             LOG("Queuing " << lid << " for release");
159             release_fifo.enq(lid);
160         } else
161             st.signal(front);
162     }
163     LOG("Finished signaling.");
164     return lock_protocol::OK;
165 }
166
167 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
168     LOG("Revoke handler " << lid << " " << xid);
169     lock_state &st = get_lock_state(lid);
170     lock sl(st.m);
171
172     if (st.state == lock_state::releasing || st.state == lock_state::none)
173         return rlock_protocol::OK;
174
175     if (st.state == lock_state::free &&
176         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
177         // gimme
178         st.state = lock_state::locked;
179         st.held_by = releaser_thread.get_id();
180         if (st.wanted_by.size())
181             st.wanted_by.pop_front();
182         release_fifo.enq(lid);
183     } else {
184         // get in line
185         st.wanted_by.push_back(releaser_thread.get_id());
186     }
187     return rlock_protocol::OK;
188 }
189
190 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
191     lock_state &st = get_lock_state(lid);
192     lock sl(st.m);
193     VERIFY(st.state == lock_state::acquiring);
194     st.state = lock_state::retrying;
195     LOG("Lock " << lid << ": none");
196     st.signal(); // only one thread needs to wake up
197     return rlock_protocol::OK;
198 }
199
200 t4_lock_client *t4_lock_client_new(const char *dst) {
201     return (t4_lock_client *)new lock_client(dst);
202 }
203
204 void t4_lock_client_delete(t4_lock_client *client) {
205     delete (lock_client *)client;
206 }
207
208 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
209     return ((lock_client *)client)->acquire(lid);
210 }
211
212 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
213     return ((lock_client *)client)->release(lid);
214 }
215
216 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
217     return ((lock_client *)client)->stat(lid);
218 }