So many changes. Broken.
[invirt/third/libt4.git] / lock_client.cc
1 // RPC stubs for clients to talk to lock_server, and cache the locks.
2
3 #include "include/lock_client.h"
4 #include <arpa/inet.h>
5
6 void lock_client::lock_state::wait(lock & mutex_lock) {
7     auto self = std::this_thread::get_id();
8     c[self].wait(mutex_lock);
9     c.erase(self);
10 }
11
12 void lock_client::lock_state::signal() {
13     // signal anyone
14     if (c.begin() != c.end())
15         c.begin()->second.notify_one();
16 }
17
18 void lock_client::lock_state::signal(thread::id who) {
19     if (c.count(who))
20         c[who].notify_one();
21 }
22
23 lock_client::lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
24     lock sl(lock_table_lock);
25     return lock_table[lid]; // creates the lock if it doesn't already exist
26 }
27
28 lock_client::lock_client(string xdst) {
29     rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
30     id = "127.0.0.1:" + std::to_string(rlock_port);
31     rlsrpc = std::make_unique<rpcs>(rlock_port);
32     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
33     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
34     rsmc = std::make_unique<rsm_client>(xdst);
35     releaser_thread = thread(&lock_client::releaser, this);
36     rlsrpc->start();
37 }
38
39 lock_client::~lock_client() {
40     release_fifo.enq(nothing<lock_protocol::lockid_t>());
41     releaser_thread.join();
42 }
43
44 void lock_client::releaser() {
45     while (auto mlid = release_fifo.deq()) {
46         lock_protocol::lockid_t lid = mlid;
47         LOG << "Releaser: " << lid;
48
49         lock_state & st = get_lock_state(lid);
50         lock sl(st.m);
51         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
52         st.state = lock_state::releasing;
53         {
54             sl.unlock();
55             int r;
56             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
57             sl.lock();
58         }
59         st.state = lock_state::none;
60         LOG << "Lock " << lid << ": none";
61         st.signal();
62     }
63     LOG << "Releaser stopping";
64 }
65
66 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
67     lock_state & st = get_lock_state(lid);
68     lock sl(st.m);
69     auto self = std::this_thread::get_id();
70
71     // check for reentrancy
72     VERIFY(st.state != lock_state::locked || st.held_by != self);
73     VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
74             == st.wanted_by.end());
75
76     st.wanted_by.push_back(self);
77
78     while (1) {
79         if (st.state != lock_state::free)
80             LOG << "Lock " << lid << ": not free";
81
82         if (st.state == lock_state::none || st.state == lock_state::retrying) {
83             if (st.state == lock_state::none) {
84                 lock l(xid_mutex);
85                 st.xid = next_xid++;
86             }
87             st.state = lock_state::acquiring;
88             LOG << "Lock " << lid << ": acquiring";
89             lock_protocol::status result;
90             {
91                 sl.unlock();
92                 int r;
93                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
94                 sl.lock();
95             }
96             LOG << "acquire returned " << result;
97             if (result == lock_protocol::OK) {
98                 st.state = lock_state::free;
99                 LOG << "Lock " << lid << ": free";
100             }
101         }
102
103         VERIFY(st.wanted_by.size() != 0);
104         if (st.state == lock_state::free) {
105             // is it for me?
106             auto front = st.wanted_by.front();
107             if (front == releaser_thread.get_id()) {
108                 st.wanted_by.pop_front();
109                 st.state = lock_state::locked;
110                 st.held_by = releaser_thread.get_id();
111                 LOG << "Queuing " << lid << " for release";
112                 release_fifo.enq(just(lid));
113             } else if (front == self) {
114                 st.wanted_by.pop_front();
115                 st.state = lock_state::locked;
116                 st.held_by = self;
117                 break;
118             } else {
119                 st.signal(front);
120             }
121         }
122
123         LOG << "waiting...";
124         st.wait(sl);
125         LOG << "wait ended";
126     }
127
128     LOG << "Lock " << lid << ": locked";
129     return lock_protocol::OK;
130 }
131
132 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
133     lock_state & st = get_lock_state(lid);
134     lock sl(st.m);
135     auto self = std::this_thread::get_id();
136     VERIFY(st.state == lock_state::locked && st.held_by == self);
137     st.state = lock_state::free;
138     LOG << "Lock " << lid << ": free";
139     if (st.wanted_by.size()) {
140         auto front = st.wanted_by.front();
141         if (front == releaser_thread.get_id()) {
142             st.state = lock_state::locked;
143             st.held_by = releaser_thread.get_id();
144             st.wanted_by.pop_front();
145             LOG << "Queuing " << lid << " for release";
146             release_fifo.enq(just(lid));
147         } else
148             st.signal(front);
149     }
150     LOG << "Finished signaling.";
151     return lock_protocol::OK;
152 }
153
154 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
155     LOG << "Revoke handler " << lid << " " << xid;
156     lock_state & st = get_lock_state(lid);
157     lock sl(st.m);
158
159     if (st.state == lock_state::releasing || st.state == lock_state::none)
160         return rlock_protocol::OK;
161
162     if (st.state == lock_state::free &&
163         (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
164         // gimme
165         st.state = lock_state::locked;
166         st.held_by = releaser_thread.get_id();
167         if (st.wanted_by.size())
168             st.wanted_by.pop_front();
169         release_fifo.enq(just(lid));
170     } else {
171         // get in line
172         st.wanted_by.push_back(releaser_thread.get_id());
173     }
174     return rlock_protocol::OK;
175 }
176
177 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
178     lock_state & st = get_lock_state(lid);
179     lock sl(st.m);
180     VERIFY(st.state == lock_state::acquiring);
181     st.state = lock_state::retrying;
182     LOG << "Lock " << lid << ": none";
183     st.signal(); // only one thread needs to wake up
184     return rlock_protocol::OK;
185 }
186
187 t4_lock_client *t4_lock_client_new(const char *dst) {
188     return (t4_lock_client *)new lock_client(dst);
189 }
190
191 void t4_lock_client_delete(t4_lock_client *client) {
192     delete (lock_client *)client;
193 }
194
195 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
196     return ((lock_client *)client)->acquire(lid);
197 }
198
199 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
200     return ((lock_client *)client)->release(lid);
201 }