Global destructor clean-ups and python test fixes
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "lock_client.h"
4 #include <arpa/inet.h>
5
6 void lock_state::wait(lock & mutex_lock) {
7     auto self = std::this_thread::get_id();
8     c[self].wait(mutex_lock);
9     c.erase(self);
10 }
11
12 void lock_state::signal() {
13     // signal anyone
14     if (c.begin() != c.end())
15         c.begin()->second.notify_one();
16 }
17
18 void lock_state::signal(thread::id who) {
19     if (c.count(who))
20         c[who].notify_one();
21 }
22
23 in_port_t lock_client::last_port = 0;
24
25 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
26     lock sl(lock_table_lock);
27     return lock_table[lid]; // creates the lock if it doesn't already exist
28 }
29
30 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
31     cl = unique_ptr<rpcc>(new rpcc(xdst));
32     if (cl->bind() < 0)
33         LOG << "lock_client: call bind";
34
35     srandom((uint32_t)time(NULL)^last_port);
36     rlock_port = ((random()%32000) | (0x1 << 10));
37     id = "127.0.0.1:" + std::to_string(rlock_port);
38     last_port = rlock_port;
39     rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
40     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
41     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
42     rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
43     releaser_thread = thread(&lock_client::releaser, this);
44     rlsrpc->start();
45 }
46
47 lock_client::~lock_client() {
48     release_fifo.enq(nothing<lock_protocol::lockid_t>());
49     releaser_thread.join();
50 }
51
52 void lock_client::releaser() {
53     while (1) {
54         maybe<lock_protocol::lockid_t> mlid;
55         release_fifo.deq(&mlid);
56
57         if (!mlid) {
58             LOG << "Releaser stopping";
59             break;
60         }
61
62         lock_protocol::lockid_t lid = mlid;
63         LOG << "Releaser: " << lid;
64
65         lock_state & st = get_lock_state(lid);
66         lock sl(st.m);
67         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
68         st.state = lock_state::releasing;
69         {
70             sl.unlock();
71             int r;
72             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
73             if (lu)
74                 lu->dorelease(lid);
75             sl.lock();
76         }
77         st.state = lock_state::none;
78         LOG << "Lock " << lid << ": none";
79         st.signal();
80     }
81 }
82
83 int lock_client::stat(lock_protocol::lockid_t lid) {
84     VERIFY(0);
85     int r;
86     auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
87     VERIFY (ret == lock_protocol::OK);
88     return r;
89 }
90
91 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
92     lock_state & st = get_lock_state(lid);
93     lock sl(st.m);
94     auto self = std::this_thread::get_id();
95
96     // check for reentrancy
97     VERIFY(st.state != lock_state::locked || st.held_by != self);
98     VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
99             == st.wanted_by.end());
100
101     st.wanted_by.push_back(self);
102
103     while (1) {
104         if (st.state != lock_state::free)
105             LOG << "Lock " << lid << ": not free";
106
107         if (st.state == lock_state::none || st.state == lock_state::retrying) {
108             if (st.state == lock_state::none) {
109                 lock l(xid_mutex);
110                 st.xid = next_xid++;
111             }
112             st.state = lock_state::acquiring;
113             LOG << "Lock " << lid << ": acquiring";
114             lock_protocol::status result;
115             {
116                 sl.unlock();
117                 int r;
118                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
119                 sl.lock();
120             }
121             LOG << "acquire returned " << result;
122             if (result == lock_protocol::OK) {
123                 st.state = lock_state::free;
124                 LOG << "Lock " << lid << ": free";
125             }
126         }
127
128         VERIFY(st.wanted_by.size() != 0);
129         if (st.state == lock_state::free) {
130             // is it for me?
131             auto front = st.wanted_by.front();
132             if (front == releaser_thread.get_id()) {
133                 st.wanted_by.pop_front();
134                 st.state = lock_state::locked;
135                 st.held_by = releaser_thread.get_id();
136                 LOG << "Queuing " << lid << " for release";
137                 release_fifo.enq(just(lid));
138             } else if (front == self) {
139                 st.wanted_by.pop_front();
140                 st.state = lock_state::locked;
141                 st.held_by = self;
142                 break;
143             } else {
144                 st.signal(front);
145             }
146         }
147
148         LOG << "waiting...";
149         st.wait(sl);
150         LOG << "wait ended";
151     }
152
153     LOG << "Lock " << lid << ": locked";
154     return lock_protocol::OK;
155 }
156
157 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
158     lock_state & st = get_lock_state(lid);
159     lock sl(st.m);
160     auto self = std::this_thread::get_id();
161     VERIFY(st.state == lock_state::locked && st.held_by == self);
162     st.state = lock_state::free;
163     LOG << "Lock " << lid << ": free";
164     if (st.wanted_by.size()) {
165         auto front = st.wanted_by.front();
166         if (front == releaser_thread.get_id()) {
167             st.state = lock_state::locked;
168             st.held_by = releaser_thread.get_id();
169             st.wanted_by.pop_front();
170             LOG << "Queuing " << lid << " for release";
171             release_fifo.enq(just(lid));
172         } else
173             st.signal(front);
174     }
175     LOG << "Finished signaling.";
176     return lock_protocol::OK;
177 }
178
179 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
180     LOG << "Revoke handler " << lid << " " << xid;
181     lock_state & st = get_lock_state(lid);
182     lock sl(st.m);
183
184     if (st.state == lock_state::releasing || st.state == lock_state::none)
185         return rlock_protocol::OK;
186
187     if (st.state == lock_state::free &&
188         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
189         // gimme
190         st.state = lock_state::locked;
191         st.held_by = releaser_thread.get_id();
192         if (st.wanted_by.size())
193             st.wanted_by.pop_front();
194         release_fifo.enq(just(lid));
195     } else {
196         // get in line
197         st.wanted_by.push_back(releaser_thread.get_id());
198     }
199     return rlock_protocol::OK;
200 }
201
202 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
203     lock_state & st = get_lock_state(lid);
204     lock sl(st.m);
205     VERIFY(st.state == lock_state::acquiring);
206     st.state = lock_state::retrying;
207     LOG << "Lock " << lid << ": none";
208     st.signal(); // only one thread needs to wake up
209     return rlock_protocol::OK;
210 }
211
212 t4_lock_client *t4_lock_client_new(const char *dst) {
213     return (t4_lock_client *)new lock_client(dst);
214 }
215
216 void t4_lock_client_delete(t4_lock_client *client) {
217     delete (lock_client *)client;
218 }
219
220 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
221     return ((lock_client *)client)->acquire(lid);
222 }
223
224 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
225     return ((lock_client *)client)->release(lid);
226 }
227
228 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
229     return ((lock_client *)client)->stat(lid);
230 }