1 // RPC stubs for clients to talk to lock_server, and cache the locks
2 // see lock_client.cache.h for protocol details.
4 #include "lock_client_cache_rsm.h"
11 #include "rsm_client.h"
13 lock_state::lock_state():
18 void lock_state::wait() {
19 pthread_t self = pthread_self();
24 void lock_state::signal() {
26 if (c.begin() != c.end())
27 c.begin()->second.signal();
30 void lock_state::signal(pthread_t who) {
35 static void * releasethread(void *x) {
36 lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
41 int lock_client_cache_rsm::last_port = 0;
43 lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
44 ScopedLock sl(lock_table_lock);
45 // by the semantics of std::map, this will create
46 // the lock if it doesn't already exist
47 return lock_table[lid];
50 lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
51 srand(time(NULL)^last_port);
52 rlock_port = ((rand()%32000) | (0x1 << 10));
54 // VERIFY(gethostname(hname, 100) == 0);
57 host << hname << ":" << rlock_port;
59 last_port = rlock_port;
60 rpcs *rlsrpc = new rpcs(rlock_port);
61 rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
62 rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
64 ScopedLock sl(xid_mutex);
67 rsmc = new rsm_client(xdst);
68 int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
72 void lock_client_cache_rsm::releaser() {
74 lock_protocol::lockid_t lid;
75 release_fifo.deq(&lid);
76 LOG("Releaser: " << lid);
78 lock_state &st = get_lock_state(lid);
80 VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
81 st.state = lock_state::releasing;
83 ScopedUnlock su(st.m);
85 rsmc->call(lock_protocol::release, lid, id, st.xid, r);
87 st.state = lock_state::none;
88 LOG("Lock " << lid << ": none");
93 lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
94 lock_state &st = get_lock_state(lid);
96 pthread_t self = pthread_self();
98 // check for reentrancy
99 VERIFY(st.state != lock_state::locked || st.held_by != self);
100 VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
102 st.wanted_by.push_back(self);
105 if (st.state != lock_state::free)
106 LOG("Lock " << lid << ": not free");
108 if (st.state == lock_state::none || st.state == lock_state::retrying) {
109 if (st.state == lock_state::none) {
110 ScopedLock sl(xid_mutex);
113 st.state = lock_state::acquiring;
114 LOG("Lock " << lid << ": acquiring");
115 lock_protocol::status result;
117 ScopedUnlock su(st.m);
119 result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
121 LOG("acquire returned " << result);
122 if (result == lock_protocol::OK) {
123 st.state = lock_state::free;
124 LOG("Lock " << lid << ": free");
128 VERIFY(st.wanted_by.size() != 0);
129 if (st.state == lock_state::free) {
131 pthread_t front = st.wanted_by.front();
132 if (front == releaser_thread) {
133 st.wanted_by.pop_front();
134 st.state = lock_state::locked;
135 st.held_by = releaser_thread;
136 LOG("Queuing " << lid << " for release");
137 release_fifo.enq(lid);
138 } else if (front == self) {
139 st.wanted_by.pop_front();
140 st.state = lock_state::locked;
153 LOG("Lock " << lid << ": locked");
154 return lock_protocol::OK;
157 lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
158 lock_state &st = get_lock_state(lid);
160 pthread_t self = pthread_self();
161 VERIFY(st.state == lock_state::locked && st.held_by == self);
162 st.state = lock_state::free;
163 LOG("Lock " << lid << ": free");
164 if (st.wanted_by.size()) {
165 pthread_t front = st.wanted_by.front();
166 if (front == releaser_thread) {
167 st.state = lock_state::locked;
168 st.held_by = releaser_thread;
169 st.wanted_by.pop_front();
170 LOG("Queuing " << lid << " for release");
171 release_fifo.enq(lid);
175 LOG("Finished signaling.");
176 return lock_protocol::OK;
179 rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
180 LOG("Revoke handler " << lid << " " << xid);
181 lock_state &st = get_lock_state(lid);
184 if (st.state == lock_state::releasing || st.state == lock_state::none)
185 return rlock_protocol::OK;
187 if (st.state == lock_state::free &&
188 (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
190 st.state = lock_state::locked;
191 st.held_by = releaser_thread;
192 if (st.wanted_by.size())
193 st.wanted_by.pop_front();
194 release_fifo.enq(lid);
197 st.wanted_by.push_back(releaser_thread);
199 return rlock_protocol::OK;
202 rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
203 lock_state &st = get_lock_state(lid);
205 VERIFY(st.state == lock_state::acquiring);
206 st.state = lock_state::retrying;
207 LOG("Lock " << lid << ": none");
208 st.signal(); // only one thread needs to wake up
209 return rlock_protocol::OK;