1 // RPC stubs for clients to talk to lock_server, and cache the locks.
3 #include "lock_client.h"
6 #include "threaded_log.h"
9 #include "rsm_client.h"
12 void lock_state::wait(lock & mutex_lock) {
13 auto self = std::this_thread::get_id();
14 c[self].wait(mutex_lock);
18 void lock_state::signal() {
20 if (c.begin() != c.end())
21 c.begin()->second.notify_one();
24 void lock_state::signal(thread::id who) {
29 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
31 unsigned int lock_client::last_port = 0;
33 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
34 lock sl(lock_table_lock);
35 return lock_table[lid]; // creates the lock if it doesn't already exist
38 lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) {
41 LOG("lock_client: call bind");
43 srandom((uint32_t)time(NULL)^last_port);
44 rlock_port = ((random()%32000) | (0x1 << 10));
45 id = "127.0.0.1:" + std::to_string(rlock_port);
46 last_port = rlock_port;
47 rpcs *rlsrpc = new rpcs(rlock_port);
48 rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
49 rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
50 rsmc = new rsm_client(xdst);
51 releaser_thread = thread(&lock_client::releaser, this);
54 void lock_client::releaser() [[noreturn]] {
56 lock_protocol::lockid_t lid;
57 release_fifo.deq(&lid);
58 LOG("Releaser: " << lid);
60 lock_state &st = get_lock_state(lid);
62 VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
63 st.state = lock_state::releasing;
67 rsmc->call(lock_protocol::release, r, lid, id, st.xid);
72 st.state = lock_state::none;
73 LOG("Lock " << lid << ": none");
78 int lock_client::stat(lock_protocol::lockid_t lid) {
81 auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
82 VERIFY (ret == lock_protocol::OK);
86 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
87 lock_state &st = get_lock_state(lid);
89 auto self = std::this_thread::get_id();
91 // check for reentrancy
92 VERIFY(st.state != lock_state::locked || st.held_by != self);
93 VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
95 st.wanted_by.push_back(self);
98 if (st.state != lock_state::free)
99 LOG("Lock " << lid << ": not free");
101 if (st.state == lock_state::none || st.state == lock_state::retrying) {
102 if (st.state == lock_state::none) {
106 st.state = lock_state::acquiring;
107 LOG("Lock " << lid << ": acquiring");
108 lock_protocol::status result;
112 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
115 LOG("acquire returned " << result);
116 if (result == lock_protocol::OK) {
117 st.state = lock_state::free;
118 LOG("Lock " << lid << ": free");
122 VERIFY(st.wanted_by.size() != 0);
123 if (st.state == lock_state::free) {
125 auto front = st.wanted_by.front();
126 if (front == releaser_thread.get_id()) {
127 st.wanted_by.pop_front();
128 st.state = lock_state::locked;
129 st.held_by = releaser_thread.get_id();
130 LOG("Queuing " << lid << " for release");
131 release_fifo.enq(lid);
132 } else if (front == self) {
133 st.wanted_by.pop_front();
134 st.state = lock_state::locked;
147 LOG("Lock " << lid << ": locked");
148 return lock_protocol::OK;
151 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
152 lock_state &st = get_lock_state(lid);
154 auto self = std::this_thread::get_id();
155 VERIFY(st.state == lock_state::locked && st.held_by == self);
156 st.state = lock_state::free;
157 LOG("Lock " << lid << ": free");
158 if (st.wanted_by.size()) {
159 auto front = st.wanted_by.front();
160 if (front == releaser_thread.get_id()) {
161 st.state = lock_state::locked;
162 st.held_by = releaser_thread.get_id();
163 st.wanted_by.pop_front();
164 LOG("Queuing " << lid << " for release");
165 release_fifo.enq(lid);
169 LOG("Finished signaling.");
170 return lock_protocol::OK;
173 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
174 LOG("Revoke handler " << lid << " " << xid);
175 lock_state &st = get_lock_state(lid);
178 if (st.state == lock_state::releasing || st.state == lock_state::none)
179 return rlock_protocol::OK;
181 if (st.state == lock_state::free &&
182 (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
184 st.state = lock_state::locked;
185 st.held_by = releaser_thread.get_id();
186 if (st.wanted_by.size())
187 st.wanted_by.pop_front();
188 release_fifo.enq(lid);
191 st.wanted_by.push_back(releaser_thread.get_id());
193 return rlock_protocol::OK;
196 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
197 lock_state &st = get_lock_state(lid);
199 VERIFY(st.state == lock_state::acquiring);
200 st.state = lock_state::retrying;
201 LOG("Lock " << lid << ": none");
202 st.signal(); // only one thread needs to wake up
203 return rlock_protocol::OK;
206 t4_lock_client *t4_lock_client_new(const char *dst) {
207 return (t4_lock_client *)new lock_client(dst);
210 void t4_lock_client_delete(t4_lock_client *client) {
211 delete (lock_client *)client;
214 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
215 return ((lock_client *)client)->acquire(lid);
218 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
219 return ((lock_client *)client)->release(lid);
222 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
223 return ((lock_client *)client)->stat(lid);