X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/a5f10a497bebfc680bf418193f1fd9f1ad7cc417..refs/heads/iannucci:/lock_server.cc?ds=inline diff --git a/lock_server.cc b/lock_server.cc index d7367bd..1241409 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,77 +1,59 @@ // the caching lock server implementation -#include "types.h" -#include "lock_server.h" +#include "include/lock_server.h" #include #include -#include "handle.h" -lock_state::lock_state(): - held(false) -{ +lock_server::lock_state::lock_state(const lock_state & other) { + held = other.held; + held_by = other.held_by; + wanted_by = other.wanted_by; + old_requests = other.old_requests; } -lock_state::lock_state(const lock_state &other) { - *this = other; -} - -lock_state& lock_state::operator=(const lock_state& o) { - held = o.held; - held_by = o.held_by; - wanted_by = o.wanted_by; - old_requests = o.old_requests; - return *this; -} - -lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { +lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); // this will create the lock if it doesn't already exist return lock_table[lid]; } -lock_server::lock_server(rsm *r) : rsm_ (r) { +lock_server::lock_server(rsm & r) : rsm_ (&r) { thread(&lock_server::revoker, this).detach(); thread(&lock_server::retryer, this).detach(); - rsm_->set_state_transfer(this); + r.set_state_transfer(this); + + r.reg(lock_protocol::acquire, &lock_server::acquire, this); + r.reg(lock_protocol::release, &lock_server::release, this); } -void lock_server::revoker() [[noreturn]] { +void lock_server::revoker () { while (1) { - lock_protocol::lockid_t lid; - revoke_fifo.deq(&lid); - LOG("Revoking " << lid); + lock_protocol::lockid_t lid = revoke_fifo.deq(); + LOG << "Revoking " << lid; if (rsm_ && !rsm_->amiprimary()) continue; - lock_state &st = get_lock_state(lid); - holder_t held_by; - { - lock sl(st.m); - held_by = st.held_by; - } + lock_state & st = get_lock_state(lid); + lock sl(st.m); + holder_t held_by = st.held_by; + sl.unlock(); - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(held_by.first).safebind(); - if (proxy) { + if (auto cl = rpcc::bind_cached(held_by.first)) { int r; - auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second); - LOG("Revoke returned " << ret); + auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second); + LOG << "Revoke returned " << ret; } } } -void lock_server::retryer() [[noreturn]] { +void lock_server::retryer() { while (1) { - lock_protocol::lockid_t lid; - retry_fifo.deq(&lid); + lock_protocol::lockid_t lid = retry_fifo.deq(); if (rsm_ && !rsm_->amiprimary()) continue; - LOG("Sending retry for " << lid); - lock_state &st = get_lock_state(lid); + LOG << "Sending retry for " << lid; + lock_state & st = get_lock_state(lid); holder_t front; { lock sl(st.m); @@ -80,23 +62,18 @@ void lock_server::retryer() [[noreturn]] { front = st.wanted_by.front(); } - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(front.first).safebind(); - if (proxy) { + if (auto cl = rpcc::bind_cached(front.first)) { int r; - auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second); - LOG("Retry returned " << ret); + auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second); + LOG << "Retry returned " << ret; } } } -int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { - LOG("lid=" << lid << " client=" << id << "," << xid); +lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { + LOG << "lid=" << lid << " client=" << id << "," << xid; holder_t h = holder_t(id, xid); - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); // deal with duplicated requests @@ -106,7 +83,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & return lock_protocol::RPCERR; else if (old_xid == xid) { if (st.held && st.held_by == h) { - LOG("Client " << id << " sent duplicate acquire xid=" << xid); + LOG << "Client " << id << " sent duplicate acquire xid=" << xid; return lock_protocol::OK; } } @@ -120,7 +97,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & st.held = true; st.held_by = h; - LOG("Lock " << lid << " held by " << h.first); + LOG << "Lock " << lid << " held by " << h.first; if (st.wanted_by.size()) revoke_fifo.enq(lid); return lock_protocol::OK; @@ -132,7 +109,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & if (p.first == id) { // make sure client is obeying serialization if (p.second != xid) { - LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second); + LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second; return lock_protocol::RPCERR; } found = true; @@ -142,7 +119,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & if (!found) st.wanted_by.push_back(h); - LOG("wanted_by=" << st.wanted_by); + LOG << "wanted_by=" << st.wanted_by; // send revoke if we're first in line if (st.wanted_by.front() == h) @@ -151,36 +128,25 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & return lock_protocol::RETRY; } -int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { - LOG("lid=" << lid << " client=" << id << "," << xid); - lock_state &st = get_lock_state(lid); +lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { + LOG << "lid=" << lid << " client=" << id << "," << xid; + lock_state & st = get_lock_state(lid); lock sl(st.m); if (st.held && st.held_by == holder_t(id, xid)) { st.held = false; - LOG("Lock " << lid << " not held"); + LOG << "Lock " << lid << " not held"; } if (st.wanted_by.size()) retry_fifo.enq(lid); return lock_protocol::OK; } -string lock_server::marshal_state() { +string lock_server::marshall_state() { lock sl(lock_table_lock); - marshall rep; - rep << nacquire << lock_table; - return rep.content(); + return marshall(nacquire, lock_table); } -void lock_server::unmarshal_state(const string & state) { +void lock_server::unmarshall_state(const string & state) { lock sl(lock_table_lock); - unmarshall rep(state, false); - rep >> nacquire >> lock_table; -} - -lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { - LOG("stat request for " << lid); - VERIFY(0); - r = nacquire; - return lock_protocol::OK; + unmarshall(state, nacquire, lock_table); } -