// the caching lock server implementation
-#include "types.h"
-#include "lock_server.h"
+#include "include/lock_server.h"
#include <unistd.h>
#include <arpa/inet.h>
-#include "handle.h"
-lock_state::lock_state():
- held(false)
-{
+lock_server::lock_state::lock_state(const lock_state & other) {
+ held = other.held;
+ held_by = other.held_by;
+ wanted_by = other.wanted_by;
+ old_requests = other.old_requests;
}
-lock_state::lock_state(const lock_state &other) {
- *this = other;
-}
-
-lock_state& lock_state::operator=(const lock_state& o) {
- held = o.held;
- held_by = o.held_by;
- wanted_by = o.wanted_by;
- old_requests = o.old_requests;
- return *this;
-}
-
-lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
// this will create the lock if it doesn't already exist
return lock_table[lid];
}
-lock_server::lock_server(rsm *r) : rsm_ (r) {
+lock_server::lock_server(rsm & r) : rsm_ (&r) {
thread(&lock_server::revoker, this).detach();
thread(&lock_server::retryer, this).detach();
- rsm_->set_state_transfer(this);
+ r.set_state_transfer(this);
+
+ r.reg(lock_protocol::acquire, &lock_server::acquire, this);
+ r.reg(lock_protocol::release, &lock_server::release, this);
}
-void lock_server::revoker() [[noreturn]] {
+void lock_server::revoker () {
while (1) {
- lock_protocol::lockid_t lid;
- revoke_fifo.deq(&lid);
- LOG("Revoking " << lid);
+ lock_protocol::lockid_t lid = revoke_fifo.deq();
+ LOG << "Revoking " << lid;
if (rsm_ && !rsm_->amiprimary())
continue;
- lock_state &st = get_lock_state(lid);
- holder_t held_by;
- {
- lock sl(st.m);
- held_by = st.held_by;
- }
+ lock_state & st = get_lock_state(lid);
+ lock sl(st.m);
+ holder_t held_by = st.held_by;
+ sl.unlock();
- rpcc *proxy = NULL;
- // try a few times?
- //int t=5;
- //while (t-- && !proxy)
- proxy = handle(held_by.first).safebind();
- if (proxy) {
+ if (auto cl = rpcc::bind_cached(held_by.first)) {
int r;
- auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
- LOG("Revoke returned " << ret);
+ auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
+ LOG << "Revoke returned " << ret;
}
}
}
-void lock_server::retryer() [[noreturn]] {
+void lock_server::retryer() {
while (1) {
- lock_protocol::lockid_t lid;
- retry_fifo.deq(&lid);
+ lock_protocol::lockid_t lid = retry_fifo.deq();
if (rsm_ && !rsm_->amiprimary())
continue;
- LOG("Sending retry for " << lid);
- lock_state &st = get_lock_state(lid);
+ LOG << "Sending retry for " << lid;
+ lock_state & st = get_lock_state(lid);
holder_t front;
{
lock sl(st.m);
front = st.wanted_by.front();
}
- rpcc *proxy = NULL;
- // try a few times?
- //int t=5;
- //while (t-- && !proxy)
- proxy = handle(front.first).safebind();
- if (proxy) {
+ if (auto cl = rpcc::bind_cached(front.first)) {
int r;
- auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
- LOG("Retry returned " << ret);
+ auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
+ LOG << "Retry returned " << ret;
}
}
}
-int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
- LOG("lid=" << lid << " client=" << id << "," << xid);
+lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+ LOG << "lid=" << lid << " client=" << id << "," << xid;
holder_t h = holder_t(id, xid);
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
// deal with duplicated requests
return lock_protocol::RPCERR;
else if (old_xid == xid) {
if (st.held && st.held_by == h) {
- LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+ LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
return lock_protocol::OK;
}
}
st.held = true;
st.held_by = h;
- LOG("Lock " << lid << " held by " << h.first);
+ LOG << "Lock " << lid << " held by " << h.first;
if (st.wanted_by.size())
revoke_fifo.enq(lid);
return lock_protocol::OK;
if (p.first == id) {
// make sure client is obeying serialization
if (p.second != xid) {
- LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
+ LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
return lock_protocol::RPCERR;
}
found = true;
if (!found)
st.wanted_by.push_back(h);
- LOG("wanted_by=" << st.wanted_by);
+ LOG << "wanted_by=" << st.wanted_by;
// send revoke if we're first in line
if (st.wanted_by.front() == h)
return lock_protocol::RETRY;
}
-int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
- LOG("lid=" << lid << " client=" << id << "," << xid);
- lock_state &st = get_lock_state(lid);
+lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+ LOG << "lid=" << lid << " client=" << id << "," << xid;
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
if (st.held && st.held_by == holder_t(id, xid)) {
st.held = false;
- LOG("Lock " << lid << " not held");
+ LOG << "Lock " << lid << " not held";
}
if (st.wanted_by.size())
retry_fifo.enq(lid);
return lock_protocol::OK;
}
-string lock_server::marshal_state() {
+string lock_server::marshall_state() {
lock sl(lock_table_lock);
- marshall rep;
- rep << nacquire << lock_table;
- return rep.content();
+ return marshall(nacquire, lock_table);
}
-void lock_server::unmarshal_state(const string & state) {
+void lock_server::unmarshall_state(const string & state) {
lock sl(lock_table_lock);
- unmarshall rep(state, false);
- rep >> nacquire >> lock_table;
-}
-
-lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
- LOG("stat request for " << lid);
- VERIFY(0);
- r = nacquire;
- return lock_protocol::OK;
+ unmarshall(state, nacquire, lock_table);
}
-