More clean-ups and cool template stuff
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "lock_client.h"
4 #include "rpc/rpc.h"
5 #include <algorithm>
6 #include "threaded_log.h"
7 #include <arpa/inet.h>
8
9 #include "rsm_client.h"
10 #include "lock.h"
11
12 void lock_state::wait(lock & mutex_lock) {
13     auto self = std::this_thread::get_id();
14     c[self].wait(mutex_lock);
15     c.erase(self);
16 }
17
18 void lock_state::signal() {
19     // signal anyone
20     if (c.begin() != c.end())
21         c.begin()->second.notify_one();
22 }
23
24 void lock_state::signal(thread::id who) {
25     if (c.count(who))
26         c[who].notify_one();
27 }
28
29 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
30
31 unsigned int lock_client::last_port = 0;
32
33 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
34     lock sl(lock_table_lock);
35     return lock_table[lid]; // creates the lock if it doesn't already exist
36 }
37
38 lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) {
39     cl = new rpcc(xdst);
40     if (cl->bind() < 0)
41         LOG("lock_client: call bind");
42
43     srandom((uint32_t)time(NULL)^last_port);
44     rlock_port = ((random()%32000) | (0x1 << 10));
45     id = "127.0.0.1:" + std::to_string(rlock_port);
46     last_port = rlock_port;
47     rpcs *rlsrpc = new rpcs(rlock_port);
48     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
49     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
50     rsmc = new rsm_client(xdst);
51     releaser_thread = thread(&lock_client::releaser, this);
52 }
53
54 void lock_client::releaser() [[noreturn]] {
55     while (1) {
56         lock_protocol::lockid_t lid;
57         release_fifo.deq(&lid);
58         LOG("Releaser: " << lid);
59
60         lock_state &st = get_lock_state(lid);
61         lock sl(st.m);
62         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
63         st.state = lock_state::releasing;
64         {
65             sl.unlock();
66             int r;
67             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
68             if (lu)
69                 lu->dorelease(lid);
70             sl.lock();
71         }
72         st.state = lock_state::none;
73         LOG("Lock " << lid << ": none");
74         st.signal();
75     }
76 }
77
78 int lock_client::stat(lock_protocol::lockid_t lid) {
79     VERIFY(0);
80     int r;
81     auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
82     VERIFY (ret == lock_protocol::OK);
83     return r;
84 }
85
86 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
87     lock_state &st = get_lock_state(lid);
88     lock sl(st.m);
89     auto self = std::this_thread::get_id();
90
91     // check for reentrancy
92     VERIFY(st.state != lock_state::locked || st.held_by != self);
93     VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
94
95     st.wanted_by.push_back(self);
96
97     while (1) {
98         if (st.state != lock_state::free)
99             LOG("Lock " << lid << ": not free");
100
101         if (st.state == lock_state::none || st.state == lock_state::retrying) {
102             if (st.state == lock_state::none) {
103                 lock l(xid_mutex);
104                 st.xid = next_xid++;
105             }
106             st.state = lock_state::acquiring;
107             LOG("Lock " << lid << ": acquiring");
108             lock_protocol::status result;
109             {
110                 sl.unlock();
111                 int r;
112                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
113                 sl.lock();
114             }
115             LOG("acquire returned " << result);
116             if (result == lock_protocol::OK) {
117                 st.state = lock_state::free;
118                 LOG("Lock " << lid << ": free");
119             }
120         }
121
122         VERIFY(st.wanted_by.size() != 0);
123         if (st.state == lock_state::free) {
124             // is it for me?
125             auto front = st.wanted_by.front();
126             if (front == releaser_thread.get_id()) {
127                 st.wanted_by.pop_front();
128                 st.state = lock_state::locked;
129                 st.held_by = releaser_thread.get_id();
130                 LOG("Queuing " << lid << " for release");
131                 release_fifo.enq(lid);
132             } else if (front == self) {
133                 st.wanted_by.pop_front();
134                 st.state = lock_state::locked;
135                 st.held_by = self;
136                 break;
137             } else {
138                 st.signal(front);
139             }
140         }
141
142         LOG("waiting...");
143         st.wait(sl);
144         LOG("wait ended");
145     }
146
147     LOG("Lock " << lid << ": locked");
148     return lock_protocol::OK;
149 }
150
151 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
152     lock_state &st = get_lock_state(lid);
153     lock sl(st.m);
154     auto self = std::this_thread::get_id();
155     VERIFY(st.state == lock_state::locked && st.held_by == self);
156     st.state = lock_state::free;
157     LOG("Lock " << lid << ": free");
158     if (st.wanted_by.size()) {
159         auto front = st.wanted_by.front();
160         if (front == releaser_thread.get_id()) {
161             st.state = lock_state::locked;
162             st.held_by = releaser_thread.get_id();
163             st.wanted_by.pop_front();
164             LOG("Queuing " << lid << " for release");
165             release_fifo.enq(lid);
166         } else
167             st.signal(front);
168     }
169     LOG("Finished signaling.");
170     return lock_protocol::OK;
171 }
172
173 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
174     LOG("Revoke handler " << lid << " " << xid);
175     lock_state &st = get_lock_state(lid);
176     lock sl(st.m);
177
178     if (st.state == lock_state::releasing || st.state == lock_state::none)
179         return rlock_protocol::OK;
180
181     if (st.state == lock_state::free &&
182         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
183         // gimme
184         st.state = lock_state::locked;
185         st.held_by = releaser_thread.get_id();
186         if (st.wanted_by.size())
187             st.wanted_by.pop_front();
188         release_fifo.enq(lid);
189     } else {
190         // get in line
191         st.wanted_by.push_back(releaser_thread.get_id());
192     }
193     return rlock_protocol::OK;
194 }
195
196 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
197     lock_state &st = get_lock_state(lid);
198     lock sl(st.m);
199     VERIFY(st.state == lock_state::acquiring);
200     st.state = lock_state::retrying;
201     LOG("Lock " << lid << ": none");
202     st.signal(); // only one thread needs to wake up
203     return rlock_protocol::OK;
204 }
205
206 t4_lock_client *t4_lock_client_new(const char *dst) {
207     return (t4_lock_client *)new lock_client(dst);
208 }
209
210 void t4_lock_client_delete(t4_lock_client *client) {
211     delete (lock_client *)client;
212 }
213
214 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
215     return ((lock_client *)client)->acquire(lid);
216 }
217
218 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
219     return ((lock_client *)client)->release(lid);
220 }
221
222 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
223     return ((lock_client *)client)->stat(lid);
224 }