From: Peter Iannucci Date: Fri, 27 Sep 2013 06:58:46 +0000 (-0400) Subject: More cleaning X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/5a5c578e2e358a121cdb9234a6cb11c4ecfbf323?ds=sidebyside More cleaning --- diff --git a/config.cc b/config.cc index 7bac4a9..d1cd70a 100644 --- a/config.cc +++ b/config.cc @@ -1,10 +1,5 @@ -#include -#include #include "config.h" -#include "paxos.h" #include "handle.h" -#include "threaded_log.h" -#include "lang/verify.h" // The config module maintains views. As a node joins or leaves a // view, the next view will be the same as previous view, except with @@ -40,8 +35,7 @@ config::config(const string &_first, const string &_me, config_view_change *_vc) : my_view_id(0), first(_first), me(_me), vc(_vc), - paxos_acceptor(this, me == _first, me, me), - paxos_proposer(this, &paxos_acceptor, me) + paxos(this, me == _first, me, me) { get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); lock cfg_mutex_lock(cfg_mutex); @@ -51,7 +45,7 @@ config::config(const string &_first, const string &_me, config_view_change *_vc) void config::restore(const string &s) { lock cfg_mutex_lock(cfg_mutex); - paxos_acceptor.restore(s); + paxos.restore(s); reconstruct(cfg_mutex_lock); } @@ -61,7 +55,7 @@ void config::get_view(unsigned instance, vector &m) { } void config::get_view(unsigned instance, vector &m, lock &) { - string value = paxos_acceptor.value(instance); + string value = paxos.value(instance); LOG("get_view(" << instance << "): returns " << value); m = members(value); } @@ -80,8 +74,8 @@ string config::value(const vector &m) const { void config::reconstruct(lock &cfg_mutex_lock) { VERIFY(cfg_mutex_lock); - if (paxos_acceptor.instance() > 0) { - my_view_id = paxos_acceptor.instance(); + my_view_id = paxos.instance(); + if (my_view_id > 0) { get_view(my_view_id, mems, cfg_mutex_lock); LOG("config::reconstruct: " << my_view_id << " " << print_members(mems)); } @@ -128,7 +122,7 @@ bool config::add(const string &new_m, unsigned vid) { vector cmems = mems; unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); - bool r = paxos_proposer.run(nextvid, cmems, value(m)); + bool r = paxos.run(nextvid, cmems, value(m)); cfg_mutex_lock.lock(); LOG("config::add: proposer returned " << (r ? "success" : "failure")); return r; @@ -145,7 +139,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) { vector cmems = mems; unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); - bool r = paxos_proposer.run(nextvid, cmems, value(n)); + bool r = paxos.run(nextvid, cmems, value(n)); cfg_mutex_lock.lock(); LOG("config::remove: proposer returned " << (r ? "success" : "failure")); return r; @@ -195,7 +189,7 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); if (vid == my_view_id) return paxos_protocol::OK; - else if (paxos_proposer.isrunning()) { + else if (paxos.isrunning()) { VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id); return paxos_protocol::OK; } diff --git a/config.h b/config.h index 074cbe9..dc06b8a 100644 --- a/config.h +++ b/config.h @@ -1,23 +1,8 @@ #ifndef config_h #define config_h -#include -#include +#include "types.h" #include "paxos.h" -#include "lock.h" - -using std::chrono::steady_clock; -using std::chrono::seconds; -using std::string; -using std::vector; -using std::thread; -using std::ostringstream; -using std::istringstream; -using std::ostream_iterator; -using std::istream_iterator; -using std::copy; -using std::min; -using std::min_element; class config_view_change { public: @@ -31,8 +16,7 @@ class config : public paxos_change { string first; string me; config_view_change *vc; - acceptor paxos_acceptor; - proposer paxos_proposer; + proposer_acceptor paxos; vector mems; mutex cfg_mutex; cond config_cond; @@ -52,16 +36,15 @@ class config : public paxos_change { config(const string &_first, const string &_me, config_view_change *_vc); unsigned view_id() { return my_view_id; } const string &myaddr() const { return me; } - string dump() { return paxos_acceptor.dump(); } + string dump() { return paxos.dump(); } void get_view(unsigned instance, vector &m); void restore(const string &s); bool add(const string &, unsigned view_id); bool ismember(const string &m, unsigned view_id); void heartbeater(void); void paxos_commit(unsigned instance, const string &v); - // XXX hack; maybe should have its own port number - rpcs *get_rpcs() { return paxos_acceptor.get_rpcs(); } - void breakpoint(int b) { paxos_proposer.breakpoint(b); } + rpcs *get_rpcs() { return paxos.get_rpcs(); } + void breakpoint(int b) { paxos.breakpoint(b); } }; #endif diff --git a/handle.cc b/handle.cc index d048ead..3b6e1fa 100644 --- a/handle.cc +++ b/handle.cc @@ -1,9 +1,4 @@ #include "handle.h" -#include "threaded_log.h" -#include "lock.h" -#include - -using std::map; class hinfo { public: diff --git a/handle.h b/handle.h index a06b156..a513b56 100644 --- a/handle.h +++ b/handle.h @@ -23,10 +23,8 @@ #ifndef handle_h #define handle_h +#include "types.h" #include "rpc/rpc.h" -#include - -using std::string; class hinfo; diff --git a/lang/verify.h b/lang/verify.h index 2b092d2..823a48d 100644 --- a/lang/verify.h +++ b/lang/verify.h @@ -3,8 +3,8 @@ #ifndef verify_client_h #define verify_client_h -#include -#include +#include +#include #ifdef NDEBUG #define VERIFY(expr) do { if (!(expr)) abort(); } while (0) diff --git a/lock.h b/lock.h deleted file mode 100644 index 1d62c39..0000000 --- a/lock.h +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef lock_h -#define lock_h - -#include -#include - -using std::mutex; -using lock = std::unique_lock; -using cond = std::condition_variable; - -#endif diff --git a/lock_client.cc b/lock_client.cc index 22e57f1..99dcb5b 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -1,14 +1,8 @@ // RPC stubs for clients to talk to lock_server, and cache the locks. #include "lock_client.h" -#include "rpc/rpc.h" -#include -#include "threaded_log.h" #include -#include "rsm_client.h" -#include "lock.h" - void lock_state::wait(lock & mutex_lock) { auto self = std::this_thread::get_id(); c[self].wait(mutex_lock); @@ -42,7 +36,7 @@ lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), n srandom((uint32_t)time(NULL)^last_port); rlock_port = ((random()%32000) | (0x1 << 10)); - id = "127.0.0.1:" + std::to_string(rlock_port); + id = "127.0.0.1:" + to_string(rlock_port); last_port = rlock_port; rpcs *rlsrpc = new rpcs(rlock_port); rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this); diff --git a/lock_client.h b/lock_client.h index 3290d1a..36ee3a2 100644 --- a/lock_client.h +++ b/lock_client.h @@ -1,18 +1,14 @@ // lock client interface. #ifndef lock_client_h - #define lock_client_h #ifdef __cplusplus -#include +#include "types.h" #include "lock_protocol.h" -#include "rpc/rpc.h" -#include "lang/verify.h" #include "rpc/fifo.h" #include "rsm_client.h" -#include "lock.h" class lock_release_user { public: @@ -20,11 +16,6 @@ class lock_release_user { virtual ~lock_release_user() {} }; -using std::string; -using std::map; -using std::thread; -using std::list; - class lock_state { public: enum { diff --git a/lock_demo.cc b/lock_demo.cc index 3b38cdf..72fddf8 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -1,5 +1,4 @@ #include "lock_client.h" -#include "threaded_log.h" char log_thread_prefix = 'd'; diff --git a/lock_protocol.h b/lock_protocol.h index 900897a..1e45ddc 100644 --- a/lock_protocol.h +++ b/lock_protocol.h @@ -3,10 +3,8 @@ #ifndef lock_protocol_h #define lock_protocol_h +#include "types.h" #include "rpc/rpc.h" -#include - -using std::string; class lock_protocol { public: @@ -16,7 +14,7 @@ class lock_protocol { enum rpc_numbers : proc_t { acquire = 0x7001, release, - stat + stat, }; }; @@ -25,7 +23,7 @@ class rlock_protocol { enum status : status_t { OK, RPCERR }; enum rpc_numbers : proc_t { revoke = 0x8001, - retry = 0x8002 + retry, }; }; #endif diff --git a/lock_server.cc b/lock_server.cc index cac6a90..379838a 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,18 +1,10 @@ // the caching lock server implementation +#include "types.h" #include "lock_server.h" -#include #include #include -#include "lang/verify.h" #include "handle.h" -#include "threaded_log.h" -#include "rpc/marshall.h" -#include "lock.h" - -using std::ostringstream; -using std::istringstream; -using std::vector; lock_state::lock_state(): held(false) @@ -61,7 +53,7 @@ void lock_server::revoker() [[noreturn]] { continue; lock_state &st = get_lock_state(lid); - holder held_by; + holder_t held_by; { lock sl(st.m); held_by = st.held_by; @@ -89,7 +81,7 @@ void lock_server::retryer() [[noreturn]] { LOG("Sending retry for " << lid); lock_state &st = get_lock_state(lid); - holder front; + holder_t front; { lock sl(st.m); if (st.wanted_by.empty()) @@ -111,8 +103,8 @@ void lock_server::retryer() [[noreturn]] { } int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { - LOG_FUNC_ENTER_SERVER; - holder h = holder(id, xid); + LOG("lid=" << lid << " client=" << id << "," << xid); + holder_t h = holder_t(id, xid); lock_state &st = get_lock_state(lid); lock sl(st.m); @@ -145,11 +137,11 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro // get in line bool found = false; - for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { - if (i->first == id) { + for (auto p : st.wanted_by) { + if (p.first == id) { // make sure client is obeying serialization - if (i->second != xid) { - LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); + if (p.second != xid) { + LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second); return lock_protocol::RPCERR; } found = true; @@ -159,7 +151,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro if (!found) st.wanted_by.push_back(h); - LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end())); + LOG("wanted_by=" << st.wanted_by); // send revoke if we're first in line if (st.wanted_by.front() == h) @@ -168,11 +160,11 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro return lock_protocol::RETRY; } -int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { - LOG_FUNC_ENTER_SERVER; +int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) { + LOG("lid=" << lid << " client=" << id << "," << xid); lock_state &st = get_lock_state(lid); lock sl(st.m); - if (st.held && st.held_by == holder(id, xid)) { + if (st.held && st.held_by == holder_t(id, xid)) { st.held = false; LOG("Lock " << lid << " not held"); } diff --git a/lock_server.h b/lock_server.h index 2aa8445..381c527 100644 --- a/lock_server.h +++ b/lock_server.h @@ -1,33 +1,22 @@ #ifndef lock_server_h #define lock_server_h -#include - -#include -#include +#include "types.h" #include "lock_protocol.h" -#include "rpc/rpc.h" -#include "rsm_state_transfer.h" #include "rsm.h" #include "rpc/fifo.h" -#include "lock.h" - -using std::string; -using std::pair; -using std::list; -using std::map; -typedef string callback; -typedef pair holder; +typedef string callback_t; +typedef pair holder_t; class lock_state { public: lock_state(); lock_state(const lock_state &other); bool held; - holder held_by; - list wanted_by; - map old_requests; + holder_t held_by; + list wanted_by; + map old_requests; mutex m; lock_state& operator=(const lock_state&); }; diff --git a/lock_smain.cc b/lock_smain.cc index d62a25b..5f859a8 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -1,10 +1,6 @@ -#include "rpc/rpc.h" +#include "lock_server.h" #include -#include -#include "threaded_log.h" #include -#include "lock_server.h" -#include "rsm.h" // Main loop of lock_server diff --git a/lock_tester.cc b/lock_tester.cc index ac9175b..c192128 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -2,18 +2,10 @@ // Lock server tester // -#include "lock_protocol.h" #include "lock_client.h" -#include "rpc/rpc.h" #include -#include -#include -#include -#include "lang/verify.h" -#include "threaded_log.h" #include #include -#include "lock.h" char log_thread_prefix = 'c'; @@ -35,8 +27,8 @@ void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; if (ct[x] != 0) { - fprintf(stderr, "error: server granted %s twice\n", lid.c_str()); - fprintf(stdout, "error: server granted %s twice\n", lid.c_str()); + cout << "error: server granted " << lid << " twice" << endl; + cerr << "error: server granted " << lid << " twice" << endl; exit(1); } ct[x] += 1; @@ -46,7 +38,7 @@ void check_release(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; if (ct[x] != 1) { - fprintf(stderr, "error: client released un-held lock %s\n", lid.c_str()); + cerr << "error: client released un-held lock " << lid << endl; exit(1); } ct[x] -= 1; @@ -132,7 +124,7 @@ main(int argc, char *argv[]) srandom((uint32_t)getpid()); if (argc < 2) { - fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); + cerr << "Usage: " << argv[0] << " [host:]port [test]" << endl; exit(1); } diff --git a/log.cc b/log.cc index 627b7ac..95c40e3 100644 --- a/log.cc +++ b/log.cc @@ -1,28 +1,25 @@ +#include "log.h" #include "paxos.h" -#include -#include -#include "threaded_log.h" // Paxos must maintain some durable state (i.e., that survives power // failures) to run Paxos correct. This module implements a log with // all durable state to run Paxos. Since the values chosen correspond // to views, the log contains all views since the beginning of time. -log::log(acceptor *_acc, std::string _me) : pxs (_acc) { +log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) { name = "paxos-" + _me + ".log"; logread(); } void log::logread(void) { - std::ifstream from; - std::string type; + ifstream from(name); + string type; unsigned instance; - from.open(name.c_str()); LOG("logread"); while (from >> type) { if (type == "done") { - std::string v; + string v; from >> instance; from.get(); getline(from, v); @@ -34,13 +31,11 @@ void log::logread(void) { pxs->n_h.n = 0; pxs->n_a.n = 0; } else if (type == "propseen") { - from >> pxs->n_h.n; - from >> pxs->n_h.m; + from >> pxs->n_h.n >> pxs->n_h.m; LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")"); } else if (type == "accepted") { - std::string v; - from >> pxs->n_a.n; - from >> pxs->n_a.m; + string v; + from >> pxs->n_a.n >> pxs->n_a.m; from.get(); getline(from, v); pxs->v_a = v; @@ -53,28 +48,26 @@ void log::logread(void) { from.close(); } -std::string log::dump() { - std::ifstream from; - std::string res; - std::string v; - from.open(name.c_str()); +string log::dump() { + ifstream from(name); + string res; + string v; while (getline(from, v)) res += v + "\n"; from.close(); return res; } -void log::restore(std::string s) { - std::ofstream f; +void log::restore(string s) { LOG("restore: " << s); - f.open(name.c_str(), std::ios::trunc); + ofstream f(name, std::ios::trunc); f << s; f.close(); } // XXX should be an atomic operation -void log::loginstance(unsigned instance, std::string v) { - std::ofstream f(name, std::ios::app); +void log::loginstance(unsigned instance, string v) { + ofstream f(name, std::ios::app); f << "done " << instance << " " << v << "\n"; f.close(); } @@ -82,21 +75,15 @@ void log::loginstance(unsigned instance, std::string v) { // an acceptor should call logprop(n_h) when it // receives a prepare to which it responds prepare_ok(). void log::logprop(prop_t n_h) { - std::ofstream f; - f.open(name.c_str(), std::ios::app); - f << "propseen"; - f << " "; - f << n_h.n; - f << " "; - f << n_h.m; - f << "\n"; + ofstream f(name, std::ios::app); + f << "propseen " << n_h.n << " " << n_h.m << "\n"; f.close(); } // an acceptor should call logaccept(n_a, v_a) when it // receives an accept RPC to which it replies accept_ok(). -void log::logaccept(prop_t n, std::string v) { - std::ofstream f(name, std::ios::app); +void log::logaccept(prop_t n, string v) { + ofstream f(name, std::ios::app); f << "accepted " << n.n << " " << n.m << " " << v << "\n"; f.close(); } diff --git a/log.h b/log.h index 5bd2779..e8acd4a 100644 --- a/log.h +++ b/log.h @@ -1,28 +1,27 @@ #ifndef log_h #define log_h -#include -#include +#include "types.h" +#include "paxos_protocol.h" - -class acceptor; +class proposer_acceptor; class log { - private: - std::string name; - acceptor *pxs; - public: - log (acceptor*, std::string _me); - std::string dump(); - void restore(std::string s); - void logread(void); - /* Log a committed paxos instance*/ - void loginstance(unsigned instance, std::string v); - /* Log the highest proposal number that the local paxos acceptor has ever seen */ - void logprop(prop_t n_h); - /* Log the proposal (proposal number and value) that the local paxos acceptor - accept has ever accepted */ - void logaccept(prop_t n_a, std::string v); + private: + string name; + proposer_acceptor *pxs; + public: + log (proposer_acceptor*, string _me); + string dump(); + void restore(string s); + void logread(void); + // Log a committed paxos instance + void loginstance(unsigned instance, string v); + // Log the highest proposal number that the local paxos acceptor has ever seen + void logprop(prop_t n_h); + // Log the proposal (proposal number and value) that the local paxos acceptor + // accept has ever accepted + void logaccept(prop_t n_a, string v); }; #endif /* log_h */ diff --git a/paxos.cc b/paxos.cc index 46d9c1c..095d56a 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,10 +1,21 @@ #include "paxos.h" #include "handle.h" -#include "threaded_log.h" -#include "lang/verify.h" -#include "lock.h" -using std::stoi; +string print_members(const nodes_t &nodes) { + ostringstream ost; + copy(nodes.begin(), nodes.end(), ostream_iterator(ost, ", ")); + return ost.str(); +} + +bool isamember(const node_t & m, const nodes_t & nodes) { + return find(nodes.begin(), nodes.end(), m) != nodes.end(); +} + +// check if l2 contains a majority of the elements of l1 +bool majority(const nodes_t &l1, const nodes_t &l2) { + auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2)); + return overlap >= (l1.size() >> 1) + 1; +} // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made @@ -15,148 +26,89 @@ using std::stoi; // paxos_commit to inform higher layers of the agreed value for this // instance. -bool operator> (const prop_t &a, const prop_t &b) { - return (a.n > b.n || (a.n == b.n && a.m > b.m)); -} - -bool operator>= (const prop_t &a, const prop_t &b) { - return (a.n > b.n || (a.n == b.n && a.m >= b.m)); -} - -string -print_members(const vector &nodes) { - string s; - s.clear(); - for (unsigned i = 0; i < nodes.size(); i++) { - s += nodes[i]; - if (i < (nodes.size()-1)) - s += ","; - } - return s; -} - - -bool isamember(const string & m, const vector & nodes) { - for (auto n : nodes) { - if (n == m) - return 1; - } - return 0; -} - -bool proposer::isrunning() { - bool r; - lock ml(pxs_mutex); - r = !stable; - return r; -} - -// check if the servers in l2 contains a majority of servers in l1 -bool proposer::majority(const vector &l1, const vector &l2) { - unsigned n = 0; - - for (unsigned i = 0; i < l1.size(); i++) { - if (isamember(l1[i], l2)) - n++; - } - return n >= (l1.size() >> 1) + 1; -} - -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me) - : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), - stable (true) +proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, + bool _first, const node_t & _me, const value_t & _value) + : delegate(_delegate), me (_me) { - my_n.n = 0; - my_n.m = me; -} + // at this point, the log has already been replayed + if (instance_h == 0 && _first) { + values[1] = _value; + l.loginstance(1, _value); + instance_h = 1; + } -void proposer::setn() -{ - my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; + pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this); + pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this); + pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this); } -bool proposer::run(unsigned instance, const vector & cur_nodes, const string & newv) +bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { - vector accepts; - vector nodes; - string v; - bool r = false; - - lock ml(pxs_mutex); + lock ml(proposer_mutex); LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable); if (!stable) { // already running proposer? LOG("proposer::run: already running"); return false; } stable = false; - setn(); - accepts.clear(); - v.clear(); + bool r = false; + my_n.n = std::max(n_h.n, my_n.n) + 1; + nodes_t accepts; + value_t v = newv; if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - LOG("paxos::manager: received a majority of prepare responses"); - - if (v.size() == 0) - v = newv; + LOG("paxos::run: received a majority of prepare responses"); breakpoint1(); - nodes = accepts; - accepts.clear(); + nodes_t nodes; + nodes.swap(accepts); accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - LOG("paxos::manager: received a majority of accept responses"); + LOG("paxos::run: received a majority of accept responses"); breakpoint2(); decide(instance, accepts, v); r = true; } else { - LOG("paxos::manager: no majority of accept responses"); + LOG("paxos::run: no majority of accept responses"); } } else { - LOG("paxos::manager: no majority of prepare responses"); + LOG("paxos::run: no majority of prepare responses"); } } else { - LOG("paxos::manager: prepare is rejected " << stable); + LOG("paxos::run: prepare is rejected " << stable); } stable = true; return r; } -// proposer::run() calls prepare to send prepare RPCs to nodes -// and collect responses. if one of those nodes -// replies with an oldinstance, return false. -// otherwise fill in accepts with set of nodes that accepted, -// set v to the v_a with the highest n_a, and return true. -bool -proposer::prepare(unsigned instance, vector & accepts, - const vector & nodes, - string & v) -{ - struct paxos_protocol::preparearg arg = { instance, my_n }; - struct paxos_protocol::prepareres res; - prop_t n_a = { 0, "" }; - rpcc *r; +bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, + const nodes_t & nodes, value_t & v) { + prepareres res; + prop_t highest_n_a{0, ""}; for (auto i : nodes) { handle h(i); - if (!(r = h.safebind())) + rpcc *r = h.safebind(); + if (!r) continue; - int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); + auto status = (paxos_protocol::status)r->call_timeout( + paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n); if (status == paxos_protocol::OK) { if (res.oldinstance) { LOG("commiting old instance!"); - acc->commit(instance, res.v_a); + commit(instance, res.v_a); return false; } if (res.accept) { accepts.push_back(i); - if (res.n_a >= n_a) { + if (res.n_a >= highest_n_a) { LOG("found a newer accepted proposal"); v = res.v_a; - n_a = res.n_a; + highest_n_a = res.n_a; } } } @@ -164,79 +116,45 @@ proposer::prepare(unsigned instance, vector & accepts, return true; } -// run() calls this to send out accept RPCs to accepts. -// fill in accepts with list of nodes that accepted. -void -proposer::accept(unsigned instance, vector & accepts, - const vector & nodes, const string & v) -{ - struct paxos_protocol::acceptarg arg = { instance, my_n, v }; - rpcc *r; +void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, + const nodes_t & nodes, const value_t & v) { for (auto i : nodes) { handle h(i); - if (!(r = h.safebind())) + rpcc *r = h.safebind(); + if (!r) continue; bool accept = false; - int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); + int status = r->call_timeout( + paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v); if (status == paxos_protocol::OK && accept) accepts.push_back(i); } } -void -proposer::decide(unsigned instance, const vector & accepts, - const string & v) -{ - struct paxos_protocol::decidearg arg = { instance, v }; - rpcc *r; +void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) { for (auto i : accepts) { handle h(i); - if (!(r = h.safebind())) + rpcc *r = h.safebind(); + if (!r) continue; int res = 0; - r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg); + r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v); } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me, - const string & _value) - : cfg(_cfg), me (_me), instance_h(0) -{ - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - - l = new log (this, me); - - if (instance_h == 0 && _first) { - values[1] = _value; - l->loginstance(1, _value); - instance_h = 1; - } - - pxs = new rpcs((uint32_t)stoi(_me)); - pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); - pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); - pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); -} - paxos_protocol::status -acceptor::preparereq(paxos_protocol::prepareres & r, const string &, - paxos_protocol::preparearg a) -{ - lock ml(pxs_mutex); +proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { + lock ml(acceptor_mutex); r.oldinstance = false; r.accept = false; r.n_a = n_a; r.v_a = v_a; - if (a.instance <= instance_h) { + if (instance <= instance_h) { r.oldinstance = true; - r.v_a = values[a.instance]; - } else if (a.n > n_h) { - n_h = a.n; - l->logprop(n_h); + r.v_a = values[instance]; + } else if (n > n_h) { + n_h = n; + l.logprop(n_h); r.accept = true; } else { LOG("I totally rejected this request. Ha."); @@ -245,106 +163,76 @@ acceptor::preparereq(paxos_protocol::prepareres & r, const string &, } paxos_protocol::status -acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a) -{ - lock ml(pxs_mutex); +proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) { + lock ml(acceptor_mutex); r = false; - if (a.n >= n_h) { - n_a = a.n; - v_a = a.v; - l->logaccept(n_a, v_a); - r = true; + if (instance == instance_h + 1) { + if (n >= n_h) { + n_a = n; + v_a = v; + l.logaccept(n_a, v_a); + r = true; + } + return paxos_protocol::OK; + } else { + return paxos_protocol::ERR; } - return paxos_protocol::OK; } -// the src argument is only for debugging paxos_protocol::status -acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a) -{ - lock ml(pxs_mutex); - LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a); - if (a.instance == instance_h + 1) { - VERIFY(v_a == a.v); - commit(a.instance, v_a, ml); - } else if (a.instance <= instance_h) { - // we are ahead ignore. +proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { + lock ml(acceptor_mutex); + LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a); + if (instance == instance_h + 1) { + VERIFY(v_a == v); + commit(instance, v_a, ml); + } else if (instance <= instance_h) { + // we are ahead; ignore. } else { - // we are behind + // we are behind. VERIFY(0); } return paxos_protocol::OK; } -void -acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock) -{ +void proposer_acceptor::commit(unsigned instance, const value_t & value) { + lock ml(acceptor_mutex); + commit(instance, value, ml); +} + +void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) { LOG("acceptor::commit: instance=" << instance << " has v=" << value); if (instance > instance_h) { - LOG("commit: highestaccepteinstance = " << instance); + LOG("commit: highestacceptedinstance = " << instance); values[instance] = value; - l->loginstance(instance, value); + l.loginstance(instance, value); instance_h = instance; - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; + n_a = n_h = {0, me}; v_a.clear(); - if (cfg) { + if (delegate) { pxs_mutex_lock.unlock(); - cfg->paxos_commit(instance, value); + delegate->paxos_commit(instance, value); pxs_mutex_lock.lock(); } } } -void -acceptor::commit(unsigned instance, const string & value) -{ - lock ml(pxs_mutex); - commit(instance, value, ml); -} - -string -acceptor::dump() -{ - return l->dump(); -} - -void -acceptor::restore(const string & s) -{ - l->restore(s); - l->logread(); -} - - - // For testing purposes - -// Call this from your code between phases prepare and accept of proposer -void -proposer::breakpoint1() -{ +void proposer_acceptor::breakpoint1() { if (break1) { LOG("Dying at breakpoint 1!"); exit(1); } } -// Call this from your code between phases accept and decide of proposer -void -proposer::breakpoint2() -{ +void proposer_acceptor::breakpoint2() { if (break2) { LOG("Dying at breakpoint 2!"); exit(1); } } -void -proposer::breakpoint(int b) -{ +void proposer_acceptor::breakpoint(int b) { if (b == 3) { LOG("Proposer: breakpoint 1"); break1 = true; diff --git a/paxos.h b/paxos.h index 8561dd5..116403d 100644 --- a/paxos.h +++ b/paxos.h @@ -1,103 +1,79 @@ #ifndef paxos_h #define paxos_h -#include -#include -#include +#include "types.h" #include "rpc/rpc.h" #include "paxos_protocol.h" #include "log.h" -#include "lock.h" -using std::string; -using std::map; -using std::vector; +using prepareres = paxos_protocol::prepareres; + +using node_t = string; +using nodes_t = vector; +using value_t = string; class paxos_change { public: - virtual void paxos_commit(unsigned instance, const string & v) = 0; + virtual void paxos_commit(unsigned instance, const value_t & v) = 0; virtual ~paxos_change() {} }; -class acceptor { +extern bool isamember(const node_t & m, const nodes_t & nodes); +extern bool majority(const nodes_t & l1, const nodes_t & l2); +extern string print_members(const nodes_t & nodes); + +class proposer_acceptor { private: - log *l; - rpcs *pxs; - paxos_change *cfg; - string me; - mutex pxs_mutex; + mutex proposer_mutex; + mutex acceptor_mutex; - // Acceptor state - prop_t n_h; // number of the highest proposal seen in a prepare - prop_t n_a; // number of highest proposal accepted - string v_a; // value of highest proposal accepted - unsigned instance_h; // number of the highest instance we have decided - map values; // vals of each instance - - void commit(unsigned instance, const string & v, lock & pxs_mutex_lock); - paxos_protocol::status preparereq(paxos_protocol::prepareres & r, - const string & src, paxos_protocol::preparearg a); - paxos_protocol::status acceptreq(bool & r, const string & src, - paxos_protocol::acceptarg a); - paxos_protocol::status decidereq(int & r, const string & src, - paxos_protocol::decidearg a); + paxos_change *delegate; + node_t me; - friend class log; + rpcs pxs = {(uint32_t)std::stoi(me)}; - public: - acceptor(class paxos_change *cfg, bool _first, const string & _me, - const string & _value); - ~acceptor() {} - void commit(unsigned instance, const string & v); - unsigned instance() { return instance_h; } - const string & value(unsigned instance) { return values[instance]; } - string dump(); - void restore(const string &); - rpcs *get_rpcs() { return pxs; } - prop_t get_n_h() { return n_h; } - unsigned get_instance_h() { return instance_h; } -}; + bool break1 = false; + bool break2 = false; -extern bool isamember(const string & m, const vector & nodes); -extern string print_members(const vector & nodes); + // Proposer state + bool stable = true; + prop_t my_n = {0, me}; // number of the last proposal used in this instance -class proposer { - private: - log *l; - paxos_change *cfg; - acceptor *acc; - string me; - bool break1; - bool break2; + // Acceptor state + prop_t n_h = {0, me}; // number of the highest proposal seen in a prepare + prop_t n_a = {0, me}; // number of highest proposal accepted + value_t v_a; // value of highest proposal accepted + unsigned instance_h = 0; // number of the highest instance we have decided + map values; // vals of each instance - mutex pxs_mutex; + friend class log; + log l = {this, me}; - // Proposer state - bool stable; - prop_t my_n; // number of the last proposal used in this instance - - void setn(); - bool prepare(unsigned instance, vector & accepts, - const vector & nodes, - string & v); - void accept(unsigned instance, vector & accepts, - const vector & nodes, const string & v); - void decide(unsigned instance, const vector & accepts, - const string & v); + void commit(unsigned instance, const value_t & v); + void commit(unsigned instance, const value_t & v, lock & pxs_mutex_lock); + + paxos_protocol::status preparereq(prepareres & r, const node_t & src, unsigned instance, prop_t n); + paxos_protocol::status acceptreq(bool & r, const node_t & src, unsigned instance, prop_t n, const value_t & v); + paxos_protocol::status decidereq(int & r, const node_t & src, unsigned instance, const value_t & v); + + bool prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v); + void accept(unsigned instance, nodes_t & accepts, const nodes_t & nodes, const value_t & v); + void decide(unsigned instance, const nodes_t & accepts, const value_t & v); void breakpoint1(); void breakpoint2(); - bool majority(const vector & l1, const vector & l2); - friend class log; public: - proposer(class paxos_change *cfg, class acceptor *_acceptor, const string &_me); - ~proposer() {} - bool run(unsigned instance, const vector & cnodes, const string & v); - bool isrunning(); + proposer_acceptor(paxos_change *delegate, bool _first, const node_t & _me, const value_t & _value); + unsigned instance() { return instance_h; } + const value_t & value(unsigned instance) { return values[instance]; } + string dump() { return l.dump(); } + void restore(const string &s) { l.restore(s); l.logread(); } + rpcs *get_rpcs() { return &pxs; } + + bool run(unsigned instance, const nodes_t & cnodes, const value_t & v); + bool isrunning() { lock ml(proposer_mutex); return !stable; } void breakpoint(int b); }; - - -#endif /* paxos_h */ +#endif diff --git a/paxos_protocol.h b/paxos_protocol.h index f2bdb3f..c24f155 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -1,11 +1,12 @@ #ifndef paxos_protocol_h #define paxos_protocol_h +#include "types.h" #include "rpc/rpc.h" struct prop_t { unsigned n; - std::string m; + string m; }; class paxos_protocol { @@ -18,45 +19,18 @@ class paxos_protocol { heartbeat, }; - struct preparearg { - unsigned instance; - prop_t n; - }; - struct prepareres { bool oldinstance; bool accept; prop_t n_a; - std::string v_a; - }; - - struct acceptarg { - unsigned instance; - prop_t n; - std::string v; - }; - - struct decidearg { - unsigned instance; - std::string v; + string v_a; }; }; -inline unmarshall & operator>>(unmarshall &u, prop_t &a) { - return u >> a.n >> a.m; -} - -inline marshall & operator<<(marshall &m, prop_t a) { - return m << a.n << a.m; -} - -inline unmarshall & operator>>(unmarshall &u, paxos_protocol::preparearg &a) { - return u >> a.instance >> a.n; -} - -inline marshall & operator<<(marshall &m, paxos_protocol::preparearg a) { - return m << a.instance << a.n; -} +inline unmarshall & operator>>(unmarshall &u, prop_t &a) { return u >> a.n >> a.m; } +inline marshall & operator<<(marshall &m, prop_t a) { return m << a.n << a.m; } +inline bool operator>(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) > tie(b.n, b.m); } +inline bool operator>=(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) >= tie(b.n, b.m); } inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) { return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a; @@ -66,20 +40,4 @@ inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) { return m << r.oldinstance << r.accept << r.n_a << r.v_a; } -inline unmarshall & operator>>(unmarshall &u, paxos_protocol::acceptarg &a) { - return u >> a.instance >> a.n >> a.v; -} - -inline marshall & operator<<(marshall &m, paxos_protocol::acceptarg a) { - return m << a.instance << a.n << a.v; -} - -inline unmarshall & operator>>(unmarshall &u, paxos_protocol::decidearg &a) { - return u >> a.instance >> a.v; -} - -inline marshall & operator<<(marshall &m, paxos_protocol::decidearg a) { - return m << a.instance << a.v; -} - #endif diff --git a/rpc/connection.cc b/rpc/connection.cc index db6a3ea..3f60c69 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,20 +1,17 @@ +// std::bind and syscall bind have the same name, so don't use std::bind in this file +#define LIBT4_NO_FUNCTIONAL +#include "connection.h" #include #include -#include #include #include #include #include - -#include "connection.h" -#include "pollmgr.h" #include "jsl_log.h" -#include "lang/verify.h" -#include "lock.h" +#include #define MAX_PDU (10<<20) //maximum PDF is 10M - connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) { @@ -25,7 +22,7 @@ connection::connection(chanmgr *m1, int f1, int l1) signal(SIGPIPE, SIG_IGN); - create_time_ = std::chrono::steady_clock::now(); + create_time_ = steady_clock::now(); PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } @@ -324,7 +321,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) flags |= O_NONBLOCK; fcntl(pipe_[0], F_SETFL, flags); - th_ = std::thread(&tcpsconn::accept_conn, this); + th_ = thread(&tcpsconn::accept_conn, this); } tcpsconn::~tcpsconn() @@ -333,7 +330,7 @@ tcpsconn::~tcpsconn() th_.join(); //close all the active connections - std::map::iterator i; + map::iterator i; for (i = conns_.begin(); i != conns_.end(); i++) { i->second->closeconn(); i->second->decref(); @@ -356,8 +353,7 @@ tcpsconn::process_accept() connection *ch = new connection(mgr_, s1, lossy_); // garbage collect all dead connections with refcount of 1 - std::map::iterator i; - for (i = conns_.begin(); i != conns_.end();) { + for (auto i = conns_.begin(); i != conns_.end();) { if (i->second->isdead() && i->second->ref() == 1) { jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", i->second->channo()); diff --git a/rpc/connection.h b/rpc/connection.h index f529a35..261cf9f 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -1,22 +1,16 @@ #ifndef connection_h #define connection_h +#include "types.h" #include -#include #include #include #include -#include - -#include -#include - #include "pollmgr.h" -constexpr size_t size_t_max = std::numeric_limits::max(); +constexpr size_t size_t_max = numeric_limits::max(); -class thread_exit_exception : std::exception { -}; +class thread_exit_exception : exception {}; class connection; @@ -64,16 +58,16 @@ class connection : public aio_callback { charbuf wpdu_; charbuf rpdu_; - std::chrono::time_point create_time_; + time_point create_time_; int waiters_; int refno_; const int lossy_; - std::mutex m_; - std::mutex ref_m_; - std::condition_variable send_complete_; - std::condition_variable send_wait_; + mutex m_; + mutex ref_m_; + cond send_complete_; + cond send_wait_; }; class tcpsconn { @@ -84,14 +78,14 @@ class tcpsconn { void accept_conn(); private: unsigned int port_; - std::mutex m_; - std::thread th_; + mutex m_; + thread th_; int pipe_[2]; int tcp_; //file desciptor for accepting connection chanmgr *mgr_; int lossy_; - std::map conns_; + map conns_; void process_accept(); }; diff --git a/rpc/fifo.h b/rpc/fifo.h index dde514d..215ec5b 100644 --- a/rpc/fifo.h +++ b/rpc/fifo.h @@ -1,8 +1,7 @@ #ifndef fifo_h #define fifo_h -#include -#include "lock.h" +#include "types.h" // blocks enq() and deq() when queue is FULL or EMPTY template @@ -17,7 +16,7 @@ class fifo { } private: - std::list q_; + list q_; mutex m_; cond non_empty_c_; // q went non-empty cond has_space_c_; // q is not longer overfull diff --git a/rpc/marshall.h b/rpc/marshall.h index abeaae7..20b9c07 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -1,16 +1,10 @@ #ifndef marshall_h #define marshall_h -#include -#include -#include -#include -#include -#include -#include +#include "types.h" +#include #include -#include -#include "lang/verify.h" +#include using proc_t = uint32_t; using status_t = int32_t; @@ -56,7 +50,7 @@ typedef int rpc_sz_t; //size of initial buffer allocation #define DEFAULT_RPC_SZ 1024 -#define RPC_HEADER_SZ (std::max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) +#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) class marshall { private: @@ -66,7 +60,7 @@ class marshall { inline void reserve(size_t n) { if((index_+n) > capacity_){ - capacity_ += std::max(capacity_, n); + capacity_ += max(capacity_, n); VERIFY (buf_ != NULL); buf_ = (char *)realloc(buf_, capacity_); VERIFY(buf_); @@ -106,12 +100,12 @@ class marshall { } // Return the current content (excluding header) as a string - std::string get_content() { - return std::string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ); + string get_content() { + return string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ); } // Return the current content (excluding header) as a string - std::string str() { + string str() { return get_content(); } @@ -135,16 +129,9 @@ marshall& operator<<(marshall &, int8_t); marshall& operator<<(marshall &, uint16_t); marshall& operator<<(marshall &, int16_t); marshall& operator<<(marshall &, uint64_t); -marshall& operator<<(marshall &, const std::string &); +marshall& operator<<(marshall &, const string &); -template -struct is_enumerable : std::false_type {}; - -template struct is_enumerable().cbegin(), std::declval().cend(), void()) -> : std::true_type {}; - -template typename std::enable_if::value, marshall>::type & +template typename enable_if::value, marshall>::type & operator<<(marshall &m, const A &x) { m << (unsigned int) x.size(); for (const auto &a : x) @@ -153,16 +140,16 @@ operator<<(marshall &m, const A &x) { } template marshall & -operator<<(marshall &m, const std::pair &d) { +operator<<(marshall &m, const pair &d) { return m << d.first << d.second; } template -using enum_type_t = typename std::enable_if::value, typename std::underlying_type::type>::type; +using enum_type_t = typename enable_if::value, typename underlying_type::type>::type; template constexpr inline enum_type_t from_enum(E e) noexcept { return (enum_type_t)e; } template constexpr inline E to_enum(enum_type_t value) noexcept { return (E)value; } -template typename std::enable_if::value, marshall>::type & +template typename enable_if::value, marshall>::type & operator<<(marshall &m, E e) { return m << from_enum(e); } @@ -179,8 +166,8 @@ unmarshall& operator>>(unmarshall &, int32_t &); unmarshall& operator>>(unmarshall &, size_t &); unmarshall& operator>>(unmarshall &, uint64_t &); unmarshall& operator>>(unmarshall &, int64_t &); -unmarshall& operator>>(unmarshall &, std::string &); -template typename std::enable_if::value, unmarshall>::type & +unmarshall& operator>>(unmarshall &, string &); +template typename enable_if::value, unmarshall>::type & operator>>(unmarshall &u, E &e); class unmarshall { @@ -194,7 +181,7 @@ class unmarshall { public: unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {} unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {} - unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false) + unmarshall(const string &s) : buf_(NULL),sz_(0),index_(0),ok_(false) { //take the content which does not exclude a RPC header from a string take_content(s); @@ -207,7 +194,7 @@ class unmarshall { void take_in(unmarshall &another); //take the content which does not exclude a RPC header from a string - void take_content(const std::string &s) { + void take_content(const string &s) { sz_ = s.size()+RPC_HEADER_SZ; buf_ = (char *)realloc(buf_,sz_); VERIFY(buf_); @@ -221,7 +208,7 @@ class unmarshall { bool okdone() const { return ok_ && index_ == sz_; } uint8_t rawbyte(); - void rawbytes(std::string &s, size_t n); + void rawbytes(string &s, size_t n); template void rawbytes(T &t); size_t ind() { return index_;} @@ -255,7 +242,7 @@ class unmarshall { } }; -template typename std::enable_if::value, unmarshall>::type & +template typename enable_if::value, unmarshall>::type & operator>>(unmarshall &u, A &x) { unsigned n = u.grab(); x.clear(); @@ -265,26 +252,26 @@ operator>>(unmarshall &u, A &x) { } template unmarshall & -operator>>(unmarshall &u, std::map &x) { +operator>>(unmarshall &u, map &x) { unsigned n = u.grab(); x.clear(); while (n--) - x.emplace(u.grab>()); + x.emplace(u.grab>()); return u; } template unmarshall & -operator>>(unmarshall &u, std::pair &d) { +operator>>(unmarshall &u, pair &d) { return u >> d.first >> d.second; } -template typename std::enable_if::value, unmarshall>::type & +template typename enable_if::value, unmarshall>::type & operator>>(unmarshall &u, E &e) { e = to_enum(u.grab>()); return u; } -typedef std::function handler; +typedef function handler; // // Automatic marshalling wrappers for RPC handlers @@ -294,7 +281,7 @@ typedef std::function handler; // C++11 does neither of these two things for us: // 1) Declare variables using a parameter pack expansion, like so // Args ...args; -// 2) Call a function with a std::tuple of the arguments it expects +// 2) Call a function with a tuple of the arguments it expects // // We implement an 'invoke' function for functions of the RPC handler // signature, i.e. int(R & r, const Args...) @@ -339,17 +326,17 @@ struct VerifyOnFailure { // One for function pointers... template -typename std::enable_if::value, RV>::type +typename enable_if::value, RV>::type invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { - return f(r, std::move(std::get(t))...); + return f(r, move(get(t))...); } // And one for pointers to member functions... template -typename std::enable_if::value, RV>::type +typename enable_if::value, RV>::type invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices) { - return (c->*f)(r, std::move(std::get(t))...); + return (c->*f)(r, move(get(t))...); } // The class marshalled_func_imp uses partial template specialization to @@ -373,15 +360,15 @@ struct marshalled_func_imp { // template parameters running from 0 up to (# args) - 1. using Indices = typename make_tuple_indices::type; // This type definition represents storage for f's unmarshalled - // arguments. std::decay is (most notably) stripping off const + // arguments. decay is (most notably) stripping off const // qualifiers. - using ArgsStorage = std::tuple::type...>; - // Allocate a handler (i.e. std::function) to hold the lambda + using ArgsStorage = tuple::type...>; + // Allocate a handler (i.e. function) to hold the lambda // which will unmarshall RPCs and call f. return new handler([=](unmarshall &u, marshall &m) -> RV { // Unmarshall each argument with the correct type and store the // result in a tuple. - ArgsStorage t = {u.grab::type>()...}; + ArgsStorage t = {u.grab::type>()...}; // Verify successful unmarshalling of the entire input stream. if (!u.okdone()) return (RV)ErrorHandler::unmarshall_args_failure(); @@ -417,7 +404,7 @@ struct marshalled_func : public marshalled_func_imp {}; template -struct marshalled_func> : +struct marshalled_func> : public marshalled_func_imp {}; #endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 919a286..023a7aa 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,11 +1,10 @@ +#include "types.h" #include #include #include #include "jsl_log.h" -#include "lang/verify.h" #include "pollmgr.h" -#include "lock.h" PollMgr *PollMgr::instance = NULL; static std::once_flag pollmgr_is_initialized; diff --git a/rpc/pollmgr.h b/rpc/pollmgr.h index 89d1660..a082843 100644 --- a/rpc/pollmgr.h +++ b/rpc/pollmgr.h @@ -1,9 +1,8 @@ #ifndef pollmgr_h #define pollmgr_h +#include "types.h" #include -#include -#include #ifdef __linux__ #include @@ -24,7 +23,7 @@ class aio_mgr { virtual void watch_fd(int fd, poll_flag flag) = 0; virtual bool unwatch_fd(int fd, poll_flag flag) = 0; virtual bool is_watched(int fd, poll_flag flag) = 0; - virtual void wait_ready(std::vector *readable, std::vector *writable) = 0; + virtual void wait_ready(vector *readable, vector *writable) = 0; virtual ~aio_mgr() {} }; @@ -55,9 +54,9 @@ class PollMgr { static int useless; private: - std::mutex m_; - std::condition_variable changedone_c_; - std::thread th_; + mutex m_; + cond changedone_c_; + thread th_; aio_callback *callbacks_[MAX_POLL_FDS]; aio_mgr *aio_; @@ -73,7 +72,7 @@ class SelectAIO : public aio_mgr { void watch_fd(int fd, poll_flag flag); bool unwatch_fd(int fd, poll_flag flag); bool is_watched(int fd, poll_flag flag); - void wait_ready(std::vector *readable, std::vector *writable); + void wait_ready(vector *readable, vector *writable); private: @@ -82,7 +81,7 @@ class SelectAIO : public aio_mgr { int highfds_; int pipefd_[2]; - std::mutex m_; + mutex m_; }; @@ -94,7 +93,7 @@ class EPollAIO : public aio_mgr { void watch_fd(int fd, poll_flag flag); bool unwatch_fd(int fd, poll_flag flag); bool is_watched(int fd, poll_flag flag); - void wait_ready(std::vector *readable, std::vector *writable); + void wait_ready(vector *readable, vector *writable); private: int pollfd_; diff --git a/rpc/rpc.cc b/rpc/rpc.cc index c6d93c8..ad3fcd9 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -54,6 +54,7 @@ x exited worker threads). */ +#include "types.h" #include "rpc.h" #include @@ -61,30 +62,14 @@ #include #include #include -#include "lock.h" #include "jsl_log.h" -#include "threaded_log.h" -#include "lang/verify.h" - -using std::stoi; const rpcc::TO rpcc::to_max = { 120000 }; const rpcc::TO rpcc::to_min = { 1000 }; -rpcc::caller::caller(int xxid, unmarshall *xun) -: xid(xxid), un(xun), done(false) -{ -} - -rpcc::caller::~caller() -{ -} - -inline -void set_rand_seed() -{ - auto now = std::chrono::time_point_cast(std::chrono::steady_clock::now()); +inline void set_rand_seed() { + auto now = time_point_cast(steady_clock::now()); srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); } @@ -196,10 +181,8 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, } TO curr_to; - std::chrono::time_point finaldeadline = - std::chrono::steady_clock::now() + - std::chrono::milliseconds(to.to), - nextdeadline; + auto finaldeadline = steady_clock::now() + milliseconds(to.to), + nextdeadline = finaldeadline; curr_to.to = to_min.to; @@ -231,20 +214,20 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, transmit = false; // only send once on a given channel } - if(finaldeadline == std::chrono::time_point::min()) + if(finaldeadline == time_point::min()) break; - nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to); + nextdeadline = steady_clock::now() + milliseconds(curr_to.to); if(nextdeadline > finaldeadline) { nextdeadline = finaldeadline; - finaldeadline = std::chrono::time_point::min(); + finaldeadline = time_point::min(); } { lock cal(ca.m); while (!ca.done){ jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); - if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){ + if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){ jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); break; } @@ -421,7 +404,7 @@ rpcs::got_pdu(connection *c, char *b, size_t sz) djob_t *j = new djob_t(c, b, sz); c->incref(); - bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j)); + bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j)); if(!succ || !reachable_){ c->decref(); delete j; @@ -447,16 +430,15 @@ rpcs::updatestat(proc_t proc) if(curr_counts_ == 0){ LOG("RPC STATS: "); for (auto i = counts_.begin(); i != counts_.end(); i++) - LOG(std::hex << i->first << ":" << std::dec << i->second); + LOG(hex << i->first << ":" << dec << i->second); lock rwl(reply_window_m_); - map >::iterator clt; size_t totalrep = 0, maxrep = 0; - for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ - totalrep += clt->second.size(); - if(clt->second.size() > maxrep) - maxrep = clt->second.size(); + for (auto clt : reply_window_) { + totalrep += clt.second.size(); + if(clt.second.size() > maxrep) + maxrep = clt.second.size(); } jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n", (int) reply_window_.size()-1, totalrep, maxrep); @@ -639,8 +621,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, int past_xid_rep = l.begin()->xid; - list::iterator start = l.begin(), it; - it = ++start; + list::iterator start = l.begin(), it = ++start; if (past_xid_rep < xid_rep || past_xid_rep == -1) { // scan for deletion candidates @@ -700,24 +681,19 @@ rpcs::add_reply(unsigned int clt_nonce, int xid, } } -void -rpcs::free_reply_window(void) -{ +void rpcs::free_reply_window(void) { lock rwl(reply_window_m_); - for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ - for (auto it = clt->second.begin(); it != clt->second.end(); it++){ - if (it->cb_present) - free(it->buf); + for (auto clt : reply_window_) { + for (auto it : clt.second){ + if (it.cb_present) + free(it.buf); } - clt->second.clear(); + clt.second.clear(); } reply_window_.clear(); } -// rpc handler -int -rpcs::rpcbind(unsigned int &r, int) -{ +int rpcs::rpcbind(unsigned int &r, int) { jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); r = nonce_; return 0; diff --git a/rpc/rpc.h b/rpc/rpc.h index d81a5dd..f01af09 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -1,20 +1,13 @@ #ifndef rpc_h #define rpc_h +#include "types.h" #include #include -#include -#include -#include #include "thr_pool.h" #include "marshall.h" #include "connection.h" -#include "lock.h" - -using std::string; -using std::map; -using std::list; class rpc_const { public: @@ -37,13 +30,12 @@ class rpcc : public chanmgr { //manages per rpc info struct caller { - caller(int xxid, unmarshall *un); - ~caller(); + caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {} int xid; unmarshall *un; int intret; - bool done; + bool done = false; mutex m; cond c; }; @@ -126,9 +118,8 @@ rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) if (intret < 0) return intret; u >> r; if (u.okdone() != true) { - fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." - "You are probably calling RPC 0x%x with wrong return " - "type.\n", proc); + cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " << + "calling RPC 0x" << hex << proc << " with the wrong return type." << endl; VERIFY(0); return rpc_const::unmarshal_reply_failure; } diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index bf8a56c..c69d317 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -1,28 +1,18 @@ // RPC test and pseudo-documentation. // generates print statements on failures, but eventually says "rpctest OK" +#include "types.h" #include "rpc.h" #include -#include -#include -#include -#include #include #include #include #include "jsl_log.h" -#include "lang/verify.h" #define NUM_CL 2 char log_thread_prefix = 'r'; -using std::string; -using std::cout; -using std::endl; -using std::vector; -using std::thread; - rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object string dst; //server's ip address diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index ff3557c..8b9691b 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -1,7 +1,4 @@ #include "thr_pool.h" -#include -#include -#include "lang/verify.h" // if blocking, then addJob() blocks when queue is full // otherwise, addJob() simply returns false when queue is full diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h index 4ea1bd4..94ce237 100644 --- a/rpc/thr_pool.h +++ b/rpc/thr_pool.h @@ -1,9 +1,7 @@ #ifndef thr_pool_h #define thr_pool_h -#include -#include - +#include "types.h" #include "fifo.h" typedef std::function job_t; diff --git a/rsm.cc b/rsm.cc index 1321f7e..00cae81 100644 --- a/rsm.cc +++ b/rsm.cc @@ -78,18 +78,13 @@ // The rule is that a module releases its internal locks before it // upcalls, but can keep its locks when calling down. -#include -#include -#include #include #include +#include "types.h" #include "handle.h" #include "rsm.h" -#include "threaded_log.h" -#include "lang/verify.h" #include "rsm_client.h" -#include "lock.h" rsm::rsm(std::string _first, std::string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), @@ -191,7 +186,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { insync = true; cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); - LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end())); + LOG("rsm::sync_with_backups " << backups); sync_cond.wait(rsm_mutex_lock); insync = false; return true; @@ -259,7 +254,7 @@ bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) { bool rsm::join(std::string m, lock & rsm_mutex_lock) { handle h(m); int ret = 0; - rsm_protocol::joinres r; + string log; LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"); rpcc *cl; @@ -267,7 +262,7 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); cl = h.safebind(); if (cl != 0) { - ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r, + ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), log, cfg->myaddr(), last_myvs); } rsm_mutex_lock.lock(); @@ -277,8 +272,8 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) { LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret); return false; } - LOG("rsm::join: succeeded " << r.log); - cfg->restore(r.log); + LOG("rsm::join: succeeded " << log); + cfg->restore(log); return true; } @@ -447,7 +442,7 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { // a node that wants to join an RSM as a server sends a // joinreq to the RSM's current primary; this is the // handler for that RPC. -rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) { +rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { auto ret = rsm_protocol::OK; lock ml(rsm_mutex); @@ -455,7 +450,7 @@ rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, views last_myvs.vid << "," << last_myvs.seqno << ")"); if (cfg->ismember(m, vid_commit)) { LOG("joinreq: is still a member"); - r.log = cfg->dump(); + log = cfg->dump(); } else if (cfg->myaddr() != primary) { LOG("joinreq: busy"); ret = rsm_protocol::BUSY; @@ -470,8 +465,8 @@ rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, views ml.lock(); } if (cfg->ismember(m, cfg->view_id())) { - r.log = cfg->dump(); - LOG("joinreq: ret " << ret << " log " << r.log); + log = cfg->dump(); + LOG("joinreq: ret " << ret << " log " << log); } else { LOG("joinreq: failed; proposer couldn't add " << succ); ret = rsm_protocol::BUSY; diff --git a/rsm.h b/rsm.h index 73fa606..f2eb5bd 100644 --- a/rsm.h +++ b/rsm.h @@ -3,19 +3,24 @@ #ifndef rsm_h #define rsm_h -#include -#include +#include "types.h" #include "rsm_protocol.h" -#include "rsm_state_transfer.h" #include "rpc/rpc.h" #include #include "config.h" +class rsm_state_transfer { + public: + virtual string marshal_state() = 0; + virtual void unmarshal_state(string) = 0; + virtual ~rsm_state_transfer() {} +}; + class rsm : public config_view_change { private: void reg1(int proc, handler *); protected: - std::map procs; + map procs; config *cfg; class rsm_state_transfer *stf; rpcs *rsmrpc; @@ -23,12 +28,12 @@ class rsm : public config_view_change { // On primary: viewstamp for the next request from rsm_client viewstamp myvs; viewstamp last_myvs; // Viewstamp of the last executed request - std::string primary; + string primary; bool insync; bool inviewchange; unsigned vid_commit; // Latest view id that is known to rsm layer unsigned vid_insync; // The view id that this node is synchronizing for - std::vector backups; // A list of unsynchronized backups + vector backups; // A list of unsynchronized backups // For testing purposes rpcs *testsvr; @@ -37,30 +42,26 @@ class rsm : public config_view_change { bool break1; bool break2; - - rsm_client_protocol::status client_members(std::vector &r, int i); - rsm_protocol::status invoke(int &, int proc, viewstamp vs, std::string mreq); - rsm_protocol::status transferreq(rsm_protocol::transferres &r, std::string src, + rsm_client_protocol::status client_members(vector &r, int i); + rsm_protocol::status invoke(int &, int proc, viewstamp vs, string mreq); + rsm_protocol::status transferreq(rsm_protocol::transferres &r, string src, viewstamp last, unsigned vid); - rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid); - rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src, + rsm_protocol::status transferdonereq(int &, string m, unsigned vid); + rsm_protocol::status joinreq(string & log, string src, viewstamp last); rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal); rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b); - std::mutex rsm_mutex; - std::mutex invoke_mutex; - std::condition_variable recovery_cond; - std::condition_variable sync_cond; + mutex rsm_mutex, invoke_mutex; + cond recovery_cond, sync_cond; - void execute(int procno, std::string req, std::string &r); - rsm_client_protocol::status client_invoke(std::string &r, int procno, - std::string req); - bool statetransfer(std::string m, lock & rsm_mutex_lock); - bool statetransferdone(std::string m, lock & rsm_mutex_lock); - bool join(std::string m, lock & rsm_mutex_lock); + void execute(int procno, string req, string &r); + rsm_client_protocol::status client_invoke(string &r, int procno, string req); + bool statetransfer(string m, lock & rsm_mutex_lock); + bool statetransferdone(string m, lock & rsm_mutex_lock); + bool join(string m, lock & rsm_mutex_lock); void set_primary(unsigned vid); - std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid); + string find_highest(viewstamp &vs, string &m, unsigned &vid); bool sync_with_backups(lock & rsm_mutex_lock); bool sync_with_primary(lock & rsm_mutex_lock); void net_repair(bool heal, lock & rsm_mutex_lock); @@ -69,7 +70,7 @@ class rsm : public config_view_change { void partition1(lock & rsm_mutex_lock); void commit_change(unsigned vid, lock & rsm_mutex_lock); public: - rsm (std::string _first, std::string _me); + rsm (string _first, string _me); ~rsm() {} bool amiprimary(); diff --git a/rsm_client.cc b/rsm_client.cc index bff32c2..310d1ad 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -1,14 +1,10 @@ +#include "types.h" #include "rsm_client.h" -#include #include -#include #include #include -#include "lang/verify.h" -#include "lock.h" -#include "threaded_log.h" -rsm_client::rsm_client(std::string dst) : primary(dst) { +rsm_client::rsm_client(string dst) : primary(dst) { LOG("create rsm_client"); lock ml(rsm_client_mutex); VERIFY (init_members(ml)); @@ -20,10 +16,10 @@ void rsm_client::primary_failure(lock &) { known_mems.pop_back(); } -rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) { +rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) { lock ml(rsm_client_mutex); while (1) { - LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary); + LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary); handle h(primary); ml.unlock(); @@ -36,7 +32,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con if (!cl) goto prim_fail; - LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret); + LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary << " ret " << dec << ret); if (ret == rsm_client_protocol::OK) return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { @@ -50,7 +46,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con continue; } prim_fail: - LOG("primary " << primary << " failed ret " << std::dec << ret); + LOG("primary " << primary << " failed ret " << dec << ret); primary_failure(ml); LOG("rsm_client::invoke: retry new primary " << primary); } diff --git a/rsm_client.h b/rsm_client.h index 814616f..bb21f24 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -1,11 +1,8 @@ #ifndef rsm_client_h #define rsm_client_h -#include "rpc/rpc.h" +#include "types.h" #include "rsm_protocol.h" -#include -#include - // // rsm client interface. diff --git a/rsm_protocol.h b/rsm_protocol.h index 6b508d8..53908f3 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -1,6 +1,7 @@ #ifndef rsm_protocol_h #define rsm_protocol_h +#include "types.h" #include "rpc/rpc.h" class rsm_client_protocol { @@ -23,33 +24,21 @@ class rsm_protocol { public: enum status : status_t { OK, ERR, BUSY}; enum rpc_numbers : proc_t { - invoke = 0x10001, + invoke = 0xa001, transferreq, transferdonereq, joinreq, }; struct transferres { - std::string state; + string state; viewstamp last; }; - - struct joinres { - std::string log; - }; }; -inline bool operator==(viewstamp a, viewstamp b) { - return a.vid == b.vid && a.seqno == b.seqno; -} - -inline bool operator>(viewstamp a, viewstamp b) { - return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno); -} - -inline bool operator!=(viewstamp a, viewstamp b) { - return a.vid != b.vid || a.seqno != b.seqno; -} +inline bool operator==(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) == tie(b.vid, b.seqno); } +inline bool operator>(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) > tie(b.vid, b.seqno); } +inline bool operator!=(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) != tie(b.vid, b.seqno); } inline marshall& operator<<(marshall &m, viewstamp v) { return m << v.vid << v.seqno; @@ -67,14 +56,6 @@ inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) { return u >> r.state >> r.last; } -inline marshall & operator<<(marshall &m, rsm_protocol::joinres r) { - return m << r.log; -} - -inline unmarshall & operator>>(unmarshall &u, rsm_protocol::joinres &r) { - return u >> r.log; -} - class rsm_test_protocol { public: enum status : status_t {OK, ERR}; diff --git a/rsm_state_transfer.h b/rsm_state_transfer.h deleted file mode 100644 index 62a130c..0000000 --- a/rsm_state_transfer.h +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef rsm_state_transfer_h -#define rsm_state_transfer_h - -class rsm_state_transfer { - public: - virtual std::string marshal_state() = 0; - virtual void unmarshal_state(std::string) = 0; - virtual ~rsm_state_transfer() {} -}; - -#endif diff --git a/rsm_tester.cc b/rsm_tester.cc index 31b8c1a..1a8b833 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -2,34 +2,27 @@ // RSM test client // +#include "types.h" #include "rsm_protocol.h" #include "rsmtest_client.h" -#include "rpc/rpc.h" -#include -#include -#include -#include -#include char log_thread_prefix = 't'; -int -main(int argc, char *argv[]) -{ +int main(int argc, char *argv[]) { if(argc != 4){ - fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]); - exit(1); + cerr << "Usage: " << argv[0] << " [host:]port [partition] arg" << endl; + return 1; } rsmtest_client *lc = new rsmtest_client(argv[1]); - std::string command(argv[2]); + string command(argv[2]); if (command == "partition") { - printf("net_repair returned %d\n", lc->net_repair(atoi(argv[3]))); + cout << "net_repair returned " << lc->net_repair(stoi(argv[3])); } else if (command == "breakpoint") { - int b = atoi(argv[3]); - printf("breakpoint %d returned %d\n", b, lc->breakpoint(b)); + int b = stoi(argv[3]); + cout << "breakpoint " << b << " returned " << lc->breakpoint(b); } else { - fprintf(stderr, "Unknown command %s\n", argv[2]); + cerr << "Unknown command " << argv[2] << endl; } return 0; } diff --git a/rsmtest_client.cc b/rsmtest_client.cc index 0c56f8a..cb3ce8c 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -1,16 +1,11 @@ // RPC stubs for clients to talk to rsmtest_server #include "rsmtest_client.h" -#include "rpc/rpc.h" #include -#include -#include -#include - rsmtest_client::rsmtest_client(std::string dst) : cl(dst) { if (cl.bind() < 0) - printf("rsmtest_client: call bind\n"); + cout << "rsmtest_client: call bind" << endl; } rsm_test_protocol::status rsmtest_client::net_repair(int heal) { diff --git a/rsmtest_client.h b/rsmtest_client.h index 51f8511..ad3d4c1 100644 --- a/rsmtest_client.h +++ b/rsmtest_client.h @@ -3,9 +3,8 @@ #ifndef rsmtest_client_h #define rsmtest_client_h -#include +#include "types.h" #include "rsm_protocol.h" -#include "rpc/rpc.h" // Client interface to the rsmtest server class rsmtest_client { diff --git a/threaded_log.cc b/threaded_log.cc index 57ddc08..6a213b1 100644 --- a/threaded_log.cc +++ b/threaded_log.cc @@ -1,9 +1,7 @@ -#include -#include #include "threaded_log.h" -std::mutex cerr_mutex; -std::map thread_name_map; +mutex cerr_mutex; +map thread_name_map; int next_thread_num = 0; -std::map instance_name_map; +map instance_name_map; int next_instance_num = 0; diff --git a/threaded_log.h b/threaded_log.h index 6918220..ebb2222 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -1,101 +1,61 @@ #ifndef threaded_log_h #define threaded_log_h -#include -#include -#include -#include -#include "lock.h" +#include "types.h" extern mutex cerr_mutex; -extern std::map thread_name_map; +extern map thread_name_map; extern int next_thread_num; -extern std::map instance_name_map; +extern map instance_name_map; extern int next_instance_num; extern char log_thread_prefix; -template -struct iterator_pair : public std::pair { - explicit iterator_pair(const A & first, const A & second) : std::pair(first, second) {} -}; - -template -const struct iterator_pair make_iterator_pair(const A & first, const A & second) { - return iterator_pair(first, second); -} - -template -std::ostream & operator<<(std::ostream &o, const std::pair &d) { - o << "<" << d.first << "," << d.second << ">"; - return o; +namespace std { + // This... is an awful hack. But sticking this in std:: makes it possible for + // ostream_iterator to use it. + template + ostream & operator<<(ostream &o, const pair &d) { + return o << "<" << d.first << "," << d.second << ">"; + } } template -std::ostream & operator<<(std::ostream &o, const iterator_pair &d) { +typename enable_if::value && !is_same::value, ostream>::type & +operator<<(ostream &o, const A &a) { o << "["; - for (auto i=d.first; i!=d.second; i++) { - o << *i; - auto j(i); - if (++j != d.second) - o << ", "; - } + auto oit = ostream_iterator(o, ", "); + copy(a.begin(), a.end(), oit); o << "]"; return o; } #define LOG_PREFIX { \ - cerr_mutex.lock(); \ auto _thread_ = std::this_thread::get_id(); \ int _tid_ = thread_name_map[_thread_]; \ if (_tid_==0) \ _tid_ = thread_name_map[_thread_] = ++next_thread_num; \ - auto _utime_ = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \ - std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \ - std::cerr << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << _tid_; \ - std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \ + auto _utime_ = duration_cast(system_clock::now().time_since_epoch()).count() % 1000000000; \ + cerr << setfill('0') << dec << left << setw(9) << _utime_ << " "; \ + cerr << setfill(' ') << log_thread_prefix << left << setw(2) << _tid_; \ + cerr << " " << setw(20) << __FILE__ << " " << setw(18) << __func__; \ } #define LOG_THIS_POINTER { \ int _self_ = instance_name_map[this]; \ if (_self_==0) \ _self_ = instance_name_map[this] = ++next_instance_num; \ - std::cerr << "#" << std::setw(2) << _self_; \ -} -#define LOG_SUFFIX { \ - cerr_mutex.unlock(); \ + cerr << "#" << setw(2) << _self_; \ } #define LOG_NONMEMBER(_x_) { \ + lock _cel_(cerr_mutex); \ LOG_PREFIX; \ - std::cerr << _x_ << std::endl; \ - LOG_SUFFIX; \ + cerr << _x_ << endl; \ } #define LOG(_x_) { \ + lock _cel_(cerr_mutex); \ LOG_PREFIX; \ LOG_THIS_POINTER; \ - std::cerr << _x_ << std::endl; \ - LOG_SUFFIX; \ -} -#define LOG_FUNC_ENTER { \ - LOG_PREFIX; \ - LOG_THIS_POINTER; \ - std::cerr << "lid=" << lid; \ - std::cerr << std::endl; \ - LOG_SUFFIX; \ -} -#define LOG_FUNC_ENTER_SERVER { \ - LOG_PREFIX; \ - LOG_THIS_POINTER; \ - std::cerr << "lid=" << lid; \ - std::cerr << " client=" << id << "," << xid; \ - std::cerr << std::endl; \ - LOG_SUFFIX; \ -} -#define LOG_FUNC_EXIT { \ - LOG_PREFIX; \ - LOG_THIS_POINTER; \ - std::cerr << "return" << lid; \ - std::cerr << std::endl; \ - LOG_SUFFIX; \ + cerr << _x_ << endl; \ } #endif diff --git a/types.h b/types.h new file mode 100644 index 0000000..9739a2a --- /dev/null +++ b/types.h @@ -0,0 +1,113 @@ +#ifndef types_h +#define types_h + +#include +using std::copy; +using std::move; +using std::max; +using std::min; +using std::min_element; +using std::find; +using std::count_if; + +#include +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::chrono::microseconds; +using std::chrono::nanoseconds; +using std::chrono::steady_clock; +using std::chrono::system_clock; +using std::chrono::duration_cast; +using std::chrono::time_point_cast; +using std::chrono::time_point; + +#include +using std::exception; + +#include +using std::ofstream; +using std::ifstream; + +#ifndef LIBT4_NO_FUNCTIONAL +#include +using std::function; +using std::bind; +using std::placeholders::_1; +#endif + +#include +#include +using std::cout; +using std::cerr; +using std::endl; +using std::dec; +using std::hex; +using std::left; +using std::setw; +using std::setfill; +using std::setprecision; +using std::ostream; +using std::istream; +using std::ostream_iterator; +using std::istream_iterator; + +#include +using std::numeric_limits; + +#include +using std::list; + +#include +using std::map; + +#include +using std::mutex; +using lock = std::unique_lock; +using cond = std::condition_variable; +using std::cv_status; + +#include +using std::ostringstream; +using std::istringstream; + +#include +using std::string; +using std::to_string; +using std::stoi; + +#include +using std::thread; + +#include +using std::tuple; +using std::get; +using std::tie; + +#include +using std::decay; +using std::true_type; +using std::false_type; +using std::is_enum; +using std::is_member_function_pointer; +using std::is_same; +using std::underlying_type; +using std::enable_if; + +#include +using std::pair; +using std::declval; + +#include +using std::vector; + + +template struct is_iterable : false_type {}; + +template struct is_iterable().cbegin(), declval().cend(), void()) +> : true_type {}; + +#include "lang/verify.h" +#include "threaded_log.h" + +#endif