1 // RPC stubs for clients to talk to lock_server, and cache the locks.
3 #include "lock_client.h"
6 void lock_state::wait(lock & mutex_lock) {
7 auto self = this_thread::get_id();
8 c[self].wait(mutex_lock);
12 void lock_state::signal() {
14 if (c.begin() != c.end())
15 c.begin()->second.notify_one();
18 void lock_state::signal(thread::id who) {
23 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
25 in_port_t lock_client::last_port = 0;
27 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
28 lock sl(lock_table_lock);
29 return lock_table[lid]; // creates the lock if it doesn't already exist
32 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
33 cl = unique_ptr<rpcc>(new rpcc(xdst));
35 LOG << "lock_client: call bind";
37 srandom((uint32_t)time(NULL)^last_port);
38 rlock_port = ((random()%32000) | (0x1 << 10));
39 id = "127.0.0.1:" + to_string(rlock_port);
40 last_port = rlock_port;
41 rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
42 rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
43 rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
44 rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
45 releaser_thread = thread(&lock_client::releaser, this);
49 void lock_client::releaser() {
51 lock_protocol::lockid_t lid;
52 release_fifo.deq(&lid);
53 LOG << "Releaser: " << lid;
55 lock_state & st = get_lock_state(lid);
57 VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
58 st.state = lock_state::releasing;
62 rsmc->call(lock_protocol::release, r, lid, id, st.xid);
67 st.state = lock_state::none;
68 LOG << "Lock " << lid << ": none";
73 int lock_client::stat(lock_protocol::lockid_t lid) {
76 auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
77 VERIFY (ret == lock_protocol::OK);
81 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
82 lock_state & st = get_lock_state(lid);
84 auto self = this_thread::get_id();
86 // check for reentrancy
87 VERIFY(st.state != lock_state::locked || st.held_by != self);
88 VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
89 == st.wanted_by.end());
91 st.wanted_by.push_back(self);
94 if (st.state != lock_state::free)
95 LOG << "Lock " << lid << ": not free";
97 if (st.state == lock_state::none || st.state == lock_state::retrying) {
98 if (st.state == lock_state::none) {
102 st.state = lock_state::acquiring;
103 LOG << "Lock " << lid << ": acquiring";
104 lock_protocol::status result;
108 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
111 LOG << "acquire returned " << result;
112 if (result == lock_protocol::OK) {
113 st.state = lock_state::free;
114 LOG << "Lock " << lid << ": free";
118 VERIFY(st.wanted_by.size() != 0);
119 if (st.state == lock_state::free) {
121 auto front = st.wanted_by.front();
122 if (front == releaser_thread.get_id()) {
123 st.wanted_by.pop_front();
124 st.state = lock_state::locked;
125 st.held_by = releaser_thread.get_id();
126 LOG << "Queuing " << lid << " for release";
127 release_fifo.enq(lid);
128 } else if (front == self) {
129 st.wanted_by.pop_front();
130 st.state = lock_state::locked;
143 LOG << "Lock " << lid << ": locked";
144 return lock_protocol::OK;
147 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
148 lock_state & st = get_lock_state(lid);
150 auto self = this_thread::get_id();
151 VERIFY(st.state == lock_state::locked && st.held_by == self);
152 st.state = lock_state::free;
153 LOG << "Lock " << lid << ": free";
154 if (st.wanted_by.size()) {
155 auto front = st.wanted_by.front();
156 if (front == releaser_thread.get_id()) {
157 st.state = lock_state::locked;
158 st.held_by = releaser_thread.get_id();
159 st.wanted_by.pop_front();
160 LOG << "Queuing " << lid << " for release";
161 release_fifo.enq(lid);
165 LOG << "Finished signaling.";
166 return lock_protocol::OK;
169 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
170 LOG << "Revoke handler " << lid << " " << xid;
171 lock_state & st = get_lock_state(lid);
174 if (st.state == lock_state::releasing || st.state == lock_state::none)
175 return rlock_protocol::OK;
177 if (st.state == lock_state::free &&
178 (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
180 st.state = lock_state::locked;
181 st.held_by = releaser_thread.get_id();
182 if (st.wanted_by.size())
183 st.wanted_by.pop_front();
184 release_fifo.enq(lid);
187 st.wanted_by.push_back(releaser_thread.get_id());
189 return rlock_protocol::OK;
192 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
193 lock_state & st = get_lock_state(lid);
195 VERIFY(st.state == lock_state::acquiring);
196 st.state = lock_state::retrying;
197 LOG << "Lock " << lid << ": none";
198 st.signal(); // only one thread needs to wake up
199 return rlock_protocol::OK;
202 t4_lock_client *t4_lock_client_new(const char *dst) {
203 return (t4_lock_client *)new lock_client(dst);
206 void t4_lock_client_delete(t4_lock_client *client) {
207 delete (lock_client *)client;
210 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
211 return ((lock_client *)client)->acquire(lid);
214 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
215 return ((lock_client *)client)->release(lid);
218 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
219 return ((lock_client *)client)->stat(lid);