X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/0989f6feac9c8e83847165c4abee5273463eaa63..5987a5357ccfd818d540a7aa58eb498a1be07aa1:/lock_server.cc diff --git a/lock_server.cc b/lock_server.cc index 0f82080..f5a1fc4 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,37 +1,214 @@ -#include "rpc/rpc.h" -#include -#include +// the caching lock server implementation + +#include "lock_server.h" +#include #include #include -#include "lock_server_cache_rsm.h" -#include "paxos.h" -#include "rsm.h" - -// Main loop of lock_server +#include +#include "lang/verify.h" +#include "handle.h" +#include "tprintf.h" +#include "rpc/marshall.h" +#include "lock.h" -char tprintf_thread_prefix = 's'; +using std::ostringstream; +using std::istringstream; +using std::vector; -int -main(int argc, char *argv[]) +lock_state::lock_state(): + held(false) { - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); +} + +lock_state::lock_state(const lock_state &other) { + *this = other; +} + +lock_state& lock_state::operator=(const lock_state& o) { + held = o.held; + held_by = o.held_by; + wanted_by = o.wanted_by; + old_requests = o.old_requests; + return *this; +} - srandom(getpid()); +template +ostringstream & operator<<(ostringstream &o, const pair &d) { + o << "<" << d.first << "," << d.second << ">"; + return o; +} + +marshall & operator<<(marshall &m, const lock_state &d) { + return m << d.held << d.held_by << d.wanted_by; +} - if(argc != 3){ - fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); - exit(1); +unmarshall & operator>>(unmarshall &u, lock_state &d) { + return u >> d.held >> d.held_by >> d.wanted_by; +} + +lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { + lock sl(lock_table_lock); + // by the semantics of map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) { + std::thread(&lock_server::revoker, this).detach(); + std::thread(&lock_server::retryer, this).detach(); + rsm->set_state_transfer(this); +} + +void lock_server::revoker() { + while (1) { + lock_protocol::lockid_t lid; + revoke_fifo.deq(&lid); + LOG("Revoking " << lid); + if (rsm && !rsm->amiprimary()) + continue; + + lock_state &st = get_lock_state(lid); + holder held_by; + { + lock sl(st.m); + held_by = st.held_by; + } + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(held_by.first).safebind(); + if (proxy) { + int r; + rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second); + LOG("Revoke returned " << ret); + } } +} - rsm rsm(argv[1], argv[2]); - lock_server_cache_rsm ls(&rsm); - rsm.set_state_transfer(&ls); +void lock_server::retryer() { + while (1) { + lock_protocol::lockid_t lid; + retry_fifo.deq(&lid); + if (rsm && !rsm->amiprimary()) + continue; - rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls); - rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls); - rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls); + LOG("Sending retry for " << lid); + lock_state &st = get_lock_state(lid); + holder front; + { + lock sl(st.m); + if (st.wanted_by.empty()) + continue; + front = st.wanted_by.front(); + } - while(1) - sleep(1000); + rlock_protocol::status ret = -1; + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(front.first).safebind(); + if (proxy) { + int r; + ret = proxy->call(rlock_protocol::retry, r, lid, front.second); + LOG("Retry returned " << ret); + } + } } + +int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { + LOG_FUNC_ENTER_SERVER; + holder h = holder(id, xid); + lock_state &st = get_lock_state(lid); + lock sl(st.m); + + // deal with duplicated requests + if (st.old_requests.count(id)) { + lock_protocol::xid_t old_xid = st.old_requests[id]; + if (old_xid > xid) + return lock_protocol::RPCERR; + else if (old_xid == xid) { + if (st.held && st.held_by == h) { + LOG("Client " << id << " sent duplicate acquire xid=" << xid); + return lock_protocol::OK; + } + } + } + + // grant the lock if it's available and I'm next in line + if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) { + if (!st.wanted_by.empty()) + st.wanted_by.pop_front(); + st.old_requests[id] = xid; + + st.held = true; + st.held_by = h; + LOG("Lock " << lid << " held by " << h.first); + if (st.wanted_by.size()) + revoke_fifo.enq(lid); + return lock_protocol::OK; + } + + // get in line + bool found = false; + for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { + if (i->first == id) { + // make sure client is obeying serialization + if (i->second != xid) { + LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); + return lock_protocol::RPCERR; + } + found = true; + break; + } + } + if (!found) + st.wanted_by.push_back(h); + + LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); + + // send revoke if we're first in line + if (st.wanted_by.front() == h) + revoke_fifo.enq(lid); + + return lock_protocol::RETRY; +} + +int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { + LOG_FUNC_ENTER_SERVER; + lock_state &st = get_lock_state(lid); + lock sl(st.m); + if (st.held && st.held_by == holder(id, xid)) { + st.held = false; + LOG("Lock " << lid << " not held"); + } + if (st.wanted_by.size()) + retry_fifo.enq(lid); + return lock_protocol::OK; +} + +string lock_server::marshal_state() { + lock sl(lock_table_lock); + marshall rep; + rep << nacquire; + rep << lock_table; + return rep.str(); +} + +void lock_server::unmarshal_state(string state) { + lock sl(lock_table_lock); + unmarshall rep(state); + rep >> nacquire; + rep >> lock_table; +} + +lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { + printf("stat request\n"); + VERIFY(0); + r = nacquire; + return lock_protocol::OK; +} +