From: Peter Iannucci Date: Thu, 26 Sep 2013 23:20:14 +0000 (-0400) Subject: More clean-ups and cool template stuff X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/2546a41ad36fdc9ef6471cb35a1d56930ae1b527?ds=sidebyside More clean-ups and cool template stuff --- diff --git a/Makefile b/Makefile index 6bd396f..3da8e03 100644 --- a/Makefile +++ b/Makefile @@ -11,18 +11,18 @@ rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_lo ar cq $@ $^ ranlib rpc/librpc.a -rpc/rpctest: rpc/rpctest.o tprintf.o rpc/librpc.a +rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a -lock_demo=lock_demo.o lock_client.o tprintf.o rsm_client.o handle.o +lock_demo=lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o lock_demo : $(lock_demo) rpc/librpc.a -lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o +lock_tester=lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o lock_tester : $(lock_tester) rpc/librpc.a -lock_server=lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server.o +lock_server=lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o lock_server : $(lock_server) rpc/librpc.a -rsm_tester=rsm_tester.o rsmtest_client.o tprintf.o +rsm_tester=rsm_tester.o rsmtest_client.o threaded_log.o rsm_tester: $(rsm_tester) rpc/librpc.a %.o: %.cc diff --git a/Makefile.osx b/Makefile.osx index 75f2e2b..91ce5ee 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -1,5 +1,5 @@ -#PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations -#PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors +PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations +PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) LDFLAGS = -stdlib=libc++ CXX = clang++ diff --git a/config.cc b/config.cc index 9319abe..7bac4a9 100644 --- a/config.cc +++ b/config.cc @@ -1,20 +1,11 @@ #include #include -#include -#include #include "config.h" #include "paxos.h" #include "handle.h" -#include "tprintf.h" +#include "threaded_log.h" #include "lang/verify.h" -using namespace std::chrono; -using std::string; -using std::vector; -using std::thread; -using std::ostringstream; -using std::istringstream; - // The config module maintains views. As a node joins or leaves a // view, the next view will be the same as previous view, except with // the new node added or removed. The first view contains only node @@ -47,277 +38,193 @@ using std::istringstream; // all views, the other nodes can bring this re-joined node up to // date. -config::config( - const string &_first, - const string &_me, - config_view_change *_vc) - : my_view_id(0), first(_first), me(_me), vc(_vc) +config::config(const string &_first, const string &_me, config_view_change *_vc) + : my_view_id(0), first(_first), me(_me), vc(_vc), + paxos_acceptor(this, me == _first, me, me), + paxos_proposer(this, &paxos_acceptor, me) { - paxos_acceptor = new acceptor(this, me == _first, me, me); - paxos_proposer = new proposer(this, paxos_acceptor, me); - - // XXX hack; maybe should have its own port number - paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); - - { - lock ml(cfg_mutex); - reconstruct(ml); - thread(&config::heartbeater, this).detach(); - } + get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); + lock cfg_mutex_lock(cfg_mutex); + reconstruct(cfg_mutex_lock); + thread(&config::heartbeater, this).detach(); } -void -config::restore(const string &s) -{ - lock ml(cfg_mutex); - paxos_acceptor->restore(s); - reconstruct(ml); +void config::restore(const string &s) { + lock cfg_mutex_lock(cfg_mutex); + paxos_acceptor.restore(s); + reconstruct(cfg_mutex_lock); } -void -config::get_view(unsigned instance, vector &m) -{ - lock ml(cfg_mutex); - get_view(instance, m, ml); +void config::get_view(unsigned instance, vector &m) { + lock cfg_mutex_lock(cfg_mutex); + get_view(instance, m, cfg_mutex_lock); } -// caller should hold cfg_mutex -void -config::get_view(unsigned instance, vector &m, lock &) -{ - string value = paxos_acceptor->value(instance); - tprintf("get_view(%d): returns %s\n", instance, value.c_str()); - members(value, m); +void config::get_view(unsigned instance, vector &m, lock &) { + string value = paxos_acceptor.value(instance); + LOG("get_view(" << instance << "): returns " << value); + m = members(value); } -void -config::members(const string &value, vector &view) const -{ +vector config::members(const string &value) const { istringstream ist(value); - string m; - view.clear(); - while (ist >> m) - view.push_back(m); + using it = istream_iterator; + return {it(ist), it()}; } -string -config::value(const vector &m) const -{ +string config::value(const vector &m) const { ostringstream ost; - for (unsigned i = 0; i < m.size(); i++) { - ost << m[i]; - ost << " "; - } + copy(m.begin(), m.end(), ostream_iterator(ost, " ")); return ost.str(); } -void -config::reconstruct(lock &cfg_mutex_lock) -{ +void config::reconstruct(lock &cfg_mutex_lock) { VERIFY(cfg_mutex_lock); - if (paxos_acceptor->instance() > 0) { - my_view_id = paxos_acceptor->instance(); + if (paxos_acceptor.instance() > 0) { + my_view_id = paxos_acceptor.instance(); get_view(my_view_id, mems, cfg_mutex_lock); - tprintf("config::reconstruct: %d %s\n", - my_view_id, print_members(mems).c_str()); + LOG("config::reconstruct: " << my_view_id << " " << print_members(mems)); } } // Called by Paxos's acceptor. -void -config::paxos_commit(unsigned instance, const string &value) -{ - vector newmem; - lock ml(cfg_mutex); +void config::paxos_commit(unsigned instance, const string &value) { + lock cfg_mutex_lock(cfg_mutex); - members(value, newmem); - tprintf("config::paxos_commit: %d: %s\n", instance, - print_members(newmem).c_str()); + vector newmem = members(value); + LOG("config::paxos_commit: " << instance << ": " << print_members(newmem)); - for (unsigned i = 0; i < mems.size(); i++) { - tprintf("config::paxos_commit: is %s still a member?\n", - mems[i].c_str()); - if (!isamember(mems[i], newmem) && me != mems[i]) { - tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); - mgr.delete_handle(mems[i]); + for (auto mem : mems) { + LOG("config::paxos_commit: is " << mem << " still a member?"); + if (!isamember(mem, newmem) && me != mem) { + LOG("config::paxos_commit: delete " << mem); + invalidate_handle(mem); } } mems = newmem; my_view_id = instance; if (vc) { - ml.unlock(); + cfg_mutex_lock.unlock(); vc->commit_change(instance); - ml.lock(); + cfg_mutex_lock.lock(); } } -bool -config::ismember(const string &m, unsigned vid) -{ - lock ml(cfg_mutex); +bool config::ismember(const string &m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); vector v; - get_view(vid, v, ml); + get_view(vid, v, cfg_mutex_lock); return isamember(m, v); } -bool -config::add(const string &new_m, unsigned vid) -{ - vector m; - vector curm; - lock ml(cfg_mutex); +bool config::add(const string &new_m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); if (vid != my_view_id) return false; - tprintf("config::add %s\n", new_m.c_str()); - m = mems; + LOG("config::add " << new_m); + vector m = mems; m.push_back(new_m); - curm = mems; - string v = value(m); + vector cmems = mems; unsigned nextvid = my_view_id + 1; - bool r; - { - ml.unlock(); - r = paxos_proposer->run(nextvid, curm, v); - ml.lock(); - } - tprintf("config::add: proposer returned %s\n", - r ? "success" : "failure"); + cfg_mutex_lock.unlock(); + bool r = paxos_proposer.run(nextvid, cmems, value(m)); + cfg_mutex_lock.lock(); + LOG("config::add: proposer returned " << (r ? "success" : "failure")); return r; } // caller should hold cfg_mutex -bool -config::remove(const string &m) -{ - adopt_lock ml(cfg_mutex); - tprintf("config::remove: my_view_id %d remove? %s\n", - my_view_id, m.c_str()); +bool config::remove(const string &m, lock &cfg_mutex_lock) { + LOG("config::remove: my_view_id " << my_view_id << " remove? " << m); vector n; - for (unsigned i = 0; i < mems.size(); i++) { - if (mems[i] != m) - n.push_back(mems[i]); + for (auto mem : mems) { + if (mem != m) + n.push_back(mem); } - string v = value(n); vector cmems = mems; unsigned nextvid = my_view_id + 1; - bool r; - { - ml.unlock(); - r = paxos_proposer->run(nextvid, cmems, v); - ml.lock(); - } - tprintf("config::remove: proposer returned %s\n", - r ? "success" : "failure"); + cfg_mutex_lock.unlock(); + bool r = paxos_proposer.run(nextvid, cmems, value(n)); + cfg_mutex_lock.lock(); + LOG("config::remove: proposer returned " << (r ? "success" : "failure")); return r; } -void -config::heartbeater() [[noreturn]] -{ - string m; - heartbeat_t h; - bool stable; - unsigned vid; - vector cmems; - lock ml(cfg_mutex); +void config::heartbeater() [[noreturn]] { + lock cfg_mutex_lock(cfg_mutex); while (1) { auto next_timeout = steady_clock::now() + seconds(3); - tprintf("heartbeater: go to sleep\n"); - config_cond.wait_until(ml, next_timeout); + LOG("heartbeater: go to sleep"); + config_cond.wait_until(cfg_mutex_lock, next_timeout); - stable = true; - vid = my_view_id; - get_view(vid, cmems, ml); - tprintf("heartbeater: current membership %s\n", - print_members(cmems).c_str()); + unsigned vid = my_view_id; + vector cmems; + get_view(vid, cmems, cfg_mutex_lock); + LOG("heartbeater: current membership " << print_members(cmems)); if (!isamember(me, cmems)) { - tprintf("heartbeater: not member yet; skip hearbeat\n"); + LOG("heartbeater: not member yet; skip hearbeat"); continue; } // who has the smallest ID? - m = me; - for (unsigned i = 0; i < cmems.size(); i++) { - if (m > cmems[i]) - m = cmems[i]; - } + string m = min(me, *min_element(cmems.begin(), cmems.end())); if (m == me) { // ping the other nodes - for (unsigned i = 0; i < cmems.size(); i++) { - if (cmems[i] != me) { - if ((h = doheartbeat(cmems[i])) != OK) { - stable = false; - m = cmems[i]; - break; - } - } + for (string mem : cmems) { + if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK) + continue; + if (vid == my_view_id) + remove(mem, cfg_mutex_lock); + break; } } else { // ping the node with the smallest ID - if ((h = doheartbeat(m)) != OK) - stable = false; - } - - if (!stable && vid == my_view_id) { - remove(m); + if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id) + remove(m, cfg_mutex_lock); } } } -paxos_protocol::status -config::heartbeat(int &r, string m, unsigned vid) -{ - lock ml(cfg_mutex); - int ret = paxos_protocol::ERR; +paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); r = (int) my_view_id; - tprintf("heartbeat from %s(%d) my_view_id %d\n", - m.c_str(), vid, my_view_id); - if (vid == my_view_id) { - ret = paxos_protocol::OK; - } else if (paxos_proposer->isrunning()) { + LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); + if (vid == my_view_id) + return paxos_protocol::OK; + else if (paxos_proposer.isrunning()) { VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id); - ret = paxos_protocol::OK; - } else { - ret = paxos_protocol::ERR; + return paxos_protocol::OK; } - return ret; + return paxos_protocol::ERR; } -config::heartbeat_t -config::doheartbeat(const string &m) -{ - adopt_lock ml(cfg_mutex); - int ret = rpc_const::timeout_failure; - int r = 0; +config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { unsigned vid = my_view_id; - heartbeat_t res = OK; - - tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); + LOG("doheartbeater to " << m << " (" << vid << ")"); handle h(m); - { - ml.unlock(); - rpcc *cl = h.safebind(); - if (cl) { - ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid); - } - ml.lock(); - } - if (ret != paxos_protocol::OK) { - if (ret == rpc_const::atmostonce_failure || - ret == rpc_const::oldsrv_failure) { - mgr.delete_handle(m); - } else { - tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", - m.c_str(), ret, vid, r); - if (ret < 0) res = FAILURE; - else res = VIEWERR; - } + + cfg_mutex_lock.unlock(); + int r = 0, ret = rpc_const::bind_failure; + if (rpcc *cl = h.safebind()) + ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid); + cfg_mutex_lock.lock(); + + heartbeat_t res = OK; + switch (ret) { + case paxos_protocol::OK: + break; + case rpc_const::atmostonce_failure: + case rpc_const::oldsrv_failure: + invalidate_handle(m); + break; + default: + LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); + res = (ret < 0) ? FAILURE : VIEWERR; } - tprintf("doheartbeat done %d\n", res); + LOG("doheartbeat done " << res); return res; } - diff --git a/config.h b/config.h index fcf1566..074cbe9 100644 --- a/config.h +++ b/config.h @@ -6,6 +6,19 @@ #include "paxos.h" #include "lock.h" +using std::chrono::steady_clock; +using std::chrono::seconds; +using std::string; +using std::vector; +using std::thread; +using std::ostringstream; +using std::istringstream; +using std::ostream_iterator; +using std::istream_iterator; +using std::copy; +using std::min; +using std::min_element; + class config_view_change { public: virtual void commit_change(unsigned view_id) = 0; @@ -14,42 +27,41 @@ class config_view_change { class config : public paxos_change { private: - acceptor *paxos_acceptor; - proposer *paxos_proposer; unsigned my_view_id; - std::string first; - std::string me; + string first; + string me; config_view_change *vc; - std::vector mems; + acceptor paxos_acceptor; + proposer paxos_proposer; + vector mems; mutex cfg_mutex; - std::condition_variable config_cond; - paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance); - std::string value(const std::vector &mems) const; - void members(const std::string &v, std::vector &m) const; - void get_view(unsigned instance, std::vector &m, lock &cfg_mutex_lock); - bool remove(const std::string &); + cond config_cond; + paxos_protocol::status heartbeat(int &r, string m, unsigned instance); + string value(const vector &mems) const; + vector members(const string &v) const; + void get_view(unsigned instance, vector &m, lock &cfg_mutex_lock); + bool remove(const string &, lock &cfg_mutex_lock); void reconstruct(lock &cfg_mutex_lock); typedef enum { OK, // response and same view # VIEWERR, // response but different view # FAILURE, // no response } heartbeat_t; - heartbeat_t doheartbeat(const std::string &m); + heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock); public: - config(const std::string &_first, - const std::string &_me, - config_view_change *_vc); + config(const string &_first, const string &_me, config_view_change *_vc); unsigned view_id() { return my_view_id; } - const std::string &myaddr() const { return me; } - std::string dump() { return paxos_acceptor->dump(); } - void get_view(unsigned instance, std::vector &m); - void restore(const std::string &s); - bool add(const std::string &, unsigned view_id); - bool ismember(const std::string &m, unsigned view_id); + const string &myaddr() const { return me; } + string dump() { return paxos_acceptor.dump(); } + void get_view(unsigned instance, vector &m); + void restore(const string &s); + bool add(const string &, unsigned view_id); + bool ismember(const string &m, unsigned view_id); void heartbeater(void); - void paxos_commit(unsigned instance, const std::string &v); - rpcs *get_rpcs() { return paxos_acceptor->get_rpcs(); } - void breakpoint(int b) { paxos_proposer->breakpoint(b); } + void paxos_commit(unsigned instance, const string &v); + // XXX hack; maybe should have its own port number + rpcs *get_rpcs() { return paxos_acceptor.get_rpcs(); } + void breakpoint(int b) { paxos_proposer.breakpoint(b); } }; #endif diff --git a/handle.cc b/handle.cc index ff38a56..d048ead 100644 --- a/handle.cc +++ b/handle.cc @@ -1,110 +1,108 @@ #include "handle.h" -#include -#include "tprintf.h" +#include "threaded_log.h" #include "lock.h" +#include -handle_mgr mgr; +using std::map; -handle::handle(std::string m) -{ - h = mgr.get_handle(m); -} +class hinfo { +public: + rpcc *cl = nullptr; + int refcnt = 0; + bool del = false; + string m; + mutex client_mutex; + hinfo(const string & m_) : m(m_) {} +}; + +class handle_mgr { + private: + mutex mgr_mutex; + map hmap; + void delete_handle(const string & m, lock & handle_mutex_lock); + public: + hinfo *acquire_handle(string m); + void release_handle(hinfo *h); + void delete_handle(const string & m); +}; + +static handle_mgr mgr; -rpcc * -handle::safebind() -{ +handle::handle(const string & m) : h(mgr.acquire_handle(m)) {} + +rpcc * handle::safebind() { if (!h) - return NULL; - lock ml(h->cl_mutex); + return nullptr; + lock ml(h->client_mutex); if (h->del) - return NULL; + return nullptr; if (h->cl) return h->cl; - sockaddr_in dstsock; - make_sockaddr(h->m.c_str(), &dstsock); - rpcc *cl = new rpcc(dstsock); - tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); - int ret; + rpcc *cl = new rpcc(h->m); + LOG("handler_mgr::acquire_handle trying to bind..." << h->m); // The test script assumes that the failure can be detected by paxos and // rsm layer within few seconds. We have to set the timeout with a small // value to support the assumption. // // With RPC_LOSSY=5, tests may fail due to delays and time outs. - ret = cl->bind(rpcc::to(1000)); + int ret = cl->bind(rpcc::to(1000)); if (ret < 0) { - tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); + LOG("handle_mgr::acquire_handle bind failure! " << h->m << " " << ret); delete cl; h->del = true; } else { - tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str()); + LOG("handle_mgr::acquire_handle bind succeeded " << h->m); h->cl = cl; } return h->cl; } -handle::~handle() -{ - if (h) mgr.done_handle(h); -} - -handle_mgr::handle_mgr() -{ +handle::~handle() { + if (h) mgr.release_handle(h); } -struct hinfo * -handle_mgr::get_handle(std::string m) -{ - lock ml(handle_mutex); - struct hinfo *h = 0; +hinfo * handle_mgr::acquire_handle(string m) { + lock ml(mgr_mutex); + hinfo *h = nullptr; if (hmap.find(m) == hmap.end()) { - h = new hinfo; - h->cl = NULL; - h->del = false; - h->refcnt = 1; - h->m = m; + h = new hinfo(m); hmap[m] = h; } else if (!hmap[m]->del) { h = hmap[m]; - h->refcnt ++; } + h->refcnt++; return h; } -void -handle_mgr::done_handle(struct hinfo *h) -{ - lock ml(handle_mutex); - h->refcnt--; - if (h->refcnt == 0 && h->del) - delete_handle_wo(h->m); +void handle_mgr::release_handle(hinfo *h) { + lock ml(mgr_mutex); + if (--h->refcnt == 0 && h->del) + delete_handle(h->m, ml); } -void -handle_mgr::delete_handle(std::string m) -{ - lock ml(handle_mutex); - delete_handle_wo(m); +void handle_mgr::delete_handle(const string & m) { + lock ml(mgr_mutex); + delete_handle(m, ml); } -// Must be called with handle_mutex locked. -void -handle_mgr::delete_handle_wo(std::string m) -{ +void handle_mgr::delete_handle(const string & m, lock &) { if (hmap.find(m) == hmap.end()) { - tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str()); - } else { - tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(), - hmap[m]->refcnt); - struct hinfo *h = hmap[m]; - if (h->refcnt == 0) { - if (h->cl) { - h->cl->cancel(); - delete h->cl; - } - hmap.erase(m); - delete h; - } else { - h->del = true; - } + LOG("handle_mgr::delete_handle: cl " << m << " isn't in cl list"); + return; } + LOG("handle_mgr::delete_handle: cl " << m << " refcnt " << hmap[m]->refcnt); + hinfo *h = hmap[m]; + if (h->refcnt == 0) { + if (h->cl) { + h->cl->cancel(); + delete h->cl; + } + hmap.erase(m); + delete h; + } else + h->del = true; +} + +void invalidate_handle(const string & m) { + mgr.delete_handle(m); } diff --git a/handle.h b/handle.h index 6b042fb..a06b156 100644 --- a/handle.h +++ b/handle.h @@ -23,23 +23,18 @@ #ifndef handle_h #define handle_h -#include -#include #include "rpc/rpc.h" +#include -struct hinfo { - rpcc *cl; - int refcnt; - bool del; - std::string m; - std::mutex cl_mutex; -}; +using std::string; + +class hinfo; class handle { private: - struct hinfo *h; + hinfo *h; public: - handle(std::string m); + handle(const string & m); ~handle(); /* safebind will try to bind with the rpc server on the first call. * Since bind may block, the caller probably should not hold a mutex @@ -62,18 +57,6 @@ class handle { rpcc *safebind(); }; -class handle_mgr { - private: - std::mutex handle_mutex; - std::map hmap; - public: - handle_mgr(); - struct hinfo *get_handle(std::string m); - void done_handle(struct hinfo *h); - void delete_handle(std::string m); - void delete_handle_wo(std::string m); -}; - -extern class handle_mgr mgr; +void invalidate_handle(const string & m); #endif diff --git a/lock.h b/lock.h index 1789ec3..1d62c39 100644 --- a/lock.h +++ b/lock.h @@ -8,13 +8,4 @@ using std::mutex; using lock = std::unique_lock; using cond = std::condition_variable; -class adopt_lock : public lock { -public: - explicit inline adopt_lock(class mutex &m) : std::unique_lock(m, std::adopt_lock) { - } - inline ~adopt_lock() { - release(); - } -}; - #endif diff --git a/lock_client.cc b/lock_client.cc index 035d80b..22e57f1 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -2,29 +2,16 @@ #include "lock_client.h" #include "rpc/rpc.h" -#include -#include #include -#include -#include "tprintf.h" +#include "threaded_log.h" #include #include "rsm_client.h" #include "lock.h" -using std::ostringstream; - -lock_state::lock_state(): - state(none) -{ -} - -void lock_state::wait() { +void lock_state::wait(lock & mutex_lock) { auto self = std::this_thread::get_id(); - { - adopt_lock ml(m); - c[self].wait(ml); - } + c[self].wait(mutex_lock); c.erase(self); } @@ -34,46 +21,34 @@ void lock_state::signal() { c.begin()->second.notify_one(); } -void lock_state::signal(std::thread::id who) { +void lock_state::signal(thread::id who) { if (c.count(who)) c[who].notify_one(); } +typedef map lock_map; + unsigned int lock_client::last_port = 0; lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); - // by the semantics of std::map, this will create - // the lock if it doesn't already exist - return lock_table[lid]; + return lock_table[lid]; // creates the lock if it doesn't already exist } -lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) { - sockaddr_in dstsock; - make_sockaddr(xdst.c_str(), &dstsock); - cl = new rpcc(dstsock); - if (cl->bind() < 0) { +lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) { + cl = new rpcc(xdst); + if (cl->bind() < 0) LOG("lock_client: call bind"); - } srandom((uint32_t)time(NULL)^last_port); rlock_port = ((random()%32000) | (0x1 << 10)); - const char *hname; - // VERIFY(gethostname(hname, 100) == 0); - hname = "127.0.0.1"; - ostringstream host; - host << hname << ":" << rlock_port; - id = host.str(); + id = "127.0.0.1:" + std::to_string(rlock_port); last_port = rlock_port; rpcs *rlsrpc = new rpcs(rlock_port); rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this); rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this); - { - lock sl(xid_mutex); - next_xid = 0; - } rsmc = new rsm_client(xdst); - releaser_thread = std::thread(&lock_client::releaser, this); + releaser_thread = thread(&lock_client::releaser, this); } void lock_client::releaser() [[noreturn]] { @@ -103,7 +78,7 @@ void lock_client::releaser() [[noreturn]] { int lock_client::stat(lock_protocol::lockid_t lid) { VERIFY(0); int r; - lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid); + auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid); VERIFY (ret == lock_protocol::OK); return r; } @@ -134,7 +109,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { { sl.unlock(); int r; - result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); + result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); sl.lock(); } LOG("acquire returned " << result); @@ -165,7 +140,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { } LOG("waiting..."); - st.wait(); + st.wait(sl); LOG("wait ended"); } @@ -241,7 +216,7 @@ t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) { } t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) { - return ((lock_client *)client)->acquire(lid); + return ((lock_client *)client)->release(lid); } t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) { diff --git a/lock_client.h b/lock_client.h index 541cc23..3290d1a 100644 --- a/lock_client.h +++ b/lock_client.h @@ -12,6 +12,7 @@ #include "lang/verify.h" #include "rpc/fifo.h" #include "rsm_client.h" +#include "lock.h" class lock_release_user { public: @@ -20,15 +21,12 @@ class lock_release_user { }; using std::string; +using std::map; using std::thread; using std::list; -using std::map; - -typedef string callback; class lock_state { public: - lock_state(); enum { none = 0, retrying, @@ -36,15 +34,15 @@ public: locked, acquiring, releasing - } state; - std::thread::id held_by; - list wanted_by; + } state = none; + thread::id held_by; + list wanted_by; mutex m; - map c; + map c; lock_protocol::xid_t xid; - void wait(); + void wait(lock & mutex_lock); void signal(); - void signal(std::thread::id who); + void signal(thread::id who); }; typedef map lock_map; @@ -54,7 +52,7 @@ typedef map lock_map; class lock_client { private: rpcc *cl; - std::thread releaser_thread; + thread releaser_thread; rsm_client *rsmc; class lock_release_user *lu; unsigned int rlock_port; diff --git a/lock_demo.cc b/lock_demo.cc index 3a85949..3b38cdf 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -1,11 +1,9 @@ #include "lock_client.h" -#include "tprintf.h" +#include "threaded_log.h" -char tprintf_thread_prefix = 'd'; +char log_thread_prefix = 'd'; -int -main(int argc, char *argv[]) -{ +int main(int argc, char *argv[]) { if(argc != 2) { fprintf(stderr, "Usage: %s [host:]port\n", argv[0]); return 1; diff --git a/lock_protocol.h b/lock_protocol.h index 61f0998..900897a 100644 --- a/lock_protocol.h +++ b/lock_protocol.h @@ -6,13 +6,14 @@ #include "rpc/rpc.h" #include +using std::string; + class lock_protocol { public: - enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR }; - typedef int status; - typedef std::string lockid_t; - typedef unsigned long long xid_t; - enum rpc_numbers { + enum status : status_t { OK, RETRY, RPCERR, NOENT, IOERR }; + using lockid_t = string; + using xid_t = uint64_t; + enum rpc_numbers : proc_t { acquire = 0x7001, release, stat @@ -21,9 +22,8 @@ class lock_protocol { class rlock_protocol { public: - enum xxstatus { OK, RPCERR }; - typedef int status; - enum rpc_numbers { + enum status : status_t { OK, RPCERR }; + enum rpc_numbers : proc_t { revoke = 0x8001, retry = 0x8002 }; diff --git a/lock_server.cc b/lock_server.cc index a82231e..cac6a90 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -6,7 +6,7 @@ #include #include "lang/verify.h" #include "handle.h" -#include "tprintf.h" +#include "threaded_log.h" #include "rpc/marshall.h" #include "lock.h" @@ -74,7 +74,7 @@ void lock_server::revoker() [[noreturn]] { proxy = handle(held_by.first).safebind(); if (proxy) { int r; - rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second); + auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second); LOG("Revoke returned " << ret); } } @@ -97,8 +97,6 @@ void lock_server::retryer() [[noreturn]] { front = st.wanted_by.front(); } - rlock_protocol::status ret = -1; - rpcc *proxy = NULL; // try a few times? //int t=5; @@ -106,7 +104,7 @@ void lock_server::retryer() [[noreturn]] { proxy = handle(front.first).safebind(); if (proxy) { int r; - ret = proxy->call(rlock_protocol::retry, r, lid, front.second); + auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second); LOG("Retry returned " << ret); } } diff --git a/lock_smain.cc b/lock_smain.cc index 363f886..d62a25b 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -1,18 +1,16 @@ #include "rpc/rpc.h" #include #include -#include "tprintf.h" +#include "threaded_log.h" #include #include "lock_server.h" #include "rsm.h" // Main loop of lock_server -char tprintf_thread_prefix = 's'; +char log_thread_prefix = 's'; -int -main(int argc, char *argv[]) -{ +int main(int argc, char *argv[]) { setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); diff --git a/lock_tester.cc b/lock_tester.cc index f4e68bd..ac9175b 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -10,12 +10,12 @@ #include #include #include "lang/verify.h" -#include "tprintf.h" +#include "threaded_log.h" #include #include #include "lock.h" -char tprintf_thread_prefix = 'c'; +char log_thread_prefix = 'c'; // must be >= 2 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. @@ -31,12 +31,10 @@ lock_protocol::lockid_t c = "3"; int ct[256]; std::mutex count_mutex; -void -check_grant(lock_protocol::lockid_t lid) -{ +void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; - if(ct[x] != 0){ + if (ct[x] != 0) { fprintf(stderr, "error: server granted %s twice\n", lid.c_str()); fprintf(stdout, "error: server granted %s twice\n", lid.c_str()); exit(1); @@ -44,22 +42,18 @@ check_grant(lock_protocol::lockid_t lid) ct[x] += 1; } -void -check_release(lock_protocol::lockid_t lid) -{ +void check_release(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; - if(ct[x] != 1){ + if (ct[x] != 1) { fprintf(stderr, "error: client released un-held lock %s\n", lid.c_str()); exit(1); } ct[x] -= 1; } -void -test1(void) -{ - tprintf ("acquire a release a acquire a release a\n"); +void test1(void) { + LOG_NONMEMBER("acquire a release a acquire a release a"); lc[0]->acquire(a); check_grant(a); lc[0]->release(a); @@ -69,7 +63,7 @@ test1(void) lc[0]->release(a); check_release(a); - tprintf ("acquire a acquire b release b release a\n"); + LOG_NONMEMBER("acquire a acquire b release b release a"); lc[0]->acquire(a); check_grant(a); lc[0]->acquire(b); @@ -80,63 +74,51 @@ test1(void) check_release(a); } -void * -test2(int i) -{ - tprintf ("test2: client %d acquire a release a\n", i); +void test2(int i) { + LOG_NONMEMBER("test2: client " << i << " acquire a release a"); lc[i]->acquire(a); - tprintf ("test2: client %d acquire done\n", i); + LOG_NONMEMBER("test2: client " << i << " acquire done"); check_grant(a); sleep(1); - tprintf ("test2: client %d release\n", i); + LOG_NONMEMBER("test2: client " << i << " release"); check_release(a); lc[i]->release(a); - tprintf ("test2: client %d release done\n", i); - return 0; + LOG_NONMEMBER("test2: client " << i << " release done"); } -void * -test3(int i) -{ - tprintf ("test3: client %d acquire a release a concurrent\n", i); +void test3(int i) { + LOG_NONMEMBER("test3: client " << i << " acquire a release a concurrent"); for (int j = 0; j < 10; j++) { lc[i]->acquire(a); check_grant(a); - tprintf ("test3: client %d got lock\n", i); + LOG_NONMEMBER("test3: client " << i << " got lock"); check_release(a); lc[i]->release(a); } - return 0; } -void * -test4(int i) -{ - tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i); +void test4(int i) { + LOG_NONMEMBER("test4: thread " << i << " acquire a release a concurrent; same clnt"); for (int j = 0; j < 10; j++) { lc[0]->acquire(a); check_grant(a); - tprintf ("test4: thread %d on client 0 got lock\n", i); + LOG_NONMEMBER("test4: thread " << i << " on client 0 got lock"); check_release(a); lc[0]->release(a); } - return 0; } -void * -test5(int i) -{ - tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i); +void test5(int i) { + LOG_NONMEMBER("test5: client " << i << " acquire a release a concurrent; same and diff clnt"); for (int j = 0; j < 10; j++) { if (i < 5) lc[0]->acquire(a); else lc[1]->acquire(a); check_grant(a); - tprintf ("test5: client %d got lock\n", i); + LOG_NONMEMBER("test5: client " << i << " got lock"); check_release(a); if (i < 5) lc[0]->release(a); else lc[1]->release(a); } - return 0; } int @@ -149,7 +131,7 @@ main(int argc, char *argv[]) setvbuf(stderr, NULL, _IONBF, 0); srandom((uint32_t)getpid()); - if(argc < 2) { + if (argc < 2) { fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); exit(1); } @@ -158,20 +140,20 @@ main(int argc, char *argv[]) if (argc > 2) { test = atoi(argv[2]); - if(test < 1 || test > 5){ - tprintf("Test number must be between 1 and 5\n"); + if (test < 1 || test > 5) { + LOG_NONMEMBER("Test number must be between 1 and 5"); exit(1); } } - tprintf("cache lock client\n"); + LOG_NONMEMBER("cache lock client"); for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst); - if(!test || test == 1){ + if (!test || test == 1) { test1(); } - if(!test || test == 2){ + if (!test || test == 2) { // test2 for (int i = 0; i < nt; i++) th[i] = std::thread(test2, i); @@ -179,36 +161,33 @@ main(int argc, char *argv[]) th[i].join(); } - if(!test || test == 3){ - tprintf("test 3\n"); + if (!test || test == 3) { + LOG_NONMEMBER("test 3"); - // test3 for (int i = 0; i < nt; i++) th[i] = std::thread(test3, i); for (int i = 0; i < nt; i++) th[i].join(); } - if(!test || test == 4){ - tprintf("test 4\n"); + if (!test || test == 4) { + LOG_NONMEMBER("test 4"); - // test 4 for (int i = 0; i < 2; i++) th[i] = std::thread(test4, i); for (int i = 0; i < 2; i++) th[i].join(); } - if(!test || test == 5){ - tprintf("test 5\n"); + if (!test || test == 5) { + LOG_NONMEMBER("test 5"); - // test 5 for (int i = 0; i < nt; i++) th[i] = std::thread(test5, i); for (int i = 0; i < nt; i++) th[i].join(); } - tprintf ("%s: passed all tests successfully\n", argv[0]); + LOG_NONMEMBER(argv[0] << ": passed all tests successfully"); } diff --git a/log.cc b/log.cc index baf3c2f..627b7ac 100644 --- a/log.cc +++ b/log.cc @@ -1,132 +1,102 @@ #include "paxos.h" #include #include -#include "tprintf.h" +#include "threaded_log.h" // Paxos must maintain some durable state (i.e., that survives power // failures) to run Paxos correct. This module implements a log with // all durable state to run Paxos. Since the values chosen correspond // to views, the log contains all views since the beginning of time. -log::log(acceptor *_acc, std::string _me) - : pxs (_acc) -{ - name = "paxos-" + _me + ".log"; - logread(); +log::log(acceptor *_acc, std::string _me) : pxs (_acc) { + name = "paxos-" + _me + ".log"; + logread(); } -void -log::logread(void) -{ - std::ifstream from; - std::string type; - unsigned instance; +void log::logread(void) { + std::ifstream from; + std::string type; + unsigned instance; - from.open(name.c_str()); - LOG("logread"); - while (from >> type) { - if (type == "done") { - std::string v; - from >> instance; - from.get(); - getline(from, v); - pxs->values[instance] = v; - pxs->instance_h = instance; - LOG("logread: instance: " << instance << " w. v = " << - pxs->values[instance]); - pxs->v_a.clear(); - pxs->n_h.n = 0; - pxs->n_a.n = 0; - } else if (type == "propseen") { - from >> pxs->n_h.n; - from >> pxs->n_h.m; - LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")"); - } else if (type == "accepted") { - std::string v; - from >> pxs->n_a.n; - from >> pxs->n_a.m; - from.get(); - getline(from, v); - pxs->v_a = v; - LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a); - } else { - LOG("logread: unknown log record"); - VERIFY(0); - } - } - from.close(); + from.open(name.c_str()); + LOG("logread"); + while (from >> type) { + if (type == "done") { + std::string v; + from >> instance; + from.get(); + getline(from, v); + pxs->values[instance] = v; + pxs->instance_h = instance; + LOG("logread: instance: " << instance << " w. v = " << + pxs->values[instance]); + pxs->v_a.clear(); + pxs->n_h.n = 0; + pxs->n_a.n = 0; + } else if (type == "propseen") { + from >> pxs->n_h.n; + from >> pxs->n_h.m; + LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")"); + } else if (type == "accepted") { + std::string v; + from >> pxs->n_a.n; + from >> pxs->n_a.m; + from.get(); + getline(from, v); + pxs->v_a = v; + LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a); + } else { + LOG("logread: unknown log record"); + VERIFY(0); + } + } + from.close(); } -std::string -log::dump() -{ - std::ifstream from; - std::string res; - std::string v; - from.open(name.c_str()); - while (getline(from, v)) { - res = res + v + "\n"; - } - from.close(); - return res; +std::string log::dump() { + std::ifstream from; + std::string res; + std::string v; + from.open(name.c_str()); + while (getline(from, v)) + res += v + "\n"; + from.close(); + return res; } -void -log::restore(std::string s) -{ - std::ofstream f; - LOG("restore: " << s); - f.open(name.c_str(), std::ios::trunc); - f << s; - f.close(); +void log::restore(std::string s) { + std::ofstream f; + LOG("restore: " << s); + f.open(name.c_str(), std::ios::trunc); + f << s; + f.close(); } // XXX should be an atomic operation -void -log::loginstance(unsigned instance, std::string v) -{ - std::ofstream f; - f.open(name.c_str(), std::ios::app); - f << "done"; - f << " "; - f << instance; - f << " "; - f << v; - f << "\n"; - f.close(); +void log::loginstance(unsigned instance, std::string v) { + std::ofstream f(name, std::ios::app); + f << "done " << instance << " " << v << "\n"; + f.close(); } // an acceptor should call logprop(n_h) when it // receives a prepare to which it responds prepare_ok(). -void -log::logprop(prop_t n_h) -{ - std::ofstream f; - f.open(name.c_str(), std::ios::app); - f << "propseen"; - f << " "; - f << n_h.n; - f << " "; - f << n_h.m; - f << "\n"; - f.close(); +void log::logprop(prop_t n_h) { + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "propseen"; + f << " "; + f << n_h.n; + f << " "; + f << n_h.m; + f << "\n"; + f.close(); } // an acceptor should call logaccept(n_a, v_a) when it // receives an accept RPC to which it replies accept_ok(). -void -log::logaccept(prop_t n, std::string v) -{ - std::ofstream f; - f.open(name.c_str(), std::ios::app); - f << "accepted"; - f << " "; - f << n.n; - f << " "; - f << n.m; - f << " "; - f << v; - f << "\n"; - f.close(); +void log::logaccept(prop_t n, std::string v) { + std::ofstream f(name, std::ios::app); + f << "accepted " << n.n << " " << n.m << " " << v << "\n"; + f.close(); } - diff --git a/paxos.cc b/paxos.cc index 83bf4f1..46d9c1c 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,10 +1,11 @@ #include "paxos.h" #include "handle.h" -#include -#include "tprintf.h" +#include "threaded_log.h" #include "lang/verify.h" #include "lock.h" +using std::stoi; + // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made // Simple". To kick off an instance of Paxos, the caller supplies a @@ -22,9 +23,9 @@ bool operator>= (const prop_t &a, const prop_t &b) { return (a.n > b.n || (a.n == b.n && a.m >= b.m)); } -std::string -print_members(const std::vector &nodes) { - std::string s; +string +print_members(const vector &nodes) { + string s; s.clear(); for (unsigned i = 0; i < nodes.size(); i++) { s += nodes[i]; @@ -35,7 +36,7 @@ print_members(const std::vector &nodes) { } -bool isamember(const std::string & m, const std::vector & nodes) { +bool isamember(const string & m, const vector & nodes) { for (auto n : nodes) { if (n == m) return 1; @@ -51,8 +52,7 @@ bool proposer::isrunning() { } // check if the servers in l2 contains a majority of servers in l1 -bool proposer::majority(const std::vector &l1, - const std::vector &l2) { +bool proposer::majority(const vector &l1, const vector &l2) { unsigned n = 0; for (unsigned i = 0; i < l1.size(); i++) { @@ -62,8 +62,7 @@ bool proposer::majority(const std::vector &l1, return n >= (l1.size() >> 1) + 1; } -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, - const std::string &_me) +proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me) : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), stable (true) { @@ -76,19 +75,17 @@ void proposer::setn() my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; } -bool proposer::run(unsigned instance, const std::vector & cur_nodes, - const std::string & newv) +bool proposer::run(unsigned instance, const vector & cur_nodes, const string & newv) { - std::vector accepts; - std::vector nodes; - std::string v; + vector accepts; + vector nodes; + string v; bool r = false; lock ml(pxs_mutex); - tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", - print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable); if (!stable) { // already running proposer? - tprintf("proposer::run: already running\n"); + LOG("proposer::run: already running"); return false; } stable = false; @@ -98,7 +95,7 @@ bool proposer::run(unsigned instance, const std::vector & cur_nodes if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of prepare responses\n"); + LOG("paxos::manager: received a majority of prepare responses"); if (v.size() == 0) v = newv; @@ -110,20 +107,20 @@ bool proposer::run(unsigned instance, const std::vector & cur_nodes accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of accept responses\n"); + LOG("paxos::manager: received a majority of accept responses"); breakpoint2(); decide(instance, accepts, v); r = true; } else { - tprintf("paxos::manager: no majority of accept responses\n"); + LOG("paxos::manager: no majority of accept responses"); } } else { - tprintf("paxos::manager: no majority of prepare responses\n"); + LOG("paxos::manager: no majority of prepare responses"); } } else { - tprintf("paxos::manager: prepare is rejected %d\n", stable); + LOG("paxos::manager: prepare is rejected " << stable); } stable = true; return r; @@ -135,9 +132,9 @@ bool proposer::run(unsigned instance, const std::vector & cur_nodes // otherwise fill in accepts with set of nodes that accepted, // set v to the v_a with the highest n_a, and return true. bool -proposer::prepare(unsigned instance, std::vector & accepts, - const std::vector & nodes, - std::string & v) +proposer::prepare(unsigned instance, vector & accepts, + const vector & nodes, + string & v) { struct paxos_protocol::preparearg arg = { instance, my_n }; struct paxos_protocol::prepareres res; @@ -150,14 +147,14 @@ proposer::prepare(unsigned instance, std::vector & accepts, int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); if (status == paxos_protocol::OK) { if (res.oldinstance) { - tprintf("commiting old instance!\n"); + LOG("commiting old instance!"); acc->commit(instance, res.v_a); return false; } if (res.accept) { accepts.push_back(i); if (res.n_a >= n_a) { - tprintf("found a newer accepted proposal\n"); + LOG("found a newer accepted proposal"); v = res.v_a; n_a = res.n_a; } @@ -170,8 +167,8 @@ proposer::prepare(unsigned instance, std::vector & accepts, // run() calls this to send out accept RPCs to accepts. // fill in accepts with list of nodes that accepted. void -proposer::accept(unsigned instance, std::vector & accepts, - const std::vector & nodes, const std::string & v) +proposer::accept(unsigned instance, vector & accepts, + const vector & nodes, const string & v) { struct paxos_protocol::acceptarg arg = { instance, my_n, v }; rpcc *r; @@ -187,8 +184,8 @@ proposer::accept(unsigned instance, std::vector & accepts, } void -proposer::decide(unsigned instance, const std::vector & accepts, - const std::string & v) +proposer::decide(unsigned instance, const vector & accepts, + const string & v) { struct paxos_protocol::decidearg arg = { instance, v }; rpcc *r; @@ -201,8 +198,8 @@ proposer::decide(unsigned instance, const std::vector & accepts, } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me, - const std::string & _value) +acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me, + const string & _value) : cfg(_cfg), me (_me), instance_h(0) { n_h.n = 0; @@ -219,14 +216,14 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _m instance_h = 1; } - pxs = new rpcs((uint32_t)std::stoi(_me)); + pxs = new rpcs((uint32_t)stoi(_me)); pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); } paxos_protocol::status -acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, +acceptor::preparereq(paxos_protocol::prepareres & r, const string &, paxos_protocol::preparearg a) { lock ml(pxs_mutex); @@ -242,13 +239,13 @@ acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, l->logprop(n_h); r.accept = true; } else { - tprintf("I totally rejected this request. Ha.\n"); + LOG("I totally rejected this request. Ha."); } return paxos_protocol::OK; } paxos_protocol::status -acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) +acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a) { lock ml(pxs_mutex); r = false; @@ -263,11 +260,10 @@ acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) // the src argument is only for debugging paxos_protocol::status -acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) +acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a) { lock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); + LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a); if (a.instance == instance_h + 1) { VERIFY(v_a == a.v); commit(a.instance, v_a, ml); @@ -281,11 +277,11 @@ acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) } void -acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock) +acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock) { - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + LOG("acceptor::commit: instance=" << instance << " has v=" << value); if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); + LOG("commit: highestaccepteinstance = " << instance); values[instance] = value; l->loginstance(instance, value); instance_h = instance; @@ -303,20 +299,20 @@ acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_ } void -acceptor::commit(unsigned instance, const std::string & value) +acceptor::commit(unsigned instance, const string & value) { lock ml(pxs_mutex); commit(instance, value, ml); } -std::string +string acceptor::dump() { return l->dump(); } void -acceptor::restore(const std::string & s) +acceptor::restore(const string & s) { l->restore(s); l->logread(); @@ -331,7 +327,7 @@ void proposer::breakpoint1() { if (break1) { - tprintf("Dying at breakpoint 1!\n"); + LOG("Dying at breakpoint 1!"); exit(1); } } @@ -341,7 +337,7 @@ void proposer::breakpoint2() { if (break2) { - tprintf("Dying at breakpoint 2!\n"); + LOG("Dying at breakpoint 2!"); exit(1); } } @@ -350,10 +346,10 @@ void proposer::breakpoint(int b) { if (b == 3) { - tprintf("Proposer: breakpoint 1\n"); + LOG("Proposer: breakpoint 1"); break1 = true; } else if (b == 4) { - tprintf("Proposer: breakpoint 2\n"); + LOG("Proposer: breakpoint 2"); break2 = true; } } diff --git a/paxos.h b/paxos.h index 9650de1..8561dd5 100644 --- a/paxos.h +++ b/paxos.h @@ -3,15 +3,19 @@ #include #include +#include #include "rpc/rpc.h" #include "paxos_protocol.h" #include "log.h" #include "lock.h" +using std::string; +using std::map; +using std::vector; class paxos_change { public: - virtual void paxos_commit(unsigned instance, const std::string & v) = 0; + virtual void paxos_commit(unsigned instance, const string & v) = 0; virtual ~paxos_change() {} }; @@ -20,49 +24,49 @@ class acceptor { log *l; rpcs *pxs; paxos_change *cfg; - std::string me; + string me; mutex pxs_mutex; // Acceptor state prop_t n_h; // number of the highest proposal seen in a prepare prop_t n_a; // number of highest proposal accepted - std::string v_a; // value of highest proposal accepted + string v_a; // value of highest proposal accepted unsigned instance_h; // number of the highest instance we have decided - std::map values; // vals of each instance + map values; // vals of each instance - void commit(unsigned instance, const std::string & v, lock & pxs_mutex_lock); + void commit(unsigned instance, const string & v, lock & pxs_mutex_lock); paxos_protocol::status preparereq(paxos_protocol::prepareres & r, - const std::string & src, paxos_protocol::preparearg a); - paxos_protocol::status acceptreq(bool & r, const std::string & src, + const string & src, paxos_protocol::preparearg a); + paxos_protocol::status acceptreq(bool & r, const string & src, paxos_protocol::acceptarg a); - paxos_protocol::status decidereq(int & r, const std::string & src, + paxos_protocol::status decidereq(int & r, const string & src, paxos_protocol::decidearg a); friend class log; public: - acceptor(class paxos_change *cfg, bool _first, const std::string & _me, - const std::string & _value); + acceptor(class paxos_change *cfg, bool _first, const string & _me, + const string & _value); ~acceptor() {} - void commit(unsigned instance, const std::string & v); + void commit(unsigned instance, const string & v); unsigned instance() { return instance_h; } - const std::string & value(unsigned instance) { return values[instance]; } - std::string dump(); - void restore(const std::string &); + const string & value(unsigned instance) { return values[instance]; } + string dump(); + void restore(const string &); rpcs *get_rpcs() { return pxs; } prop_t get_n_h() { return n_h; } unsigned get_instance_h() { return instance_h; } }; -extern bool isamember(const std::string & m, const std::vector & nodes); -extern std::string print_members(const std::vector & nodes); +extern bool isamember(const string & m, const vector & nodes); +extern string print_members(const vector & nodes); class proposer { private: log *l; paxos_change *cfg; acceptor *acc; - std::string me; + string me; bool break1; bool break2; @@ -73,23 +77,23 @@ class proposer { prop_t my_n; // number of the last proposal used in this instance void setn(); - bool prepare(unsigned instance, std::vector & accepts, - const std::vector & nodes, - std::string & v); - void accept(unsigned instance, std::vector & accepts, - const std::vector & nodes, const std::string & v); - void decide(unsigned instance, const std::vector & accepts, - const std::string & v); + bool prepare(unsigned instance, vector & accepts, + const vector & nodes, + string & v); + void accept(unsigned instance, vector & accepts, + const vector & nodes, const string & v); + void decide(unsigned instance, const vector & accepts, + const string & v); void breakpoint1(); void breakpoint2(); - bool majority(const std::vector & l1, const std::vector & l2); + bool majority(const vector & l1, const vector & l2); friend class log; public: - proposer(class paxos_change *cfg, class acceptor *_acceptor, const std::string &_me); + proposer(class paxos_change *cfg, class acceptor *_acceptor, const string &_me); ~proposer() {} - bool run(unsigned instance, const std::vector & cnodes, const std::string & v); + bool run(unsigned instance, const vector & cnodes, const string & v); bool isrunning(); void breakpoint(int b); }; diff --git a/paxos_protocol.h b/paxos_protocol.h index 734ca51..f2bdb3f 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -4,130 +4,82 @@ #include "rpc/rpc.h" struct prop_t { - unsigned n; - std::string m; + unsigned n; + std::string m; }; class paxos_protocol { - public: - enum xxstatus { OK, ERR }; - typedef int status; - enum rpc_numbers { - preparereq = 0x11001, - acceptreq, - decidereq, - heartbeat, - }; - - struct preparearg { - unsigned instance; - prop_t n; - }; - - struct prepareres { - bool oldinstance; - bool accept; - prop_t n_a; - std::string v_a; - }; - - struct acceptarg { - unsigned instance; - prop_t n; - std::string v; - }; - - struct decidearg { - unsigned instance; - std::string v; - }; - + public: + enum status : status_t { OK, ERR }; + enum rpc_numbers : proc_t { + preparereq = 0x11001, + acceptreq, + decidereq, + heartbeat, + }; + + struct preparearg { + unsigned instance; + prop_t n; + }; + + struct prepareres { + bool oldinstance; + bool accept; + prop_t n_a; + std::string v_a; + }; + + struct acceptarg { + unsigned instance; + prop_t n; + std::string v; + }; + + struct decidearg { + unsigned instance; + std::string v; + }; }; -inline unmarshall & -operator>>(unmarshall &u, prop_t &a) -{ - u >> a.n; - u >> a.m; - return u; +inline unmarshall & operator>>(unmarshall &u, prop_t &a) { + return u >> a.n >> a.m; } -inline marshall & -operator<<(marshall &m, prop_t a) -{ - m << a.n; - m << a.m; - return m; +inline marshall & operator<<(marshall &m, prop_t a) { + return m << a.n << a.m; } -inline unmarshall & -operator>>(unmarshall &u, paxos_protocol::preparearg &a) -{ - u >> a.instance; - u >> a.n; - return u; +inline unmarshall & operator>>(unmarshall &u, paxos_protocol::preparearg &a) { + return u >> a.instance >> a.n; } -inline marshall & -operator<<(marshall &m, paxos_protocol::preparearg a) -{ - m << a.instance; - m << a.n; - return m; +inline marshall & operator<<(marshall &m, paxos_protocol::preparearg a) { + return m << a.instance << a.n; } -inline unmarshall & -operator>>(unmarshall &u, paxos_protocol::prepareres &r) -{ - u >> r.oldinstance; - u >> r.accept; - u >> r.n_a; - u >> r.v_a; - return u; +inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) { + return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a; } -inline marshall & -operator<<(marshall &m, paxos_protocol::prepareres r) -{ - m << r.oldinstance; - m << r.accept; - m << r.n_a; - m << r.v_a; - return m; +inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) { + return m << r.oldinstance << r.accept << r.n_a << r.v_a; } -inline unmarshall & -operator>>(unmarshall &u, paxos_protocol::acceptarg &a) -{ - u >> a.instance; - u >> a.n; - u >> a.v; - return u; +inline unmarshall & operator>>(unmarshall &u, paxos_protocol::acceptarg &a) { + return u >> a.instance >> a.n >> a.v; } -inline marshall & -operator<<(marshall &m, paxos_protocol::acceptarg a) -{ - m << a.instance; - m << a.n; - m << a.v; - return m; +inline marshall & operator<<(marshall &m, paxos_protocol::acceptarg a) { + return m << a.instance << a.n << a.v; } -inline unmarshall & -operator>>(unmarshall &u, paxos_protocol::decidearg &a) -{ - u >> a.instance; - u >> a.v; - return u; +inline unmarshall & operator>>(unmarshall &u, paxos_protocol::decidearg &a) { + return u >> a.instance >> a.v; } -inline marshall & -operator<<(marshall &m, paxos_protocol::decidearg a) -{ - m << a.instance; - m << a.v; - return m; +inline marshall & operator<<(marshall &m, paxos_protocol::decidearg a) { + return m << a.instance << a.v; } #endif diff --git a/rpc/marshall.h b/rpc/marshall.h index 676a682..abeaae7 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -12,11 +12,14 @@ #include #include "lang/verify.h" +using proc_t = uint32_t; +using status_t = int32_t; + struct request_header { - request_header(int x=0, int p=0, unsigned c=0, unsigned s=0, int xi=0) : + request_header(int x=0, proc_t p=0, unsigned c=0, unsigned s=0, int xi=0) : xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} int xid; - int proc; + proc_t proc; unsigned int clt_nonce; unsigned int srv_nonce; int xid_rep; @@ -134,7 +137,14 @@ marshall& operator<<(marshall &, int16_t); marshall& operator<<(marshall &, uint64_t); marshall& operator<<(marshall &, const std::string &); -template marshall & +template +struct is_enumerable : std::false_type {}; + +template struct is_enumerable().cbegin(), std::declval().cend(), void()) +> : std::true_type {}; + +template typename std::enable_if::value, marshall>::type & operator<<(marshall &m, const A &x) { m << (unsigned int) x.size(); for (const auto &a : x) @@ -144,9 +154,17 @@ operator<<(marshall &m, const A &x) { template marshall & operator<<(marshall &m, const std::pair &d) { - m << d.first; - m << d.second; - return m; + return m << d.first << d.second; +} + +template +using enum_type_t = typename std::enable_if::value, typename std::underlying_type::type>::type; +template constexpr inline enum_type_t from_enum(E e) noexcept { return (enum_type_t)e; } +template constexpr inline E to_enum(enum_type_t value) noexcept { return (E)value; } + +template typename std::enable_if::value, marshall>::type & +operator<<(marshall &m, E e) { + return m << from_enum(e); } class unmarshall; @@ -162,6 +180,8 @@ unmarshall& operator>>(unmarshall &, size_t &); unmarshall& operator>>(unmarshall &, uint64_t &); unmarshall& operator>>(unmarshall &, int64_t &); unmarshall& operator>>(unmarshall &, std::string &); +template typename std::enable_if::value, unmarshall>::type & +operator>>(unmarshall &u, E &e); class unmarshall { private: @@ -235,7 +255,8 @@ class unmarshall { } }; -template unmarshall & operator>>(unmarshall &u, A &x) { +template typename std::enable_if::value, unmarshall>::type & +operator>>(unmarshall &u, A &x) { unsigned n = u.grab(); x.clear(); while (n--) @@ -257,6 +278,12 @@ operator>>(unmarshall &u, std::pair &d) { return u >> d.first >> d.second; } +template typename std::enable_if::value, unmarshall>::type & +operator>>(unmarshall &u, E &e) { + e = to_enum(u.grab>()); + return u; +} + typedef std::function handler; // @@ -311,17 +338,17 @@ struct VerifyOnFailure { // One for function pointers... -template -typename std::enable_if::value, int>::type -invoke(F f, void *, R & r, args_type & t, tuple_indices) { +template +typename std::enable_if::value, RV>::type +invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { return f(r, std::move(std::get(t))...); } // And one for pointers to member functions... -template -typename std::enable_if::value, int>::type -invoke(F f, C *c, R & r, args_type & t, tuple_indices) { +template +typename std::enable_if::value, RV>::type +invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices) { return (c->*f)(r, std::move(std::get(t))...); } @@ -339,8 +366,8 @@ template -struct marshalled_func_imp { +template +struct marshalled_func_imp { static inline handler *wrap(F f, C *c=nullptr) { // This type definition corresponds to an empty struct with // template parameters running from 0 up to (# args) - 1. @@ -351,20 +378,20 @@ struct marshalled_func_imp { using ArgsStorage = std::tuple::type...>; // Allocate a handler (i.e. std::function) to hold the lambda // which will unmarshall RPCs and call f. - return new handler([=](unmarshall &u, marshall &m) -> int { + return new handler([=](unmarshall &u, marshall &m) -> RV { // Unmarshall each argument with the correct type and store the // result in a tuple. ArgsStorage t = {u.grab::type>()...}; // Verify successful unmarshalling of the entire input stream. if (!u.okdone()) - return ErrorHandler::unmarshall_args_failure(); + return (RV)ErrorHandler::unmarshall_args_failure(); // Allocate space for the RPC response -- will be passed into the // function as an lvalue reference. R r; // Perform the invocation. Note that Indices() calls the default // constructor of the empty struct with the special template // parameters. - int b = invoke(f, c, r, t, Indices()); + RV b = invoke(RV(), f, c, r, t, Indices()); // Marshall the response. m << r; // Make like a tree. @@ -381,13 +408,13 @@ struct marshalled_func_imp { template struct marshalled_func; -template -struct marshalled_func : - public marshalled_func_imp {}; +template +struct marshalled_func : + public marshalled_func_imp {}; -template -struct marshalled_func : - public marshalled_func_imp {}; +template +struct marshalled_func : + public marshalled_func_imp {}; template struct marshalled_func> : diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 5e43547..c6d93c8 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -64,9 +64,11 @@ #include "lock.h" #include "jsl_log.h" -#include "tprintf.h" +#include "threaded_log.h" #include "lang/verify.h" +using std::stoi; + const rpcc::TO rpcc::to_max = { 120000 }; const rpcc::TO rpcc::to_min = { 1000 }; @@ -86,8 +88,8 @@ void set_rand_seed() srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); } -rpcc::rpcc(sockaddr_in d, bool retrans) : - dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), +rpcc::rpcc(const string & d, bool retrans) : + dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) { if(retrans){ @@ -146,7 +148,7 @@ rpcc::bind(TO to) rpcc::cancel(void) { lock ml(m_); - tprintf("rpcc::cancel: force callers to fail"); + LOG("rpcc::cancel: force callers to fail"); for(auto &p : calls_){ caller *ca = p.second; @@ -163,11 +165,11 @@ rpcc::cancel(void) destroy_wait_ = true; destroy_wait_c_.wait(ml); } - tprintf("rpcc::cancel: done"); + LOG("rpcc::cancel: done"); } int -rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, +rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { @@ -189,7 +191,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, ca.xid = xid_++; calls_[ca.xid] = &ca; - req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); + req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); xid_rep = xid_rep_window_.front(); } @@ -428,7 +430,7 @@ rpcs::got_pdu(connection *c, char *b, size_t sz) } void -rpcs::reg1(unsigned int proc, handler *h) +rpcs::reg1(proc_t proc, handler *h) { lock pl(procs_m_); VERIFY(procs_.count(proc) == 0); @@ -437,18 +439,18 @@ rpcs::reg1(unsigned int proc, handler *h) } void -rpcs::updatestat(unsigned int proc) +rpcs::updatestat(proc_t proc) { lock cl(count_m_); counts_[proc]++; curr_counts_--; if(curr_counts_ == 0){ - tprintf("RPC STATS: "); + LOG("RPC STATS: "); for (auto i = counts_.begin(); i != counts_.end(); i++) - tprintf("%x:%lu ", i->first, i->second); + LOG(std::hex << i->first << ":" << std::dec << i->second); lock rwl(reply_window_m_); - std::map >::iterator clt; + map >::iterator clt; size_t totalrep = 0, maxrep = 0; for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ @@ -471,7 +473,7 @@ rpcs::dispatch(djob_t *j) request_header h; req.unpack_req_header(&h); - unsigned int proc = (unsigned int)h.proc; + proc_t proc = h.proc; if(!req.ok()){ jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); @@ -630,14 +632,14 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, { lock rwl(reply_window_m_); - std::list &l = reply_window_[clt_nonce]; + list &l = reply_window_[clt_nonce]; VERIFY(l.size() > 0); VERIFY(xid >= xid_rep); int past_xid_rep = l.begin()->xid; - std::list::iterator start = l.begin(), it; + list::iterator start = l.begin(), it; it = ++start; if (past_xid_rep < xid_rep || past_xid_rep == -1) { @@ -685,8 +687,8 @@ rpcs::add_reply(unsigned int clt_nonce, int xid, { lock rwl(reply_window_m_); // remember the RPC reply value - std::list &l = reply_window_[clt_nonce]; - std::list::iterator it = l.begin(); + list &l = reply_window_[clt_nonce]; + list::iterator it = l.begin(); // skip to our place in the list for (it++; it != l.end() && it->xid < xid; it++); // there should already be an entry, so whine if there isn't @@ -748,7 +750,7 @@ marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; } marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; } marshall & -operator<<(marshall &m, const std::string &s) { +operator<<(marshall &m, const string &s) { m << (unsigned int) s.size(); m.rawbytes(s.data(), s.size()); return m; @@ -797,7 +799,7 @@ unmarshall::rawbyte() } void -unmarshall::rawbytes(std::string &ss, size_t n) +unmarshall::rawbytes(string &ss, size_t n) { VERIFY(ensure(n)); ss.assign(buf_+index_, n); @@ -825,7 +827,7 @@ unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes(x); ret unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes(xx); x = xx; return u; } unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes(x); return u; } unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes(x); return u; } -unmarshall & operator>>(unmarshall &u, std::string &s) { +unmarshall & operator>>(unmarshall &u, string &s) { unsigned sz = u.grab(); if(u.ok()) u.rawbytes(s, sz); @@ -839,24 +841,23 @@ bool operator<(const sockaddr_in &a, const sockaddr_in &b){ } /*---------------auxilary function--------------*/ -void -make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) { +sockaddr_in make_sockaddr(const string &hostandport) { auto colon = hostandport.find(':'); - if (colon == std::string::npos) - make_sockaddr("127.0.0.1", hostandport, dst); + if (colon == string::npos) + return make_sockaddr("127.0.0.1", hostandport); else - make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst); + return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1)); } -void -make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) { - bzero(dst, sizeof(*dst)); - dst->sin_family = AF_INET; +sockaddr_in make_sockaddr(const string &host, const string &port) { + sockaddr_in dst; + bzero(&dst, sizeof(dst)); + dst.sin_family = AF_INET; struct in_addr a{inet_addr(host.c_str())}; if(a.s_addr != INADDR_NONE) - dst->sin_addr.s_addr = a.s_addr; + dst.sin_addr.s_addr = a.s_addr; else { struct hostent *hp = gethostbyname(host.c_str()); @@ -865,7 +866,8 @@ make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_ exit(1); } memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); - dst->sin_addr.s_addr = a.s_addr; + dst.sin_addr.s_addr = a.s_addr; } - dst->sin_port = hton((uint16_t)std::stoi(port)); + dst.sin_port = hton((uint16_t)stoi(port)); + return dst; } diff --git a/rpc/rpc.h b/rpc/rpc.h index c0420a5..d81a5dd 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -10,10 +10,11 @@ #include "thr_pool.h" #include "marshall.h" #include "connection.h" +#include "lock.h" -#ifdef DMALLOC -#include "dmalloc.h" -#endif +using std::string; +using std::map; +using std::list; class rpc_const { public: @@ -43,8 +44,8 @@ class rpcc : public chanmgr { unmarshall *un; int intret; bool done; - std::mutex m; - std::condition_variable c; + mutex m; + cond c; }; void get_refconn(connection **ch); @@ -62,27 +63,27 @@ class rpcc : public chanmgr { connection *chan_; - std::mutex m_; // protect insert/delete to calls[] - std::mutex chan_m_; + mutex m_; // protect insert/delete to calls[] + mutex chan_m_; bool destroy_wait_; - std::condition_variable destroy_wait_c_; + cond destroy_wait_c_; - std::map calls_; - std::list xid_rep_window_; + map calls_; + list xid_rep_window_; struct request { request() { clear(); } void clear() { buf.clear(); xid = -1; } bool isvalid() { return xid != -1; } - std::string buf; + string buf; int xid; }; struct request dup_req_; int xid_rep_done_; public: - rpcc(sockaddr_in d, bool retrans=true); + rpcc(const string & d, bool retrans=true); ~rpcc(); struct TO { @@ -99,27 +100,26 @@ class rpcc : public chanmgr { void set_reachable(bool r) { reachable_ = r; } void cancel(); - - int islossy() { return lossytest_ > 0; } - int call1(unsigned int proc, + int islossy() { return lossytest_ > 0; } + + int call1(proc_t proc, marshall &req, unmarshall &rep, TO to); bool got_pdu(connection *c, char *b, size_t sz); - template - int call_m(unsigned int proc, marshall &req, R & r, TO to); + int call_m(proc_t proc, marshall &req, R & r, TO to); template - inline int call(unsigned int proc, R & r, const Args&... args); + inline int call(proc_t proc, R & r, const Args&... args); template - inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args); + inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args); }; template int -rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) +rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) { unmarshall u; int intret = call1(proc, req, u, to); @@ -136,13 +136,13 @@ rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) } template inline int -rpcc::call(unsigned int proc, R & r, const Args&... args) +rpcc::call(proc_t proc, R & r, const Args&... args) { return call_timeout(proc, rpcc::to_max, r, args...); } template inline int -rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args) +rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args) { marshall m{args...}; return call_m(proc, m, r, to); @@ -191,7 +191,7 @@ class rpcs : public chanmgr { // provide at most once semantics by maintaining a window of replies // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - std::map > reply_window_; + map > reply_window_; void free_reply_window(void); void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz); @@ -200,26 +200,26 @@ class rpcs : public chanmgr { int xid, int rep_xid, char **b, size_t *sz); - void updatestat(unsigned int proc); + void updatestat(proc_t proc); // latest connection to the client - std::map conns_; + map conns_; // counting const size_t counting_; size_t curr_counts_; - std::map counts_; + map counts_; int lossytest_; bool reachable_; // map proc # to function - std::map procs_; + map procs_; - std::mutex procs_m_; // protect insert/delete to procs[] - std::mutex count_m_; //protect modification of counts - std::mutex reply_window_m_; // protect reply window et al - std::mutex conss_m_; // protect conns_ + mutex procs_m_; // protect insert/delete to procs[] + mutex count_m_; //protect modification of counts + mutex reply_window_m_; // protect reply window et al + mutex conss_m_; // protect conns_ protected: @@ -233,7 +233,7 @@ class rpcs : public chanmgr { void dispatch(djob_t *); // internal handler registration - void reg1(unsigned int proc, handler *); + void reg1(proc_t proc, handler *); ThrPool* dispatchpool_; tcpsconn* listener_; @@ -249,7 +249,7 @@ class rpcs : public chanmgr { bool got_pdu(connection *c, char *b, size_t sz); - template void reg(unsigned int proc, F f, C *c=nullptr); + template void reg(proc_t proc, F f, C *c=nullptr); }; struct ReturnOnFailure { @@ -258,12 +258,11 @@ struct ReturnOnFailure { } }; -template void rpcs::reg(unsigned int proc, F f, C *c) { +template void rpcs::reg(proc_t proc, F f, C *c) { reg1(proc, marshalled_func::wrap(f, c)); } -void make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst); -void make_sockaddr(const std::string &host, const std::string &port, struct - sockaddr_in *dst); +sockaddr_in make_sockaddr(const string &hostandport); +sockaddr_in make_sockaddr(const string &host, const string &port); #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index dbb10c6..bf8a56c 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -3,9 +3,10 @@ #include "rpc.h" #include -#include +#include +#include +#include #include -#include #include #include #include @@ -14,11 +15,17 @@ #define NUM_CL 2 -char tprintf_thread_prefix = 'r'; +char log_thread_prefix = 'r'; + +using std::string; +using std::cout; +using std::endl; +using std::vector; +using std::thread; rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object -struct sockaddr_in dst; //server's ip address +string dst; //server's ip address int port; // server-side handlers. they must be methods of some class @@ -26,10 +33,10 @@ int port; // from multiple classes. class srv { public: - int handle_22(std::string & r, const std::string a, const std::string b); + int handle_22(string & r, const string a, const string b); int handle_fast(int &r, const int a); int handle_slow(int &r, const int a); - int handle_bigrep(std::string &r, const size_t a); + int handle_bigrep(string &r, const size_t a); }; // a handler. a and b are arguments, r is the result. @@ -40,7 +47,7 @@ class srv { // at these argument types, so this function definition // does what a .x file does in SunRPC. int -srv::handle_22(std::string &r, const std::string a, std::string b) +srv::handle_22(string &r, const string a, string b) { r = a + b; return 0; @@ -62,9 +69,9 @@ srv::handle_slow(int &r, const int a) } int -srv::handle_bigrep(std::string &r, const size_t len) +srv::handle_bigrep(string &r, const size_t len) { - r = std::string((size_t)len, 'x'); + r = string((size_t)len, 'x'); return 0; } @@ -88,7 +95,7 @@ testmarshall() VERIFY(m.size()==RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; - std::string s = std::string("hallo...."); + string s = "hallo...."; m << i; m << l; m << s; @@ -104,7 +111,7 @@ testmarshall() VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; unsigned long long l1; - std::string s1; + string s1; un >> i1; un >> l1; un >> s1; @@ -120,12 +127,11 @@ client1(size_t cl) for(int i = 0; i < 100; i++){ int arg = (random() % 2000); - std::string rep; + string rep; int ret = clients[which_cl]->call(25, rep, arg); VERIFY(ret == 0); - if ((int)rep.size()!=arg) { - printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); - } + if ((int)rep.size()!=arg) + cout << "repsize wrong " << rep.size() << "!=" << arg << endl; VERIFY((int)rep.size() == arg); } @@ -142,7 +148,7 @@ client1(size_t cl) auto end = std::chrono::steady_clock::now(); auto diff = std::chrono::duration_cast(end - start).count(); if (ret != 0) - printf("%d ms have elapsed!!!\n", (int)diff); + cout << diff << " ms have elapsed!!!" << endl; VERIFY(ret == 0); VERIFY(rep == (which ? arg+1 : arg+2)); } @@ -158,12 +164,10 @@ client2(size_t cl) while(time(0) - t1 < 10){ int arg = (random() % 2000); - std::string rep; + string rep; int ret = clients[which_cl]->call(25, rep, arg); - if ((int)rep.size()!=arg) { - printf("ask for %d reply got %d ret %d\n", - arg, (int)rep.size(), ret); - } + if ((int)rep.size()!=arg) + cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl; VERIFY((int)rep.size() == arg); } } @@ -184,57 +188,53 @@ client3(void *xx) void simple_tests(rpcc *c) { - printf("simple_tests\n"); + cout << "simple_tests" << endl; // an RPC call to procedure #22. // rpcc::call() looks at the argument types to decide how // to marshall the RPC call packet, and how to unmarshall // the reply packet. - std::string rep; - int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye"); + string rep; + int intret = c->call(22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == 0); // this is what handle_22 returns VERIFY(rep == "hello goodbye"); - printf(" -- string concat RPC .. ok\n"); + cout << " -- string concat RPC .. ok" << endl; // small request, big reply (perhaps req via UDP, reply via TCP) intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); VERIFY(intret == 0); VERIFY(rep.size() == 70000); - printf(" -- small request, big reply .. ok\n"); + cout << " -- small request, big reply .. ok" << endl; // specify a timeout value to an RPC that should succeed (udp) int xx = 0; intret = c->call_timeout(23, rpcc::to(3000), xx, 77); VERIFY(intret == 0 && xx == 78); - printf(" -- no spurious timeout .. ok\n"); + cout << " -- no spurious timeout .. ok" << endl; // specify a timeout value to an RPC that should succeed (tcp) { - std::string arg(1000, 'x'); - std::string rep2; - c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x"); + string arg(1000, 'x'); + string rep2; + c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x"); VERIFY(rep2.size() == 1001); - printf(" -- no spurious timeout .. ok\n"); + cout << " -- no spurious timeout .. ok" << endl; } // huge RPC - std::string big(1000000, 'x'); - intret = c->call(22, rep, big, (std::string)"z"); + string big(1000000, 'x'); + intret = c->call(22, rep, big, (string)"z"); VERIFY(rep.size() == 1000001); - printf(" -- huge 1M rpc request .. ok\n"); + cout << " -- huge 1M rpc request .. ok" << endl; // specify a timeout value to an RPC that should timeout (udp) - struct sockaddr_in non_existent; - memset(&non_existent, 0, sizeof(non_existent)); - non_existent.sin_family = AF_INET; - non_existent.sin_addr.s_addr = inet_addr("127.0.0.1"); - non_existent.sin_port = htons(7661); + string non_existent = "127.0.0.1:7661"; rpcc *c1 = new rpcc(non_existent); time_t t0 = time(0); intret = c1->bind(rpcc::to(3000)); time_t t1 = time(0); VERIFY(intret < 0 && (t1 - t0) <= 4); - printf(" -- rpc timeout .. ok\n"); - printf("simple_tests OK\n"); + cout << " -- rpc timeout .. ok" << endl; + cout << "simple_tests OK" << endl; } void @@ -243,23 +243,23 @@ concurrent_test(size_t nt) // create threads that make lots of calls in parallel, // to test thread synchronization for concurrent calls // and dispatches. - printf("start concurrent_test (%lu threads) ...", nt); + cout << "start concurrent_test (" << nt << " threads) ..."; - std::vector th(nt); + vector th(nt); for(size_t i = 0; i < nt; i++) - th[i] = std::thread(client1, i); + th[i] = thread(client1, i); for(size_t i = 0; i < nt; i++) th[i].join(); - printf(" OK\n"); + cout << " OK" << endl; } void lossy_test() { - printf("start lossy_test ..."); + cout << "start lossy_test ..."; VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); if (server) { @@ -275,15 +275,15 @@ lossy_test() size_t nt = 1; - std::vector th(nt); + vector th(nt); for(size_t i = 0; i < nt; i++) - th[i] = std::thread(client2, i); + th[i] = thread(client2, i); for(size_t i = 0; i < nt; i++) th[i].join(); - printf(".. OK\n"); + cout << ".. OK" << endl; VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); } @@ -293,22 +293,22 @@ failure_test() rpcc *client1; rpcc *client = clients[0]; - printf("failure_test\n"); + cout << "failure_test" << endl; delete server; client1 = new rpcc(dst); VERIFY (client1->bind(rpcc::to(3000)) < 0); - printf(" -- create new client and try to bind to failed server .. failed ok\n"); + cout << " -- create new client and try to bind to failed server .. failed ok" << endl; delete client1; startserver(); - std::string rep; - int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); + string rep; + int intret = client->call(22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == rpc_const::oldsrv_failure); - printf(" -- call recovered server with old client .. failed ok\n"); + cout << " -- call recovered server with old client .. failed ok" << endl; delete client; @@ -316,25 +316,25 @@ failure_test() VERIFY (client->bind() >= 0); VERIFY (client->bind() < 0); - intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); + intret = client->call(22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == 0); VERIFY(rep == "hello goodbye"); - printf(" -- delete existing rpc client, create replacement rpc client .. ok\n"); + cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl; size_t nt = 10; - printf(" -- concurrent test on new rpc client w/ %lu threads ..", nt); + cout << " -- concurrent test on new rpc client w/ " << nt << " threads .."; - std::vector th(nt); + vector th(nt); for(size_t i = 0; i < nt; i++) - th[i] = std::thread(client3, client); + th[i] = thread(client3, client); for(size_t i = 0; i < nt; i++) th[i].join(); - printf("ok\n"); + cout << "ok" << endl; delete server; delete client; @@ -342,19 +342,19 @@ failure_test() startserver(); clients[0] = client = new rpcc(dst); VERIFY (client->bind() >= 0); - printf(" -- delete existing rpc client and server, create replacements.. ok\n"); + cout << " -- delete existing rpc client and server, create replacements.. ok" << endl; - printf(" -- concurrent test on new client and server w/ %lu threads ..", nt); + cout << " -- concurrent test on new client and server w/ " << nt << " threads .."; for(size_t i = 0; i < nt; i++) - th[i] = std::thread(client3, client); + th[i] = thread(client3, client); for(size_t i = 0; i < nt; i++) th[i].join(); - printf("ok\n"); + cout << "ok" << endl; - printf("failure_test OK\n"); + cout << "failure_test OK" << endl; } int @@ -406,16 +406,13 @@ main(int argc, char *argv[]) testmarshall(); if (isserver) { - printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ); + cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl; startserver(); } if (isclient) { // server's address. - memset(&dst, 0, sizeof(dst)); - dst.sin_family = AF_INET; - dst.sin_addr.s_addr = inet_addr("127.0.0.1"); - dst.sin_port = htons(port); + dst = "127.0.0.1:" + std::to_string(port); // start the client. bind it to the server. @@ -435,7 +432,7 @@ main(int argc, char *argv[]) failure_test(); } - printf("rpctest OK\n"); + cout << "rpctest OK" << endl; exit(0); } diff --git a/rsm.cc b/rsm.cc index 8e597d6..1321f7e 100644 --- a/rsm.cc +++ b/rsm.cc @@ -86,7 +86,7 @@ #include "handle.h" #include "rsm.h" -#include "tprintf.h" +#include "threaded_log.h" #include "lang/verify.h" #include "rsm_client.h" #include "lock.h" @@ -140,9 +140,9 @@ void rsm::recovery() [[noreturn]] { while (!cfg->ismember(cfg->myaddr(), vid_commit)) { // XXX iannucci 2013/09/15 -- I don't understand whether accessing // cfg->view_id in this manner involves a race. I suspect not. - if (join(primary)) { + if (join(primary, ml)) { LOG("recovery: joined"); - commit_change_wo(cfg->view_id()); + commit_change(cfg->view_id(), ml); } else { ml.unlock(); std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary? @@ -152,9 +152,9 @@ void rsm::recovery() [[noreturn]] { vid_insync = vid_commit; LOG("recovery: sync vid_insync " << vid_insync); if (primary == cfg->myaddr()) { - r = sync_with_backups(); + r = sync_with_backups(ml); } else { - r = sync_with_primary(); + r = sync_with_primary(ml); } LOG("recovery: sync done"); @@ -173,40 +173,39 @@ void rsm::recovery() [[noreturn]] { } } -bool rsm::sync_with_backups() { - adopt_lock ml(rsm_mutex); - ml.unlock(); +bool rsm::sync_with_backups(lock & rsm_mutex_lock) { + rsm_mutex_lock.unlock(); { // Make sure that the state of lock_server is stable during // synchronization; otherwise, the primary's state may be more recent // than replicas after the synchronization. - lock ml2(invoke_mutex); + lock invoke_mutex_lock(invoke_mutex); // By acquiring and releasing the invoke_mutex once, we make sure that // the state of lock_server will not be changed until all // replicas are synchronized. The reason is that client_invoke arrives // after this point of time will see inviewchange == true, and returns // BUSY. } - ml.lock(); + rsm_mutex_lock.lock(); // Start accepting synchronization request (statetransferreq) now! insync = true; cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end())); - sync_cond.wait(ml); + sync_cond.wait(rsm_mutex_lock); insync = false; return true; } -bool rsm::sync_with_primary() { +bool rsm::sync_with_primary(lock & rsm_mutex_lock) { // Remember the primary of vid_insync std::string m = primary; while (vid_insync == vid_commit) { - if (statetransfer(m)) + if (statetransfer(m, rsm_mutex_lock)) break; } - return statetransferdone(m); + return statetransferdone(m, rsm_mutex_lock); } @@ -214,55 +213,50 @@ bool rsm::sync_with_primary() { * Call to transfer state from m to the local node. * Assumes that rsm_mutex is already held. */ -bool rsm::statetransfer(std::string m) +bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) { rsm_protocol::transferres r; handle h(m); int ret = 0; - tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n", - m.c_str(), last_myvs.vid, last_myvs.seqno); + LOG("rsm::statetransfer: contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); rpcc *cl; { - adopt_lock ml(rsm_mutex); - ml.unlock(); + rsm_mutex_lock.unlock(); cl = h.safebind(); if (cl) { ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000), r, cfg->myaddr(), last_myvs, vid_insync); } - ml.lock(); + rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(), - (long unsigned) cl, ret); + LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret); return false; } if (stf && last_myvs != r.last) { stf->unmarshal_state(r.state); } last_myvs = r.last; - tprintf("rsm::statetransfer transfer from %s success, vs(%d,%d)\n", - m.c_str(), last_myvs.vid, last_myvs.seqno); + LOG("rsm::statetransfer transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); return true; } -bool rsm::statetransferdone(std::string m) { - adopt_lock ml(rsm_mutex); - ml.unlock(); +bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) { + rsm_mutex_lock.unlock(); handle h(m); rpcc *cl = h.safebind(); bool done = false; if (cl) { int r; - rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync); + auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync); done = (ret == rsm_protocol::OK); } - ml.lock(); + rsm_mutex_lock.lock(); return done; } -bool rsm::join(std::string m) { +bool rsm::join(std::string m, lock & rsm_mutex_lock) { handle h(m); int ret = 0; rsm_protocol::joinres r; @@ -270,14 +264,13 @@ bool rsm::join(std::string m) { LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"); rpcc *cl; { - adopt_lock ml(rsm_mutex); - ml.unlock(); + rsm_mutex_lock.unlock(); cl = h.safebind(); if (cl != 0) { ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r, cfg->myaddr(), last_myvs); } - ml.lock(); + rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { @@ -295,16 +288,16 @@ bool rsm::join(std::string m) { */ void rsm::commit_change(unsigned vid) { lock ml(rsm_mutex); - commit_change_wo(vid); + commit_change(vid, ml); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); } -void rsm::commit_change_wo(unsigned vid) { +void rsm::commit_change(unsigned vid, lock &) { if (vid <= vid_commit) return; - tprintf("commit_change: new view (%d) last vs (%d,%d) %s insync %d\n", - vid, last_myvs.vid, last_myvs.seqno, primary.c_str(), insync); + LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," << + last_myvs.seqno << ") " << primary << " insync " << insync); vid_commit = vid; inviewchange = true; set_primary(vid); @@ -316,13 +309,13 @@ void rsm::commit_change_wo(unsigned vid) { void rsm::execute(int procno, std::string req, std::string &r) { - tprintf("execute\n"); + LOG("execute"); handler *h = procs[procno]; VERIFY(h); unmarshall args(req); marshall rep; std::string reps; - rsm_protocol::status ret = (*h)(args, rep); + auto ret = (rsm_protocol::status)(*h)(args, rep); marshall rep1; rep1 << ret; rep1 << rep.str(); @@ -367,14 +360,14 @@ rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std:: rpcc *cl = h.safebind(); if (!cl) return rsm_client_protocol::BUSY; - rsm_protocol::status ret; int ignored_rval; - ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req); + auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req); LOG("Invoke returned " << ret); if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; breakpoint1(); - partition1(); + lock rsm_mutex_lock(rsm_mutex); + partition1(rsm_mutex_lock); } } execute(procno, req, r); @@ -427,16 +420,14 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); - int ret = rsm_protocol::OK; - tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), - last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); - if (!insync || vid != vid_insync) { + LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << + last_myvs.vid << "," << last_myvs.seqno << ")"); + if (!insync || vid != vid_insync) return rsm_protocol::BUSY; - } if (stf && last != last_myvs) r.state = stf->marshal_state(); r.last = last_myvs; - return ret; + return rsm_protocol::OK; } /** @@ -457,16 +448,16 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { // joinreq to the RSM's current primary; this is the // handler for that RPC. rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) { - int ret = rsm_protocol::OK; + auto ret = rsm_protocol::OK; lock ml(rsm_mutex); - tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(), - last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); + LOG("joinreq: src " << m << " last (" << last.vid << "," << last.seqno << ") mylast (" << + last_myvs.vid << "," << last_myvs.seqno << ")"); if (cfg->ismember(m, vid_commit)) { - tprintf("joinreq: is still a member\n"); + LOG("joinreq: is still a member"); r.log = cfg->dump(); } else if (cfg->myaddr() != primary) { - tprintf("joinreq: busy\n"); + LOG("joinreq: busy"); ret = rsm_protocol::BUSY; } else { // We cache vid_commit to avoid adding m to a view which already contains @@ -480,9 +471,9 @@ rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, views } if (cfg->ismember(m, cfg->view_id())) { r.log = cfg->dump(); - tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str()); + LOG("joinreq: ret " << ret << " log " << r.log); } else { - tprintf("joinreq: failed; proposer couldn't add %d\n", succ); + LOG("joinreq: failed; proposer couldn't add " << succ); ret = rsm_protocol::BUSY; } } @@ -500,8 +491,7 @@ rsm_client_protocol::status rsm::client_members(std::vector &r, int cfg->get_view(vid_commit, m); m.push_back(primary); r = m; - tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(), - primary.c_str()); + LOG("rsm::client_members return " << print_members(m) << " m " << primary); return rsm_client_protocol::OK; } @@ -515,7 +505,7 @@ void rsm::set_primary(unsigned vid) { VERIFY (c.size() > 0); if (isamember(primary,c)) { - tprintf("set_primary: primary stays %s\n", primary.c_str()); + LOG("set_primary: primary stays " << primary); return; } @@ -523,7 +513,7 @@ void rsm::set_primary(unsigned vid) { for (unsigned i = 0; i < p.size(); i++) { if (isamember(p[i], c)) { primary = p[i]; - tprintf("set_primary: primary is %s\n", primary.c_str()); + LOG("set_primary: primary is " << primary); return; } } @@ -541,25 +531,25 @@ bool rsm::amiprimary() { // Simulate partitions // assumes caller holds rsm_mutex -void rsm::net_repair_wo(bool heal) { +void rsm::net_repair(bool heal, lock &) { std::vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); - tprintf("rsm::net_repair_wo: %s %d\n", m[i].c_str(), heal); + LOG("rsm::net_repair: " << m[i] << " " << heal); if (h.safebind()) h.safebind()->set_reachable(heal); } } rsmrpc->set_reachable(heal); } -rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) { +rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) { lock ml(rsm_mutex); - tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", - heal, dopartition, partitioned); + LOG("rsm::test_net_repairreq: " << heal << " (dopartition " << + dopartition << ", partitioned " << partitioned << ")"); if (heal) { - net_repair_wo(heal); + net_repair(heal, ml); partitioned = false; } else { dopartition = true; @@ -573,30 +563,30 @@ rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) { void rsm::breakpoint1() { if (break1) { - tprintf("Dying at breakpoint 1 in rsm!\n"); + LOG("Dying at breakpoint 1 in rsm!"); exit(1); } } void rsm::breakpoint2() { if (break2) { - tprintf("Dying at breakpoint 2 in rsm!\n"); + LOG("Dying at breakpoint 2 in rsm!"); exit(1); } } -void rsm::partition1() { +void rsm::partition1(lock & rsm_mutex_lock) { if (dopartition) { - net_repair_wo(false); + net_repair(false, rsm_mutex_lock); dopartition = false; partitioned = true; } } -rsm_test_protocol::status rsm::breakpointreq(int &r, int b) { +rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); - tprintf("rsm::breakpointreq: %d\n", b); + LOG("rsm::breakpointreq: " << b); if (b == 1) break1 = true; else if (b == 2) break2 = true; else if (b == 3 || b == 4) cfg->breakpoint(b); diff --git a/rsm.h b/rsm.h index 87734e1..73fa606 100644 --- a/rsm.h +++ b/rsm.h @@ -45,8 +45,8 @@ class rsm : public config_view_change { rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid); rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src, viewstamp last); - rsm_test_protocol::status test_net_repairreq(int &r, int heal); - rsm_test_protocol::status breakpointreq(int &r, int b); + rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal); + rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b); std::mutex rsm_mutex; std::mutex invoke_mutex; @@ -56,18 +56,18 @@ class rsm : public config_view_change { void execute(int procno, std::string req, std::string &r); rsm_client_protocol::status client_invoke(std::string &r, int procno, std::string req); - bool statetransfer(std::string m); - bool statetransferdone(std::string m); - bool join(std::string m); + bool statetransfer(std::string m, lock & rsm_mutex_lock); + bool statetransferdone(std::string m, lock & rsm_mutex_lock); + bool join(std::string m, lock & rsm_mutex_lock); void set_primary(unsigned vid); std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid); - bool sync_with_backups(); - bool sync_with_primary(); - void net_repair_wo(bool heal); + bool sync_with_backups(lock & rsm_mutex_lock); + bool sync_with_primary(lock & rsm_mutex_lock); + void net_repair(bool heal, lock & rsm_mutex_lock); void breakpoint1(); void breakpoint2(); - void partition1(); - void commit_change_wo(unsigned vid); + void partition1(lock & rsm_mutex_lock); + void commit_change(unsigned vid, lock & rsm_mutex_lock); public: rsm (std::string _first, std::string _me); ~rsm() {} diff --git a/rsm_client.cc b/rsm_client.cc index b68ef0c..bff32c2 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -6,31 +6,21 @@ #include #include "lang/verify.h" #include "lock.h" -#include "tprintf.h" +#include "threaded_log.h" -rsm_client::rsm_client(std::string dst) { +rsm_client::rsm_client(std::string dst) : primary(dst) { LOG("create rsm_client"); - std::vector mems; - - sockaddr_in dstsock; - make_sockaddr(dst.c_str(), &dstsock); - primary = dst; - - { - lock ml(rsm_client_mutex); - VERIFY (init_members()); - } + lock ml(rsm_client_mutex); + VERIFY (init_members(ml)); LOG("rsm_client: done"); } -// Assumes caller holds rsm_client_mutex -void rsm_client::primary_failure() { +void rsm_client::primary_failure(lock &) { primary = known_mems.back(); known_mems.pop_back(); } rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) { - int ret = 0; lock ml(rsm_client_mutex); while (1) { LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary); @@ -38,8 +28,9 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con ml.unlock(); rpcc *cl = h.safebind(); + auto ret = rsm_client_protocol::OK; if (cl) - ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req); + ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req); ml.lock(); if (!cl) @@ -47,7 +38,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret); if (ret == rsm_client_protocol::OK) - break; + return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { LOG("rsm is busy " << primary); sleep(3); @@ -55,29 +46,27 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con } if (ret == rsm_client_protocol::NOTPRIMARY) { LOG("primary " << primary << " isn't the primary--let's get a complete list of mems"); - if (init_members()) + if (init_members(ml)) continue; } prim_fail: LOG("primary " << primary << " failed ret " << std::dec << ret); - primary_failure(); + primary_failure(ml); LOG("rsm_client::invoke: retry new primary " << primary); } - return ret; } -bool rsm_client::init_members() { +bool rsm_client::init_members(lock & rsm_client_mutex_lock) { LOG("rsm_client::init_members get members!"); handle h(primary); int ret = rsm_client_protocol::ERR; rpcc *cl; { - adopt_lock ml(rsm_client_mutex); - ml.unlock(); + rsm_client_mutex_lock.unlock(); cl = h.safebind(); if (cl) ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0); - ml.lock(); + rsm_client_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) return false; diff --git a/rsm_client.h b/rsm_client.h index 5d0cd71..814616f 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -20,8 +20,8 @@ class rsm_client { std::string primary; std::vector known_mems; std::mutex rsm_client_mutex; - void primary_failure(); - bool init_members(); + void primary_failure(lock & rsm_client_mutex_lock); + bool init_members(lock & rsm_client_mutex_lock); public: rsm_client(std::string dst); rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req); @@ -34,32 +34,32 @@ class rsm_client { template int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { - std::string rep; - std::string res; - int intret = invoke(proc, rep, req.cstr()); - VERIFY( intret == rsm_client_protocol::OK ); - unmarshall u(rep); - u >> intret; - if (intret < 0) return intret; - u >> res; - if (!u.okdone()) { - fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" - "You probably forgot to set the reply string in " - "rsm::client_invoke, or you may call RPC 0x%x with wrong return " - "type\n", proc); - VERIFY(0); - return rpc_const::unmarshal_reply_failure; - } - unmarshall u1(res); - u1 >> r; - if(!u1.okdone()) { - fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" - "You are probably calling RPC 0x%x with wrong return " - "type.\n", proc); - VERIFY(0); - return rpc_const::unmarshal_reply_failure; - } - return intret; + std::string rep; + std::string res; + int intret = invoke(proc, rep, req.cstr()); + VERIFY( intret == rsm_client_protocol::OK ); + unmarshall u(rep); + u >> intret; + if (intret < 0) return intret; + u >> res; + if (!u.okdone()) { + fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" + "You probably forgot to set the reply string in " + "rsm::client_invoke, or you may call RPC 0x%x with wrong return " + "type\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + unmarshall u1(res); + u1 >> r; + if(!u1.okdone()) { + fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; } template diff --git a/rsm_protocol.h b/rsm_protocol.h index 68ff061..6b508d8 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -3,110 +3,85 @@ #include "rpc/rpc.h" - class rsm_client_protocol { - public: - enum xxstatus { OK, ERR, NOTPRIMARY, BUSY}; - typedef int status; - enum rpc_numbers { - invoke = 0x9001, - members, - }; + public: + enum status : status_t {OK, ERR, NOTPRIMARY, BUSY}; + enum rpc_numbers : proc_t { + invoke = 0x9001, + members, + }; }; - struct viewstamp { - viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : - vid(_vid), seqno(_seqno) {} - unsigned int vid; - unsigned int seqno; - inline void operator++(int) { seqno++; } + viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : vid(_vid), seqno(_seqno) {} + unsigned int vid; + unsigned int seqno; + inline void operator++(int) { seqno++; } }; class rsm_protocol { - public: - enum xxstatus { OK, ERR, BUSY}; - typedef int status; - enum rpc_numbers { - invoke = 0x10001, - transferreq, - transferdonereq, - joinreq, - }; - - struct transferres { - std::string state; - viewstamp last; - }; - - struct joinres { - std::string log; - }; + public: + enum status : status_t { OK, ERR, BUSY}; + enum rpc_numbers : proc_t { + invoke = 0x10001, + transferreq, + transferdonereq, + joinreq, + }; + + struct transferres { + std::string state; + viewstamp last; + }; + + struct joinres { + std::string log; + }; }; inline bool operator==(viewstamp a, viewstamp b) { - return a.vid == b.vid && a.seqno == b.seqno; + return a.vid == b.vid && a.seqno == b.seqno; } inline bool operator>(viewstamp a, viewstamp b) { - return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno); + return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno); } inline bool operator!=(viewstamp a, viewstamp b) { - return a.vid != b.vid || a.seqno != b.seqno; + return a.vid != b.vid || a.seqno != b.seqno; } -inline marshall& operator<<(marshall &m, viewstamp v) -{ - m << v.vid; - m << v.seqno; - return m; +inline marshall& operator<<(marshall &m, viewstamp v) { + return m << v.vid << v.seqno; } inline unmarshall& operator>>(unmarshall &u, viewstamp &v) { - u >> v.vid; - u >> v.seqno; - return u; + return u >> v.vid >> v.seqno; } -inline marshall & -operator<<(marshall &m, rsm_protocol::transferres r) -{ - m << r.state; - m << r.last; - return m; +inline marshall & operator<<(marshall &m, rsm_protocol::transferres r) { + return m << r.state << r.last; } -inline unmarshall & -operator>>(unmarshall &u, rsm_protocol::transferres &r) -{ - u >> r.state; - u >> r.last; - return u; +inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) { + return u >> r.state >> r.last; } -inline marshall & -operator<<(marshall &m, rsm_protocol::joinres r) -{ - m << r.log; - return m; +inline marshall & operator<<(marshall &m, rsm_protocol::joinres r) { + return m << r.log; } -inline unmarshall & -operator>>(unmarshall &u, rsm_protocol::joinres &r) -{ - u >> r.log; - return u; +inline unmarshall & operator>>(unmarshall &u, rsm_protocol::joinres &r) { + return u >> r.log; } class rsm_test_protocol { - public: - enum xxstatus { OK, ERR}; - typedef int status; - enum rpc_numbers { - net_repair = 0x12001, - breakpoint = 0x12002, - }; + public: + enum status : status_t {OK, ERR}; + enum rpc_numbers : proc_t { + net_repair = 0x12001, + breakpoint = 0x12002, + }; }; #endif diff --git a/rsm_tester.cc b/rsm_tester.cc index 3ff4733..31b8c1a 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -11,7 +11,7 @@ #include #include -char tprintf_thread_prefix = 't'; +char log_thread_prefix = 't'; int main(int argc, char *argv[]) diff --git a/rsmtest_client.cc b/rsmtest_client.cc index c61194e..0c56f8a 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -8,30 +8,21 @@ #include #include -rsmtest_client::rsmtest_client(std::string dst) -{ - sockaddr_in dstsock; - make_sockaddr(dst.c_str(), &dstsock); - cl = new rpcc(dstsock); - if (cl->bind() < 0) { +rsmtest_client::rsmtest_client(std::string dst) : cl(dst) { + if (cl.bind() < 0) printf("rsmtest_client: call bind\n"); - } } -int -rsmtest_client::net_repair(int heal) -{ - int r; - int ret = cl->call(rsm_test_protocol::net_repair, r, heal); +rsm_test_protocol::status rsmtest_client::net_repair(int heal) { + rsm_test_protocol::status r; + auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::net_repair, r, heal); VERIFY (ret == rsm_test_protocol::OK); return r; } -int -rsmtest_client::breakpoint(int b) -{ - int r; - int ret = cl->call(rsm_test_protocol::breakpoint, r, b); +rsm_test_protocol::status rsmtest_client::breakpoint(int b) { + rsm_test_protocol::status r; + auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::breakpoint, r, b); VERIFY (ret == rsm_test_protocol::OK); return r; } diff --git a/rsmtest_client.h b/rsmtest_client.h index dc7388e..51f8511 100644 --- a/rsmtest_client.h +++ b/rsmtest_client.h @@ -10,7 +10,7 @@ // Client interface to the rsmtest server class rsmtest_client { protected: - rpcc *cl; + rpcc cl; public: rsmtest_client(std::string d); virtual ~rsmtest_client() {} diff --git a/tprintf.cc b/threaded_log.cc similarity index 88% rename from tprintf.cc rename to threaded_log.cc index 93a6070..57ddc08 100644 --- a/tprintf.cc +++ b/threaded_log.cc @@ -1,6 +1,6 @@ #include #include -#include "tprintf.h" +#include "threaded_log.h" std::mutex cerr_mutex; std::map thread_name_map; diff --git a/tprintf.h b/threaded_log.h similarity index 88% rename from tprintf.h rename to threaded_log.h index 41539fe..6918220 100644 --- a/tprintf.h +++ b/threaded_log.h @@ -1,5 +1,5 @@ -#ifndef tprintf_h -#define tprintf_h +#ifndef threaded_log_h +#define threaded_log_h #include #include @@ -12,7 +12,7 @@ extern std::map thread_name_map; extern int next_thread_num; extern std::map instance_name_map; extern int next_instance_num; -extern char tprintf_thread_prefix; +extern char log_thread_prefix; template struct iterator_pair : public std::pair { @@ -51,7 +51,7 @@ std::ostream & operator<<(std::ostream &o, const iterator_pair &d) { _tid_ = thread_name_map[_thread_] = ++next_thread_num; \ auto _utime_ = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \ std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \ - std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << _tid_; \ + std::cerr << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << _tid_; \ std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \ } #define LOG_THIS_POINTER { \ @@ -98,13 +98,4 @@ std::ostream & operator<<(std::ostream &o, const iterator_pair &d) { LOG_SUFFIX; \ } -#define tprintf(...) { \ - char *buf = nullptr; \ - int len = asprintf(&buf, __VA_ARGS__); \ - if (buf[len-1]=='\n') \ - buf[len-1] = '\0'; \ - LOG_NONMEMBER(buf); \ - free(buf); \ -} - #endif