1 // RPC stubs for clients to talk to lock_server, and cache the locks.
3 #include "lock_client.h"
6 void lock_state::wait(lock & mutex_lock) {
7 auto self = std::this_thread::get_id();
8 c[self].wait(mutex_lock);
12 void lock_state::signal() {
14 if (c.begin() != c.end())
15 c.begin()->second.notify_one();
18 void lock_state::signal(thread::id who) {
23 in_port_t lock_client::last_port = 0;
25 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
26 lock sl(lock_table_lock);
27 return lock_table[lid]; // creates the lock if it doesn't already exist
30 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
31 cl = unique_ptr<rpcc>(new rpcc(xdst));
33 LOG << "lock_client: call bind";
35 srandom((uint32_t)time(NULL)^last_port);
36 rlock_port = ((random()%32000) | (0x1 << 10));
37 id = "127.0.0.1:" + std::to_string(rlock_port);
38 last_port = rlock_port;
39 rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
40 rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
41 rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
42 rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
43 releaser_thread = thread(&lock_client::releaser, this);
47 void lock_client::releaser() {
49 lock_protocol::lockid_t lid;
50 release_fifo.deq(&lid);
51 LOG << "Releaser: " << lid;
53 lock_state & st = get_lock_state(lid);
55 VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
56 st.state = lock_state::releasing;
60 rsmc->call(lock_protocol::release, r, lid, id, st.xid);
65 st.state = lock_state::none;
66 LOG << "Lock " << lid << ": none";
71 int lock_client::stat(lock_protocol::lockid_t lid) {
74 auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
75 VERIFY (ret == lock_protocol::OK);
79 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
80 lock_state & st = get_lock_state(lid);
82 auto self = std::this_thread::get_id();
84 // check for reentrancy
85 VERIFY(st.state != lock_state::locked || st.held_by != self);
86 VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
87 == st.wanted_by.end());
89 st.wanted_by.push_back(self);
92 if (st.state != lock_state::free)
93 LOG << "Lock " << lid << ": not free";
95 if (st.state == lock_state::none || st.state == lock_state::retrying) {
96 if (st.state == lock_state::none) {
100 st.state = lock_state::acquiring;
101 LOG << "Lock " << lid << ": acquiring";
102 lock_protocol::status result;
106 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
109 LOG << "acquire returned " << result;
110 if (result == lock_protocol::OK) {
111 st.state = lock_state::free;
112 LOG << "Lock " << lid << ": free";
116 VERIFY(st.wanted_by.size() != 0);
117 if (st.state == lock_state::free) {
119 auto front = st.wanted_by.front();
120 if (front == releaser_thread.get_id()) {
121 st.wanted_by.pop_front();
122 st.state = lock_state::locked;
123 st.held_by = releaser_thread.get_id();
124 LOG << "Queuing " << lid << " for release";
125 release_fifo.enq(lid);
126 } else if (front == self) {
127 st.wanted_by.pop_front();
128 st.state = lock_state::locked;
141 LOG << "Lock " << lid << ": locked";
142 return lock_protocol::OK;
145 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
146 lock_state & st = get_lock_state(lid);
148 auto self = std::this_thread::get_id();
149 VERIFY(st.state == lock_state::locked && st.held_by == self);
150 st.state = lock_state::free;
151 LOG << "Lock " << lid << ": free";
152 if (st.wanted_by.size()) {
153 auto front = st.wanted_by.front();
154 if (front == releaser_thread.get_id()) {
155 st.state = lock_state::locked;
156 st.held_by = releaser_thread.get_id();
157 st.wanted_by.pop_front();
158 LOG << "Queuing " << lid << " for release";
159 release_fifo.enq(lid);
163 LOG << "Finished signaling.";
164 return lock_protocol::OK;
167 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
168 LOG << "Revoke handler " << lid << " " << xid;
169 lock_state & st = get_lock_state(lid);
172 if (st.state == lock_state::releasing || st.state == lock_state::none)
173 return rlock_protocol::OK;
175 if (st.state == lock_state::free &&
176 (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
178 st.state = lock_state::locked;
179 st.held_by = releaser_thread.get_id();
180 if (st.wanted_by.size())
181 st.wanted_by.pop_front();
182 release_fifo.enq(lid);
185 st.wanted_by.push_back(releaser_thread.get_id());
187 return rlock_protocol::OK;
190 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
191 lock_state & st = get_lock_state(lid);
193 VERIFY(st.state == lock_state::acquiring);
194 st.state = lock_state::retrying;
195 LOG << "Lock " << lid << ": none";
196 st.signal(); // only one thread needs to wake up
197 return rlock_protocol::OK;
200 t4_lock_client *t4_lock_client_new(const char *dst) {
201 return (t4_lock_client *)new lock_client(dst);
204 void t4_lock_client_delete(t4_lock_client *client) {
205 delete (lock_client *)client;
208 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
209 return ((lock_client *)client)->acquire(lid);
212 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
213 return ((lock_client *)client)->release(lid);
216 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
217 return ((lock_client *)client)->stat(lid);