X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5987a5357ccfd818d540a7aa58eb498a1be07aa1..ab6c1548ac2b1907bca92c8ce43e919c1a649a6f:/lock_server.cc diff --git a/lock_server.cc b/lock_server.cc index f5a1fc4..141a598 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,30 +1,19 @@ // the caching lock server implementation #include "lock_server.h" -#include -#include #include #include -#include "lang/verify.h" -#include "handle.h" -#include "tprintf.h" -#include "rpc/marshall.h" -#include "lock.h" - -using std::ostringstream; -using std::istringstream; -using std::vector; lock_state::lock_state(): held(false) { } -lock_state::lock_state(const lock_state &other) { +lock_state::lock_state(const lock_state & other) { *this = other; } -lock_state& lock_state::operator=(const lock_state& o) { +lock_state & lock_state::operator=(const lock_state & o) { held = o.held; held_by = o.held_by; wanted_by = o.wanted_by; @@ -32,57 +21,39 @@ lock_state& lock_state::operator=(const lock_state& o) { return *this; } -template -ostringstream & operator<<(ostringstream &o, const pair &d) { - o << "<" << d.first << "," << d.second << ">"; - return o; -} - -marshall & operator<<(marshall &m, const lock_state &d) { - return m << d.held << d.held_by << d.wanted_by; -} - -unmarshall & operator>>(unmarshall &u, lock_state &d) { - return u >> d.held >> d.held_by >> d.wanted_by; -} - lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); - // by the semantics of map, this will create - // the lock if it doesn't already exist + // this will create the lock if it doesn't already exist return lock_table[lid]; } -lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) { - std::thread(&lock_server::revoker, this).detach(); - std::thread(&lock_server::retryer, this).detach(); - rsm->set_state_transfer(this); +lock_server::lock_server(rsm & r) : rsm_ (&r) { + thread(&lock_server::revoker, this).detach(); + thread(&lock_server::retryer, this).detach(); + r.set_state_transfer(this); + + r.reg(lock_protocol::acquire, &lock_server::acquire, this); + r.reg(lock_protocol::release, &lock_server::release, this); + r.reg(lock_protocol::stat, &lock_server::stat, this); } -void lock_server::revoker() { +void lock_server::revoker () { while (1) { lock_protocol::lockid_t lid; revoke_fifo.deq(&lid); - LOG("Revoking " << lid); - if (rsm && !rsm->amiprimary()) + LOG << "Revoking " << lid; + if (rsm_ && !rsm_->amiprimary()) continue; - lock_state &st = get_lock_state(lid); - holder held_by; - { - lock sl(st.m); - held_by = st.held_by; - } + lock_state & st = get_lock_state(lid); + lock sl(st.m); + holder_t held_by = st.held_by; + sl.unlock(); - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(held_by.first).safebind(); - if (proxy) { + if (auto cl = rpcc::bind_cached(held_by.first)) { int r; - rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second); - LOG("Revoke returned " << ret); + auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second); + LOG << "Revoke returned " << ret; } } } @@ -91,12 +62,12 @@ void lock_server::retryer() { while (1) { lock_protocol::lockid_t lid; retry_fifo.deq(&lid); - if (rsm && !rsm->amiprimary()) + if (rsm_ && !rsm_->amiprimary()) continue; - LOG("Sending retry for " << lid); - lock_state &st = get_lock_state(lid); - holder front; + LOG << "Sending retry for " << lid; + lock_state & st = get_lock_state(lid); + holder_t front; { lock sl(st.m); if (st.wanted_by.empty()) @@ -104,25 +75,18 @@ void lock_server::retryer() { front = st.wanted_by.front(); } - rlock_protocol::status ret = -1; - - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(front.first).safebind(); - if (proxy) { + if (auto cl = rpcc::bind_cached(front.first)) { int r; - ret = proxy->call(rlock_protocol::retry, r, lid, front.second); - LOG("Retry returned " << ret); + auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second); + LOG << "Retry returned " << ret; } } } -int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { - LOG_FUNC_ENTER_SERVER; - holder h = holder(id, xid); - lock_state &st = get_lock_state(lid); +lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { + LOG << "lid=" << lid << " client=" << id << "," << xid; + holder_t h = holder_t(id, xid); + lock_state & st = get_lock_state(lid); lock sl(st.m); // deal with duplicated requests @@ -132,7 +96,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr return lock_protocol::RPCERR; else if (old_xid == xid) { if (st.held && st.held_by == h) { - LOG("Client " << id << " sent duplicate acquire xid=" << xid); + LOG << "Client " << id << " sent duplicate acquire xid=" << xid; return lock_protocol::OK; } } @@ -146,7 +110,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr st.held = true; st.held_by = h; - LOG("Lock " << lid << " held by " << h.first); + LOG << "Lock " << lid << " held by " << h.first; if (st.wanted_by.size()) revoke_fifo.enq(lid); return lock_protocol::OK; @@ -154,11 +118,11 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr // get in line bool found = false; - for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { - if (i->first == id) { + for (auto p : st.wanted_by) { + if (p.first == id) { // make sure client is obeying serialization - if (i->second != xid) { - LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); + if (p.second != xid) { + LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second; return lock_protocol::RPCERR; } found = true; @@ -168,7 +132,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr if (!found) st.wanted_by.push_back(h); - LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); + LOG << "wanted_by=" << st.wanted_by; // send revoke if we're first in line if (st.wanted_by.front() == h) @@ -177,13 +141,13 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr return lock_protocol::RETRY; } -int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { - LOG_FUNC_ENTER_SERVER; - lock_state &st = get_lock_state(lid); +lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { + LOG << "lid=" << lid << " client=" << id << "," << xid; + lock_state & st = get_lock_state(lid); lock sl(st.m); - if (st.held && st.held_by == holder(id, xid)) { + if (st.held && st.held_by == holder_t(id, xid)) { st.held = false; - LOG("Lock " << lid << " not held"); + LOG << "Lock " << lid << " not held"; } if (st.wanted_by.size()) retry_fifo.enq(lid); @@ -192,21 +156,16 @@ int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_ string lock_server::marshal_state() { lock sl(lock_table_lock); - marshall rep; - rep << nacquire; - rep << lock_table; - return rep.str(); + return marshall(nacquire, lock_table).content(); } -void lock_server::unmarshal_state(string state) { +void lock_server::unmarshal_state(const string & state) { lock sl(lock_table_lock); - unmarshall rep(state); - rep >> nacquire; - rep >> lock_table; + unmarshall(state, false, nacquire, lock_table); } -lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { - printf("stat request\n"); +lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) { + LOG << "stat request for " << lid; VERIFY(0); r = nacquire; return lock_protocol::OK;