All sleep calls via std::this_thread
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "lock_client.h"
4 #include <arpa/inet.h>
5
6 void lock_state::wait(lock & mutex_lock) {
7     auto self = std::this_thread::get_id();
8     c[self].wait(mutex_lock);
9     c.erase(self);
10 }
11
12 void lock_state::signal() {
13     // signal anyone
14     if (c.begin() != c.end())
15         c.begin()->second.notify_one();
16 }
17
18 void lock_state::signal(thread::id who) {
19     if (c.count(who))
20         c[who].notify_one();
21 }
22
23 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
24     lock sl(lock_table_lock);
25     return lock_table[lid]; // creates the lock if it doesn't already exist
26 }
27
28 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
29     cl = unique_ptr<rpcc>(new rpcc(xdst));
30     if (cl->bind() < 0)
31         LOG << "lock_client: call bind";
32
33     rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
34     id = "127.0.0.1:" + std::to_string(rlock_port);
35     rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
36     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
37     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
38     rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
39     releaser_thread = thread(&lock_client::releaser, this);
40     rlsrpc->start();
41 }
42
43 lock_client::~lock_client() {
44     release_fifo.enq(nothing<lock_protocol::lockid_t>());
45     releaser_thread.join();
46 }
47
48 void lock_client::releaser() {
49     while (1) {
50         maybe<lock_protocol::lockid_t> mlid;
51         release_fifo.deq(&mlid);
52
53         if (!mlid) {
54             LOG << "Releaser stopping";
55             break;
56         }
57
58         lock_protocol::lockid_t lid = mlid;
59         LOG << "Releaser: " << lid;
60
61         lock_state & st = get_lock_state(lid);
62         lock sl(st.m);
63         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
64         st.state = lock_state::releasing;
65         {
66             sl.unlock();
67             int r;
68             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
69             if (lu)
70                 lu->dorelease(lid);
71             sl.lock();
72         }
73         st.state = lock_state::none;
74         LOG << "Lock " << lid << ": none";
75         st.signal();
76     }
77 }
78
79 int lock_client::stat(lock_protocol::lockid_t lid) {
80     VERIFY(0);
81     int r;
82     auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
83     VERIFY (ret == lock_protocol::OK);
84     return r;
85 }
86
87 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
88     lock_state & st = get_lock_state(lid);
89     lock sl(st.m);
90     auto self = std::this_thread::get_id();
91
92     // check for reentrancy
93     VERIFY(st.state != lock_state::locked || st.held_by != self);
94     VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
95             == st.wanted_by.end());
96
97     st.wanted_by.push_back(self);
98
99     while (1) {
100         if (st.state != lock_state::free)
101             LOG << "Lock " << lid << ": not free";
102
103         if (st.state == lock_state::none || st.state == lock_state::retrying) {
104             if (st.state == lock_state::none) {
105                 lock l(xid_mutex);
106                 st.xid = next_xid++;
107             }
108             st.state = lock_state::acquiring;
109             LOG << "Lock " << lid << ": acquiring";
110             lock_protocol::status result;
111             {
112                 sl.unlock();
113                 int r;
114                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
115                 sl.lock();
116             }
117             LOG << "acquire returned " << result;
118             if (result == lock_protocol::OK) {
119                 st.state = lock_state::free;
120                 LOG << "Lock " << lid << ": free";
121             }
122         }
123
124         VERIFY(st.wanted_by.size() != 0);
125         if (st.state == lock_state::free) {
126             // is it for me?
127             auto front = st.wanted_by.front();
128             if (front == releaser_thread.get_id()) {
129                 st.wanted_by.pop_front();
130                 st.state = lock_state::locked;
131                 st.held_by = releaser_thread.get_id();
132                 LOG << "Queuing " << lid << " for release";
133                 release_fifo.enq(just(lid));
134             } else if (front == self) {
135                 st.wanted_by.pop_front();
136                 st.state = lock_state::locked;
137                 st.held_by = self;
138                 break;
139             } else {
140                 st.signal(front);
141             }
142         }
143
144         LOG << "waiting...";
145         st.wait(sl);
146         LOG << "wait ended";
147     }
148
149     LOG << "Lock " << lid << ": locked";
150     return lock_protocol::OK;
151 }
152
153 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
154     lock_state & st = get_lock_state(lid);
155     lock sl(st.m);
156     auto self = std::this_thread::get_id();
157     VERIFY(st.state == lock_state::locked && st.held_by == self);
158     st.state = lock_state::free;
159     LOG << "Lock " << lid << ": free";
160     if (st.wanted_by.size()) {
161         auto front = st.wanted_by.front();
162         if (front == releaser_thread.get_id()) {
163             st.state = lock_state::locked;
164             st.held_by = releaser_thread.get_id();
165             st.wanted_by.pop_front();
166             LOG << "Queuing " << lid << " for release";
167             release_fifo.enq(just(lid));
168         } else
169             st.signal(front);
170     }
171     LOG << "Finished signaling.";
172     return lock_protocol::OK;
173 }
174
175 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
176     LOG << "Revoke handler " << lid << " " << xid;
177     lock_state & st = get_lock_state(lid);
178     lock sl(st.m);
179
180     if (st.state == lock_state::releasing || st.state == lock_state::none)
181         return rlock_protocol::OK;
182
183     if (st.state == lock_state::free &&
184         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
185         // gimme
186         st.state = lock_state::locked;
187         st.held_by = releaser_thread.get_id();
188         if (st.wanted_by.size())
189             st.wanted_by.pop_front();
190         release_fifo.enq(just(lid));
191     } else {
192         // get in line
193         st.wanted_by.push_back(releaser_thread.get_id());
194     }
195     return rlock_protocol::OK;
196 }
197
198 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
199     lock_state & st = get_lock_state(lid);
200     lock sl(st.m);
201     VERIFY(st.state == lock_state::acquiring);
202     st.state = lock_state::retrying;
203     LOG << "Lock " << lid << ": none";
204     st.signal(); // only one thread needs to wake up
205     return rlock_protocol::OK;
206 }
207
208 t4_lock_client *t4_lock_client_new(const char *dst) {
209     return (t4_lock_client *)new lock_client(dst);
210 }
211
212 void t4_lock_client_delete(t4_lock_client *client) {
213     delete (lock_client *)client;
214 }
215
216 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
217     return ((lock_client *)client)->acquire(lid);
218 }
219
220 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
221     return ((lock_client *)client)->release(lid);
222 }
223
224 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
225     return ((lock_client *)client)->stat(lid);
226 }