More clean-ups
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "lock_client.h"
4 #include <arpa/inet.h>
5
6 void lock_state::wait(lock & mutex_lock) {
7     auto self = std::this_thread::get_id();
8     c[self].wait(mutex_lock);
9     c.erase(self);
10 }
11
12 void lock_state::signal() {
13     // signal anyone
14     if (c.begin() != c.end())
15         c.begin()->second.notify_one();
16 }
17
18 void lock_state::signal(thread::id who) {
19     if (c.count(who))
20         c[who].notify_one();
21 }
22
23 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
24     lock sl(lock_table_lock);
25     return lock_table[lid]; // creates the lock if it doesn't already exist
26 }
27
28 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
29     rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
30     id = "127.0.0.1:" + std::to_string(rlock_port);
31     rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
32     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
33     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
34     rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
35     releaser_thread = thread(&lock_client::releaser, this);
36     rlsrpc->start();
37 }
38
39 lock_client::~lock_client() {
40     release_fifo.enq(nothing<lock_protocol::lockid_t>());
41     releaser_thread.join();
42 }
43
44 void lock_client::releaser() {
45     while (auto mlid = release_fifo.deq()) {
46         lock_protocol::lockid_t lid = mlid;
47         LOG << "Releaser: " << lid;
48
49         lock_state & st = get_lock_state(lid);
50         lock sl(st.m);
51         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
52         st.state = lock_state::releasing;
53         {
54             sl.unlock();
55             int r;
56             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
57             if (lu)
58                 lu->dorelease(lid);
59             sl.lock();
60         }
61         st.state = lock_state::none;
62         LOG << "Lock " << lid << ": none";
63         st.signal();
64     }
65     LOG << "Releaser stopping";
66 }
67
68 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
69     lock_state & st = get_lock_state(lid);
70     lock sl(st.m);
71     auto self = std::this_thread::get_id();
72
73     // check for reentrancy
74     VERIFY(st.state != lock_state::locked || st.held_by != self);
75     VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
76             == st.wanted_by.end());
77
78     st.wanted_by.push_back(self);
79
80     while (1) {
81         if (st.state != lock_state::free)
82             LOG << "Lock " << lid << ": not free";
83
84         if (st.state == lock_state::none || st.state == lock_state::retrying) {
85             if (st.state == lock_state::none) {
86                 lock l(xid_mutex);
87                 st.xid = next_xid++;
88             }
89             st.state = lock_state::acquiring;
90             LOG << "Lock " << lid << ": acquiring";
91             lock_protocol::status result;
92             {
93                 sl.unlock();
94                 int r;
95                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
96                 sl.lock();
97             }
98             LOG << "acquire returned " << result;
99             if (result == lock_protocol::OK) {
100                 st.state = lock_state::free;
101                 LOG << "Lock " << lid << ": free";
102             }
103         }
104
105         VERIFY(st.wanted_by.size() != 0);
106         if (st.state == lock_state::free) {
107             // is it for me?
108             auto front = st.wanted_by.front();
109             if (front == releaser_thread.get_id()) {
110                 st.wanted_by.pop_front();
111                 st.state = lock_state::locked;
112                 st.held_by = releaser_thread.get_id();
113                 LOG << "Queuing " << lid << " for release";
114                 release_fifo.enq(just(lid));
115             } else if (front == self) {
116                 st.wanted_by.pop_front();
117                 st.state = lock_state::locked;
118                 st.held_by = self;
119                 break;
120             } else {
121                 st.signal(front);
122             }
123         }
124
125         LOG << "waiting...";
126         st.wait(sl);
127         LOG << "wait ended";
128     }
129
130     LOG << "Lock " << lid << ": locked";
131     return lock_protocol::OK;
132 }
133
134 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
135     lock_state & st = get_lock_state(lid);
136     lock sl(st.m);
137     auto self = std::this_thread::get_id();
138     VERIFY(st.state == lock_state::locked && st.held_by == self);
139     st.state = lock_state::free;
140     LOG << "Lock " << lid << ": free";
141     if (st.wanted_by.size()) {
142         auto front = st.wanted_by.front();
143         if (front == releaser_thread.get_id()) {
144             st.state = lock_state::locked;
145             st.held_by = releaser_thread.get_id();
146             st.wanted_by.pop_front();
147             LOG << "Queuing " << lid << " for release";
148             release_fifo.enq(just(lid));
149         } else
150             st.signal(front);
151     }
152     LOG << "Finished signaling.";
153     return lock_protocol::OK;
154 }
155
156 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
157     LOG << "Revoke handler " << lid << " " << xid;
158     lock_state & st = get_lock_state(lid);
159     lock sl(st.m);
160
161     if (st.state == lock_state::releasing || st.state == lock_state::none)
162         return rlock_protocol::OK;
163
164     if (st.state == lock_state::free &&
165         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
166         // gimme
167         st.state = lock_state::locked;
168         st.held_by = releaser_thread.get_id();
169         if (st.wanted_by.size())
170             st.wanted_by.pop_front();
171         release_fifo.enq(just(lid));
172     } else {
173         // get in line
174         st.wanted_by.push_back(releaser_thread.get_id());
175     }
176     return rlock_protocol::OK;
177 }
178
179 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
180     lock_state & st = get_lock_state(lid);
181     lock sl(st.m);
182     VERIFY(st.state == lock_state::acquiring);
183     st.state = lock_state::retrying;
184     LOG << "Lock " << lid << ": none";
185     st.signal(); // only one thread needs to wake up
186     return rlock_protocol::OK;
187 }
188
189 t4_lock_client *t4_lock_client_new(const char *dst) {
190     return (t4_lock_client *)new lock_client(dst);
191 }
192
193 void t4_lock_client_delete(t4_lock_client *client) {
194     delete (lock_client *)client;
195 }
196
197 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
198     return ((lock_client *)client)->acquire(lid);
199 }
200
201 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
202     return ((lock_client *)client)->release(lid);
203 }