From: Peter Iannucci Date: Sat, 28 Sep 2013 03:37:44 +0000 (-0400) Subject: MOAR TEMPLATE MAGIC X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/24bebc0ecf83446c7371eff69042322aab34976a MOAR TEMPLATE MAGIC --- diff --git a/Makefile b/Makefile index 3da8e03..363cbb3 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ EXTRA_TARGETS ?= all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS) -rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o +rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rm -f $@ ar cq $@ $^ ranlib rpc/librpc.a diff --git a/config.cc b/config.cc index d1cd70a..038a100 100644 --- a/config.cc +++ b/config.cc @@ -61,15 +61,11 @@ void config::get_view(unsigned instance, vector &m, lock &) { } vector config::members(const string &value) const { - istringstream ist(value); - using it = istream_iterator; - return {it(ist), it()}; + return explode(value); } -string config::value(const vector &m) const { - ostringstream ost; - copy(m.begin(), m.end(), ostream_iterator(ost, " ")); - return ost.str(); +string config::value(const vector &members) const { + return implode(members); } void config::reconstruct(lock &cfg_mutex_lock) { @@ -77,7 +73,7 @@ void config::reconstruct(lock &cfg_mutex_lock) { my_view_id = paxos.instance(); if (my_view_id > 0) { get_view(my_view_id, mems, cfg_mutex_lock); - LOG("config::reconstruct: " << my_view_id << " " << print_members(mems)); + LOG("config::reconstruct: " << my_view_id << " " << mems); } } @@ -86,7 +82,7 @@ void config::paxos_commit(unsigned instance, const string &value) { lock cfg_mutex_lock(cfg_mutex); vector newmem = members(value); - LOG("config::paxos_commit: " << instance << ": " << print_members(newmem)); + LOG("config::paxos_commit: " << instance << ": " << newmem); for (auto mem : mems) { LOG("config::paxos_commit: is " << mem << " still a member?"); @@ -114,17 +110,20 @@ bool config::ismember(const string &m, unsigned vid) { bool config::add(const string &new_m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); - if (vid != my_view_id) + LOG("adding " << new_m << " to " << vid); + if (vid != my_view_id) { + LOG("that's not my view id, " << my_view_id << "!"); return false; - LOG("config::add " << new_m); + } vector m = mems; m.push_back(new_m); vector cmems = mems; unsigned nextvid = my_view_id + 1; + LOG("calling down to paxos layer"); cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, value(m)); cfg_mutex_lock.lock(); - LOG("config::add: proposer returned " << (r ? "success" : "failure")); + LOG("paxos proposer returned " << (r ? "success" : "failure")); return r; } @@ -156,7 +155,7 @@ void config::heartbeater() [[noreturn]] { unsigned vid = my_view_id; vector cmems; get_view(vid, cmems, cfg_mutex_lock); - LOG("heartbeater: current membership " << print_members(cmems)); + LOG("heartbeater: current membership " << cmems); if (!isamember(me, cmems)) { LOG("heartbeater: not member yet; skip hearbeat"); diff --git a/handle.cc b/handle.cc index 3b6e1fa..d32c895 100644 --- a/handle.cc +++ b/handle.cc @@ -62,10 +62,11 @@ hinfo * handle_mgr::acquire_handle(string m) { if (hmap.find(m) == hmap.end()) { h = new hinfo(m); hmap[m] = h; + h->refcnt++; } else if (!hmap[m]->del) { h = hmap[m]; + h->refcnt++; } - h->refcnt++; return h; } diff --git a/lock_demo.cc b/lock_demo.cc index 72fddf8..714c4e3 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -4,7 +4,7 @@ char log_thread_prefix = 'd'; int main(int argc, char *argv[]) { if(argc != 2) { - fprintf(stderr, "Usage: %s [host:]port\n", argv[0]); + cerr << "Usage: " << argv[0] << " [host:]port" << endl; return 1; } diff --git a/lock_server.cc b/lock_server.cc index 379838a..d5e85a5 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -23,18 +23,9 @@ lock_state& lock_state::operator=(const lock_state& o) { return *this; } -marshall & operator<<(marshall &m, const lock_state &d) { - return m << d.held << d.held_by << d.wanted_by; -} - -unmarshall & operator>>(unmarshall &u, lock_state &d) { - return u >> d.held >> d.held_by >> d.wanted_by; -} - lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); - // by the semantics of map, this will create - // the lock if it doesn't already exist + // this will create the lock if it doesn't already exist return lock_table[lid]; } diff --git a/lock_server.h b/lock_server.h index 381c527..5c182e0 100644 --- a/lock_server.h +++ b/lock_server.h @@ -19,8 +19,12 @@ public: map old_requests; mutex m; lock_state& operator=(const lock_state&); + + MEMBERS(held, held_by, wanted_by) }; +MARSHALLABLE(lock_state) + typedef map lock_map; class lock_server : public rsm_state_transfer { diff --git a/lock_smain.cc b/lock_smain.cc index 5f859a8..3bd7376 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -13,7 +13,7 @@ int main(int argc, char *argv[]) { srandom((uint32_t)getpid()); if(argc != 3){ - fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); + cerr << "Usage: " << argv[0] << " [master:]port [me:]port" << endl; exit(1); } diff --git a/log.cc b/log.cc index 95c40e3..de00d67 100644 --- a/log.cc +++ b/log.cc @@ -27,19 +27,19 @@ void log::logread(void) { pxs->instance_h = instance; LOG("logread: instance: " << instance << " w. v = " << pxs->values[instance]); - pxs->v_a.clear(); - pxs->n_h.n = 0; - pxs->n_a.n = 0; + pxs->accepted_value.clear(); + pxs->promise.n = 0; + pxs->accepted.n = 0; } else if (type == "propseen") { - from >> pxs->n_h.n >> pxs->n_h.m; - LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")"); + from >> pxs->promise.n >> pxs->promise.m; + LOG("logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")"); } else if (type == "accepted") { string v; - from >> pxs->n_a.n >> pxs->n_a.m; + from >> pxs->accepted.n >> pxs->accepted.m; from.get(); getline(from, v); - pxs->v_a = v; - LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a); + pxs->accepted_value = v; + LOG("logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value); } else { LOG("logread: unknown log record"); VERIFY(0); @@ -72,15 +72,15 @@ void log::loginstance(unsigned instance, string v) { f.close(); } -// an acceptor should call logprop(n_h) when it +// an acceptor should call logprop(promise) when it // receives a prepare to which it responds prepare_ok(). -void log::logprop(prop_t n_h) { +void log::logprop(prop_t promise) { ofstream f(name, std::ios::app); - f << "propseen " << n_h.n << " " << n_h.m << "\n"; + f << "propseen " << promise.n << " " << promise.m << "\n"; f.close(); } -// an acceptor should call logaccept(n_a, v_a) when it +// an acceptor should call logaccept(accepted, accepted_value) when it // receives an accept RPC to which it replies accept_ok(). void log::logaccept(prop_t n, string v) { ofstream f(name, std::ios::app); diff --git a/paxos.cc b/paxos.cc index 095d56a..f9d5785 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,12 +1,6 @@ #include "paxos.h" #include "handle.h" -string print_members(const nodes_t &nodes) { - ostringstream ost; - copy(nodes.begin(), nodes.end(), ostream_iterator(ost, ", ")); - return ost.str(); -} - bool isamember(const node_t & m, const nodes_t & nodes) { return find(nodes.begin(), nodes.end(), m) != nodes.end(); } @@ -45,20 +39,20 @@ proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { lock ml(proposer_mutex); - LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable); + LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable); if (!stable) { // already running proposer? - LOG("proposer::run: already running"); + LOG("paxos proposer already running"); return false; } stable = false; bool r = false; - my_n.n = std::max(n_h.n, my_n.n) + 1; + proposal.n = std::max(promise.n, proposal.n) + 1; nodes_t accepts; value_t v = newv; if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - LOG("paxos::run: received a majority of prepare responses"); + LOG("received a majority of prepare responses"); breakpoint1(); @@ -67,20 +61,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - LOG("paxos::run: received a majority of accept responses"); + LOG("received a majority of accept responses"); breakpoint2(); decide(instance, accepts, v); r = true; } else { - LOG("paxos::run: no majority of accept responses"); + LOG("no majority of accept responses"); } } else { - LOG("paxos::run: no majority of prepare responses"); + LOG("no majority of prepare responses"); } } else { - LOG("paxos::run: prepare is rejected " << stable); + LOG("prepare is rejected " << stable); } stable = true; return r; @@ -88,6 +82,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v) { + LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"); prepareres res; prop_t highest_n_a{0, ""}; for (auto i : nodes) { @@ -96,17 +91,19 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, if (!r) continue; auto status = (paxos_protocol::status)r->call_timeout( - paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n); + paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal); if (status == paxos_protocol::OK) { if (res.oldinstance) { LOG("commiting old instance!"); commit(instance, res.v_a); return false; } + LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " << + "v_a=\"" << res.v_a << "\""); if (res.accept) { accepts.push_back(i); if (res.n_a >= highest_n_a) { - LOG("found a newer accepted proposal"); + LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"); v = res.v_a; highest_n_a = res.n_a; } @@ -125,7 +122,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, continue; bool accept = false; int status = r->call_timeout( - paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v); + paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v); if (status == paxos_protocol::OK && accept) accepts.push_back(i); } @@ -144,21 +141,26 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const paxos_protocol::status proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { + LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")"); lock ml(acceptor_mutex); r.oldinstance = false; r.accept = false; - r.n_a = n_a; - r.v_a = v_a; + r.n_a = accepted; + r.v_a = accepted_value; if (instance <= instance_h) { + LOG("old instance " << instance << " has value " << values[instance]); r.oldinstance = true; r.v_a = values[instance]; - } else if (n > n_h) { - n_h = n; - l.logprop(n_h); + } else if (n > promise) { + LOG("looks good to me"); + promise = n; + l.logprop(promise); r.accept = true; } else { LOG("I totally rejected this request. Ha."); } + LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << + "v_a=\"" << r.v_a << "\""); return paxos_protocol::OK; } @@ -167,10 +169,10 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t lock ml(acceptor_mutex); r = false; if (instance == instance_h + 1) { - if (n >= n_h) { - n_a = n; - v_a = v; - l.logaccept(n_a, v_a); + if (n >= promise) { + accepted = n; + accepted_value = v; + l.logaccept(accepted, accepted_value); r = true; } return paxos_protocol::OK; @@ -182,10 +184,10 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t paxos_protocol::status proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { lock ml(acceptor_mutex); - LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a); + LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value); if (instance == instance_h + 1) { - VERIFY(v_a == v); - commit(instance, v_a, ml); + VERIFY(accepted_value == v); + commit(instance, accepted_value, ml); } else if (instance <= instance_h) { // we are ahead; ignore. } else { @@ -207,8 +209,8 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & values[instance] = value; l.loginstance(instance, value); instance_h = instance; - n_a = n_h = {0, me}; - v_a.clear(); + accepted = promise = {0, me}; + accepted_value.clear(); if (delegate) { pxs_mutex_lock.unlock(); delegate->paxos_commit(instance, value); diff --git a/paxos.h b/paxos.h index 116403d..426dfef 100644 --- a/paxos.h +++ b/paxos.h @@ -20,7 +20,6 @@ class paxos_change { extern bool isamember(const node_t & m, const nodes_t & nodes); extern bool majority(const nodes_t & l1, const nodes_t & l2); -extern string print_members(const nodes_t & nodes); class proposer_acceptor { private: @@ -37,12 +36,12 @@ class proposer_acceptor { // Proposer state bool stable = true; - prop_t my_n = {0, me}; // number of the last proposal used in this instance + prop_t proposal = {0, me}; // number of the last proposal used in this instance // Acceptor state - prop_t n_h = {0, me}; // number of the highest proposal seen in a prepare - prop_t n_a = {0, me}; // number of highest proposal accepted - value_t v_a; // value of highest proposal accepted + prop_t promise = {0, me}; // number of the highest proposal seen in a prepare + prop_t accepted = {0, me}; // number of highest proposal accepted + value_t accepted_value; // value of highest proposal accepted unsigned instance_h = 0; // number of the highest instance we have decided map values; // vals of each instance diff --git a/paxos_protocol.h b/paxos_protocol.h index c24f155..5e8afdd 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -7,8 +7,13 @@ struct prop_t { unsigned n; string m; + + MEMBERS(n, m) + LEXICOGRAPHIC_COMPARISON(prop_t) }; +MARSHALLABLE(prop_t) + class paxos_protocol { public: enum status : status_t { OK, ERR }; @@ -24,20 +29,11 @@ class paxos_protocol { bool accept; prop_t n_a; string v_a; + + MEMBERS(oldinstance, accept, n_a, v_a) }; }; -inline unmarshall & operator>>(unmarshall &u, prop_t &a) { return u >> a.n >> a.m; } -inline marshall & operator<<(marshall &m, prop_t a) { return m << a.n << a.m; } -inline bool operator>(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) > tie(b.n, b.m); } -inline bool operator>=(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) >= tie(b.n, b.m); } - -inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) { - return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a; -} - -inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) { - return m << r.oldinstance << r.accept << r.n_a << r.v_a; -} +MARSHALLABLE(paxos_protocol::prepareres) #endif diff --git a/rpc/connection.cc b/rpc/connection.cc index 3f60c69..86d4ec5 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -7,15 +7,13 @@ #include #include #include -#include "jsl_log.h" #include #define MAX_PDU (10<<20) //maximum PDF is 10M connection::connection(chanmgr *m1, int f1, int l1) -: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) +: mgr_(m1), fd_(f1), lossy_(l1) { - int flags = fcntl(fd_, F_GETFL, NULL); flags |= O_NONBLOCK; fcntl(fd_, F_SETFL, flags); @@ -27,8 +25,7 @@ connection::connection(chanmgr *m1, int f1, int l1) PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } -connection::~connection() -{ +connection::~connection() { VERIFY(dead_); if (rpdu_.buf) free(rpdu_.buf); @@ -36,23 +33,17 @@ connection::~connection() close(fd_); } -void -connection::incref() -{ +void connection::incref() { lock rl(ref_m_); refno_++; } -bool -connection::isdead() -{ +bool connection::isdead() { lock ml(m_); return dead_; } -void -connection::closeconn() -{ +void connection::closeconn() { { lock ml(m_); if (!dead_) { @@ -67,9 +58,7 @@ connection::closeconn() PollMgr::Instance()->block_remove_fd(fd_); } -void -connection::decref() -{ +void connection::decref() { bool dead = false; { lock rl(ref_m_); @@ -80,21 +69,11 @@ connection::decref() dead = dead_; } } - if (dead) { + if (dead) delete this; - } -} - -int -connection::ref() -{ - lock rl(ref_m_); - return refno_; } -int -connection::compare(connection *another) -{ +int connection::compare(connection *another) { if (create_time_ > another->create_time_) return 1; if (create_time_ < another->create_time_) @@ -102,261 +81,244 @@ connection::compare(connection *another) return 0; } -bool -connection::send(char *b, size_t sz) -{ +bool connection::send(char *b, size_t sz) { lock ml(m_); - waiters_++; - while (!dead_ && wpdu_.buf) { + waiters_++; + while (!dead_ && wpdu_.buf) { send_wait_.wait(ml); - } - waiters_--; - if (dead_) { - return false; - } - wpdu_.buf = b; - wpdu_.sz = sz; - wpdu_.solong = 0; - - if (lossy_) { - if ((random()%100) < lossy_) { - jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_); - shutdown(fd_,SHUT_RDWR); - } - } - - if (!writepdu()) { - dead_ = true; + } + waiters_--; + if (dead_) { + return false; + } + wpdu_.buf = b; + wpdu_.sz = sz; + wpdu_.solong = 0; + + if (lossy_) { + if ((random()%100) < lossy_) { + IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_); + shutdown(fd_,SHUT_RDWR); + } + } + + if (!writepdu()) { + dead_ = true; ml.unlock(); - PollMgr::Instance()->block_remove_fd(fd_); + PollMgr::Instance()->block_remove_fd(fd_); ml.lock(); - } else { - if (wpdu_.solong == wpdu_.sz) { - } else { - //should be rare to need to explicitly add write callback - PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); - while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) { + } else { + if (wpdu_.solong == wpdu_.sz) { + } else { + //should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) { send_complete_.wait(ml); - } - } - } - bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); - wpdu_.solong = wpdu_.sz = 0; - wpdu_.buf = NULL; - if (waiters_ > 0) + } + } + } + bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); + wpdu_.solong = wpdu_.sz = 0; + wpdu_.buf = NULL; + if (waiters_ > 0) send_wait_.notify_all(); - return ret; + return ret; } //fd_ is ready to be written -void -connection::write_cb(int s) -{ +void connection::write_cb(int s) { lock ml(m_); - VERIFY(!dead_); - VERIFY(fd_ == s); - if (wpdu_.sz == 0) { - PollMgr::Instance()->del_callback(fd_,CB_WRONLY); - return; - } - if (!writepdu()) { - PollMgr::Instance()->del_callback(fd_, CB_RDWR); - dead_ = true; - } else { - VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong < wpdu_.sz) { - return; - } + VERIFY(!dead_); + VERIFY(fd_ == s); + if (wpdu_.sz == 0) { + PollMgr::Instance()->del_callback(fd_,CB_WRONLY); + return; + } + if (!writepdu()) { + PollMgr::Instance()->del_callback(fd_, CB_RDWR); + dead_ = true; + } else { + VERIFY(wpdu_.solong != size_t_max); + if (wpdu_.solong < wpdu_.sz) { + return; + } } - send_complete_.notify_one(); + send_complete_.notify_one(); } //fd_ is ready to be read -void -connection::read_cb(int s) -{ +void connection::read_cb(int s) { lock ml(m_); - VERIFY(fd_ == s); - if (dead_) { - return; - } - - bool succ = true; - if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { - succ = readpdu(); - } - - if (!succ) { - PollMgr::Instance()->del_callback(fd_,CB_RDWR); - dead_ = true; - send_complete_.notify_one(); - } - - if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { - if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { - //chanmgr has successfully consumed the pdu - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; - } - } + VERIFY(fd_ == s); + if (dead_) { + return; + } + + bool succ = true; + if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + succ = readpdu(); + } + + if (!succ) { + PollMgr::Instance()->del_callback(fd_,CB_RDWR); + dead_ = true; + send_complete_.notify_one(); + } + + if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + //chanmgr has successfully consumed the pdu + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + } + } } -bool -connection::writepdu() -{ - VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong == wpdu_.sz) - return true; - - if (wpdu_.solong == 0) { - uint32_t sz = htonl((uint32_t)wpdu_.sz); - bcopy(&sz,wpdu_.buf,sizeof(sz)); - } - ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); - if (n < 0) { - if (errno != EAGAIN) { - jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno); - wpdu_.solong = size_t_max; - wpdu_.sz = 0; - } - return (errno == EAGAIN); - } - wpdu_.solong += (size_t)n; - return true; +bool connection::writepdu() { + VERIFY(wpdu_.solong != size_t_max); + if (wpdu_.solong == wpdu_.sz) + return true; + + if (wpdu_.solong == 0) { + uint32_t sz = htonl((uint32_t)wpdu_.sz); + bcopy(&sz,wpdu_.buf,sizeof(sz)); + } + ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + if (n < 0) { + if (errno != EAGAIN) { + IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno); + wpdu_.solong = size_t_max; + wpdu_.sz = 0; + } + return (errno == EAGAIN); + } + wpdu_.solong += (size_t)n; + return true; } -bool -connection::readpdu() -{ - if (!rpdu_.sz) { - uint32_t sz1; - ssize_t n = read(fd_, &sz1, sizeof(sz1)); - - if (n == 0) { - return false; - } - - if (n < 0) { - VERIFY(errno!=EAGAIN); - return false; - } - - if (n > 0 && n != sizeof(sz1)) { - jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n"); - return false; - } - - size_t sz = ntohl(sz1); - - if (sz > MAX_PDU) { - char *tmpb = (char *)&sz1; - jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %lu network order=%x %x %x %x %x\n", sz, - sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); - return false; - } - - rpdu_.sz = sz; - VERIFY(rpdu_.buf == NULL); - rpdu_.buf = (char *)malloc(sz+sizeof(sz1)); - VERIFY(rpdu_.buf); - bcopy(&sz1,rpdu_.buf,sizeof(sz1)); - rpdu_.solong = sizeof(sz1); - } - - ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); - if (n <= 0) { - if (errno == EAGAIN) - return true; - if (rpdu_.buf) - free(rpdu_.buf); - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; - return (errno == EAGAIN); - } - rpdu_.solong += (size_t)n; - return true; +bool connection::readpdu() { + if (!rpdu_.sz) { + uint32_t sz1; + ssize_t n = read(fd_, &sz1, sizeof(sz1)); + + if (n == 0) { + return false; + } + + if (n < 0) { + VERIFY(errno!=EAGAIN); + return false; + } + + if (n > 0 && n != sizeof(sz1)) { + IF_LEVEL(0) LOG("connection::readpdu short read of sz"); + return false; + } + + size_t sz = ntohl(sz1); + + if (sz > MAX_PDU) { + IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1); + return false; + } + + rpdu_.sz = sz; + VERIFY(rpdu_.buf == NULL); + rpdu_.buf = (char *)malloc(sz+sizeof(sz1)); + VERIFY(rpdu_.buf); + bcopy(&sz1,rpdu_.buf,sizeof(sz1)); + rpdu_.solong = sizeof(sz1); + } + + ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + if (n <= 0) { + if (errno == EAGAIN) + return true; + if (rpdu_.buf) + free(rpdu_.buf); + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + return (errno == EAGAIN); + } + rpdu_.solong += (size_t)n; + return true; } tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) : mgr_(m1), lossy_(lossytest) { - struct sockaddr_in sin; - memset(&sin, 0, sizeof(sin)); - sin.sin_family = AF_INET; - sin.sin_port = htons(port); - - tcp_ = socket(AF_INET, SOCK_STREAM, 0); - if (tcp_ < 0) { - perror("tcpsconn::tcpsconn accept_loop socket:"); - VERIFY(0); - } - - int yes = 1; - setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); - setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - - if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { - perror("accept_loop tcp bind:"); - VERIFY(0); - } - - if (listen(tcp_, 1000) < 0) { - perror("tcpsconn::tcpsconn listen:"); - VERIFY(0); - } + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + tcp_ = socket(AF_INET, SOCK_STREAM, 0); + if (tcp_ < 0) { + perror("tcpsconn::tcpsconn accept_loop socket:"); + VERIFY(0); + } + + int yes = 1; + setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + + if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { + perror("accept_loop tcp bind:"); + VERIFY(0); + } + + if (listen(tcp_, 1000) < 0) { + perror("tcpsconn::tcpsconn listen:"); + VERIFY(0); + } socklen_t addrlen = sizeof(sin); VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); port_ = ntohs(sin.sin_port); - jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, - sin.sin_port); + IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port); - if (pipe(pipe_) < 0) { - perror("accept_loop pipe:"); - VERIFY(0); - } + if (pipe(pipe_) < 0) { + perror("accept_loop pipe:"); + VERIFY(0); + } - int flags = fcntl(pipe_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipe_[0], F_SETFL, flags); + int flags = fcntl(pipe_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipe_[0], F_SETFL, flags); th_ = thread(&tcpsconn::accept_conn, this); } tcpsconn::~tcpsconn() { - VERIFY(close(pipe_[1]) == 0); + VERIFY(close(pipe_[1]) == 0); th_.join(); - //close all the active connections - map::iterator i; - for (i = conns_.begin(); i != conns_.end(); i++) { - i->second->closeconn(); - i->second->decref(); - } + //close all the active connections + map::iterator i; + for (i = conns_.begin(); i != conns_.end(); i++) { + i->second->closeconn(); + i->second->decref(); + } } -void -tcpsconn::process_accept() -{ - sockaddr_in sin; - socklen_t slen = sizeof(sin); - int s1 = accept(tcp_, (sockaddr *)&sin, &slen); - if (s1 < 0) { - perror("tcpsconn::accept_conn error"); - throw thread_exit_exception(); - } - - jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", - s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); - connection *ch = new connection(mgr_, s1, lossy_); +void tcpsconn::process_accept() { + sockaddr_in sin; + socklen_t slen = sizeof(sin); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + if (s1 < 0) { + perror("tcpsconn::accept_conn error"); + throw thread_exit_exception(); + } + + IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port)); + connection *ch = new connection(mgr_, s1, lossy_); // garbage collect all dead connections with refcount of 1 for (auto i = conns_.begin(); i != conns_.end();) { if (i->second->isdead() && i->second->ref() == 1) { - jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", - i->second->channo()); + IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo()); i->second->decref(); // Careful not to reuse i right after erase. (i++) will // be evaluated before the erase call because in C++, @@ -367,14 +329,12 @@ tcpsconn::process_accept() ++i; } - conns_[ch->channo()] = ch; + conns_[ch->channo()] = ch; } -void -tcpsconn::accept_conn() -{ - fd_set rfds; - int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; +void tcpsconn::accept_conn() { + fd_set rfds; + int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; try { while (1) { @@ -389,7 +349,7 @@ tcpsconn::accept_conn() continue; } else { perror("accept_conn select:"); - jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno); VERIFY(0); } } @@ -411,20 +371,16 @@ tcpsconn::accept_conn() } } -connection * -connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) -{ - int s = socket(AF_INET, SOCK_STREAM, 0); - int yes = 1; - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", - inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); - close(s); - return NULL; - } - jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n", - s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); - return new connection(mgr, s, lossy); +connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { + int s = socket(AF_INET, SOCK_STREAM, 0); + int yes = 1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + close(s); + return NULL; + } + IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + return new connection(mgr, s, lossy); } diff --git a/rpc/connection.h b/rpc/connection.h index 261cf9f..2a01e46 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -43,7 +43,7 @@ class connection : public aio_callback { void incref(); void decref(); - int ref(); + int ref() { lock rl(ref_m_); return refno_; } int compare(connection *another); private: @@ -53,15 +53,15 @@ class connection : public aio_callback { chanmgr *mgr_; const int fd_; - bool dead_; + bool dead_ = false; charbuf wpdu_; charbuf rpdu_; time_point create_time_; - int waiters_; - int refno_; + int waiters_ = 0; + int refno_ = 1; const int lossy_; mutex m_; diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc deleted file mode 100644 index 9399b09..0000000 --- a/rpc/jsl_log.cc +++ /dev/null @@ -1 +0,0 @@ -int JSL_DEBUG_LEVEL = 0; diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h deleted file mode 100644 index 66a2dd3..0000000 --- a/rpc/jsl_log.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef jsl_log_h -#define jsl_log_h - -enum dbcode { - JSL_DBG_OFF = 0, - JSL_DBG_1 = 1, // Critical - JSL_DBG_2 = 2, // Error - JSL_DBG_3 = 3, // Info - JSL_DBG_4 = 4, // Debugging -}; - -extern int JSL_DEBUG_LEVEL; - -#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);} - -#endif diff --git a/rpc/marshall.h b/rpc/marshall.h index 20b9c07..98856e4 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -52,6 +52,8 @@ typedef int rpc_sz_t; #define DEFAULT_RPC_SZ 1024 #define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) +struct pass { template inline pass(Args&&...) {} }; + class marshall { private: char *buf_; // Base of the raw bytes buffer (dynamically readjusted) @@ -67,10 +69,7 @@ class marshall { } } public: - struct pass { template inline pass(Args&&...) {} }; - template - marshall(const Args&... args) { buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); VERIFY(buf_); @@ -407,4 +406,33 @@ template struct marshalled_func> : public marshalled_func_imp {}; +template unmarshall & +tuple_unmarshall_imp(unmarshall & u, tuple t, tuple_indices) { + (void)pass{(u >> get(t))...}; + return u; +} + +template unmarshall & +operator>>(unmarshall & u, tuple && t) { + using Indices = typename make_tuple_indices::type; + return tuple_unmarshall_imp(u, t, Indices()); +} + +template marshall & +tuple_marshall_imp(marshall & m, tuple & t, tuple_indices) { + (void)pass{(m << get(t))...}; + return m; +} + +template marshall & +operator<<(marshall & m, tuple && t) { + using Indices = typename make_tuple_indices::type; + return tuple_marshall_imp(m, t, Indices()); +} + +// for structs or classes containing a MEMBERS declaration +#define MARSHALLABLE(_c_) \ +inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \ +inline marshall & operator<<(marshall &m, _c_ a) { return m << a._tuple_(); } + #endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 023a7aa..15fba26 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -3,7 +3,6 @@ #include #include -#include "jsl_log.h" #include "pollmgr.h" PollMgr *PollMgr::instance = NULL; @@ -12,41 +11,41 @@ static std::once_flag pollmgr_is_initialized; static void PollMgrInit() { - PollMgr::instance = new PollMgr(); + PollMgr::instance = new PollMgr(); } PollMgr * PollMgr::Instance() { std::call_once(pollmgr_is_initialized, PollMgrInit); - return instance; + return instance; } PollMgr::PollMgr() : pending_change_(false) { - bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); - aio_ = new SelectAIO(); - //aio_ = new EPollAIO(); + bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); + aio_ = new SelectAIO(); + //aio_ = new EPollAIO(); th_ = std::thread(&PollMgr::wait_loop, this); } PollMgr::~PollMgr() [[noreturn]] { - //never kill me!!! - VERIFY(0); + //never kill me!!! + VERIFY(0); } void PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { - VERIFY(fd < MAX_POLL_FDS); + VERIFY(fd < MAX_POLL_FDS); lock ml(m_); - aio_->watch_fd(fd, flag); + aio_->watch_fd(fd, flag); - VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); - callbacks_[fd] = ch; + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; } //remove all callbacks related to fd @@ -56,82 +55,82 @@ void PollMgr::block_remove_fd(int fd) { lock ml(m_); - aio_->unwatch_fd(fd, CB_RDWR); - pending_change_ = true; + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; changedone_c_.wait(ml); - callbacks_[fd] = NULL; + callbacks_[fd] = NULL; } void PollMgr::del_callback(int fd, poll_flag flag) { lock ml(m_); - if (aio_->unwatch_fd(fd, flag)) { - callbacks_[fd] = NULL; - } + if (aio_->unwatch_fd(fd, flag)) { + callbacks_[fd] = NULL; + } } bool PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) { lock ml(m_); - if (!callbacks_[fd] || callbacks_[fd]!=c) - return false; + if (!callbacks_[fd] || callbacks_[fd]!=c) + return false; - return aio_->is_watched(fd, flag); + return aio_->is_watched(fd, flag); } void PollMgr::wait_loop() [[noreturn]] { - std::vector readable; - std::vector writable; + std::vector readable; + std::vector writable; - while (1) { - { + while (1) { + { lock ml(m_); - if (pending_change_) { - pending_change_ = false; + if (pending_change_) { + pending_change_ = false; changedone_c_.notify_all(); - } - } - readable.clear(); - writable.clear(); - aio_->wait_ready(&readable,&writable); - - if (!readable.size() && !writable.size()) { - continue; - } - //no locking of m_ - //because no add_callback() and del_callback should - //modify callbacks_[fd] while the fd is not dead - for (unsigned int i = 0; i < readable.size(); i++) { - int fd = readable[i]; - if (callbacks_[fd]) - callbacks_[fd]->read_cb(fd); - } - - for (unsigned int i = 0; i < writable.size(); i++) { - int fd = writable[i]; - if (callbacks_[fd]) - callbacks_[fd]->write_cb(fd); - } - } + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(&readable,&writable); + + if (!readable.size() && !writable.size()) { + continue; + } + //no locking of m_ + //because no add_callback() and del_callback should + //modify callbacks_[fd] while the fd is not dead + for (unsigned int i = 0; i < readable.size(); i++) { + int fd = readable[i]; + if (callbacks_[fd]) + callbacks_[fd]->read_cb(fd); + } + + for (unsigned int i = 0; i < writable.size(); i++) { + int fd = writable[i]; + if (callbacks_[fd]) + callbacks_[fd]->write_cb(fd); + } + } } SelectAIO::SelectAIO() : highfds_(0) { - FD_ZERO(&rfds_); - FD_ZERO(&wfds_); + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); - VERIFY(pipe(pipefd_) == 0); - FD_SET(pipefd_[0], &rfds_); - highfds_ = pipefd_[0]; + VERIFY(pipe(pipefd_) == 0); + FD_SET(pipefd_[0], &rfds_); + highfds_ = pipefd_[0]; - int flags = fcntl(pipefd_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipefd_[0], F_SETFL, flags); + int flags = fcntl(pipefd_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipefd_[0], F_SETFL, flags); } SelectAIO::~SelectAIO() @@ -142,210 +141,210 @@ void SelectAIO::watch_fd(int fd, poll_flag flag) { lock ml(m_); - if (highfds_ <= fd) - highfds_ = fd; - - if (flag == CB_RDONLY) { - FD_SET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - FD_SET(fd,&wfds_); - }else { - FD_SET(fd,&rfds_); - FD_SET(fd,&wfds_); - } - - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + if (highfds_ <= fd) + highfds_ = fd; + + if (flag == CB_RDONLY) { + FD_SET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + FD_SET(fd,&wfds_); + }else { + FD_SET(fd,&rfds_); + FD_SET(fd,&wfds_); + } + + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); } bool SelectAIO::is_watched(int fd, poll_flag flag) { lock ml(m_); - if (flag == CB_RDONLY) { - return FD_ISSET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - return FD_ISSET(fd,&wfds_); - }else{ - return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); - } + if (flag == CB_RDONLY) { + return FD_ISSET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + return FD_ISSET(fd,&wfds_); + }else{ + return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); + } } bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { lock ml(m_); - if (flag == CB_RDONLY) { - FD_CLR(fd, &rfds_); - }else if (flag == CB_WRONLY) { - FD_CLR(fd, &wfds_); - }else if (flag == CB_RDWR) { - FD_CLR(fd, &wfds_); - FD_CLR(fd, &rfds_); - }else{ - VERIFY(0); - } - - if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { - if (fd == highfds_) { - int newh = pipefd_[0]; - for (int i = 0; i <= highfds_; i++) { - if (FD_ISSET(i, &rfds_)) { - newh = i; - }else if (FD_ISSET(i, &wfds_)) { - newh = i; - } - } - highfds_ = newh; - } - } - if (flag == CB_RDWR) { - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); - } - return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); + if (flag == CB_RDONLY) { + FD_CLR(fd, &rfds_); + }else if (flag == CB_WRONLY) { + FD_CLR(fd, &wfds_); + }else if (flag == CB_RDWR) { + FD_CLR(fd, &wfds_); + FD_CLR(fd, &rfds_); + }else{ + VERIFY(0); + } + + if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { + if (fd == highfds_) { + int newh = pipefd_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_)) { + newh = i; + }else if (FD_ISSET(i, &wfds_)) { + newh = i; + } + } + highfds_ = newh; + } + } + if (flag == CB_RDWR) { + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + } + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); } void SelectAIO::wait_ready(std::vector *readable, std::vector *writable) { - fd_set trfds, twfds; - int high; + fd_set trfds, twfds; + int high; - { + { lock ml(m_); - trfds = rfds_; - twfds = wfds_; - high = highfds_; - } - - int ret = select(high+1, &trfds, &twfds, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - return; - } else { - perror("select:"); - jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); - VERIFY(0); - } - } - - for (int fd = 0; fd <= high; fd++) { - if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { - char tmp; - VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); - VERIFY(tmp==1); - }else { - if (FD_ISSET(fd, &twfds)) { - writable->push_back(fd); - } - if (FD_ISSET(fd, &trfds)) { - readable->push_back(fd); - } - } - } + trfds = rfds_; + twfds = wfds_; + high = highfds_; + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + return; + } else { + perror("select:"); + IF_LEVEL(0) LOG("PollMgr::select_loop failure errno " << errno); + VERIFY(0); + } + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(tmp==1); + }else { + if (FD_ISSET(fd, &twfds)) { + writable->push_back(fd); + } + if (FD_ISSET(fd, &trfds)) { + readable->push_back(fd); + } + } + } } #ifdef __linux__ EPollAIO::EPollAIO() { - pollfd_ = epoll_create(MAX_POLL_FDS); - VERIFY(pollfd_ >= 0); - bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); + pollfd_ = epoll_create(MAX_POLL_FDS); + VERIFY(pollfd_ >= 0); + bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); } EPollAIO::~EPollAIO() { - close(pollfd_); + close(pollfd_); } static inline int poll_flag_to_event(poll_flag flag) { - int f; - if (flag == CB_RDONLY) { - f = EPOLLIN; - }else if (flag == CB_WRONLY) { - f = EPOLLOUT; - }else { //flag == CB_RDWR - f = EPOLLIN | EPOLLOUT; - } - return f; + int f; + if (flag == CB_RDONLY) { + f = EPOLLIN; + }else if (flag == CB_WRONLY) { + f = EPOLLOUT; + }else { //flag == CB_RDWR + f = EPOLLIN | EPOLLOUT; + } + return f; } void EPollAIO::watch_fd(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); + VERIFY(fd < MAX_POLL_FDS); - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - fdstatus_[fd] |= (int)flag; + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (int)flag; - ev.events = EPOLLET; - ev.data.fd = fd; + ev.events = EPOLLET; + ev.data.fd = fd; - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } - if (flag == CB_RDWR) { - VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); - } + if (flag == CB_RDWR) { + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); } bool EPollAIO::unwatch_fd(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); - fdstatus_[fd] &= ~(int)flag; - - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; - - ev.events = EPOLLET; - ev.data.fd = fd; - - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } - - if (flag == CB_RDWR) { - VERIFY(op == EPOLL_CTL_DEL); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); - return (op == EPOLL_CTL_DEL); + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(int)flag; + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(op == EPOLL_CTL_DEL); + } + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); } bool EPollAIO::is_watched(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); - return ((fdstatus_[fd] & CB_MASK) == flag); + VERIFY(fd < MAX_POLL_FDS); + return ((fdstatus_[fd] & CB_MASK) == flag); } void EPollAIO::wait_ready(std::vector *readable, std::vector *writable) { - int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); - for (int i = 0; i < nfds; i++) { - if (ready_[i].events & EPOLLIN) { - readable->push_back(ready_[i].data.fd); - } - if (ready_[i].events & EPOLLOUT) { - writable->push_back(ready_[i].data.fd); - } - } + int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) { + readable->push_back(ready_[i].data.fd); + } + if (ready_[i].events & EPOLLOUT) { + writable->push_back(ready_[i].data.fd); + } + } } #endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index ad3fcd9..9f1d90c 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -63,8 +63,6 @@ #include #include -#include "jsl_log.h" - const rpcc::TO rpcc::to_max = { 120000 }; const rpcc::TO rpcc::to_min = { 1000 }; @@ -95,16 +93,13 @@ rpcc::rpcc(const string & d, bool retrans) : // xid starts with 1 and latest received reply starts with 0 xid_rep_window_.push_back(0); - jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", - clt_nonce_, lossytest_); + IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_); } // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future -rpcc::~rpcc() -{ - jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", - clt_nonce_, chan_?chan_->channo():-1); +rpcc::~rpcc() { + IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1)); if(chan_){ chan_->closeconn(); chan_->decref(); @@ -112,9 +107,7 @@ rpcc::~rpcc() VERIFY(calls_.size() == 0); } -int -rpcc::bind(TO to) -{ +int rpcc::bind(TO to) { unsigned int r; int ret = call_timeout(rpc_const::bind, to, r, 0); if(ret == 0){ @@ -122,22 +115,19 @@ rpcc::bind(TO to) bind_done_ = true; srv_nonce_ = r; } else { - jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", - inet_ntoa(dst_.sin_addr), ret); + IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret); } return ret; }; // Cancel all outstanding calls - void -rpcc::cancel(void) -{ +void rpcc::cancel(void) { lock ml(m_); LOG("rpcc::cancel: force callers to fail"); for(auto &p : calls_){ caller *ca = p.second; - jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); + IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail"); { lock cl(ca->m); ca->done = true; @@ -153,10 +143,7 @@ rpcc::cancel(void) LOG("rpcc::cancel: done"); } -int -rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, - TO to) -{ +int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { caller ca(0, &rep); int xid_rep; @@ -165,7 +152,7 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, if((proc != rpc_const::bind && !bind_done_) || (proc == rpc_const::bind && bind_done_)){ - jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n"); + IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice"); return rpc_const::bind_failure; } @@ -206,10 +193,9 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); ch->send(req.cstr(), req.size()); } - else jsl_log(JSL_DBG_1, "not reachable\n"); - jsl_log(JSL_DBG_2, - "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n", - clt_nonce_, proc, ca.xid, clt_nonce_); + else IF_LEVEL(1) LOG("not reachable"); + IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc << + " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_); } transmit = false; // only send once on a given channel } @@ -226,14 +212,14 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, { lock cal(ca.m); while (!ca.done){ - jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); + IF_LEVEL(2) LOG("rpcc:call1: wait"); if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){ - jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); + IF_LEVEL(2) LOG("rpcc::call1: timeout"); break; } } if(ca.done){ - jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n"); + IF_LEVEL(2) LOG("rpcc::call1: reply received"); break; } } @@ -273,10 +259,9 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, lock cal(ca.m); - jsl_log(JSL_DBG_2, - "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n", - clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), - ntohs(dst_.sin_port), ca.done, ca.intret); + IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc << + " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << + ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); if(ch) ch->decref(); @@ -316,7 +301,7 @@ rpcc::got_pdu(connection *, char *b, size_t sz) rep.unpack_reply_header(&h); if(!rep.ok()){ - jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n"); + IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!"); return true; } @@ -325,7 +310,7 @@ rpcc::got_pdu(connection *, char *b, size_t sz) update_xid_rep(h.xid); if(calls_.find(h.xid) == calls_.end()){ - jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid); + IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request"); return true; } caller *ca = calls_[h.xid]; @@ -335,8 +320,7 @@ rpcc::got_pdu(connection *, char *b, size_t sz) ca->un->take_in(rep); ca->intret = h.ret; if(ca->intret < 0){ - jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n", - h.xid, ca->intret); + IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret); } ca->done = 1; } @@ -373,7 +357,7 @@ rpcs::rpcs(unsigned int p1, size_t count) { set_rand_seed(); nonce_ = (unsigned int)random(); - jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); + IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_); char *loss_env = getenv("RPC_LOSSY"); if(loss_env != NULL){ @@ -398,7 +382,7 @@ bool rpcs::got_pdu(connection *c, char *b, size_t sz) { if(!reachable_){ - jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n"); + IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable"); return true; } @@ -440,8 +424,8 @@ rpcs::updatestat(proc_t proc) if(clt.second.size() > maxrep) maxrep = clt.second.size(); } - jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n", - (int) reply_window_.size()-1, totalrep, maxrep); + IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " << + totalrep << " max per client " << maxrep); curr_counts_ = counting_; } } @@ -458,23 +442,21 @@ rpcs::dispatch(djob_t *j) proc_t proc = h.proc; if(!req.ok()){ - jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); + IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!"); c->decref(); return; } - jsl_log(JSL_DBG_2, - "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n", - h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); + IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " << + dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce); marshall rep; reply_header rh(h.xid,0); // is client sending to an old instance of server? if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ - jsl_log(JSL_DBG_2, - "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n", - h.srv_nonce, nonce_, h.proc); + IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce << + " (current " << nonce_ << ") proc " << hex << h.proc); rh.ret = rpc_const::oldsrv_failure; rep.pack_reply_header(rh); c->send(rep.cstr(),rep.size()); @@ -486,10 +468,9 @@ rpcs::dispatch(djob_t *j) { lock pl(procs_m_); if(procs_.count(proc) < 1){ - fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n", - proc); + cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl; c->decref(); - VERIFY(0); + VERIFY(0); return; } @@ -508,9 +489,8 @@ rpcs::dispatch(djob_t *j) if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid - jsl_log(JSL_DBG_2, - "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", - h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1); + IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid << + " chan " << c->channo() << ", total clients " << (reply_window_.size()-1)); } } @@ -542,10 +522,9 @@ rpcs::dispatch(djob_t *j) rh.ret = (*f)(req, rep); if (rh.ret == rpc_const::unmarshal_args_failure) { - fprintf(stderr, "rpcs::dispatch: failed to" - " unmarshall the arguments. You are" - " probably calling RPC 0x%x with wrong" - " types of arguments.\n", proc); + cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " << + "probably calling RPC 0x" << hex << proc << " with the wrong " << + "types of arguments." << endl; VERIFY(0); } VERIFY(rh.ret >= 0); @@ -553,9 +532,8 @@ rpcs::dispatch(djob_t *j) rep.pack_reply_header(rh); rep.take_buf(&b1,&sz1); - jsl_log(JSL_DBG_2, - "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n", - sz1, h.xid, proc, rh.ret, h.clt_nonce); + IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " << + h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce); if(h.clt_nonce > 0){ // only record replies for clients that require at-most-once logic @@ -584,8 +562,7 @@ rpcs::dispatch(djob_t *j) c->send(b1, sz1); break; case FORGOTTEN: // very old request and we don't have the response anymore - jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n", - h.xid, h.clt_nonce); + IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce); rh.ret = rpc_const::atmostonce_failure; rep.pack_reply_header(rh); c->send(rep.cstr(),rep.size()); @@ -674,7 +651,7 @@ rpcs::add_reply(unsigned int clt_nonce, int xid, for (it++; it != l.end() && it->xid < xid; it++); // there should already be an entry, so whine if there isn't if (it == l.end() || it->xid != xid) { - fprintf(stderr, "Could not find reply struct in add_reply"); + cerr << "Could not find reply struct in add_reply" << endl; l.insert(it, reply_t(xid, b, sz)); } else { *it = reply_t(xid, b, sz); @@ -694,7 +671,7 @@ void rpcs::free_reply_window(void) { } int rpcs::rpcbind(unsigned int &r, int) { - jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); + IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_); r = nonce_; return 0; } @@ -838,7 +815,7 @@ sockaddr_in make_sockaddr(const string &host, const string &port) { struct hostent *hp = gethostbyname(host.c_str()); if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) { - fprintf(stderr, "cannot find host name %s\n", host.c_str()); + cerr << "cannot find host name " << host << endl; exit(1); } memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index c69d317..7217b25 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -7,7 +7,6 @@ #include #include #include -#include "jsl_log.h" #define NUM_CL 2 @@ -22,11 +21,11 @@ int port; // to simplify rpcs::reg(). a server process can have handlers // from multiple classes. class srv { - public: - int handle_22(string & r, const string a, const string b); - int handle_fast(int &r, const int a); - int handle_slow(int &r, const int a); - int handle_bigrep(string &r, const size_t a); + public: + int handle_22(string & r, const string a, const string b); + int handle_fast(int &r, const int a); + int handle_slow(int &r, const int a); + int handle_bigrep(string &r, const size_t a); }; // a handler. a and b are arguments, r is the result. @@ -39,395 +38,395 @@ class srv { int srv::handle_22(string &r, const string a, string b) { - r = a + b; - return 0; + r = a + b; + return 0; } int srv::handle_fast(int &r, const int a) { - r = a + 1; - return 0; + r = a + 1; + return 0; } int srv::handle_slow(int &r, const int a) { - usleep(random() % 5000); - r = a + 2; - return 0; + usleep(random() % 5000); + r = a + 2; + return 0; } int srv::handle_bigrep(string &r, const size_t len) { - r = string((size_t)len, 'x'); - return 0; + r = string((size_t)len, 'x'); + return 0; } srv service; void startserver() { - server = new rpcs((unsigned int)port); - server->reg(22, &srv::handle_22, &service); - server->reg(23, &srv::handle_fast, &service); - server->reg(24, &srv::handle_slow, &service); - server->reg(25, &srv::handle_bigrep, &service); + server = new rpcs((unsigned int)port); + server->reg(22, &srv::handle_22, &service); + server->reg(23, &srv::handle_fast, &service); + server->reg(24, &srv::handle_slow, &service); + server->reg(25, &srv::handle_bigrep, &service); } void testmarshall() { - marshall m; - request_header rh{1,2,3,4,5}; - m.pack_req_header(rh); - VERIFY(m.size()==RPC_HEADER_SZ); - int i = 12345; - unsigned long long l = 1223344455L; - string s = "hallo...."; - m << i; - m << l; - m << s; - - char *b; - size_t sz; - m.take_buf(&b,&sz); - VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); - - unmarshall un(b,sz); - request_header rh1; - un.unpack_req_header(&rh1); - VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); - int i1; - unsigned long long l1; - string s1; - un >> i1; - un >> l1; - un >> s1; - VERIFY(un.okdone()); - VERIFY(i1==i && l1==l && s1==s); + marshall m; + request_header rh{1,2,3,4,5}; + m.pack_req_header(rh); + VERIFY(m.size()==RPC_HEADER_SZ); + int i = 12345; + unsigned long long l = 1223344455L; + string s = "hallo...."; + m << i; + m << l; + m << s; + + char *b; + size_t sz; + m.take_buf(&b,&sz); + VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); + + unmarshall un(b,sz); + request_header rh1; + un.unpack_req_header(&rh1); + VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); + int i1; + unsigned long long l1; + string s1; + un >> i1; + un >> l1; + un >> s1; + VERIFY(un.okdone()); + VERIFY(i1==i && l1==l && s1==s); } void client1(size_t cl) { - // test concurrency. - size_t which_cl = cl % NUM_CL; - - for(int i = 0; i < 100; i++){ - int arg = (random() % 2000); - string rep; - int ret = clients[which_cl]->call(25, rep, arg); - VERIFY(ret == 0); - if ((int)rep.size()!=arg) - cout << "repsize wrong " << rep.size() << "!=" << arg << endl; - VERIFY((int)rep.size() == arg); - } - - // test rpc replies coming back not in the order of - // the original calls -- i.e. does xid reply dispatch work. - for(int i = 0; i < 100; i++){ - int which = (random() % 2); - int arg = (random() % 1000); - int rep; - - auto start = std::chrono::steady_clock::now(); - - int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); - auto end = std::chrono::steady_clock::now(); - auto diff = std::chrono::duration_cast(end - start).count(); - if (ret != 0) - cout << diff << " ms have elapsed!!!" << endl; - VERIFY(ret == 0); - VERIFY(rep == (which ? arg+1 : arg+2)); - } + // test concurrency. + size_t which_cl = cl % NUM_CL; + + for(int i = 0; i < 100; i++){ + int arg = (random() % 2000); + string rep; + int ret = clients[which_cl]->call(25, rep, arg); + VERIFY(ret == 0); + if ((int)rep.size()!=arg) + cout << "repsize wrong " << rep.size() << "!=" << arg << endl; + VERIFY((int)rep.size() == arg); + } + + // test rpc replies coming back not in the order of + // the original calls -- i.e. does xid reply dispatch work. + for(int i = 0; i < 100; i++){ + int which = (random() % 2); + int arg = (random() % 1000); + int rep; + + auto start = std::chrono::steady_clock::now(); + + int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); + auto end = std::chrono::steady_clock::now(); + auto diff = std::chrono::duration_cast(end - start).count(); + if (ret != 0) + cout << diff << " ms have elapsed!!!" << endl; + VERIFY(ret == 0); + VERIFY(rep == (which ? arg+1 : arg+2)); + } } void client2(size_t cl) { - size_t which_cl = cl % NUM_CL; - - time_t t1; - time(&t1); - - while(time(0) - t1 < 10){ - int arg = (random() % 2000); - string rep; - int ret = clients[which_cl]->call(25, rep, arg); - if ((int)rep.size()!=arg) - cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl; - VERIFY((int)rep.size() == arg); - } + size_t which_cl = cl % NUM_CL; + + time_t t1; + time(&t1); + + while(time(0) - t1 < 10){ + int arg = (random() % 2000); + string rep; + int ret = clients[which_cl]->call(25, rep, arg); + if ((int)rep.size()!=arg) + cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl; + VERIFY((int)rep.size() == arg); + } } void client3(void *xx) { - rpcc *c = (rpcc *) xx; + rpcc *c = (rpcc *) xx; - for(int i = 0; i < 4; i++){ - int rep; - int ret = c->call_timeout(24, rpcc::to(3000), rep, i); - VERIFY(ret == rpc_const::timeout_failure || rep == i+2); - } + for(int i = 0; i < 4; i++){ + int rep = 0; + int ret = c->call_timeout(24, rpcc::to(3000), rep, i); + VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + } } void simple_tests(rpcc *c) { - cout << "simple_tests" << endl; - // an RPC call to procedure #22. - // rpcc::call() looks at the argument types to decide how - // to marshall the RPC call packet, and how to unmarshall - // the reply packet. - string rep; - int intret = c->call(22, rep, (string)"hello", (string)" goodbye"); - VERIFY(intret == 0); // this is what handle_22 returns - VERIFY(rep == "hello goodbye"); - cout << " -- string concat RPC .. ok" << endl; - - // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); - VERIFY(intret == 0); - VERIFY(rep.size() == 70000); - cout << " -- small request, big reply .. ok" << endl; - - // specify a timeout value to an RPC that should succeed (udp) - int xx = 0; - intret = c->call_timeout(23, rpcc::to(3000), xx, 77); - VERIFY(intret == 0 && xx == 78); - cout << " -- no spurious timeout .. ok" << endl; - - // specify a timeout value to an RPC that should succeed (tcp) - { - string arg(1000, 'x'); - string rep2; - c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x"); - VERIFY(rep2.size() == 1001); - cout << " -- no spurious timeout .. ok" << endl; - } - - // huge RPC - string big(1000000, 'x'); - intret = c->call(22, rep, big, (string)"z"); - VERIFY(rep.size() == 1000001); - cout << " -- huge 1M rpc request .. ok" << endl; - - // specify a timeout value to an RPC that should timeout (udp) + cout << "simple_tests" << endl; + // an RPC call to procedure #22. + // rpcc::call() looks at the argument types to decide how + // to marshall the RPC call packet, and how to unmarshall + // the reply packet. + string rep; + int intret = c->call(22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == 0); // this is what handle_22 returns + VERIFY(rep == "hello goodbye"); + cout << " -- string concat RPC .. ok" << endl; + + // small request, big reply (perhaps req via UDP, reply via TCP) + intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); + VERIFY(intret == 0); + VERIFY(rep.size() == 70000); + cout << " -- small request, big reply .. ok" << endl; + + // specify a timeout value to an RPC that should succeed (udp) + int xx = 0; + intret = c->call_timeout(23, rpcc::to(3000), xx, 77); + VERIFY(intret == 0 && xx == 78); + cout << " -- no spurious timeout .. ok" << endl; + + // specify a timeout value to an RPC that should succeed (tcp) + { + string arg(1000, 'x'); + string rep2; + c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x"); + VERIFY(rep2.size() == 1001); + cout << " -- no spurious timeout .. ok" << endl; + } + + // huge RPC + string big(1000000, 'x'); + intret = c->call(22, rep, big, (string)"z"); + VERIFY(rep.size() == 1000001); + cout << " -- huge 1M rpc request .. ok" << endl; + + // specify a timeout value to an RPC that should timeout (udp) string non_existent = "127.0.0.1:7661"; - rpcc *c1 = new rpcc(non_existent); - time_t t0 = time(0); - intret = c1->bind(rpcc::to(3000)); - time_t t1 = time(0); - VERIFY(intret < 0 && (t1 - t0) <= 4); - cout << " -- rpc timeout .. ok" << endl; - cout << "simple_tests OK" << endl; + rpcc *c1 = new rpcc(non_existent); + time_t t0 = time(0); + intret = c1->bind(rpcc::to(3000)); + time_t t1 = time(0); + VERIFY(intret < 0 && (t1 - t0) <= 4); + cout << " -- rpc timeout .. ok" << endl; + cout << "simple_tests OK" << endl; } void concurrent_test(size_t nt) { - // create threads that make lots of calls in parallel, - // to test thread synchronization for concurrent calls - // and dispatches. - cout << "start concurrent_test (" << nt << " threads) ..."; + // create threads that make lots of calls in parallel, + // to test thread synchronization for concurrent calls + // and dispatches. + cout << "start concurrent_test (" << nt << " threads) ..."; vector th(nt); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i] = thread(client1, i); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i].join(); - cout << " OK" << endl; + cout << " OK" << endl; } void lossy_test() { - cout << "start lossy_test ..."; - VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + cout << "start lossy_test ..."; + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); - if (server) { - delete server; - startserver(); - } + if (server) { + delete server; + startserver(); + } - for (int i = 0; i < NUM_CL; i++) { - delete clients[i]; - clients[i] = new rpcc(dst); - VERIFY(clients[i]->bind()==0); - } + for (int i = 0; i < NUM_CL; i++) { + delete clients[i]; + clients[i] = new rpcc(dst); + VERIFY(clients[i]->bind()==0); + } - size_t nt = 1; + size_t nt = 1; vector th(nt); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i] = thread(client2, i); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i].join(); - cout << ".. OK" << endl; - VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); + cout << ".. OK" << endl; + VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); } void failure_test() { - rpcc *client1; - rpcc *client = clients[0]; + rpcc *client1; + rpcc *client = clients[0]; - cout << "failure_test" << endl; + cout << "failure_test" << endl; - delete server; + delete server; - client1 = new rpcc(dst); - VERIFY (client1->bind(rpcc::to(3000)) < 0); - cout << " -- create new client and try to bind to failed server .. failed ok" << endl; + client1 = new rpcc(dst); + VERIFY (client1->bind(rpcc::to(3000)) < 0); + cout << " -- create new client and try to bind to failed server .. failed ok" << endl; - delete client1; + delete client1; - startserver(); + startserver(); - string rep; - int intret = client->call(22, rep, (string)"hello", (string)" goodbye"); - VERIFY(intret == rpc_const::oldsrv_failure); - cout << " -- call recovered server with old client .. failed ok" << endl; + string rep; + int intret = client->call(22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == rpc_const::oldsrv_failure); + cout << " -- call recovered server with old client .. failed ok" << endl; - delete client; + delete client; - clients[0] = client = new rpcc(dst); - VERIFY (client->bind() >= 0); - VERIFY (client->bind() < 0); + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + VERIFY (client->bind() < 0); - intret = client->call(22, rep, (string)"hello", (string)" goodbye"); - VERIFY(intret == 0); - VERIFY(rep == "hello goodbye"); + intret = client->call(22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == 0); + VERIFY(rep == "hello goodbye"); - cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl; + cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl; - size_t nt = 10; - cout << " -- concurrent test on new rpc client w/ " << nt << " threads .."; + size_t nt = 10; + cout << " -- concurrent test on new rpc client w/ " << nt << " threads .."; vector th(nt); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i] = thread(client3, client); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i].join(); - cout << "ok" << endl; + cout << "ok" << endl; - delete server; - delete client; + delete server; + delete client; - startserver(); - clients[0] = client = new rpcc(dst); - VERIFY (client->bind() >= 0); - cout << " -- delete existing rpc client and server, create replacements.. ok" << endl; + startserver(); + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + cout << " -- delete existing rpc client and server, create replacements.. ok" << endl; - cout << " -- concurrent test on new client and server w/ " << nt << " threads .."; + cout << " -- concurrent test on new client and server w/ " << nt << " threads .."; - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i] = thread(client3, client); - for(size_t i = 0; i < nt; i++) + for(size_t i = 0; i < nt; i++) th[i].join(); - cout << "ok" << endl; + cout << "ok" << endl; - cout << "failure_test OK" << endl; + cout << "failure_test OK" << endl; } int main(int argc, char *argv[]) { - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); - int debug_level = 0; - - bool isclient = false; - bool isserver = false; - - srandom((uint32_t)getpid()); - port = 20000 + (getpid() % 10000); - - int ch = 0; - while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { - switch (ch) { - case 'c': - isclient = true; - break; - case 's': - isserver = true; - break; - case 'd': - debug_level = atoi(optarg); - break; - case 'p': - port = atoi(optarg); - break; - case 'l': - VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + int debug_level = 0; + + bool isclient = false; + bool isserver = false; + + srandom((uint32_t)getpid()); + port = 20000 + (getpid() % 10000); + + int ch = 0; + while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { + switch (ch) { + case 'c': + isclient = true; + break; + case 's': + isserver = true; + break; + case 'd': + debug_level = atoi(optarg); + break; + case 'p': + port = atoi(optarg); + break; + case 'l': + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + break; + default: break; - default: - break; - } - } + } + } - if (!isserver && !isclient) { - isserver = isclient = true; - } + if (!isserver && !isclient) { + isserver = isclient = true; + } - if (debug_level > 0) { - JSL_DEBUG_LEVEL = debug_level; - jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); - } + if (debug_level > 0) { + DEBUG_LEVEL = debug_level; + IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level); + } - testmarshall(); + testmarshall(); - if (isserver) { - cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl; - startserver(); - } + if (isserver) { + cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl; + startserver(); + } - if (isclient) { - // server's address. + if (isclient) { + // server's address. dst = "127.0.0.1:" + std::to_string(port); - // start the client. bind it to the server. - // starts a thread to listen for replies and hand them to - // the correct waiting caller thread. there should probably - // be only one rpcc per process. you probably need one - // rpcc per server. - for (int i = 0; i < NUM_CL; i++) { - clients[i] = new rpcc(dst); - VERIFY (clients[i]->bind() == 0); - } + // start the client. bind it to the server. + // starts a thread to listen for replies and hand them to + // the correct waiting caller thread. there should probably + // be only one rpcc per process. you probably need one + // rpcc per server. + for (int i = 0; i < NUM_CL; i++) { + clients[i] = new rpcc(dst); + VERIFY (clients[i]->bind() == 0); + } - simple_tests(clients[0]); - concurrent_test(10); - lossy_test(); - if (isserver) { - failure_test(); - } + simple_tests(clients[0]); + concurrent_test(10); + lossy_test(); + if (isserver) { + failure_test(); + } - cout << "rpctest OK" << endl; + cout << "rpctest OK" << endl; - exit(0); - } + exit(0); + } - while (1) { - sleep(1); - } + while (1) { + sleep(1); + } } diff --git a/rsm.cc b/rsm.cc index 00cae81..f12f9db 100644 --- a/rsm.cc +++ b/rsm.cc @@ -446,17 +446,18 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { auto ret = rsm_protocol::OK; lock ml(rsm_mutex); - LOG("joinreq: src " << m << " last (" << last.vid << "," << last.seqno << ") mylast (" << + LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" << last_myvs.vid << "," << last_myvs.seqno << ")"); if (cfg->ismember(m, vid_commit)) { - LOG("joinreq: is still a member"); + LOG(m << " is still a member -- nothing to do"); log = cfg->dump(); } else if (cfg->myaddr() != primary) { - LOG("joinreq: busy"); + LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!"); ret = rsm_protocol::BUSY; } else { // We cache vid_commit to avoid adding m to a view which already contains // m due to race condition + LOG("calling down to config layer"); unsigned vid_cache = vid_commit; bool succ; { @@ -466,9 +467,9 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { } if (cfg->ismember(m, cfg->view_id())) { log = cfg->dump(); - LOG("joinreq: ret " << ret << " log " << log); + LOG("ret " << ret << " log " << log); } else { - LOG("joinreq: failed; proposer couldn't add " << succ); + LOG("failed; proposer couldn't add " << succ); ret = rsm_protocol::BUSY; } } @@ -486,7 +487,7 @@ rsm_client_protocol::status rsm::client_members(std::vector &r, int cfg->get_view(vid_commit, m); m.push_back(primary); r = m; - LOG("rsm::client_members return " << print_members(m) << " m " << primary); + LOG("rsm::client_members return " << m << " m " << primary); return rsm_client_protocol::OK; } diff --git a/rsm.h b/rsm.h index f2eb5bd..18ac8af 100644 --- a/rsm.h +++ b/rsm.h @@ -61,7 +61,6 @@ class rsm : public config_view_change { bool statetransferdone(string m, lock & rsm_mutex_lock); bool join(string m, lock & rsm_mutex_lock); void set_primary(unsigned vid); - string find_highest(viewstamp &vs, string &m, unsigned &vid); bool sync_with_backups(lock & rsm_mutex_lock); bool sync_with_primary(lock & rsm_mutex_lock); void net_repair(bool heal, lock & rsm_mutex_lock); diff --git a/rsm_client.h b/rsm_client.h index bb21f24..4a80f60 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -40,19 +40,19 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { if (intret < 0) return intret; u >> res; if (!u.okdone()) { - fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" - "You probably forgot to set the reply string in " - "rsm::client_invoke, or you may call RPC 0x%x with wrong return " - "type\n", proc); + cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl; + cerr << "You probably forgot to set the reply string in " + "rsm::client_invoke, or you may have called RPC 0x" << hex << + proc << " with the wrong return type" << endl; VERIFY(0); return rpc_const::unmarshal_reply_failure; } unmarshall u1(res); u1 >> r; if(!u1.okdone()) { - fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" - "You are probably calling RPC 0x%x with wrong return " - "type.\n", proc); + cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl; + cerr << "You are probably calling RPC 0x" << hex << proc << + " with the wrong return type." << endl; VERIFY(0); return rpc_const::unmarshal_reply_failure; } diff --git a/rsm_protocol.h b/rsm_protocol.h index 53908f3..1601bcb 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -18,8 +18,13 @@ struct viewstamp { unsigned int vid; unsigned int seqno; inline void operator++(int) { seqno++; } + + MEMBERS(vid, seqno) + LEXICOGRAPHIC_COMPARISON(viewstamp) }; +MARSHALLABLE(viewstamp) + class rsm_protocol { public: enum status : status_t { OK, ERR, BUSY}; @@ -33,28 +38,12 @@ class rsm_protocol { struct transferres { string state; viewstamp last; + + MEMBERS(state, last) }; }; -inline bool operator==(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) == tie(b.vid, b.seqno); } -inline bool operator>(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) > tie(b.vid, b.seqno); } -inline bool operator!=(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) != tie(b.vid, b.seqno); } - -inline marshall& operator<<(marshall &m, viewstamp v) { - return m << v.vid << v.seqno; -} - -inline unmarshall& operator>>(unmarshall &u, viewstamp &v) { - return u >> v.vid >> v.seqno; -} - -inline marshall & operator<<(marshall &m, rsm_protocol::transferres r) { - return m << r.state << r.last; -} - -inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) { - return u >> r.state >> r.last; -} +MARSHALLABLE(rsm_protocol::transferres) class rsm_test_protocol { public: diff --git a/rsmtest_client.cc b/rsmtest_client.cc index cb3ce8c..3965fa9 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -21,5 +21,3 @@ rsm_test_protocol::status rsmtest_client::breakpoint(int b) { VERIFY (ret == rsm_test_protocol::OK); return r; } - - diff --git a/start.sh b/start.sh deleted file mode 100755 index d3cf93b..0000000 --- a/start.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env bash - -ulimit -c unlimited - -NUM_LS=${1:-0} - -BASE_PORT=$RANDOM -BASE_PORT=$[BASE_PORT+2000] -LOCK_PORT=$[BASE_PORT+6] - -if [ $NUM_LS -gt 1 ]; then - x=0 - rm config - while [ $x -lt $NUM_LS ]; do - port=$[LOCK_PORT+2*x] - x=$[x+1] - echo $port >> config - done - x=0 - while [ $x -lt $NUM_LS ]; do - port=$[LOCK_PORT+2*x] - x=$[x+1] - echo "starting ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &" - ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 & - sleep 1 - done -else - echo "starting ./lock_server $LOCK_PORT > lock_server.log 2>&1 &" - ./lock_server $LOCK_PORT > lock_server.log 2>&1 & - sleep 1 -fi diff --git a/stop.sh b/stop.sh deleted file mode 100755 index 2d0a1f6..0000000 --- a/stop.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash - -pkill -u $USER lock_server diff --git a/threaded_log.cc b/threaded_log.cc index 6a213b1..c44266e 100644 --- a/threaded_log.cc +++ b/threaded_log.cc @@ -5,3 +5,4 @@ map thread_name_map; int next_thread_num = 0; map instance_name_map; int next_instance_num = 0; +int DEBUG_LEVEL = 0; diff --git a/threaded_log.h b/threaded_log.h index ebb2222..706e2b7 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -11,7 +11,7 @@ extern int next_instance_num; extern char log_thread_prefix; namespace std { - // This... is an awful hack. But sticking this in std:: makes it possible for + // This is an awful hack. But sticking this in std:: makes it possible for // ostream_iterator to use it. template ostream & operator<<(ostream &o, const pair &d) { @@ -22,11 +22,7 @@ namespace std { template typename enable_if::value && !is_same::value, ostream>::type & operator<<(ostream &o, const A &a) { - o << "["; - auto oit = ostream_iterator(o, ", "); - copy(a.begin(), a.end(), oit); - o << "]"; - return o; + return o << "[" << implode(a, ", ") << "]"; } #define LOG_PREFIX { \ @@ -58,4 +54,8 @@ operator<<(ostream &o, const A &a) { cerr << _x_ << endl; \ } +extern int DEBUG_LEVEL; + +#define IF_LEVEL(level) if(DEBUG_LEVEL >= abs(level)) + #endif diff --git a/types.h b/types.h index 9739a2a..e6b5895 100644 --- a/types.h +++ b/types.h @@ -107,7 +107,42 @@ template struct is_iterable().cbegin(), declval().cend(), void()) > : true_type {}; +template +inline typename enable_if::value, string>::type +implode(const C & v, string delim=" ") { + if (v.begin() == v.end()) + return string(); + ostringstream oss; + auto last = prev(v.end()); + copy(v.begin(), last, ostream_iterator(oss, delim.c_str())); + oss << *last; + return oss.str(); +} + +inline vector explode(const string &s, string delim=" ") { + vector out; + size_t start = 0, end = 0; + while ((end = s.find(delim, start)) != string::npos) { + out.push_back(s.substr(start, end - start)); + start = end + 1; + } + out.push_back(s.substr(start)); + return out; +} + #include "lang/verify.h" #include "threaded_log.h" +#define MEMBERS(...) \ +inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \ +inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } + +#define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \ +inline bool operator _op_(const _c_ &b) const { return _tuple_() _op_ b._tuple_(); } + +#define LEXICOGRAPHIC_COMPARISON(_c_) \ +LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \ +LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \ +LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=) + #endif