From: Peter Iannucci Date: Fri, 20 Sep 2013 16:52:40 +0000 (-0400) Subject: More renaming X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/5987a5357ccfd818d540a7aa58eb498a1be07aa1?hp=0989f6feac9c8e83847165c4abee5273463eaa63 More renaming --- diff --git a/Makefile b/Makefile index 9d3ae06..6bd396f 100644 --- a/Makefile +++ b/Makefile @@ -16,10 +16,10 @@ rpc/rpctest: rpc/rpctest.o tprintf.o rpc/librpc.a lock_demo=lock_demo.o lock_client.o tprintf.o rsm_client.o handle.o lock_demo : $(lock_demo) rpc/librpc.a -lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o +lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_tester : $(lock_tester) rpc/librpc.a -lock_server=lock_server.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o +lock_server=lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server.o lock_server : $(lock_server) rpc/librpc.a rsm_tester=rsm_tester.o rsmtest_client.o tprintf.o diff --git a/lock_client.cc b/lock_client.cc index a71a206..d996b40 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -1,44 +1,229 @@ -// RPC stubs for clients to talk to lock_server +// RPC stubs for clients to talk to lock_server, and cache the locks. #include "lock_client.h" #include "rpc/rpc.h" -#include - #include #include +#include #include +#include "tprintf.h" +#include -lock_client::lock_client(std::string dst) +#include "rsm_client.h" +#include "lock.h" + +using std::ostringstream; + +lock_state::lock_state(): + state(none) { +} + +void lock_state::wait() { + auto self = std::this_thread::get_id(); + { + adopt_lock ml(m); + c[self].wait(ml); + } + c.erase(self); +} + +void lock_state::signal() { + // signal anyone + if (c.begin() != c.end()) + c.begin()->second.notify_one(); +} + +void lock_state::signal(std::thread::id who) { + if (c.count(who)) + c[who].notify_one(); +} + +int lock_client::last_port = 0; + +lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { + lock sl(lock_table_lock); + // by the semantics of std::map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) { sockaddr_in dstsock; - make_sockaddr(dst.c_str(), &dstsock); + make_sockaddr(xdst.c_str(), &dstsock); cl = new rpcc(dstsock); if (cl->bind() < 0) { printf("lock_client: call bind\n"); } + + srand(time(NULL)^last_port); + rlock_port = ((rand()%32000) | (0x1 << 10)); + const char *hname; + // VERIFY(gethostname(hname, 100) == 0); + hname = "127.0.0.1"; + ostringstream host; + host << hname << ":" << rlock_port; + id = host.str(); + last_port = rlock_port; + rpcs *rlsrpc = new rpcs(rlock_port); + rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this); + rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this); + { + lock sl(xid_mutex); + xid = 0; + } + rsmc = new rsm_client(xdst); + releaser_thread = std::thread(&lock_client::releaser, this); } -int -lock_client::stat(lock_protocol::lockid_t lid) -{ +void lock_client::releaser() { + while (1) { + lock_protocol::lockid_t lid; + release_fifo.deq(&lid); + LOG("Releaser: " << lid); + + lock_state &st = get_lock_state(lid); + lock sl(st.m); + VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id()); + st.state = lock_state::releasing; + { + sl.unlock(); + int r; + rsmc->call(lock_protocol::release, r, lid, id, st.xid); + sl.lock(); + } + st.state = lock_state::none; + LOG("Lock " << lid << ": none"); + st.signal(); + } +} + +int lock_client::stat(lock_protocol::lockid_t lid) { + VERIFY(0); int r; lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid); VERIFY (ret == lock_protocol::OK); return r; } -lock_protocol::status -lock_client::acquire(lock_protocol::lockid_t lid) -{ - int r; - return cl->call(lock_protocol::acquire, r, cl->id(), lid); +lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { + lock_state &st = get_lock_state(lid); + lock sl(st.m); + auto self = std::this_thread::get_id(); + + // check for reentrancy + VERIFY(st.state != lock_state::locked || st.held_by != self); + VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end()); + + st.wanted_by.push_back(self); + + while (1) { + if (st.state != lock_state::free) + LOG("Lock " << lid << ": not free"); + + if (st.state == lock_state::none || st.state == lock_state::retrying) { + if (st.state == lock_state::none) { + lock sl(xid_mutex); + st.xid = xid++; + } + st.state = lock_state::acquiring; + LOG("Lock " << lid << ": acquiring"); + lock_protocol::status result; + { + sl.unlock(); + int r; + result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); + sl.lock(); + } + LOG("acquire returned " << result); + if (result == lock_protocol::OK) { + st.state = lock_state::free; + LOG("Lock " << lid << ": free"); + } + } + + VERIFY(st.wanted_by.size() != 0); + if (st.state == lock_state::free) { + // is it for me? + auto front = st.wanted_by.front(); + if (front == releaser_thread.get_id()) { + st.wanted_by.pop_front(); + st.state = lock_state::locked; + st.held_by = releaser_thread.get_id(); + LOG("Queuing " << lid << " for release"); + release_fifo.enq(lid); + } else if (front == self) { + st.wanted_by.pop_front(); + st.state = lock_state::locked; + st.held_by = self; + break; + } else { + st.signal(front); + } + } + + LOG("waiting..."); + st.wait(); + LOG("wait ended"); + } + + LOG("Lock " << lid << ": locked"); + return lock_protocol::OK; } -lock_protocol::status -lock_client::release(lock_protocol::lockid_t lid) -{ - int r; - return cl->call(lock_protocol::release, r, cl->id(), lid); +lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { + lock_state &st = get_lock_state(lid); + lock sl(st.m); + auto self = std::this_thread::get_id(); + VERIFY(st.state == lock_state::locked && st.held_by == self); + st.state = lock_state::free; + LOG("Lock " << lid << ": free"); + if (st.wanted_by.size()) { + auto front = st.wanted_by.front(); + if (front == releaser_thread.get_id()) { + st.state = lock_state::locked; + st.held_by = releaser_thread.get_id(); + st.wanted_by.pop_front(); + LOG("Queuing " << lid << " for release"); + release_fifo.enq(lid); + } else + st.signal(front); + } + LOG("Finished signaling."); + return lock_protocol::OK; +} + +rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { + LOG("Revoke handler " << lid << " " << xid); + lock_state &st = get_lock_state(lid); + lock sl(st.m); + + if (st.state == lock_state::releasing || st.state == lock_state::none) + return rlock_protocol::OK; + + if (st.state == lock_state::free && + (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) { + // gimme + st.state = lock_state::locked; + st.held_by = releaser_thread.get_id(); + if (st.wanted_by.size()) + st.wanted_by.pop_front(); + release_fifo.enq(lid); + } else { + // get in line + st.wanted_by.push_back(releaser_thread.get_id()); + } + return rlock_protocol::OK; +} + +rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { + lock_state &st = get_lock_state(lid); + lock sl(st.m); + VERIFY(st.state == lock_state::acquiring); + st.state = lock_state::retrying; + LOG("Lock " << lid << ": none"); + st.signal(); // only one thread needs to wake up + return rlock_protocol::OK; } t4_lock_client *t4_lock_client_new(const char *dst) { @@ -60,4 +245,3 @@ t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) { t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) { return ((lock_client *)client)->stat(lid); } - diff --git a/lock_client.h b/lock_client.h index b1176c4..7b5edf6 100644 --- a/lock_client.h +++ b/lock_client.h @@ -1,6 +1,7 @@ // lock client interface. #ifndef lock_client_h + #define lock_client_h #ifdef __cplusplus @@ -8,21 +9,76 @@ #include #include "lock_protocol.h" #include "rpc/rpc.h" -#include +#include "lang/verify.h" +#include "rpc/fifo.h" +#include "rsm_client.h" + +class lock_release_user { + public: + virtual void dorelease(lock_protocol::lockid_t) = 0; + virtual ~lock_release_user() {}; +}; + +using std::string; +using std::thread; +using std::list; +using std::map; + +typedef string callback; -// Client interface to the lock server +class lock_state { +public: + lock_state(); + enum { + none = 0, + retrying, + free, + locked, + acquiring, + releasing + } state; + std::thread::id held_by; + list wanted_by; + mutex m; + map c; + lock_protocol::xid_t xid; + void wait(); + void signal(); + void signal(std::thread::id who); +}; + +typedef map lock_map; + +// Clients that caches locks. The server can revoke locks using +// lock_revoke_server. class lock_client { - protected: - rpcc *cl; - public: - lock_client(std::string d); - virtual ~lock_client() {}; - virtual lock_protocol::status acquire(lock_protocol::lockid_t); - virtual lock_protocol::status release(lock_protocol::lockid_t); - virtual lock_protocol::status stat(lock_protocol::lockid_t); + private: + rpcc *cl; + std::thread releaser_thread; + rsm_client *rsmc; + class lock_release_user *lu; + int rlock_port; + string hostname; + string id; + mutex xid_mutex; + lock_protocol::xid_t xid; + fifo release_fifo; + mutex lock_table_lock; + lock_map lock_table; + lock_state &get_lock_state(lock_protocol::lockid_t lid); + public: + static int last_port; + lock_client(string xdst, class lock_release_user *l = 0); + ~lock_client() {}; + lock_protocol::status acquire(lock_protocol::lockid_t); + lock_protocol::status release(lock_protocol::lockid_t); + int stat(lock_protocol::lockid_t); + void releaser(); + rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); + rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); }; -#endif +#endif // C++ extern "C" { diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc deleted file mode 100644 index 2a6d6e3..0000000 --- a/lock_client_cache_rsm.cc +++ /dev/null @@ -1,212 +0,0 @@ -// RPC stubs for clients to talk to lock_server, and cache the locks -// see lock_client.cache.h for protocol details. - -#include "lock_client_cache_rsm.h" -#include "rpc/rpc.h" -#include -#include -#include -#include -#include "tprintf.h" - -#include "rsm_client.h" -#include "lock.h" - -using std::ostringstream; - -lock_state::lock_state(): - state(none) -{ -} - -void lock_state::wait() { - auto self = std::this_thread::get_id(); - { - adopt_lock ml(m); - c[self].wait(ml); - } - c.erase(self); -} - -void lock_state::signal() { - // signal anyone - if (c.begin() != c.end()) - c.begin()->second.notify_one(); -} - -void lock_state::signal(std::thread::id who) { - if (c.count(who)) - c[who].notify_one(); -} - -int lock_client_cache_rsm::last_port = 0; - -lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { - lock sl(lock_table_lock); - // by the semantics of std::map, this will create - // the lock if it doesn't already exist - return lock_table[lid]; -} - -lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) { - srand(time(NULL)^last_port); - rlock_port = ((rand()%32000) | (0x1 << 10)); - const char *hname; - // VERIFY(gethostname(hname, 100) == 0); - hname = "127.0.0.1"; - ostringstream host; - host << hname << ":" << rlock_port; - id = host.str(); - last_port = rlock_port; - rpcs *rlsrpc = new rpcs(rlock_port); - rlsrpc->reg(rlock_protocol::revoke, &lock_client_cache_rsm::revoke_handler, this); - rlsrpc->reg(rlock_protocol::retry, &lock_client_cache_rsm::retry_handler, this); - { - lock sl(xid_mutex); - xid = 0; - } - rsmc = new rsm_client(xdst); - releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this); -} - -void lock_client_cache_rsm::releaser() { - while (1) { - lock_protocol::lockid_t lid; - release_fifo.deq(&lid); - LOG("Releaser: " << lid); - - lock_state &st = get_lock_state(lid); - lock sl(st.m); - VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id()); - st.state = lock_state::releasing; - { - sl.unlock(); - int r; - rsmc->call(lock_protocol::release, r, lid, id, st.xid); - sl.lock(); - } - st.state = lock_state::none; - LOG("Lock " << lid << ": none"); - st.signal(); - } -} - -lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) { - lock_state &st = get_lock_state(lid); - lock sl(st.m); - auto self = std::this_thread::get_id(); - - // check for reentrancy - VERIFY(st.state != lock_state::locked || st.held_by != self); - VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end()); - - st.wanted_by.push_back(self); - - while (1) { - if (st.state != lock_state::free) - LOG("Lock " << lid << ": not free"); - - if (st.state == lock_state::none || st.state == lock_state::retrying) { - if (st.state == lock_state::none) { - lock sl(xid_mutex); - st.xid = xid++; - } - st.state = lock_state::acquiring; - LOG("Lock " << lid << ": acquiring"); - lock_protocol::status result; - { - sl.unlock(); - int r; - result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); - sl.lock(); - } - LOG("acquire returned " << result); - if (result == lock_protocol::OK) { - st.state = lock_state::free; - LOG("Lock " << lid << ": free"); - } - } - - VERIFY(st.wanted_by.size() != 0); - if (st.state == lock_state::free) { - // is it for me? - auto front = st.wanted_by.front(); - if (front == releaser_thread.get_id()) { - st.wanted_by.pop_front(); - st.state = lock_state::locked; - st.held_by = releaser_thread.get_id(); - LOG("Queuing " << lid << " for release"); - release_fifo.enq(lid); - } else if (front == self) { - st.wanted_by.pop_front(); - st.state = lock_state::locked; - st.held_by = self; - break; - } else { - st.signal(front); - } - } - - LOG("waiting..."); - st.wait(); - LOG("wait ended"); - } - - LOG("Lock " << lid << ": locked"); - return lock_protocol::OK; -} - -lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) { - lock_state &st = get_lock_state(lid); - lock sl(st.m); - auto self = std::this_thread::get_id(); - VERIFY(st.state == lock_state::locked && st.held_by == self); - st.state = lock_state::free; - LOG("Lock " << lid << ": free"); - if (st.wanted_by.size()) { - auto front = st.wanted_by.front(); - if (front == releaser_thread.get_id()) { - st.state = lock_state::locked; - st.held_by = releaser_thread.get_id(); - st.wanted_by.pop_front(); - LOG("Queuing " << lid << " for release"); - release_fifo.enq(lid); - } else - st.signal(front); - } - LOG("Finished signaling."); - return lock_protocol::OK; -} - -rlock_protocol::status lock_client_cache_rsm::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { - LOG("Revoke handler " << lid << " " << xid); - lock_state &st = get_lock_state(lid); - lock sl(st.m); - - if (st.state == lock_state::releasing || st.state == lock_state::none) - return rlock_protocol::OK; - - if (st.state == lock_state::free && - (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) { - // gimme - st.state = lock_state::locked; - st.held_by = releaser_thread.get_id(); - if (st.wanted_by.size()) - st.wanted_by.pop_front(); - release_fifo.enq(lid); - } else { - // get in line - st.wanted_by.push_back(releaser_thread.get_id()); - } - return rlock_protocol::OK; -} - -rlock_protocol::status lock_client_cache_rsm::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { - lock_state &st = get_lock_state(lid); - lock sl(st.m); - VERIFY(st.state == lock_state::acquiring); - st.state = lock_state::retrying; - LOG("Lock " << lid << ": none"); - st.signal(); // only one thread needs to wake up - return rlock_protocol::OK; -} diff --git a/lock_client_cache_rsm.h b/lock_client_cache_rsm.h deleted file mode 100644 index 815224c..0000000 --- a/lock_client_cache_rsm.h +++ /dev/null @@ -1,81 +0,0 @@ -// lock client interface. - -#ifndef lock_client_cache_rsm_h - -#define lock_client_cache_rsm_h - -#include -#include "lock_protocol.h" -#include "rpc/rpc.h" -#include "lock_client.h" -#include "lang/verify.h" -#include "rpc/fifo.h" -#include "rsm_client.h" - -class lock_release_user { - public: - virtual void dorelease(lock_protocol::lockid_t) = 0; - virtual ~lock_release_user() {}; -}; - -using std::string; -using std::thread; -using std::list; -using std::map; - -typedef string callback; - -class lock_state { -public: - lock_state(); - enum { - none = 0, - retrying, - free, - locked, - acquiring, - releasing - } state; - std::thread::id held_by; - list wanted_by; - mutex m; - map c; - lock_protocol::xid_t xid; - void wait(); - void signal(); - void signal(std::thread::id who); -}; - -typedef map lock_map; - -class lock_client_cache_rsm; - -// Clients that caches locks. The server can revoke locks using -// lock_revoke_server. -class lock_client_cache_rsm : public lock_client { - private: - std::thread releaser_thread; - rsm_client *rsmc; - class lock_release_user *lu; - int rlock_port; - string hostname; - string id; - mutex xid_mutex; - lock_protocol::xid_t xid; - fifo release_fifo; - mutex lock_table_lock; - lock_map lock_table; - lock_state &get_lock_state(lock_protocol::lockid_t lid); - public: - static int last_port; - lock_client_cache_rsm(string xdst, class lock_release_user *l = 0); - virtual ~lock_client_cache_rsm() {}; - lock_protocol::status acquire(lock_protocol::lockid_t); - virtual lock_protocol::status release(lock_protocol::lockid_t); - void releaser(); - rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); - rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); -}; - - -#endif diff --git a/lock_server.cc b/lock_server.cc index 0f82080..f5a1fc4 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,37 +1,214 @@ -#include "rpc/rpc.h" -#include -#include +// the caching lock server implementation + +#include "lock_server.h" +#include #include #include -#include "lock_server_cache_rsm.h" -#include "paxos.h" -#include "rsm.h" - -// Main loop of lock_server +#include +#include "lang/verify.h" +#include "handle.h" +#include "tprintf.h" +#include "rpc/marshall.h" +#include "lock.h" -char tprintf_thread_prefix = 's'; +using std::ostringstream; +using std::istringstream; +using std::vector; -int -main(int argc, char *argv[]) +lock_state::lock_state(): + held(false) { - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); +} + +lock_state::lock_state(const lock_state &other) { + *this = other; +} + +lock_state& lock_state::operator=(const lock_state& o) { + held = o.held; + held_by = o.held_by; + wanted_by = o.wanted_by; + old_requests = o.old_requests; + return *this; +} - srandom(getpid()); +template +ostringstream & operator<<(ostringstream &o, const pair &d) { + o << "<" << d.first << "," << d.second << ">"; + return o; +} + +marshall & operator<<(marshall &m, const lock_state &d) { + return m << d.held << d.held_by << d.wanted_by; +} - if(argc != 3){ - fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); - exit(1); +unmarshall & operator>>(unmarshall &u, lock_state &d) { + return u >> d.held >> d.held_by >> d.wanted_by; +} + +lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { + lock sl(lock_table_lock); + // by the semantics of map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) { + std::thread(&lock_server::revoker, this).detach(); + std::thread(&lock_server::retryer, this).detach(); + rsm->set_state_transfer(this); +} + +void lock_server::revoker() { + while (1) { + lock_protocol::lockid_t lid; + revoke_fifo.deq(&lid); + LOG("Revoking " << lid); + if (rsm && !rsm->amiprimary()) + continue; + + lock_state &st = get_lock_state(lid); + holder held_by; + { + lock sl(st.m); + held_by = st.held_by; + } + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(held_by.first).safebind(); + if (proxy) { + int r; + rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second); + LOG("Revoke returned " << ret); + } } +} - rsm rsm(argv[1], argv[2]); - lock_server_cache_rsm ls(&rsm); - rsm.set_state_transfer(&ls); +void lock_server::retryer() { + while (1) { + lock_protocol::lockid_t lid; + retry_fifo.deq(&lid); + if (rsm && !rsm->amiprimary()) + continue; - rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls); - rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls); - rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls); + LOG("Sending retry for " << lid); + lock_state &st = get_lock_state(lid); + holder front; + { + lock sl(st.m); + if (st.wanted_by.empty()) + continue; + front = st.wanted_by.front(); + } - while(1) - sleep(1000); + rlock_protocol::status ret = -1; + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(front.first).safebind(); + if (proxy) { + int r; + ret = proxy->call(rlock_protocol::retry, r, lid, front.second); + LOG("Retry returned " << ret); + } + } } + +int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { + LOG_FUNC_ENTER_SERVER; + holder h = holder(id, xid); + lock_state &st = get_lock_state(lid); + lock sl(st.m); + + // deal with duplicated requests + if (st.old_requests.count(id)) { + lock_protocol::xid_t old_xid = st.old_requests[id]; + if (old_xid > xid) + return lock_protocol::RPCERR; + else if (old_xid == xid) { + if (st.held && st.held_by == h) { + LOG("Client " << id << " sent duplicate acquire xid=" << xid); + return lock_protocol::OK; + } + } + } + + // grant the lock if it's available and I'm next in line + if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) { + if (!st.wanted_by.empty()) + st.wanted_by.pop_front(); + st.old_requests[id] = xid; + + st.held = true; + st.held_by = h; + LOG("Lock " << lid << " held by " << h.first); + if (st.wanted_by.size()) + revoke_fifo.enq(lid); + return lock_protocol::OK; + } + + // get in line + bool found = false; + for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { + if (i->first == id) { + // make sure client is obeying serialization + if (i->second != xid) { + LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); + return lock_protocol::RPCERR; + } + found = true; + break; + } + } + if (!found) + st.wanted_by.push_back(h); + + LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); + + // send revoke if we're first in line + if (st.wanted_by.front() == h) + revoke_fifo.enq(lid); + + return lock_protocol::RETRY; +} + +int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { + LOG_FUNC_ENTER_SERVER; + lock_state &st = get_lock_state(lid); + lock sl(st.m); + if (st.held && st.held_by == holder(id, xid)) { + st.held = false; + LOG("Lock " << lid << " not held"); + } + if (st.wanted_by.size()) + retry_fifo.enq(lid); + return lock_protocol::OK; +} + +string lock_server::marshal_state() { + lock sl(lock_table_lock); + marshall rep; + rep << nacquire; + rep << lock_table; + return rep.str(); +} + +void lock_server::unmarshal_state(string state) { + lock sl(lock_table_lock); + unmarshall rep(state); + rep >> nacquire; + rep >> lock_table; +} + +lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { + printf("stat request\n"); + VERIFY(0); + r = nacquire; + return lock_protocol::OK; +} + diff --git a/lock_server_cache_rsm.h b/lock_server.h similarity index 88% rename from lock_server_cache_rsm.h rename to lock_server.h index c33b51e..2aa8445 100644 --- a/lock_server_cache_rsm.h +++ b/lock_server.h @@ -1,5 +1,5 @@ -#ifndef lock_server_cache_rsm_h -#define lock_server_cache_rsm_h +#ifndef lock_server_h +#define lock_server_h #include @@ -34,7 +34,7 @@ public: typedef map lock_map; -class lock_server_cache_rsm : public rsm_state_transfer { +class lock_server : public rsm_state_transfer { private: int nacquire; mutex lock_table_lock; @@ -44,7 +44,7 @@ class lock_server_cache_rsm : public rsm_state_transfer { fifo revoke_fifo; class rsm *rsm; public: - lock_server_cache_rsm(class rsm *rsm = 0); + lock_server(class rsm *rsm = 0); lock_protocol::status stat(int &, lock_protocol::lockid_t); void revoker(); void retryer(); diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc deleted file mode 100644 index 8f3cf2b..0000000 --- a/lock_server_cache_rsm.cc +++ /dev/null @@ -1,213 +0,0 @@ -// the caching lock server implementation - -#include "lock_server_cache_rsm.h" -#include -#include -#include -#include -#include "lang/verify.h" -#include "handle.h" -#include "tprintf.h" -#include "rpc/marshall.h" -#include "lock.h" - -using std::ostringstream; -using std::istringstream; -using std::vector; - -lock_state::lock_state(): - held(false) -{ -} - -lock_state::lock_state(const lock_state &other) { - *this = other; -} - -lock_state& lock_state::operator=(const lock_state& o) { - held = o.held; - held_by = o.held_by; - wanted_by = o.wanted_by; - old_requests = o.old_requests; - return *this; -} - -template -ostringstream & operator<<(ostringstream &o, const pair &d) { - o << "<" << d.first << "," << d.second << ">"; - return o; -} - -marshall & operator<<(marshall &m, const lock_state &d) { - return m << d.held << d.held_by << d.wanted_by; -} - -unmarshall & operator>>(unmarshall &u, lock_state &d) { - return u >> d.held >> d.held_by >> d.wanted_by; -} - -lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { - lock sl(lock_table_lock); - // by the semantics of map, this will create - // the lock if it doesn't already exist - return lock_table[lid]; -} - -lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) { - std::thread(&lock_server_cache_rsm::revoker, this).detach(); - std::thread(&lock_server_cache_rsm::retryer, this).detach(); - rsm->set_state_transfer(this); -} - -void lock_server_cache_rsm::revoker() { - while (1) { - lock_protocol::lockid_t lid; - revoke_fifo.deq(&lid); - LOG("Revoking " << lid); - if (rsm && !rsm->amiprimary()) - continue; - - lock_state &st = get_lock_state(lid); - holder held_by; - { - lock sl(st.m); - held_by = st.held_by; - } - - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(held_by.first).safebind(); - if (proxy) { - int r; - rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second); - LOG("Revoke returned " << ret); - } - } -} - -void lock_server_cache_rsm::retryer() { - while (1) { - lock_protocol::lockid_t lid; - retry_fifo.deq(&lid); - if (rsm && !rsm->amiprimary()) - continue; - - LOG("Sending retry for " << lid); - lock_state &st = get_lock_state(lid); - holder front; - { - lock sl(st.m); - if (st.wanted_by.empty()) - continue; - front = st.wanted_by.front(); - } - - rlock_protocol::status ret = -1; - - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(front.first).safebind(); - if (proxy) { - int r; - ret = proxy->call(rlock_protocol::retry, r, lid, front.second); - LOG("Retry returned " << ret); - } - } -} - -int lock_server_cache_rsm::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { - LOG_FUNC_ENTER_SERVER; - holder h = holder(id, xid); - lock_state &st = get_lock_state(lid); - lock sl(st.m); - - // deal with duplicated requests - if (st.old_requests.count(id)) { - lock_protocol::xid_t old_xid = st.old_requests[id]; - if (old_xid > xid) - return lock_protocol::RPCERR; - else if (old_xid == xid) { - if (st.held && st.held_by == h) { - LOG("Client " << id << " sent duplicate acquire xid=" << xid); - return lock_protocol::OK; - } - } - } - - // grant the lock if it's available and I'm next in line - if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) { - if (!st.wanted_by.empty()) - st.wanted_by.pop_front(); - st.old_requests[id] = xid; - - st.held = true; - st.held_by = h; - LOG("Lock " << lid << " held by " << h.first); - if (st.wanted_by.size()) - revoke_fifo.enq(lid); - return lock_protocol::OK; - } - - // get in line - bool found = false; - for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { - if (i->first == id) { - // make sure client is obeying serialization - if (i->second != xid) { - LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); - return lock_protocol::RPCERR; - } - found = true; - break; - } - } - if (!found) - st.wanted_by.push_back(h); - - LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); - - // send revoke if we're first in line - if (st.wanted_by.front() == h) - revoke_fifo.enq(lid); - - return lock_protocol::RETRY; -} - -int lock_server_cache_rsm::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { - LOG_FUNC_ENTER_SERVER; - lock_state &st = get_lock_state(lid); - lock sl(st.m); - if (st.held && st.held_by == holder(id, xid)) { - st.held = false; - LOG("Lock " << lid << " not held"); - } - if (st.wanted_by.size()) - retry_fifo.enq(lid); - return lock_protocol::OK; -} - -string lock_server_cache_rsm::marshal_state() { - lock sl(lock_table_lock); - marshall rep; - rep << nacquire; - rep << lock_table; - return rep.str(); -} - -void lock_server_cache_rsm::unmarshal_state(string state) { - lock sl(lock_table_lock); - unmarshall rep(state); - rep >> nacquire; - rep >> lock_table; -} - -lock_protocol::status lock_server_cache_rsm::stat(int &r, lock_protocol::lockid_t lid) { - printf("stat request\n"); - r = nacquire; - return lock_protocol::OK; -} - diff --git a/lock_smain.cc b/lock_smain.cc new file mode 100644 index 0000000..086186e --- /dev/null +++ b/lock_smain.cc @@ -0,0 +1,37 @@ +#include "rpc/rpc.h" +#include +#include +#include +#include +#include "lock_server.h" +#include "paxos.h" +#include "rsm.h" + +// Main loop of lock_server + +char tprintf_thread_prefix = 's'; + +int +main(int argc, char *argv[]) +{ + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + + srandom(getpid()); + + if(argc != 3){ + fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); + exit(1); + } + + rsm rsm(argv[1], argv[2]); + lock_server ls(&rsm); + rsm.set_state_transfer(&ls); + + rsm.reg(lock_protocol::acquire, &lock_server::acquire, &ls); + rsm.reg(lock_protocol::release, &lock_server::release, &ls); + rsm.reg(lock_protocol::stat, &lock_server::stat, &ls); + + while(1) + sleep(1000); +} diff --git a/lock_tester.cc b/lock_tester.cc index d063cdc..5c78c90 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -10,7 +10,6 @@ #include #include #include "lang/verify.h" -#include "lock_client_cache_rsm.h" #include "tprintf.h" #include #include @@ -21,7 +20,7 @@ char tprintf_thread_prefix = 'c'; // must be >= 2 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. std::string dst; -lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt]; +lock_client **lc = new lock_client * [nt]; lock_protocol::lockid_t a = "1"; lock_protocol::lockid_t b = "2"; lock_protocol::lockid_t c = "3"; @@ -174,7 +173,7 @@ main(int argc, char *argv[]) } tprintf("cache lock client\n"); - for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst); + for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst); if(!test || test == 1){ test1(); diff --git a/rsm.cc b/rsm.cc index 66c8d97..bdeabaa 100644 --- a/rsm.cc +++ b/rsm.cc @@ -189,12 +189,12 @@ bool rsm::sync_with_backups() { adopt_lock ml(rsm_mutex); ml.unlock(); { - // Make sure that the state of lock_server_cache_rsm is stable during + // Make sure that the state of lock_server is stable during // synchronization; otherwise, the primary's state may be more recent // than replicas after the synchronization. lock ml(invoke_mutex); // By acquiring and releasing the invoke_mutex once, we make sure that - // the state of lock_server_cache_rsm will not be changed until all + // the state of lock_server will not be changed until all // replicas are synchronized. The reason is that client_invoke arrives // after this point of time will see inviewchange == true, and returns // BUSY.