1 // RPC stubs for clients to talk to lock_server, and cache the locks.
3 #include "lock_client.h"
6 void lock_state::wait(lock & mutex_lock) {
7 auto self = std::this_thread::get_id();
8 c[self].wait(mutex_lock);
12 void lock_state::signal() {
14 if (c.begin() != c.end())
15 c.begin()->second.notify_one();
18 void lock_state::signal(thread::id who) {
23 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
24 lock sl(lock_table_lock);
25 return lock_table[lid]; // creates the lock if it doesn't already exist
28 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
29 cl = unique_ptr<rpcc>(new rpcc(xdst));
31 LOG << "lock_client: call bind";
33 rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
34 id = "127.0.0.1:" + std::to_string(rlock_port);
35 rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
36 rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
37 rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
38 rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
39 releaser_thread = thread(&lock_client::releaser, this);
43 lock_client::~lock_client() {
44 release_fifo.enq(nothing<lock_protocol::lockid_t>());
45 releaser_thread.join();
48 void lock_client::releaser() {
50 maybe<lock_protocol::lockid_t> mlid;
51 release_fifo.deq(&mlid);
54 LOG << "Releaser stopping";
58 lock_protocol::lockid_t lid = mlid;
59 LOG << "Releaser: " << lid;
61 lock_state & st = get_lock_state(lid);
63 VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
64 st.state = lock_state::releasing;
68 rsmc->call(lock_protocol::release, r, lid, id, st.xid);
73 st.state = lock_state::none;
74 LOG << "Lock " << lid << ": none";
79 int lock_client::stat(lock_protocol::lockid_t lid) {
82 auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
83 VERIFY (ret == lock_protocol::OK);
87 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
88 lock_state & st = get_lock_state(lid);
90 auto self = std::this_thread::get_id();
92 // check for reentrancy
93 VERIFY(st.state != lock_state::locked || st.held_by != self);
94 VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
95 == st.wanted_by.end());
97 st.wanted_by.push_back(self);
100 if (st.state != lock_state::free)
101 LOG << "Lock " << lid << ": not free";
103 if (st.state == lock_state::none || st.state == lock_state::retrying) {
104 if (st.state == lock_state::none) {
108 st.state = lock_state::acquiring;
109 LOG << "Lock " << lid << ": acquiring";
110 lock_protocol::status result;
114 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
117 LOG << "acquire returned " << result;
118 if (result == lock_protocol::OK) {
119 st.state = lock_state::free;
120 LOG << "Lock " << lid << ": free";
124 VERIFY(st.wanted_by.size() != 0);
125 if (st.state == lock_state::free) {
127 auto front = st.wanted_by.front();
128 if (front == releaser_thread.get_id()) {
129 st.wanted_by.pop_front();
130 st.state = lock_state::locked;
131 st.held_by = releaser_thread.get_id();
132 LOG << "Queuing " << lid << " for release";
133 release_fifo.enq(just(lid));
134 } else if (front == self) {
135 st.wanted_by.pop_front();
136 st.state = lock_state::locked;
149 LOG << "Lock " << lid << ": locked";
150 return lock_protocol::OK;
153 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
154 lock_state & st = get_lock_state(lid);
156 auto self = std::this_thread::get_id();
157 VERIFY(st.state == lock_state::locked && st.held_by == self);
158 st.state = lock_state::free;
159 LOG << "Lock " << lid << ": free";
160 if (st.wanted_by.size()) {
161 auto front = st.wanted_by.front();
162 if (front == releaser_thread.get_id()) {
163 st.state = lock_state::locked;
164 st.held_by = releaser_thread.get_id();
165 st.wanted_by.pop_front();
166 LOG << "Queuing " << lid << " for release";
167 release_fifo.enq(just(lid));
171 LOG << "Finished signaling.";
172 return lock_protocol::OK;
175 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
176 LOG << "Revoke handler " << lid << " " << xid;
177 lock_state & st = get_lock_state(lid);
180 if (st.state == lock_state::releasing || st.state == lock_state::none)
181 return rlock_protocol::OK;
183 if (st.state == lock_state::free &&
184 (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
186 st.state = lock_state::locked;
187 st.held_by = releaser_thread.get_id();
188 if (st.wanted_by.size())
189 st.wanted_by.pop_front();
190 release_fifo.enq(just(lid));
193 st.wanted_by.push_back(releaser_thread.get_id());
195 return rlock_protocol::OK;
198 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
199 lock_state & st = get_lock_state(lid);
201 VERIFY(st.state == lock_state::acquiring);
202 st.state = lock_state::retrying;
203 LOG << "Lock " << lid << ": none";
204 st.signal(); // only one thread needs to wake up
205 return rlock_protocol::OK;
208 t4_lock_client *t4_lock_client_new(const char *dst) {
209 return (t4_lock_client *)new lock_client(dst);
212 void t4_lock_client_delete(t4_lock_client *client) {
213 delete (lock_client *)client;
216 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
217 return ((lock_client *)client)->acquire(lid);
220 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
221 return ((lock_client *)client)->release(lid);
224 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
225 return ((lock_client *)client)->stat(lid);