Simplifications and clean-ups
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "lock_client.h"
4 #include <arpa/inet.h>
5
6 void lock_state::wait(lock & mutex_lock) {
7     auto self = this_thread::get_id();
8     c[self].wait(mutex_lock);
9     c.erase(self);
10 }
11
12 void lock_state::signal() {
13     // signal anyone
14     if (c.begin() != c.end())
15         c.begin()->second.notify_one();
16 }
17
18 void lock_state::signal(thread::id who) {
19     if (c.count(who))
20         c[who].notify_one();
21 }
22
23 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
24
25 in_port_t lock_client::last_port = 0;
26
27 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
28     lock sl(lock_table_lock);
29     return lock_table[lid]; // creates the lock if it doesn't already exist
30 }
31
32 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
33     cl = unique_ptr<rpcc>(new rpcc(xdst));
34     if (cl->bind() < 0)
35         LOG << "lock_client: call bind";
36
37     srandom((uint32_t)time(NULL)^last_port);
38     rlock_port = ((random()%32000) | (0x1 << 10));
39     id = "127.0.0.1:" + to_string(rlock_port);
40     last_port = rlock_port;
41     rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
42     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
43     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
44     rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
45     releaser_thread = thread(&lock_client::releaser, this);
46     rlsrpc->start();
47 }
48
49 void lock_client::releaser() {
50     while (1) {
51         lock_protocol::lockid_t lid;
52         release_fifo.deq(&lid);
53         LOG << "Releaser: " << lid;
54
55         lock_state & st = get_lock_state(lid);
56         lock sl(st.m);
57         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
58         st.state = lock_state::releasing;
59         {
60             sl.unlock();
61             int r;
62             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
63             if (lu)
64                 lu->dorelease(lid);
65             sl.lock();
66         }
67         st.state = lock_state::none;
68         LOG << "Lock " << lid << ": none";
69         st.signal();
70     }
71 }
72
73 int lock_client::stat(lock_protocol::lockid_t lid) {
74     VERIFY(0);
75     int r;
76     auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
77     VERIFY (ret == lock_protocol::OK);
78     return r;
79 }
80
81 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
82     lock_state & st = get_lock_state(lid);
83     lock sl(st.m);
84     auto self = this_thread::get_id();
85
86     // check for reentrancy
87     VERIFY(st.state != lock_state::locked || st.held_by != self);
88     VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
89             == st.wanted_by.end());
90
91     st.wanted_by.push_back(self);
92
93     while (1) {
94         if (st.state != lock_state::free)
95             LOG << "Lock " << lid << ": not free";
96
97         if (st.state == lock_state::none || st.state == lock_state::retrying) {
98             if (st.state == lock_state::none) {
99                 lock l(xid_mutex);
100                 st.xid = next_xid++;
101             }
102             st.state = lock_state::acquiring;
103             LOG << "Lock " << lid << ": acquiring";
104             lock_protocol::status result;
105             {
106                 sl.unlock();
107                 int r;
108                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
109                 sl.lock();
110             }
111             LOG << "acquire returned " << result;
112             if (result == lock_protocol::OK) {
113                 st.state = lock_state::free;
114                 LOG << "Lock " << lid << ": free";
115             }
116         }
117
118         VERIFY(st.wanted_by.size() != 0);
119         if (st.state == lock_state::free) {
120             // is it for me?
121             auto front = st.wanted_by.front();
122             if (front == releaser_thread.get_id()) {
123                 st.wanted_by.pop_front();
124                 st.state = lock_state::locked;
125                 st.held_by = releaser_thread.get_id();
126                 LOG << "Queuing " << lid << " for release";
127                 release_fifo.enq(lid);
128             } else if (front == self) {
129                 st.wanted_by.pop_front();
130                 st.state = lock_state::locked;
131                 st.held_by = self;
132                 break;
133             } else {
134                 st.signal(front);
135             }
136         }
137
138         LOG << "waiting...";
139         st.wait(sl);
140         LOG << "wait ended";
141     }
142
143     LOG << "Lock " << lid << ": locked";
144     return lock_protocol::OK;
145 }
146
147 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
148     lock_state & st = get_lock_state(lid);
149     lock sl(st.m);
150     auto self = this_thread::get_id();
151     VERIFY(st.state == lock_state::locked && st.held_by == self);
152     st.state = lock_state::free;
153     LOG << "Lock " << lid << ": free";
154     if (st.wanted_by.size()) {
155         auto front = st.wanted_by.front();
156         if (front == releaser_thread.get_id()) {
157             st.state = lock_state::locked;
158             st.held_by = releaser_thread.get_id();
159             st.wanted_by.pop_front();
160             LOG << "Queuing " << lid << " for release";
161             release_fifo.enq(lid);
162         } else
163             st.signal(front);
164     }
165     LOG << "Finished signaling.";
166     return lock_protocol::OK;
167 }
168
169 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
170     LOG << "Revoke handler " << lid << " " << xid;
171     lock_state & st = get_lock_state(lid);
172     lock sl(st.m);
173
174     if (st.state == lock_state::releasing || st.state == lock_state::none)
175         return rlock_protocol::OK;
176
177     if (st.state == lock_state::free &&
178         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
179         // gimme
180         st.state = lock_state::locked;
181         st.held_by = releaser_thread.get_id();
182         if (st.wanted_by.size())
183             st.wanted_by.pop_front();
184         release_fifo.enq(lid);
185     } else {
186         // get in line
187         st.wanted_by.push_back(releaser_thread.get_id());
188     }
189     return rlock_protocol::OK;
190 }
191
192 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
193     lock_state & st = get_lock_state(lid);
194     lock sl(st.m);
195     VERIFY(st.state == lock_state::acquiring);
196     st.state = lock_state::retrying;
197     LOG << "Lock " << lid << ": none";
198     st.signal(); // only one thread needs to wake up
199     return rlock_protocol::OK;
200 }
201
202 t4_lock_client *t4_lock_client_new(const char *dst) {
203     return (t4_lock_client *)new lock_client(dst);
204 }
205
206 void t4_lock_client_delete(t4_lock_client *client) {
207     delete (lock_client *)client;
208 }
209
210 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
211     return ((lock_client *)client)->acquire(lid);
212 }
213
214 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
215     return ((lock_client *)client)->release(lid);
216 }
217
218 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
219     return ((lock_client *)client)->stat(lid);
220 }