1 // RPC stubs for clients to talk to lock_server, and cache the locks.
3 #include "include/lock_client.h"
6 void lock_client::lock_state::wait(lock & mutex_lock) {
7 auto self = std::this_thread::get_id();
8 c[self].wait(mutex_lock);
12 void lock_client::lock_state::signal() {
14 if (c.begin() != c.end())
15 c.begin()->second.notify_one();
18 void lock_client::lock_state::signal(thread::id who) {
23 lock_client::lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
24 lock sl(lock_table_lock);
25 return lock_table[lid]; // creates the lock if it doesn't already exist
28 lock_client::lock_client(string xdst) {
29 rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
30 id = "127.0.0.1:" + std::to_string(rlock_port);
31 rlsrpc = std::make_unique<rpcs>(rlock_port);
32 rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
33 rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
34 rsmc = std::make_unique<rsm_client>(xdst);
35 releaser_thread = thread(&lock_client::releaser, this);
39 lock_client::~lock_client() {
40 release_fifo.enq(nothing<lock_protocol::lockid_t>());
41 releaser_thread.join();
44 void lock_client::releaser() {
45 while (auto mlid = release_fifo.deq()) {
46 lock_protocol::lockid_t lid = mlid;
47 LOG << "Releaser: " << lid;
49 lock_state & st = get_lock_state(lid);
51 VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
52 st.state = lock_state::releasing;
56 rsmc->call(lock_protocol::release, r, lid, id, st.xid);
59 st.state = lock_state::none;
60 LOG << "Lock " << lid << ": none";
63 LOG << "Releaser stopping";
66 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
67 lock_state & st = get_lock_state(lid);
69 auto self = std::this_thread::get_id();
71 // check for reentrancy
72 VERIFY(st.state != lock_state::locked || st.held_by != self);
73 VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
74 == st.wanted_by.end());
76 st.wanted_by.push_back(self);
79 if (st.state != lock_state::free)
80 LOG << "Lock " << lid << ": not free";
82 if (st.state == lock_state::none || st.state == lock_state::retrying) {
83 if (st.state == lock_state::none) {
87 st.state = lock_state::acquiring;
88 LOG << "Lock " << lid << ": acquiring";
89 lock_protocol::status result;
93 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
96 LOG << "acquire returned " << result;
97 if (result == lock_protocol::OK) {
98 st.state = lock_state::free;
99 LOG << "Lock " << lid << ": free";
103 VERIFY(st.wanted_by.size() != 0);
104 if (st.state == lock_state::free) {
106 auto front = st.wanted_by.front();
107 if (front == releaser_thread.get_id()) {
108 st.wanted_by.pop_front();
109 st.state = lock_state::locked;
110 st.held_by = releaser_thread.get_id();
111 LOG << "Queuing " << lid << " for release";
112 release_fifo.enq(just(lid));
113 } else if (front == self) {
114 st.wanted_by.pop_front();
115 st.state = lock_state::locked;
128 LOG << "Lock " << lid << ": locked";
129 return lock_protocol::OK;
132 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
133 lock_state & st = get_lock_state(lid);
135 auto self = std::this_thread::get_id();
136 VERIFY(st.state == lock_state::locked && st.held_by == self);
137 st.state = lock_state::free;
138 LOG << "Lock " << lid << ": free";
139 if (st.wanted_by.size()) {
140 auto front = st.wanted_by.front();
141 if (front == releaser_thread.get_id()) {
142 st.state = lock_state::locked;
143 st.held_by = releaser_thread.get_id();
144 st.wanted_by.pop_front();
145 LOG << "Queuing " << lid << " for release";
146 release_fifo.enq(just(lid));
150 LOG << "Finished signaling.";
151 return lock_protocol::OK;
154 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
155 LOG << "Revoke handler " << lid << " " << xid;
156 lock_state & st = get_lock_state(lid);
159 if (st.state == lock_state::releasing || st.state == lock_state::none)
160 return rlock_protocol::OK;
162 if (st.state == lock_state::free &&
163 (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
165 st.state = lock_state::locked;
166 st.held_by = releaser_thread.get_id();
167 if (st.wanted_by.size())
168 st.wanted_by.pop_front();
169 release_fifo.enq(just(lid));
172 st.wanted_by.push_back(releaser_thread.get_id());
174 return rlock_protocol::OK;
177 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
178 lock_state & st = get_lock_state(lid);
180 VERIFY(st.state == lock_state::acquiring);
181 st.state = lock_state::retrying;
182 LOG << "Lock " << lid << ": none";
183 st.signal(); // only one thread needs to wake up
184 return rlock_protocol::OK;
187 t4_lock_client *t4_lock_client_new(const char *dst) {
188 return (t4_lock_client *)new lock_client(dst);
191 void t4_lock_client_delete(t4_lock_client *client) {
192 delete (lock_client *)client;
195 t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
196 return ((lock_client *)client)->acquire(lid);
199 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
200 return ((lock_client *)client)->release(lid);