MOAR TEMPLATE MAGIC
authorPeter Iannucci <iannucci@mit.edu>
Sat, 28 Sep 2013 03:37:44 +0000 (23:37 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Sat, 28 Sep 2013 03:41:01 +0000 (23:41 -0400)
29 files changed:
Makefile
config.cc
handle.cc
lock_demo.cc
lock_server.cc
lock_server.h
lock_smain.cc
log.cc
paxos.cc
paxos.h
paxos_protocol.h
rpc/connection.cc
rpc/connection.h
rpc/jsl_log.cc [deleted file]
rpc/jsl_log.h [deleted file]
rpc/marshall.h
rpc/pollmgr.cc
rpc/rpc.cc
rpc/rpctest.cc
rsm.cc
rsm.h
rsm_client.h
rsm_protocol.h
rsmtest_client.cc
start.sh [deleted file]
stop.sh [deleted file]
threaded_log.cc
threaded_log.h
types.h

index 3da8e03..363cbb3 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,7 @@ EXTRA_TARGETS ?=
 
 all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
 
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o
        rm -f $@
        ar cq $@ $^
        ranlib rpc/librpc.a
index d1cd70a..038a100 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -61,15 +61,11 @@ void config::get_view(unsigned instance, vector<string> &m, lock &) {
 }
 
 vector<string> config::members(const string &value) const {
-    istringstream ist(value);
-    using it = istream_iterator<string>;
-    return {it(ist), it()};
+    return explode(value);
 }
 
-string config::value(const vector<string> &m) const {
-    ostringstream ost;
-    copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
-    return ost.str();
+string config::value(const vector<string> &members) const {
+    return implode(members);
 }
 
 void config::reconstruct(lock &cfg_mutex_lock) {
@@ -77,7 +73,7 @@ void config::reconstruct(lock &cfg_mutex_lock) {
     my_view_id = paxos.instance();
     if (my_view_id > 0) {
         get_view(my_view_id, mems, cfg_mutex_lock);
-        LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
+        LOG("config::reconstruct: " << my_view_id << " " << mems);
     }
 }
 
@@ -86,7 +82,7 @@ void config::paxos_commit(unsigned instance, const string &value) {
     lock cfg_mutex_lock(cfg_mutex);
 
     vector<string> newmem = members(value);
-    LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
+    LOG("config::paxos_commit: " << instance << ": " << newmem);
 
     for (auto mem : mems) {
         LOG("config::paxos_commit: is " << mem << " still a member?");
@@ -114,17 +110,20 @@ bool config::ismember(const string &m, unsigned vid) {
 
 bool config::add(const string &new_m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
-    if (vid != my_view_id)
+    LOG("adding " << new_m << " to " << vid);
+    if (vid != my_view_id) {
+        LOG("that's not my view id, " << my_view_id << "!");
         return false;
-    LOG("config::add " << new_m);
+    }
     vector<string> m = mems;
     m.push_back(new_m);
     vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
+    LOG("calling down to paxos layer");
     cfg_mutex_lock.unlock();
     bool r = paxos.run(nextvid, cmems, value(m));
     cfg_mutex_lock.lock();
-    LOG("config::add: proposer returned " << (r ? "success" : "failure"));
+    LOG("paxos proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
@@ -156,7 +155,7 @@ void config::heartbeater() [[noreturn]] {
         unsigned vid = my_view_id;
         vector<string> cmems;
         get_view(vid, cmems, cfg_mutex_lock);
-        LOG("heartbeater: current membership " << print_members(cmems));
+        LOG("heartbeater: current membership " << cmems);
 
         if (!isamember(me, cmems)) {
             LOG("heartbeater: not member yet; skip hearbeat");
index 3b6e1fa..d32c895 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -62,10 +62,11 @@ hinfo * handle_mgr::acquire_handle(string m) {
     if (hmap.find(m) == hmap.end()) {
         h = new hinfo(m);
         hmap[m] = h;
+        h->refcnt++;
     } else if (!hmap[m]->del) {
         h = hmap[m];
+        h->refcnt++;
     }
-    h->refcnt++;
     return h;
 }
 
index 72fddf8..714c4e3 100644 (file)
@@ -4,7 +4,7 @@ char log_thread_prefix = 'd';
 
 int main(int argc, char *argv[]) {
     if(argc != 2) {
-        fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
+        cerr << "Usage: " << argv[0] << " [host:]port" << endl;
         return 1;
     }
 
index 379838a..d5e85a5 100644 (file)
@@ -23,18 +23,9 @@ lock_state& lock_state::operator=(const lock_state& o) {
     return *this;
 }
 
-marshall & operator<<(marshall &m, const lock_state &d) {
-       return m << d.held << d.held_by << d.wanted_by;
-}
-
-unmarshall & operator>>(unmarshall &u, lock_state &d) {
-       return u >> d.held >> d.held_by >> d.wanted_by;
-}
-
 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
-    // by the semantics of map, this will create
-    // the lock if it doesn't already exist
+    // this will create the lock if it doesn't already exist
     return lock_table[lid];
 }
 
index 381c527..5c182e0 100644 (file)
@@ -19,8 +19,12 @@ public:
     map<callback_t, lock_protocol::xid_t> old_requests;
     mutex m;
     lock_state& operator=(const lock_state&);
+
+    MEMBERS(held, held_by, wanted_by)
 };
 
+MARSHALLABLE(lock_state)
+
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 
 class lock_server : public rsm_state_transfer {
index 5f859a8..3bd7376 100644 (file)
@@ -13,7 +13,7 @@ int main(int argc, char *argv[]) {
     srandom((uint32_t)getpid());
 
     if(argc != 3){
-        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+        cerr << "Usage: " << argv[0] << " [master:]port [me:]port" << endl;
         exit(1);
     }
 
diff --git a/log.cc b/log.cc
index 95c40e3..de00d67 100644 (file)
--- a/log.cc
+++ b/log.cc
@@ -27,19 +27,19 @@ void log::logread(void) {
             pxs->instance_h = instance;
             LOG("logread: instance: " << instance << " w. v = " <<
                     pxs->values[instance]);
-            pxs->v_a.clear();
-            pxs->n_h.n = 0;
-            pxs->n_a.n = 0;
+            pxs->accepted_value.clear();
+            pxs->promise.n = 0;
+            pxs->accepted.n = 0;
         } else if (type == "propseen") {
-            from >> pxs->n_h.n >> pxs->n_h.m;
-            LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
+            from >> pxs->promise.n >> pxs->promise.m;
+            LOG("logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")");
         } else if (type == "accepted") {
             string v;
-            from >> pxs->n_a.n >> pxs->n_a.m;
+            from >> pxs->accepted.n >> pxs->accepted.m;
             from.get();
             getline(from, v);
-            pxs->v_a = v;
-            LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
+            pxs->accepted_value = v;
+            LOG("logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value);
         } else {
             LOG("logread: unknown log record");
             VERIFY(0);
@@ -72,15 +72,15 @@ void log::loginstance(unsigned instance, string v) {
     f.close();
 }
 
-// an acceptor should call logprop(n_h) when it
+// an acceptor should call logprop(promise) when it
 // receives a prepare to which it responds prepare_ok().
-void log::logprop(prop_t n_h) {
+void log::logprop(prop_t promise) {
     ofstream f(name, std::ios::app);
-    f << "propseen " << n_h.n << " " << n_h.m << "\n";
+    f << "propseen " << promise.n << " " << promise.m << "\n";
     f.close();
 }
 
-// an acceptor should call logaccept(n_a, v_a) when it
+// an acceptor should call logaccept(accepted, accepted_value) when it
 // receives an accept RPC to which it replies accept_ok().
 void log::logaccept(prop_t n, string v) {
     ofstream f(name, std::ios::app);
index 095d56a..f9d5785 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,12 +1,6 @@
 #include "paxos.h"
 #include "handle.h"
 
-string print_members(const nodes_t &nodes) {
-    ostringstream ost;
-    copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
-    return ost.str();
-}
-
 bool isamember(const node_t & m, const nodes_t & nodes) {
     return find(nodes.begin(), nodes.end(), m) != nodes.end();
 }
@@ -45,20 +39,20 @@ proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
 {
     lock ml(proposer_mutex);
-    LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
+    LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
     if (!stable) {  // already running proposer?
-        LOG("proposer::run: already running");
+        LOG("paxos proposer already running");
         return false;
     }
     stable = false;
     bool r = false;
-    my_n.n = std::max(n_h.n, my_n.n) + 1;
+    proposal.n = std::max(promise.n, proposal.n) + 1;
     nodes_t accepts;
     value_t v = newv;
     if (prepare(instance, accepts, cur_nodes, v)) {
 
         if (majority(cur_nodes, accepts)) {
-            LOG("paxos::run: received a majority of prepare responses");
+            LOG("received a majority of prepare responses");
 
             breakpoint1();
 
@@ -67,20 +61,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
             accept(instance, accepts, nodes, v);
 
             if (majority(cur_nodes, accepts)) {
-                LOG("paxos::run: received a majority of accept responses");
+                LOG("received a majority of accept responses");
 
                 breakpoint2();
 
                 decide(instance, accepts, v);
                 r = true;
             } else {
-                LOG("paxos::run: no majority of accept responses");
+                LOG("no majority of accept responses");
             }
         } else {
-            LOG("paxos::run: no majority of prepare responses");
+            LOG("no majority of prepare responses");
         }
     } else {
-        LOG("paxos::run: prepare is rejected " << stable);
+        LOG("prepare is rejected " << stable);
     }
     stable = true;
     return r;
@@ -88,6 +82,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
 
 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         const nodes_t & nodes, value_t & v) {
+    LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
     prepareres res;
     prop_t highest_n_a{0, ""};
     for (auto i : nodes) {
@@ -96,17 +91,19 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         if (!r)
             continue;
         auto status = (paxos_protocol::status)r->call_timeout(
-                paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
+                paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 LOG("commiting old instance!");
                 commit(instance, res.v_a);
                 return false;
             }
+            LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
+                    "v_a=\"" << res.v_a << "\"");
             if (res.accept) {
                 accepts.push_back(i);
                 if (res.n_a >= highest_n_a) {
-                    LOG("found a newer accepted proposal");
+                    LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
                     v = res.v_a;
                     highest_n_a = res.n_a;
                 }
@@ -125,7 +122,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
             continue;
         bool accept = false;
         int status = r->call_timeout(
-                paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
+                paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v);
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
@@ -144,21 +141,26 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
 
 paxos_protocol::status
 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+    LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
     lock ml(acceptor_mutex);
     r.oldinstance = false;
     r.accept = false;
-    r.n_a = n_a;
-    r.v_a = v_a;
+    r.n_a = accepted;
+    r.v_a = accepted_value;
     if (instance <= instance_h) {
+        LOG("old instance " << instance << " has value " << values[instance]);
         r.oldinstance = true;
         r.v_a = values[instance];
-    } else if (n > n_h) {
-        n_h = n;
-        l.logprop(n_h);
+    } else if (n > promise) {
+        LOG("looks good to me");
+        promise = n;
+        l.logprop(promise);
         r.accept = true;
     } else {
         LOG("I totally rejected this request.  Ha.");
     }
+    LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
+        "v_a=\"" << r.v_a << "\"");
     return paxos_protocol::OK;
 }
 
@@ -167,10 +169,10 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t
     lock ml(acceptor_mutex);
     r = false;
     if (instance == instance_h + 1) {
-        if (n >= n_h) {
-            n_a = n;
-            v_a = v;
-            l.logaccept(n_a, v_a);
+        if (n >= promise) {
+            accepted = n;
+            accepted_value = v;
+            l.logaccept(accepted, accepted_value);
             r = true;
         }
         return paxos_protocol::OK;
@@ -182,10 +184,10 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t
 paxos_protocol::status
 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
     lock ml(acceptor_mutex);
-    LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
+    LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
     if (instance == instance_h + 1) {
-        VERIFY(v_a == v);
-        commit(instance, v_a, ml);
+        VERIFY(accepted_value == v);
+        commit(instance, accepted_value, ml);
     } else if (instance <= instance_h) {
         // we are ahead; ignore.
     } else {
@@ -207,8 +209,8 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock &
         values[instance] = value;
         l.loginstance(instance, value);
         instance_h = instance;
-        n_a = n_h = {0, me};
-        v_a.clear();
+        accepted = promise = {0, me};
+        accepted_value.clear();
         if (delegate) {
             pxs_mutex_lock.unlock();
             delegate->paxos_commit(instance, value);
diff --git a/paxos.h b/paxos.h
index 116403d..426dfef 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -20,7 +20,6 @@ class paxos_change {
 
 extern bool isamember(const node_t & m, const nodes_t & nodes);
 extern bool majority(const nodes_t & l1, const nodes_t & l2);
-extern string print_members(const nodes_t & nodes);
 
 class proposer_acceptor {
     private:
@@ -37,12 +36,12 @@ class proposer_acceptor {
 
         // Proposer state
         bool stable = true;
-        prop_t my_n = {0, me};      // number of the last proposal used in this instance
+        prop_t proposal = {0, me};  // number of the last proposal used in this instance
 
         // Acceptor state
-        prop_t n_h = {0, me};       // number of the highest proposal seen in a prepare
-        prop_t n_a = {0, me};       // number of highest proposal accepted
-        value_t v_a;                // value of highest proposal accepted
+        prop_t promise = {0, me};   // number of the highest proposal seen in a prepare
+        prop_t accepted = {0, me};  // number of highest proposal accepted
+        value_t accepted_value;     // value of highest proposal accepted
         unsigned instance_h = 0;    // number of the highest instance we have decided
         map<unsigned,value_t> values;   // vals of each instance
 
index c24f155..5e8afdd 100644 (file)
@@ -7,8 +7,13 @@
 struct prop_t {
     unsigned n;
     string m;
+
+    MEMBERS(n, m)
+    LEXICOGRAPHIC_COMPARISON(prop_t)
 };
 
+MARSHALLABLE(prop_t)
+
 class paxos_protocol {
     public:
         enum status : status_t { OK, ERR };
@@ -24,20 +29,11 @@ class paxos_protocol {
             bool accept;
             prop_t n_a;
             string v_a;
+
+            MEMBERS(oldinstance, accept, n_a, v_a)
         };
 };
 
-inline unmarshall & operator>>(unmarshall &u, prop_t &a) { return u >> a.n >> a.m; }
-inline marshall & operator<<(marshall &m, prop_t a) { return m << a.n << a.m; }
-inline bool operator>(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) > tie(b.n, b.m); }
-inline bool operator>=(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) >= tie(b.n, b.m); }
-
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) {
-    return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) {
-    return m << r.oldinstance << r.accept << r.n_a << r.v_a;
-}
+MARSHALLABLE(paxos_protocol::prepareres)
 
 #endif
index 3f60c69..86d4ec5 100644 (file)
@@ -7,15 +7,13 @@
 #include <errno.h>
 #include <signal.h>
 #include <unistd.h>
-#include "jsl_log.h"
 #include <sys/socket.h>
 
 #define MAX_PDU (10<<20) //maximum PDF is 10M
 
 connection::connection(chanmgr *m1, int f1, int l1)
-: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
+: mgr_(m1), fd_(f1), lossy_(l1)
 {
-
     int flags = fcntl(fd_, F_GETFL, NULL);
     flags |= O_NONBLOCK;
     fcntl(fd_, F_SETFL, flags);
@@ -27,8 +25,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
 }
 
-connection::~connection()
-{
+connection::~connection() {
     VERIFY(dead_);
     if (rpdu_.buf)
         free(rpdu_.buf);
@@ -36,23 +33,17 @@ connection::~connection()
     close(fd_);
 }
 
-void
-connection::incref()
-{
+void connection::incref() {
     lock rl(ref_m_);
     refno_++;
 }
 
-bool
-connection::isdead()
-{
+bool connection::isdead() {
     lock ml(m_);
     return dead_;
 }
 
-void
-connection::closeconn()
-{
+void connection::closeconn() {
     {
         lock ml(m_);
         if (!dead_) {
@@ -67,9 +58,7 @@ connection::closeconn()
     PollMgr::Instance()->block_remove_fd(fd_);
 }
 
-void
-connection::decref()
-{
+void connection::decref() {
     bool dead = false;
     {
         lock rl(ref_m_);
@@ -80,21 +69,11 @@ connection::decref()
             dead = dead_;
         }
     }
-    if (dead) {
+    if (dead)
         delete this;
-    }
-}
-
-int
-connection::ref()
-{
-    lock rl(ref_m_);
-       return refno_;
 }
 
-int
-connection::compare(connection *another)
-{
+int connection::compare(connection *another) {
     if (create_time_ > another->create_time_)
         return 1;
     if (create_time_ < another->create_time_)
@@ -102,261 +81,244 @@ connection::compare(connection *another)
     return 0;
 }
 
-bool
-connection::send(char *b, size_t sz)
-{
+bool connection::send(char *b, size_t sz) {
     lock ml(m_);
-       waiters_++;
-       while (!dead_ && wpdu_.buf) {
+    waiters_++;
+    while (!dead_ && wpdu_.buf) {
         send_wait_.wait(ml);
-       }
-       waiters_--;
-       if (dead_) {
-               return false;
-       }
-       wpdu_.buf = b;
-       wpdu_.sz = sz;
-       wpdu_.solong = 0;
-
-       if (lossy_) {
-               if ((random()%100) < lossy_) {
-                       jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
-                       shutdown(fd_,SHUT_RDWR);
-               }
-       }
-
-       if (!writepdu()) {
-               dead_ = true;
+    }
+    waiters_--;
+    if (dead_) {
+        return false;
+    }
+    wpdu_.buf = b;
+    wpdu_.sz = sz;
+    wpdu_.solong = 0;
+
+    if (lossy_) {
+        if ((random()%100) < lossy_) {
+            IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+            shutdown(fd_,SHUT_RDWR);
+        }
+    }
+
+    if (!writepdu()) {
+        dead_ = true;
         ml.unlock();
-               PollMgr::Instance()->block_remove_fd(fd_);
+        PollMgr::Instance()->block_remove_fd(fd_);
         ml.lock();
-       } else {
-               if (wpdu_.solong == wpdu_.sz) {
-               } else {
-                       //should be rare to need to explicitly add write callback
-                       PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
-                       while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
+    } else {
+        if (wpdu_.solong == wpdu_.sz) {
+        } else {
+            //should be rare to need to explicitly add write callback
+            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+            while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
                 send_complete_.wait(ml);
-                       }
-               }
-       }
-       bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
-       wpdu_.solong = wpdu_.sz = 0;
-       wpdu_.buf = NULL;
-       if (waiters_ > 0)
+            }
+        }
+    }
+    bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
+    wpdu_.solong = wpdu_.sz = 0;
+    wpdu_.buf = NULL;
+    if (waiters_ > 0)
         send_wait_.notify_all();
-       return ret;
+    return ret;
 }
 
 //fd_ is ready to be written
-void
-connection::write_cb(int s)
-{
+void connection::write_cb(int s) {
     lock ml(m_);
-       VERIFY(!dead_);
-       VERIFY(fd_ == s);
-       if (wpdu_.sz == 0) {
-               PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
-               return;
-       }
-       if (!writepdu()) {
-               PollMgr::Instance()->del_callback(fd_, CB_RDWR);
-               dead_ = true;
-       } else {
-               VERIFY(wpdu_.solong != size_t_max);
-               if (wpdu_.solong < wpdu_.sz) {
-                       return;
-               }
+    VERIFY(!dead_);
+    VERIFY(fd_ == s);
+    if (wpdu_.sz == 0) {
+        PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+        return;
+    }
+    if (!writepdu()) {
+        PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+        dead_ = true;
+    } else {
+        VERIFY(wpdu_.solong != size_t_max);
+        if (wpdu_.solong < wpdu_.sz) {
+            return;
+        }
     }
-       send_complete_.notify_one();
+    send_complete_.notify_one();
 }
 
 //fd_ is ready to be read
-void
-connection::read_cb(int s)
-{
+void connection::read_cb(int s) {
     lock ml(m_);
-       VERIFY(fd_ == s);
-       if (dead_)  {
-               return;
-       }
-
-       bool succ = true;
-       if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
-               succ = readpdu();
-       }
-
-       if (!succ) {
-               PollMgr::Instance()->del_callback(fd_,CB_RDWR);
-               dead_ = true;
-               send_complete_.notify_one();
-       }
-
-       if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
-               if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
-                       //chanmgr has successfully consumed the pdu
-                       rpdu_.buf = NULL;
-                       rpdu_.sz = rpdu_.solong = 0;
-               }
-       }
+    VERIFY(fd_ == s);
+    if (dead_)  {
+        return;
+    }
+
+    bool succ = true;
+    if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
+        succ = readpdu();
+    }
+
+    if (!succ) {
+        PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+        dead_ = true;
+        send_complete_.notify_one();
+    }
+
+    if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
+        if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
+            //chanmgr has successfully consumed the pdu
+            rpdu_.buf = NULL;
+            rpdu_.sz = rpdu_.solong = 0;
+        }
+    }
 }
 
-bool
-connection::writepdu()
-{
-       VERIFY(wpdu_.solong != size_t_max);
-       if (wpdu_.solong == wpdu_.sz)
-               return true;
-
-       if (wpdu_.solong == 0) {
-               uint32_t sz = htonl((uint32_t)wpdu_.sz);
-               bcopy(&sz,wpdu_.buf,sizeof(sz));
-       }
-       ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
-       if (n < 0) {
-               if (errno != EAGAIN) {
-                       jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
-                       wpdu_.solong = size_t_max;
-                       wpdu_.sz = 0;
-               }
-               return (errno == EAGAIN);
-       }
-       wpdu_.solong += (size_t)n;
-       return true;
+bool connection::writepdu() {
+    VERIFY(wpdu_.solong != size_t_max);
+    if (wpdu_.solong == wpdu_.sz)
+        return true;
+
+    if (wpdu_.solong == 0) {
+        uint32_t sz = htonl((uint32_t)wpdu_.sz);
+        bcopy(&sz,wpdu_.buf,sizeof(sz));
+    }
+    ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+    if (n < 0) {
+        if (errno != EAGAIN) {
+            IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+            wpdu_.solong = size_t_max;
+            wpdu_.sz = 0;
+        }
+        return (errno == EAGAIN);
+    }
+    wpdu_.solong += (size_t)n;
+    return true;
 }
 
-bool
-connection::readpdu()
-{
-       if (!rpdu_.sz) {
-               uint32_t sz1;
-               ssize_t n = read(fd_, &sz1, sizeof(sz1));
-
-               if (n == 0) {
-                       return false;
-               }
-
-               if (n < 0) {
-                       VERIFY(errno!=EAGAIN);
-                       return false;
-               }
-
-               if (n > 0 && n != sizeof(sz1)) {
-                       jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
-                       return false;
-               }
-
-               size_t sz = ntohl(sz1);
-
-               if (sz > MAX_PDU) {
-                       char *tmpb = (char *)&sz1;
-                       jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %lu network order=%x %x %x %x %x\n", sz,
-                                       sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
-                       return false;
-               }
-
-               rpdu_.sz = sz;
-               VERIFY(rpdu_.buf == NULL);
-               rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
-               VERIFY(rpdu_.buf);
-               bcopy(&sz1,rpdu_.buf,sizeof(sz1));
-               rpdu_.solong = sizeof(sz1);
-       }
-
-       ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
-       if (n <= 0) {
-               if (errno == EAGAIN)
-                       return true;
-               if (rpdu_.buf)
-                       free(rpdu_.buf);
-               rpdu_.buf = NULL;
-               rpdu_.sz = rpdu_.solong = 0;
-               return (errno == EAGAIN);
-       }
-       rpdu_.solong += (size_t)n;
-       return true;
+bool connection::readpdu() {
+    if (!rpdu_.sz) {
+        uint32_t sz1;
+        ssize_t n = read(fd_, &sz1, sizeof(sz1));
+
+        if (n == 0) {
+            return false;
+        }
+
+        if (n < 0) {
+            VERIFY(errno!=EAGAIN);
+            return false;
+        }
+
+        if (n > 0 && n != sizeof(sz1)) {
+            IF_LEVEL(0) LOG("connection::readpdu short read of sz");
+            return false;
+        }
+
+        size_t sz = ntohl(sz1);
+
+        if (sz > MAX_PDU) {
+            IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+            return false;
+        }
+
+        rpdu_.sz = sz;
+        VERIFY(rpdu_.buf == NULL);
+        rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
+        VERIFY(rpdu_.buf);
+        bcopy(&sz1,rpdu_.buf,sizeof(sz1));
+        rpdu_.solong = sizeof(sz1);
+    }
+
+    ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+    if (n <= 0) {
+        if (errno == EAGAIN)
+            return true;
+        if (rpdu_.buf)
+            free(rpdu_.buf);
+        rpdu_.buf = NULL;
+        rpdu_.sz = rpdu_.solong = 0;
+        return (errno == EAGAIN);
+    }
+    rpdu_.solong += (size_t)n;
+    return true;
 }
 
 tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
 : mgr_(m1), lossy_(lossytest)
 {
-       struct sockaddr_in sin;
-       memset(&sin, 0, sizeof(sin));
-       sin.sin_family = AF_INET;
-       sin.sin_port = htons(port);
-
-       tcp_ = socket(AF_INET, SOCK_STREAM, 0);
-       if (tcp_ < 0) {
-               perror("tcpsconn::tcpsconn accept_loop socket:");
-               VERIFY(0);
-       }
-
-       int yes = 1;
-       setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
-       setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
-
-       if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
-               perror("accept_loop tcp bind:");
-               VERIFY(0);
-       }
-
-       if (listen(tcp_, 1000) < 0) {
-               perror("tcpsconn::tcpsconn listen:");
-               VERIFY(0);
-       }
+    struct sockaddr_in sin;
+    memset(&sin, 0, sizeof(sin));
+    sin.sin_family = AF_INET;
+    sin.sin_port = htons(port);
+
+    tcp_ = socket(AF_INET, SOCK_STREAM, 0);
+    if (tcp_ < 0) {
+        perror("tcpsconn::tcpsconn accept_loop socket:");
+        VERIFY(0);
+    }
+
+    int yes = 1;
+    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
+    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+
+    if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+        perror("accept_loop tcp bind:");
+        VERIFY(0);
+    }
+
+    if (listen(tcp_, 1000) < 0) {
+        perror("tcpsconn::tcpsconn listen:");
+        VERIFY(0);
+    }
 
     socklen_t addrlen = sizeof(sin);
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
     port_ = ntohs(sin.sin_port);
 
-       jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
-               sin.sin_port);
+    IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
 
-       if (pipe(pipe_) < 0) {
-               perror("accept_loop pipe:");
-               VERIFY(0);
-       }
+    if (pipe(pipe_) < 0) {
+        perror("accept_loop pipe:");
+        VERIFY(0);
+    }
 
-       int flags = fcntl(pipe_[0], F_GETFL, NULL);
-       flags |= O_NONBLOCK;
-       fcntl(pipe_[0], F_SETFL, flags);
+    int flags = fcntl(pipe_[0], F_GETFL, NULL);
+    flags |= O_NONBLOCK;
+    fcntl(pipe_[0], F_SETFL, flags);
 
     th_ = thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
 {
-       VERIFY(close(pipe_[1]) == 0);
+    VERIFY(close(pipe_[1]) == 0);
     th_.join();
 
-       //close all the active connections
-       map<int, connection *>::iterator i;
-       for (i = conns_.begin(); i != conns_.end(); i++) {
-               i->second->closeconn();
-               i->second->decref();
-       }
+    //close all the active connections
+    map<int, connection *>::iterator i;
+    for (i = conns_.begin(); i != conns_.end(); i++) {
+        i->second->closeconn();
+        i->second->decref();
+    }
 }
 
-void
-tcpsconn::process_accept()
-{
-       sockaddr_in sin;
-       socklen_t slen = sizeof(sin);
-       int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
-       if (s1 < 0) {
-               perror("tcpsconn::accept_conn error");
-               throw thread_exit_exception();
-       }
-
-       jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
-                       s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
-       connection *ch = new connection(mgr_, s1, lossy_);
+void tcpsconn::process_accept() {
+    sockaddr_in sin;
+    socklen_t slen = sizeof(sin);
+    int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
+    if (s1 < 0) {
+        perror("tcpsconn::accept_conn error");
+        throw thread_exit_exception();
+    }
+
+    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+    connection *ch = new connection(mgr_, s1, lossy_);
 
     // garbage collect all dead connections with refcount of 1
     for (auto i = conns_.begin(); i != conns_.end();) {
         if (i->second->isdead() && i->second->ref() == 1) {
-            jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
-                    i->second->channo());
+            IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
             i->second->decref();
             // Careful not to reuse i right after erase. (i++) will
             // be evaluated before the erase call because in C++,
@@ -367,14 +329,12 @@ tcpsconn::process_accept()
             ++i;
     }
 
-       conns_[ch->channo()] = ch;
+    conns_[ch->channo()] = ch;
 }
 
-void
-tcpsconn::accept_conn()
-{
-       fd_set rfds;
-       int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
+void tcpsconn::accept_conn() {
+    fd_set rfds;
+    int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
 
     try {
         while (1) {
@@ -389,7 +349,7 @@ tcpsconn::accept_conn()
                     continue;
                 } else {
                     perror("accept_conn select:");
-                    jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
+                    IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
                     VERIFY(0);
                 }
             }
@@ -411,20 +371,16 @@ tcpsconn::accept_conn()
     }
 }
 
-connection *
-connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
-{
-       int s = socket(AF_INET, SOCK_STREAM, 0);
-       int yes = 1;
-       setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
-       if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-               jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
-                               inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
-               close(s);
-               return NULL;
-       }
-       jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n",
-                       s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
-       return new connection(mgr, s, lossy);
+connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
+    int s = socket(AF_INET, SOCK_STREAM, 0);
+    int yes = 1;
+    setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+    if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+        IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+        close(s);
+        return NULL;
+    }
+    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+    return new connection(mgr, s, lossy);
 }
 
index 261cf9f..2a01e46 100644 (file)
@@ -43,7 +43,7 @@ class connection : public aio_callback {
 
         void incref();
         void decref();
-        int ref();
+        int ref() { lock rl(ref_m_); return refno_; }
 
         int compare(connection *another);
     private:
@@ -53,15 +53,15 @@ class connection : public aio_callback {
 
         chanmgr *mgr_;
         const int fd_;
-        bool dead_;
+        bool dead_ = false;
 
         charbuf wpdu_;
         charbuf rpdu_;
 
         time_point<steady_clock> create_time_;
 
-        int waiters_;
-        int refno_;
+        int waiters_ = 0;
+        int refno_ = 1;
         const int lossy_;
 
         mutex m_;
diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc
deleted file mode 100644 (file)
index 9399b09..0000000
+++ /dev/null
@@ -1 +0,0 @@
-int JSL_DEBUG_LEVEL = 0;
diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h
deleted file mode 100644 (file)
index 66a2dd3..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-#ifndef jsl_log_h
-#define jsl_log_h
-
-enum dbcode {
-    JSL_DBG_OFF = 0,
-    JSL_DBG_1 = 1, // Critical
-    JSL_DBG_2 = 2, // Error
-    JSL_DBG_3 = 3, // Info
-    JSL_DBG_4 = 4, // Debugging
-};
-
-extern int JSL_DEBUG_LEVEL;
-
-#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);}
-
-#endif
index 20b9c07..98856e4 100644 (file)
@@ -52,6 +52,8 @@ typedef int rpc_sz_t;
 #define DEFAULT_RPC_SZ 1024
 #define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
 
+struct pass { template <typename... Args> inline pass(Args&&...) {} };
+
 class marshall {
     private:
         char *buf_;     // Base of the raw bytes buffer (dynamically readjusted)
@@ -67,10 +69,7 @@ class marshall {
             }
         }
     public:
-        struct pass { template <typename... Args> inline pass(Args&&...) {} };
-
         template <typename... Args>
-
         marshall(const Args&... args) {
             buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
             VERIFY(buf_);
@@ -407,4 +406,33 @@ template <class F, class ErrorHandler, class Signature>
 struct marshalled_func<F, ErrorHandler, function<Signature>> :
     public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
 
+template <class ...Args, size_t ...Indices> unmarshall &
+tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
+    (void)pass{(u >> get<Indices>(t))...};
+    return u;
+}
+
+template <class... Args> unmarshall &
+operator>>(unmarshall & u, tuple<Args &...> && t) {
+    using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+    return tuple_unmarshall_imp(u, t, Indices());
+}
+
+template <class ...Args, size_t ...Indices> marshall &
+tuple_marshall_imp(marshall & m, tuple<Args...> & t, tuple_indices<Indices...>) {
+    (void)pass{(m << get<Indices>(t))...};
+    return m;
+}
+
+template <class... Args> marshall &
+operator<<(marshall & m, tuple<Args...> && t) {
+    using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+    return tuple_marshall_imp(m, t, Indices());
+}
+
+// for structs or classes containing a MEMBERS declaration
+#define MARSHALLABLE(_c_) \
+inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
+inline marshall & operator<<(marshall &m, _c_ a) { return m << a._tuple_(); }
+
 #endif
index 023a7aa..15fba26 100644 (file)
@@ -3,7 +3,6 @@
 #include <fcntl.h>
 #include <unistd.h>
 
-#include "jsl_log.h"
 #include "pollmgr.h"
 
 PollMgr *PollMgr::instance = NULL;
@@ -12,41 +11,41 @@ static std::once_flag pollmgr_is_initialized;
 static void
 PollMgrInit()
 {
-       PollMgr::instance = new PollMgr();
+    PollMgr::instance = new PollMgr();
 }
 
 PollMgr *
 PollMgr::Instance()
 {
     std::call_once(pollmgr_is_initialized, PollMgrInit);
-       return instance;
+    return instance;
 }
 
 PollMgr::PollMgr() : pending_change_(false)
 {
-       bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
-       aio_ = new SelectAIO();
-       //aio_ = new EPollAIO();
+    bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
+    aio_ = new SelectAIO();
+    //aio_ = new EPollAIO();
 
     th_ = std::thread(&PollMgr::wait_loop, this);
 }
 
 PollMgr::~PollMgr() [[noreturn]]
 {
-       //never kill me!!!
-       VERIFY(0);
+    //never kill me!!!
+    VERIFY(0);
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
-       VERIFY(fd < MAX_POLL_FDS);
+    VERIFY(fd < MAX_POLL_FDS);
 
     lock ml(m_);
-       aio_->watch_fd(fd, flag);
+    aio_->watch_fd(fd, flag);
 
-       VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
-       callbacks_[fd] = ch;
+    VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
+    callbacks_[fd] = ch;
 }
 
 //remove all callbacks related to fd
@@ -56,82 +55,82 @@ void
 PollMgr::block_remove_fd(int fd)
 {
     lock ml(m_);
-       aio_->unwatch_fd(fd, CB_RDWR);
-       pending_change_ = true;
+    aio_->unwatch_fd(fd, CB_RDWR);
+    pending_change_ = true;
     changedone_c_.wait(ml);
-       callbacks_[fd] = NULL;
+    callbacks_[fd] = NULL;
 }
 
 void
 PollMgr::del_callback(int fd, poll_flag flag)
 {
     lock ml(m_);
-       if (aio_->unwatch_fd(fd, flag)) {
-               callbacks_[fd] = NULL;
-       }
+    if (aio_->unwatch_fd(fd, flag)) {
+        callbacks_[fd] = NULL;
+    }
 }
 
 bool
 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
 {
     lock ml(m_);
-       if (!callbacks_[fd] || callbacks_[fd]!=c)
-               return false;
+    if (!callbacks_[fd] || callbacks_[fd]!=c)
+        return false;
 
-       return aio_->is_watched(fd, flag);
+    return aio_->is_watched(fd, flag);
 }
 
 void
 PollMgr::wait_loop() [[noreturn]]
 {
 
-       std::vector<int> readable;
-       std::vector<int> writable;
+    std::vector<int> readable;
+    std::vector<int> writable;
 
-       while (1) {
-               {
+    while (1) {
+        {
             lock ml(m_);
-                       if (pending_change_) {
-                               pending_change_ = false;
+            if (pending_change_) {
+                pending_change_ = false;
                 changedone_c_.notify_all();
-                       }
-               }
-               readable.clear();
-               writable.clear();
-               aio_->wait_ready(&readable,&writable);
-
-               if (!readable.size() && !writable.size()) {
-                       continue;
-               } 
-               //no locking of m_
-               //because no add_callback() and del_callback should 
-               //modify callbacks_[fd] while the fd is not dead
-               for (unsigned int i = 0; i < readable.size(); i++) {
-                       int fd = readable[i];
-                       if (callbacks_[fd])
-                               callbacks_[fd]->read_cb(fd);
-               }
-
-               for (unsigned int i = 0; i < writable.size(); i++) {
-                       int fd = writable[i];
-                       if (callbacks_[fd])
-                               callbacks_[fd]->write_cb(fd);
-               }
-       }
+            }
+        }
+        readable.clear();
+        writable.clear();
+        aio_->wait_ready(&readable,&writable);
+
+        if (!readable.size() && !writable.size()) {
+            continue;
+        } 
+        //no locking of m_
+        //because no add_callback() and del_callback should 
+        //modify callbacks_[fd] while the fd is not dead
+        for (unsigned int i = 0; i < readable.size(); i++) {
+            int fd = readable[i];
+            if (callbacks_[fd])
+                callbacks_[fd]->read_cb(fd);
+        }
+
+        for (unsigned int i = 0; i < writable.size(); i++) {
+            int fd = writable[i];
+            if (callbacks_[fd])
+                callbacks_[fd]->write_cb(fd);
+        }
+    }
 }
 
 SelectAIO::SelectAIO() : highfds_(0)
 {
-       FD_ZERO(&rfds_);
-       FD_ZERO(&wfds_);
+    FD_ZERO(&rfds_);
+    FD_ZERO(&wfds_);
 
-       VERIFY(pipe(pipefd_) == 0);
-       FD_SET(pipefd_[0], &rfds_);
-       highfds_ = pipefd_[0];
+    VERIFY(pipe(pipefd_) == 0);
+    FD_SET(pipefd_[0], &rfds_);
+    highfds_ = pipefd_[0];
 
-       int flags = fcntl(pipefd_[0], F_GETFL, NULL);
-       flags |= O_NONBLOCK;
-       fcntl(pipefd_[0], F_SETFL, flags);
+    int flags = fcntl(pipefd_[0], F_GETFL, NULL);
+    flags |= O_NONBLOCK;
+    fcntl(pipefd_[0], F_SETFL, flags);
 }
 
 SelectAIO::~SelectAIO()
@@ -142,210 +141,210 @@ void
 SelectAIO::watch_fd(int fd, poll_flag flag)
 {
     lock ml(m_);
-       if (highfds_ <= fd) 
-               highfds_ = fd;
-
-       if (flag == CB_RDONLY) {
-               FD_SET(fd,&rfds_);
-       }else if (flag == CB_WRONLY) {
-               FD_SET(fd,&wfds_);
-       }else {
-               FD_SET(fd,&rfds_);
-               FD_SET(fd,&wfds_);
-       }
-
-       char tmp = 1;
-       VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    if (highfds_ <= fd) 
+        highfds_ = fd;
+
+    if (flag == CB_RDONLY) {
+        FD_SET(fd,&rfds_);
+    }else if (flag == CB_WRONLY) {
+        FD_SET(fd,&wfds_);
+    }else {
+        FD_SET(fd,&rfds_);
+        FD_SET(fd,&wfds_);
+    }
+
+    char tmp = 1;
+    VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
 }
 
 bool
 SelectAIO::is_watched(int fd, poll_flag flag)
 {
     lock ml(m_);
-       if (flag == CB_RDONLY) {
-               return FD_ISSET(fd,&rfds_);
-       }else if (flag == CB_WRONLY) {
-               return FD_ISSET(fd,&wfds_);
-       }else{
-               return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
-       }
+    if (flag == CB_RDONLY) {
+        return FD_ISSET(fd,&rfds_);
+    }else if (flag == CB_WRONLY) {
+        return FD_ISSET(fd,&wfds_);
+    }else{
+        return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
+    }
 }
 
 bool 
 SelectAIO::unwatch_fd(int fd, poll_flag flag)
 {
     lock ml(m_);
-       if (flag == CB_RDONLY) {
-               FD_CLR(fd, &rfds_);
-       }else if (flag == CB_WRONLY) {
-               FD_CLR(fd, &wfds_);
-       }else if (flag == CB_RDWR) {
-               FD_CLR(fd, &wfds_);
-               FD_CLR(fd, &rfds_);
-       }else{
-               VERIFY(0);
-       }
-
-       if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
-               if (fd == highfds_) {
-                       int newh = pipefd_[0];
-                       for (int i = 0; i <= highfds_; i++) {
-                               if (FD_ISSET(i, &rfds_)) {
-                                       newh = i;
-                               }else if (FD_ISSET(i, &wfds_)) {
-                                       newh = i;
-                               }
-                       }
-                       highfds_ = newh;
-               }
-       }
-       if (flag == CB_RDWR) {
-               char tmp = 1;
-               VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
-       }
-       return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
+    if (flag == CB_RDONLY) {
+        FD_CLR(fd, &rfds_);
+    }else if (flag == CB_WRONLY) {
+        FD_CLR(fd, &wfds_);
+    }else if (flag == CB_RDWR) {
+        FD_CLR(fd, &wfds_);
+        FD_CLR(fd, &rfds_);
+    }else{
+        VERIFY(0);
+    }
+
+    if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
+        if (fd == highfds_) {
+            int newh = pipefd_[0];
+            for (int i = 0; i <= highfds_; i++) {
+                if (FD_ISSET(i, &rfds_)) {
+                    newh = i;
+                }else if (FD_ISSET(i, &wfds_)) {
+                    newh = i;
+                }
+            }
+            highfds_ = newh;
+        }
+    }
+    if (flag == CB_RDWR) {
+        char tmp = 1;
+        VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    }
+    return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
 }
 
 void
 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
 {
-       fd_set trfds, twfds;
-       int high;
+    fd_set trfds, twfds;
+    int high;
 
-       {
+    {
         lock ml(m_);
-               trfds = rfds_;
-               twfds = wfds_;
-               high = highfds_;
-       }
-
-       int ret = select(high+1, &trfds, &twfds, NULL, NULL);
-
-       if (ret < 0) {
-               if (errno == EINTR) {
-                       return;
-               } else {
-                       perror("select:");
-                       jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
-                       VERIFY(0);
-               }
-       }
-
-       for (int fd = 0; fd <= high; fd++) {
-               if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
-                       char tmp;
-                       VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
-                       VERIFY(tmp==1);
-               }else {
-                       if (FD_ISSET(fd, &twfds)) {
-                               writable->push_back(fd);
-                       }
-                       if (FD_ISSET(fd, &trfds)) {
-                               readable->push_back(fd);
-                       }
-               }
-       }
+        trfds = rfds_;
+        twfds = wfds_;
+        high = highfds_;
+    }
+
+    int ret = select(high+1, &trfds, &twfds, NULL, NULL);
+
+    if (ret < 0) {
+        if (errno == EINTR) {
+            return;
+        } else {
+            perror("select:");
+            IF_LEVEL(0) LOG("PollMgr::select_loop failure errno " << errno);
+            VERIFY(0);
+        }
+    }
+
+    for (int fd = 0; fd <= high; fd++) {
+        if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+            char tmp;
+            VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+            VERIFY(tmp==1);
+        }else {
+            if (FD_ISSET(fd, &twfds)) {
+                writable->push_back(fd);
+            }
+            if (FD_ISSET(fd, &trfds)) {
+                readable->push_back(fd);
+            }
+        }
+    }
 }
 
 #ifdef __linux__ 
 
 EPollAIO::EPollAIO()
 {
-       pollfd_ = epoll_create(MAX_POLL_FDS);
-       VERIFY(pollfd_ >= 0);
-       bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
+    pollfd_ = epoll_create(MAX_POLL_FDS);
+    VERIFY(pollfd_ >= 0);
+    bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
 }
 
 EPollAIO::~EPollAIO()
 {
-       close(pollfd_);
+    close(pollfd_);
 }
 
 static inline
 int poll_flag_to_event(poll_flag flag)
 {
-       int f;
-       if (flag == CB_RDONLY) {
-               f = EPOLLIN;
-       }else if (flag == CB_WRONLY) {
-               f = EPOLLOUT;
-       }else { //flag == CB_RDWR
-               f = EPOLLIN | EPOLLOUT;
-       }
-       return f;
+    int f;
+    if (flag == CB_RDONLY) {
+        f = EPOLLIN;
+    }else if (flag == CB_WRONLY) {
+        f = EPOLLOUT;
+    }else { //flag == CB_RDWR
+        f = EPOLLIN | EPOLLOUT;
+    }
+    return f;
 }
 
 void
 EPollAIO::watch_fd(int fd, poll_flag flag)
 {
-       VERIFY(fd < MAX_POLL_FDS);
+    VERIFY(fd < MAX_POLL_FDS);
 
-       struct epoll_event ev;
-       int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-       fdstatus_[fd] |= (int)flag;
+    struct epoll_event ev;
+    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+    fdstatus_[fd] |= (int)flag;
 
-       ev.events = EPOLLET;
-       ev.data.fd = fd;
+    ev.events = EPOLLET;
+    ev.data.fd = fd;
 
-       if (fdstatus_[fd] & CB_RDONLY) {
-               ev.events |= EPOLLIN;
-       }
-       if (fdstatus_[fd] & CB_WRONLY) {
-               ev.events |= EPOLLOUT;
-       }
+    if (fdstatus_[fd] & CB_RDONLY) {
+        ev.events |= EPOLLIN;
+    }
+    if (fdstatus_[fd] & CB_WRONLY) {
+        ev.events |= EPOLLOUT;
+    }
 
-       if (flag == CB_RDWR) {
-               VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
-       }
+    if (flag == CB_RDWR) {
+        VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
+    }
 
-       VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
 }
 
 bool 
 EPollAIO::unwatch_fd(int fd, poll_flag flag)
 {
-       VERIFY(fd < MAX_POLL_FDS);
-       fdstatus_[fd] &= ~(int)flag;
-
-       struct epoll_event ev;
-       int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
-
-       ev.events = EPOLLET;
-       ev.data.fd = fd;
-
-       if (fdstatus_[fd] & CB_RDONLY) {
-               ev.events |= EPOLLIN;
-       }
-       if (fdstatus_[fd] & CB_WRONLY) {
-               ev.events |= EPOLLOUT;
-       }
-
-       if (flag == CB_RDWR) {
-               VERIFY(op == EPOLL_CTL_DEL);
-       }
-       VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
-       return (op == EPOLL_CTL_DEL);
+    VERIFY(fd < MAX_POLL_FDS);
+    fdstatus_[fd] &= ~(int)flag;
+
+    struct epoll_event ev;
+    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+
+    ev.events = EPOLLET;
+    ev.data.fd = fd;
+
+    if (fdstatus_[fd] & CB_RDONLY) {
+        ev.events |= EPOLLIN;
+    }
+    if (fdstatus_[fd] & CB_WRONLY) {
+        ev.events |= EPOLLOUT;
+    }
+
+    if (flag == CB_RDWR) {
+        VERIFY(op == EPOLL_CTL_DEL);
+    }
+    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+    return (op == EPOLL_CTL_DEL);
 }
 
 bool
 EPollAIO::is_watched(int fd, poll_flag flag)
 {
-       VERIFY(fd < MAX_POLL_FDS);
-       return ((fdstatus_[fd] & CB_MASK) == flag);
+    VERIFY(fd < MAX_POLL_FDS);
+    return ((fdstatus_[fd] & CB_MASK) == flag);
 }
 
 void
 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
 {
-       int nfds = epoll_wait(pollfd_, ready_,  MAX_POLL_FDS, -1);
-       for (int i = 0; i < nfds; i++) {
-               if (ready_[i].events & EPOLLIN) {
-                       readable->push_back(ready_[i].data.fd);
-               }
-               if (ready_[i].events & EPOLLOUT) {
-                       writable->push_back(ready_[i].data.fd);
-               }
-       }
+    int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+    for (int i = 0; i < nfds; i++) {
+        if (ready_[i].events & EPOLLIN) {
+            readable->push_back(ready_[i].data.fd);
+        }
+        if (ready_[i].events & EPOLLOUT) {
+            writable->push_back(ready_[i].data.fd);
+        }
+    }
 }
 
 #endif
index ad3fcd9..9f1d90c 100644 (file)
@@ -63,8 +63,6 @@
 #include <netdb.h>
 #include <unistd.h>
 
-#include "jsl_log.h"
-
 const rpcc::TO rpcc::to_max = { 120000 };
 const rpcc::TO rpcc::to_min = { 1000 };
 
@@ -95,16 +93,13 @@ rpcc::rpcc(const string & d, bool retrans) :
     // xid starts with 1 and latest received reply starts with 0
     xid_rep_window_.push_back(0);
 
-    jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
-            clt_nonce_, lossytest_);
+    IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
-rpcc::~rpcc()
-{
-    jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
-            clt_nonce_, chan_?chan_->channo():-1);
+rpcc::~rpcc() {
+    IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
     if(chan_){
         chan_->closeconn();
         chan_->decref();
@@ -112,9 +107,7 @@ rpcc::~rpcc()
     VERIFY(calls_.size() == 0);
 }
 
-int
-rpcc::bind(TO to)
-{
+int rpcc::bind(TO to) {
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
@@ -122,22 +115,19 @@ rpcc::bind(TO to)
         bind_done_ = true;
         srv_nonce_ = r;
     } else {
-        jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
-                inet_ntoa(dst_.sin_addr), ret);
+        IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
     }
     return ret;
 };
 
 // Cancel all outstanding calls
-    void
-rpcc::cancel(void)
-{
+void rpcc::cancel(void) {
     lock ml(m_);
     LOG("rpcc::cancel: force callers to fail");
     for(auto &p : calls_){
         caller *ca = p.second;
 
-        jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
+        IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail");
         {
             lock cl(ca->m);
             ca->done = true;
@@ -153,10 +143,7 @@ rpcc::cancel(void)
     LOG("rpcc::cancel: done");
 }
 
-int
-rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
-        TO to)
-{
+int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
 
     caller ca(0, &rep);
     int xid_rep;
@@ -165,7 +152,7 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
 
         if((proc != rpc_const::bind && !bind_done_) ||
                 (proc == rpc_const::bind && bind_done_)){
-            jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
+            IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice");
             return rpc_const::bind_failure;
         }
 
@@ -206,10 +193,9 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
                     ch->send(req.cstr(), req.size());
                 }
-                else jsl_log(JSL_DBG_1, "not reachable\n");
-                jsl_log(JSL_DBG_2,
-                        "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
-                        clt_nonce_, proc, ca.xid, clt_nonce_);
+                else IF_LEVEL(1) LOG("not reachable");
+                IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc <<
+                                " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
             }
             transmit = false; // only send once on a given channel
         }
@@ -226,14 +212,14 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
         {
             lock cal(ca.m);
             while (!ca.done){
-                jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
+                IF_LEVEL(2) LOG("rpcc:call1: wait");
                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
-                    jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
+                    IF_LEVEL(2) LOG("rpcc::call1: timeout");
                     break;
                 }
             }
             if(ca.done){
-                jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
+                IF_LEVEL(2) LOG("rpcc::call1: reply received");
                 break;
             }
         }
@@ -273,10 +259,9 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
 
     lock cal(ca.m);
 
-    jsl_log(JSL_DBG_2,
-            "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
-            clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
-            ntohs(dst_.sin_port), ca.done, ca.intret);
+    IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc <<
+                    " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
+                    ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
     if(ch)
         ch->decref();
@@ -316,7 +301,7 @@ rpcc::got_pdu(connection *, char *b, size_t sz)
     rep.unpack_reply_header(&h);
 
     if(!rep.ok()){
-        jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
+        IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!");
         return true;
     }
 
@@ -325,7 +310,7 @@ rpcc::got_pdu(connection *, char *b, size_t sz)
     update_xid_rep(h.xid);
 
     if(calls_.find(h.xid) == calls_.end()){
-        jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
+        IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request");
         return true;
     }
     caller *ca = calls_[h.xid];
@@ -335,8 +320,7 @@ rpcc::got_pdu(connection *, char *b, size_t sz)
         ca->un->take_in(rep);
         ca->intret = h.ret;
         if(ca->intret < 0){
-            jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
-                    h.xid, ca->intret);
+            IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret);
         }
         ca->done = 1;
     }
@@ -373,7 +357,7 @@ rpcs::rpcs(unsigned int p1, size_t count)
 {
     set_rand_seed();
     nonce_ = (unsigned int)random();
-    jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
+    IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_);
 
     char *loss_env = getenv("RPC_LOSSY");
     if(loss_env != NULL){
@@ -398,7 +382,7 @@ bool
 rpcs::got_pdu(connection *c, char *b, size_t sz)
 {
         if(!reachable_){
-            jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
+            IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable");
             return true;
         }
 
@@ -440,8 +424,8 @@ rpcs::updatestat(proc_t proc)
             if(clt.second.size() > maxrep)
                 maxrep = clt.second.size();
         }
-        jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
-                        (int) reply_window_.size()-1, totalrep, maxrep);
+        IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
+                        totalrep << " max per client " << maxrep);
         curr_counts_ = counting_;
     }
 }
@@ -458,23 +442,21 @@ rpcs::dispatch(djob_t *j)
     proc_t proc = h.proc;
 
     if(!req.ok()){
-        jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
+        IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!");
         c->decref();
         return;
     }
 
-    jsl_log(JSL_DBG_2,
-            "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
-            h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
+    IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+                    dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
 
     marshall rep;
     reply_header rh(h.xid,0);
 
     // is client sending to an old instance of server?
     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
-        jsl_log(JSL_DBG_2,
-                "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
-                h.srv_nonce, nonce_, h.proc);
+        IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce <<
+                        " (current " << nonce_ << ") proc " << hex << h.proc);
         rh.ret = rpc_const::oldsrv_failure;
         rep.pack_reply_header(rh);
         c->send(rep.cstr(),rep.size());
@@ -486,10 +468,9 @@ rpcs::dispatch(djob_t *j)
     {
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
-            fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
-                proc);
+            cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl;
             c->decref();
-                        VERIFY(0);
+            VERIFY(0);
             return;
         }
 
@@ -508,9 +489,8 @@ rpcs::dispatch(djob_t *j)
             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
-                jsl_log(JSL_DBG_2,
-                        "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
-                        h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
+                IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid <<
+                                " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
             }
         }
 
@@ -542,10 +522,9 @@ rpcs::dispatch(djob_t *j)
 
             rh.ret = (*f)(req, rep);
             if (rh.ret == rpc_const::unmarshal_args_failure) {
-                fprintf(stderr, "rpcs::dispatch: failed to"
-                        " unmarshall the arguments. You are"
-                        " probably calling RPC 0x%x with wrong"
-                        " types of arguments.\n", proc);
+                cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " <<
+                        "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+                        "types of arguments." << endl;
                 VERIFY(0);
             }
             VERIFY(rh.ret >= 0);
@@ -553,9 +532,8 @@ rpcs::dispatch(djob_t *j)
             rep.pack_reply_header(rh);
             rep.take_buf(&b1,&sz1);
 
-            jsl_log(JSL_DBG_2,
-                    "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
-                    sz1, h.xid, proc, rh.ret, h.clt_nonce);
+            IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " <<
+                            h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
 
             if(h.clt_nonce > 0){
                 // only record replies for clients that require at-most-once logic
@@ -584,8 +562,7 @@ rpcs::dispatch(djob_t *j)
             c->send(b1, sz1);
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
-            jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
-                    h.xid, h.clt_nonce);
+            IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce);
             rh.ret = rpc_const::atmostonce_failure;
             rep.pack_reply_header(rh);
             c->send(rep.cstr(),rep.size());
@@ -674,7 +651,7 @@ rpcs::add_reply(unsigned int clt_nonce, int xid,
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
-        fprintf(stderr, "Could not find reply struct in add_reply");
+        cerr << "Could not find reply struct in add_reply" << endl;
         l.insert(it, reply_t(xid, b, sz));
     } else {
         *it = reply_t(xid, b, sz);
@@ -694,7 +671,7 @@ void rpcs::free_reply_window(void) {
 }
 
 int rpcs::rpcbind(unsigned int &r, int) {
-    jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
+    IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_);
     r = nonce_;
     return 0;
 }
@@ -838,7 +815,7 @@ sockaddr_in make_sockaddr(const string &host, const string &port) {
         struct hostent *hp = gethostbyname(host.c_str());
 
         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
-            fprintf(stderr, "cannot find host name %s\n", host.c_str());
+            cerr << "cannot find host name " << host << endl;
             exit(1);
         }
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
index c69d317..7217b25 100644 (file)
@@ -7,7 +7,6 @@
 #include <getopt.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include "jsl_log.h"
 
 #define NUM_CL 2
 
@@ -22,11 +21,11 @@ int port;
 // to simplify rpcs::reg(). a server process can have handlers
 // from multiple classes.
 class srv {
-       public:
-               int handle_22(string & r, const string a, const string b);
-               int handle_fast(int &r, const int a);
-               int handle_slow(int &r, const int a);
-               int handle_bigrep(string &r, const size_t a);
+    public:
+        int handle_22(string & r, const string a, const string b);
+        int handle_fast(int &r, const int a);
+        int handle_slow(int &r, const int a);
+        int handle_bigrep(string &r, const size_t a);
 };
 
 // a handler. a and b are arguments, r is the result.
@@ -39,395 +38,395 @@ class srv {
 int
 srv::handle_22(string &r, const string a, string b)
 {
-       r = a + b;
-       return 0;
+    r = a + b;
+    return 0;
 }
 
 int
 srv::handle_fast(int &r, const int a)
 {
-       r = a + 1;
-       return 0;
+    r = a + 1;
+    return 0;
 }
 
 int
 srv::handle_slow(int &r, const int a)
 {
-       usleep(random() % 5000);
-       r = a + 2;
-       return 0;
+    usleep(random() % 5000);
+    r = a + 2;
+    return 0;
 }
 
 int
 srv::handle_bigrep(string &r, const size_t len)
 {
-       r = string((size_t)len, 'x');
-       return 0;
+    r = string((size_t)len, 'x');
+    return 0;
 }
 
 srv service;
 
 void startserver()
 {
-       server = new rpcs((unsigned int)port);
-       server->reg(22, &srv::handle_22, &service);
-       server->reg(23, &srv::handle_fast, &service);
-       server->reg(24, &srv::handle_slow, &service);
-       server->reg(25, &srv::handle_bigrep, &service);
+    server = new rpcs((unsigned int)port);
+    server->reg(22, &srv::handle_22, &service);
+    server->reg(23, &srv::handle_fast, &service);
+    server->reg(24, &srv::handle_slow, &service);
+    server->reg(25, &srv::handle_bigrep, &service);
 }
 
 void
 testmarshall()
 {
-       marshall m;
-       request_header rh{1,2,3,4,5};
-       m.pack_req_header(rh);
-       VERIFY(m.size()==RPC_HEADER_SZ);
-       int i = 12345;
-       unsigned long long l = 1223344455L;
-       string s = "hallo....";
-       m << i;
-       m << l;
-       m << s;
-
-       char *b;
-       size_t sz;
-       m.take_buf(&b,&sz);
-       VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
-
-       unmarshall un(b,sz);
-       request_header rh1;
-       un.unpack_req_header(&rh1);
-       VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
-       int i1;
-       unsigned long long l1;
-       string s1;
-       un >> i1;
-       un >> l1;
-       un >> s1;
-       VERIFY(un.okdone());
-       VERIFY(i1==i && l1==l && s1==s);
+    marshall m;
+    request_header rh{1,2,3,4,5};
+    m.pack_req_header(rh);
+    VERIFY(m.size()==RPC_HEADER_SZ);
+    int i = 12345;
+    unsigned long long l = 1223344455L;
+    string s = "hallo....";
+    m << i;
+    m << l;
+    m << s;
+
+    char *b;
+    size_t sz;
+    m.take_buf(&b,&sz);
+    VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+
+    unmarshall un(b,sz);
+    request_header rh1;
+    un.unpack_req_header(&rh1);
+    VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
+    int i1;
+    unsigned long long l1;
+    string s1;
+    un >> i1;
+    un >> l1;
+    un >> s1;
+    VERIFY(un.okdone());
+    VERIFY(i1==i && l1==l && s1==s);
 }
 
 void
 client1(size_t cl)
 {
-       // test concurrency.
-       size_t which_cl = cl % NUM_CL;
-
-       for(int i = 0; i < 100; i++){
-               int arg = (random() % 2000);
-               string rep;
-               int ret = clients[which_cl]->call(25, rep, arg);
-               VERIFY(ret == 0);
-               if ((int)rep.size()!=arg)
-                       cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
-               VERIFY((int)rep.size() == arg);
-       }
-
-       // test rpc replies coming back not in the order of
-       // the original calls -- i.e. does xid reply dispatch work.
-       for(int i = 0; i < 100; i++){
-               int which = (random() % 2);
-               int arg = (random() % 1000);
-               int rep;
-
-               auto start = std::chrono::steady_clock::now();
-
-               int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
-               auto end = std::chrono::steady_clock::now();
-               auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
-               if (ret != 0)
-                       cout << diff << " ms have elapsed!!!" << endl;
-               VERIFY(ret == 0);
-               VERIFY(rep == (which ? arg+1 : arg+2));
-       }
+    // test concurrency.
+    size_t which_cl = cl % NUM_CL;
+
+    for(int i = 0; i < 100; i++){
+        int arg = (random() % 2000);
+        string rep;
+        int ret = clients[which_cl]->call(25, rep, arg);
+        VERIFY(ret == 0);
+        if ((int)rep.size()!=arg)
+            cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
+        VERIFY((int)rep.size() == arg);
+    }
+
+    // test rpc replies coming back not in the order of
+    // the original calls -- i.e. does xid reply dispatch work.
+    for(int i = 0; i < 100; i++){
+        int which = (random() % 2);
+        int arg = (random() % 1000);
+        int rep;
+
+        auto start = std::chrono::steady_clock::now();
+
+        int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+        auto end = std::chrono::steady_clock::now();
+        auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+        if (ret != 0)
+            cout << diff << " ms have elapsed!!!" << endl;
+        VERIFY(ret == 0);
+        VERIFY(rep == (which ? arg+1 : arg+2));
+    }
 }
 
 void
 client2(size_t cl)
 {
-       size_t which_cl = cl % NUM_CL;
-
-       time_t t1;
-       time(&t1);
-
-       while(time(0) - t1 < 10){
-               int arg = (random() % 2000);
-               string rep;
-               int ret = clients[which_cl]->call(25, rep, arg);
-               if ((int)rep.size()!=arg)
-                       cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
-               VERIFY((int)rep.size() == arg);
-       }
+    size_t which_cl = cl % NUM_CL;
+
+    time_t t1;
+    time(&t1);
+
+    while(time(0) - t1 < 10){
+        int arg = (random() % 2000);
+        string rep;
+        int ret = clients[which_cl]->call(25, rep, arg);
+        if ((int)rep.size()!=arg)
+            cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
+        VERIFY((int)rep.size() == arg);
+    }
 }
 
 void
 client3(void *xx)
 {
-       rpcc *c = (rpcc *) xx;
+    rpcc *c = (rpcc *) xx;
 
-       for(int i = 0; i < 4; i++){
-               int rep;
-               int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
-               VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
-       }
+    for(int i = 0; i < 4; i++){
+        int rep = 0;
+        int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
+        VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+    }
 }
 
 
 void
 simple_tests(rpcc *c)
 {
-       cout << "simple_tests" << endl;
-       // an RPC call to procedure #22.
-       // rpcc::call() looks at the argument types to decide how
-       // to marshall the RPC call packet, and how to unmarshall
-       // the reply packet.
-       string rep;
-       int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
-       VERIFY(intret == 0); // this is what handle_22 returns
-       VERIFY(rep == "hello goodbye");
-       cout << "   -- string concat RPC .. ok" << endl;
-
-       // small request, big reply (perhaps req via UDP, reply via TCP)
-       intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
-       VERIFY(intret == 0);
-       VERIFY(rep.size() == 70000);
-       cout << "   -- small request, big reply .. ok" << endl;
-
-       // specify a timeout value to an RPC that should succeed (udp)
-       int xx = 0;
-       intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
-       VERIFY(intret == 0 && xx == 78);
-       cout << "   -- no spurious timeout .. ok" << endl;
-
-       // specify a timeout value to an RPC that should succeed (tcp)
-       {
-               string arg(1000, 'x');
-               string rep2;
-               c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
-               VERIFY(rep2.size() == 1001);
-               cout << "   -- no spurious timeout .. ok" << endl;
-       }
-
-       // huge RPC
-       string big(1000000, 'x');
-       intret = c->call(22, rep, big, (string)"z");
-       VERIFY(rep.size() == 1000001);
-       cout << "   -- huge 1M rpc request .. ok" << endl;
-
-       // specify a timeout value to an RPC that should timeout (udp)
+    cout << "simple_tests" << endl;
+    // an RPC call to procedure #22.
+    // rpcc::call() looks at the argument types to decide how
+    // to marshall the RPC call packet, and how to unmarshall
+    // the reply packet.
+    string rep;
+    int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == 0); // this is what handle_22 returns
+    VERIFY(rep == "hello goodbye");
+    cout << "   -- string concat RPC .. ok" << endl;
+
+    // small request, big reply (perhaps req via UDP, reply via TCP)
+    intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
+    VERIFY(intret == 0);
+    VERIFY(rep.size() == 70000);
+    cout << "   -- small request, big reply .. ok" << endl;
+
+    // specify a timeout value to an RPC that should succeed (udp)
+    int xx = 0;
+    intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
+    VERIFY(intret == 0 && xx == 78);
+    cout << "   -- no spurious timeout .. ok" << endl;
+
+    // specify a timeout value to an RPC that should succeed (tcp)
+    {
+        string arg(1000, 'x');
+        string rep2;
+        c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
+        VERIFY(rep2.size() == 1001);
+        cout << "   -- no spurious timeout .. ok" << endl;
+    }
+
+    // huge RPC
+    string big(1000000, 'x');
+    intret = c->call(22, rep, big, (string)"z");
+    VERIFY(rep.size() == 1000001);
+    cout << "   -- huge 1M rpc request .. ok" << endl;
+
+    // specify a timeout value to an RPC that should timeout (udp)
     string non_existent = "127.0.0.1:7661";
-       rpcc *c1 = new rpcc(non_existent);
-       time_t t0 = time(0);
-       intret = c1->bind(rpcc::to(3000));
-       time_t t1 = time(0);
-       VERIFY(intret < 0 && (t1 - t0) <= 4);
-       cout << "   -- rpc timeout .. ok" << endl;
-       cout << "simple_tests OK" << endl;
+    rpcc *c1 = new rpcc(non_existent);
+    time_t t0 = time(0);
+    intret = c1->bind(rpcc::to(3000));
+    time_t t1 = time(0);
+    VERIFY(intret < 0 && (t1 - t0) <= 4);
+    cout << "   -- rpc timeout .. ok" << endl;
+    cout << "simple_tests OK" << endl;
 }
 
 void 
 concurrent_test(size_t nt)
 {
-       // create threads that make lots of calls in parallel,
-       // to test thread synchronization for concurrent calls
-       // and dispatches.
-       cout << "start concurrent_test (" << nt << " threads) ...";
+    // create threads that make lots of calls in parallel,
+    // to test thread synchronization for concurrent calls
+    // and dispatches.
+    cout << "start concurrent_test (" << nt << " threads) ...";
 
     vector<thread> th(nt);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i] = thread(client1, i);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       cout << " OK" << endl;
+    cout << " OK" << endl;
 }
 
 void 
 lossy_test()
 {
-       cout << "start lossy_test ...";
-       VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+    cout << "start lossy_test ...";
+    VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
 
-       if (server) {
-               delete server;
-               startserver();
-       }
+    if (server) {
+        delete server;
+        startserver();
+    }
 
-       for (int i = 0; i < NUM_CL; i++) {
-               delete clients[i];
-               clients[i] = new rpcc(dst);
-               VERIFY(clients[i]->bind()==0);
-       }
+    for (int i = 0; i < NUM_CL; i++) {
+        delete clients[i];
+        clients[i] = new rpcc(dst);
+        VERIFY(clients[i]->bind()==0);
+    }
 
-       size_t nt = 1;
+    size_t nt = 1;
 
     vector<thread> th(nt);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i] = thread(client2, i);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       cout << ".. OK" << endl;
-       VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
+    cout << ".. OK" << endl;
+    VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
 }
 
 void 
 failure_test()
 {
-       rpcc *client1;
-       rpcc *client = clients[0];
+    rpcc *client1;
+    rpcc *client = clients[0];
 
-       cout << "failure_test" << endl;
+    cout << "failure_test" << endl;
 
-       delete server;
+    delete server;
 
-       client1 = new rpcc(dst);
-       VERIFY (client1->bind(rpcc::to(3000)) < 0);
-       cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
+    client1 = new rpcc(dst);
+    VERIFY (client1->bind(rpcc::to(3000)) < 0);
+    cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
-       delete client1;
+    delete client1;
 
-       startserver();
+    startserver();
 
-       string rep;
-       int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
-       VERIFY(intret == rpc_const::oldsrv_failure);
-       cout << "   -- call recovered server with old client .. failed ok" << endl;
+    string rep;
+    int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == rpc_const::oldsrv_failure);
+    cout << "   -- call recovered server with old client .. failed ok" << endl;
 
-       delete client;
+    delete client;
 
-       clients[0] = client = new rpcc(dst);
-       VERIFY (client->bind() >= 0);
-       VERIFY (client->bind() < 0);
+    clients[0] = client = new rpcc(dst);
+    VERIFY (client->bind() >= 0);
+    VERIFY (client->bind() < 0);
 
-       intret = client->call(22, rep, (string)"hello", (string)" goodbye");
-       VERIFY(intret == 0);
-       VERIFY(rep == "hello goodbye");
+    intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == 0);
+    VERIFY(rep == "hello goodbye");
 
-       cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
+    cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
 
 
-       size_t nt = 10;
-       cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
+    size_t nt = 10;
+    cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
 
     vector<thread> th(nt);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i] = thread(client3, client);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       cout << "ok" << endl;
+    cout << "ok" << endl;
 
-       delete server;
-       delete client;
+    delete server;
+    delete client;
 
-       startserver();
-       clients[0] = client = new rpcc(dst);
-       VERIFY (client->bind() >= 0);
-       cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
+    startserver();
+    clients[0] = client = new rpcc(dst);
+    VERIFY (client->bind() >= 0);
+    cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
 
-       cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
+    cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i] = thread(client3, client);
 
-       for(size_t i = 0; i < nt; i++)
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       cout << "ok" << endl;
+    cout << "ok" << endl;
 
-       cout << "failure_test OK" << endl;
+    cout << "failure_test OK" << endl;
 }
 
 int
 main(int argc, char *argv[])
 {
 
-       setvbuf(stdout, NULL, _IONBF, 0);
-       setvbuf(stderr, NULL, _IONBF, 0);
-       int debug_level = 0;
-
-       bool isclient = false;
-       bool isserver = false;
-
-       srandom((uint32_t)getpid());
-       port = 20000 + (getpid() % 10000);
-
-       int ch = 0;
-       while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
-               switch (ch) {
-                       case 'c':
-                               isclient = true;
-                               break;
-                       case 's':
-                               isserver = true;
-                               break;
-                       case 'd':
-                               debug_level = atoi(optarg);
-                               break;
-                       case 'p':
-                               port = atoi(optarg);
-                               break;
-                       case 'l':
-                               VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+    setvbuf(stdout, NULL, _IONBF, 0);
+    setvbuf(stderr, NULL, _IONBF, 0);
+    int debug_level = 0;
+
+    bool isclient = false;
+    bool isserver = false;
+
+    srandom((uint32_t)getpid());
+    port = 20000 + (getpid() % 10000);
+
+    int ch = 0;
+    while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
+        switch (ch) {
+            case 'c':
+                isclient = true;
+                break;
+            case 's':
+                isserver = true;
+                break;
+            case 'd':
+                debug_level = atoi(optarg);
+                break;
+            case 'p':
+                port = atoi(optarg);
+                break;
+            case 'l':
+                VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+                break;
+            default:
                 break;
-                       default:
-                               break;
-               }
-       }
+        }
+    }
 
-       if (!isserver && !isclient)  {
-               isserver = isclient = true;
-       }
+    if (!isserver && !isclient)  {
+        isserver = isclient = true;
+    }
 
-       if (debug_level > 0) {
-               JSL_DEBUG_LEVEL = debug_level;
-               jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
-       }
+    if (debug_level > 0) {
+        DEBUG_LEVEL = debug_level;
+        IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+    }
 
-       testmarshall();
+    testmarshall();
 
-       if (isserver) {
-               cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
-               startserver();
-       }
+    if (isserver) {
+        cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+        startserver();
+    }
 
-       if (isclient) {
-               // server's address.
+    if (isclient) {
+        // server's address.
         dst = "127.0.0.1:" + std::to_string(port);
 
 
-               // start the client.  bind it to the server.
-               // starts a thread to listen for replies and hand them to
-               // the correct waiting caller thread. there should probably
-               // be only one rpcc per process. you probably need one
-               // rpcc per server.
-               for (int i = 0; i < NUM_CL; i++) {
-                       clients[i] = new rpcc(dst);
-                       VERIFY (clients[i]->bind() == 0);
-               }
+        // start the client.  bind it to the server.
+        // starts a thread to listen for replies and hand them to
+        // the correct waiting caller thread. there should probably
+        // be only one rpcc per process. you probably need one
+        // rpcc per server.
+        for (int i = 0; i < NUM_CL; i++) {
+            clients[i] = new rpcc(dst);
+            VERIFY (clients[i]->bind() == 0);
+        }
 
-               simple_tests(clients[0]);
-               concurrent_test(10);
-               lossy_test();
-               if (isserver) {
-                       failure_test();
-               }
+        simple_tests(clients[0]);
+        concurrent_test(10);
+        lossy_test();
+        if (isserver) {
+            failure_test();
+        }
 
-               cout << "rpctest OK" << endl;
+        cout << "rpctest OK" << endl;
 
-               exit(0);
-       }
+        exit(0);
+    }
 
-       while (1) {
-               sleep(1);
-       }
+    while (1) {
+        sleep(1);
+    }
 }
diff --git a/rsm.cc b/rsm.cc
index 00cae81..f12f9db 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -446,17 +446,18 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
-    LOG("joinreq: src " << m << " last (" << last.vid << "," << last.seqno << ") mylast (" <<
+    LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
             last_myvs.vid << "," << last_myvs.seqno << ")");
     if (cfg->ismember(m, vid_commit)) {
-        LOG("joinreq: is still a member");
+        LOG(m << " is still a member -- nothing to do");
         log = cfg->dump();
     } else if (cfg->myaddr() != primary) {
-        LOG("joinreq: busy");
+        LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
         ret = rsm_protocol::BUSY;
     } else {
         // We cache vid_commit to avoid adding m to a view which already contains
         // m due to race condition
+        LOG("calling down to config layer");
         unsigned vid_cache = vid_commit;
         bool succ;
         {
@@ -466,9 +467,9 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
         }
         if (cfg->ismember(m, cfg->view_id())) {
             log = cfg->dump();
-            LOG("joinreq: ret " << ret << " log " << log);
+            LOG("ret " << ret << " log " << log);
         } else {
-            LOG("joinreq: failed; proposer couldn't add " << succ);
+            LOG("failed; proposer couldn't add " << succ);
             ret = rsm_protocol::BUSY;
         }
     }
@@ -486,7 +487,7 @@ rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
     r = m;
-    LOG("rsm::client_members return " << print_members(m) << " m " << primary);
+    LOG("rsm::client_members return " << m << " m " << primary);
     return rsm_client_protocol::OK;
 }
 
diff --git a/rsm.h b/rsm.h
index f2eb5bd..18ac8af 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -61,7 +61,6 @@ class rsm : public config_view_change {
         bool statetransferdone(string m, lock & rsm_mutex_lock);
         bool join(string m, lock & rsm_mutex_lock);
         void set_primary(unsigned vid);
-        string find_highest(viewstamp &vs, string &m, unsigned &vid);
         bool sync_with_backups(lock & rsm_mutex_lock);
         bool sync_with_primary(lock & rsm_mutex_lock);
         void net_repair(bool heal, lock & rsm_mutex_lock);
index bb21f24..4a80f60 100644 (file)
@@ -40,19 +40,19 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
     if (intret < 0) return intret;
     u >> res;
     if (!u.okdone()) {
-        fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
-                "You probably forgot to set the reply string in "
-                "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
-                "type\n", proc);
+        cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
+        cerr << "You probably forgot to set the reply string in "
+                "rsm::client_invoke, or you may have called RPC 0x" << hex <<
+                proc << " with the wrong return type" << endl;
         VERIFY(0);
         return rpc_const::unmarshal_reply_failure;
     }
     unmarshall u1(res);
     u1 >> r;
     if(!u1.okdone()) {
-        fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
-                "You are probably calling RPC 0x%x with wrong return "
-                "type.\n", proc);
+        cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
+        cerr << "You are probably calling RPC 0x" << hex << proc <<
+                " with the wrong return type." << endl;
         VERIFY(0);
         return rpc_const::unmarshal_reply_failure;
     }
index 53908f3..1601bcb 100644 (file)
@@ -18,8 +18,13 @@ struct viewstamp {
     unsigned int vid;
     unsigned int seqno;
     inline void operator++(int) { seqno++; }
+
+    MEMBERS(vid, seqno)
+    LEXICOGRAPHIC_COMPARISON(viewstamp)
 };
 
+MARSHALLABLE(viewstamp)
+
 class rsm_protocol {
     public:
         enum status : status_t { OK, ERR, BUSY};
@@ -33,28 +38,12 @@ class rsm_protocol {
         struct transferres {
             string state;
             viewstamp last;
+
+            MEMBERS(state, last)
         };
 };
 
-inline bool operator==(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) == tie(b.vid, b.seqno); }
-inline bool operator>(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) > tie(b.vid, b.seqno); }
-inline bool operator!=(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) != tie(b.vid, b.seqno); }
-
-inline marshall& operator<<(marshall &m, viewstamp v) {
-    return m << v.vid << v.seqno;
-}
-
-inline unmarshall& operator>>(unmarshall &u, viewstamp &v) {
-    return u >> v.vid >> v.seqno;
-}
-
-inline marshall & operator<<(marshall &m, rsm_protocol::transferres r) {
-    return m << r.state << r.last;
-}
-
-inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) {
-    return u >> r.state >> r.last;
-}
+MARSHALLABLE(rsm_protocol::transferres)
 
 class rsm_test_protocol {
     public:
index cb3ce8c..3965fa9 100644 (file)
@@ -21,5 +21,3 @@ rsm_test_protocol::status rsmtest_client::breakpoint(int b) {
     VERIFY (ret == rsm_test_protocol::OK);
     return r;
 }
-
-
diff --git a/start.sh b/start.sh
deleted file mode 100755 (executable)
index d3cf93b..0000000
--- a/start.sh
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/usr/bin/env bash
-
-ulimit -c unlimited
-
-NUM_LS=${1:-0}
-
-BASE_PORT=$RANDOM
-BASE_PORT=$[BASE_PORT+2000]
-LOCK_PORT=$[BASE_PORT+6]
-
-if [ $NUM_LS -gt 1 ]; then
-    x=0
-    rm config
-    while [ $x -lt $NUM_LS ]; do
-      port=$[LOCK_PORT+2*x]
-      x=$[x+1]
-      echo $port >> config
-    done
-    x=0
-    while [ $x -lt $NUM_LS ]; do
-      port=$[LOCK_PORT+2*x]
-      x=$[x+1]
-      echo "starting ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &"
-      ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &
-      sleep 1
-    done
-else
-    echo "starting ./lock_server $LOCK_PORT > lock_server.log 2>&1 &"
-    ./lock_server $LOCK_PORT > lock_server.log 2>&1 &
-    sleep 1
-fi
diff --git a/stop.sh b/stop.sh
deleted file mode 100755 (executable)
index 2d0a1f6..0000000
--- a/stop.sh
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/usr/bin/env bash
-
-pkill -u $USER lock_server
index 6a213b1..c44266e 100644 (file)
@@ -5,3 +5,4 @@ map<thread::id, int> thread_name_map;
 int next_thread_num = 0;
 map<void *, int> instance_name_map;
 int next_instance_num = 0;
+int DEBUG_LEVEL = 0;
index ebb2222..706e2b7 100644 (file)
@@ -11,7 +11,7 @@ extern int next_instance_num;
 extern char log_thread_prefix;
 
 namespace std {
-    // This... is an awful hack.  But sticking this in std:: makes it possible for
+    // This is an awful hack.  But sticking this in std:: makes it possible for
     // ostream_iterator to use it.
     template <class A, class B>
     ostream & operator<<(ostream &o, const pair<A,B> &d) {
@@ -22,11 +22,7 @@ namespace std {
 template <class A>
 typename enable_if<is_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
 operator<<(ostream &o, const A &a) {
-    o << "[";
-    auto oit = ostream_iterator<typename A::value_type>(o, ", ");
-    copy(a.begin(), a.end(), oit);
-    o << "]";
-    return o;
+    return o << "[" << implode(a, ", ") << "]";
 }
 
 #define LOG_PREFIX { \
@@ -58,4 +54,8 @@ operator<<(ostream &o, const A &a) {
     cerr << _x_ << endl; \
 }
 
+extern int DEBUG_LEVEL;
+
+#define IF_LEVEL(level) if(DEBUG_LEVEL >= abs(level))
+
 #endif
diff --git a/types.h b/types.h
index 9739a2a..e6b5895 100644 (file)
--- a/types.h
+++ b/types.h
@@ -107,7 +107,42 @@ template<class A> struct is_iterable<A,
     decltype(declval<A&>().cbegin(), declval<A&>().cend(), void())
 > : true_type {};
 
+template <class C>
+inline typename enable_if<is_iterable<C>::value, string>::type
+implode(const C & v, string delim=" ") {
+    if (v.begin() == v.end())
+        return string();
+    ostringstream oss;
+    auto last = prev(v.end());
+    copy(v.begin(), last, ostream_iterator<typename C::value_type>(oss, delim.c_str()));
+    oss << *last;
+    return oss.str();
+}
+
+inline vector<string> explode(const string &s, string delim=" ") {
+    vector<string> out;
+    size_t start = 0, end = 0;
+    while ((end = s.find(delim, start)) != string::npos) {
+        out.push_back(s.substr(start, end - start));
+        start = end + 1;
+    }
+    out.push_back(s.substr(start));
+    return out;
+}
+
 #include "lang/verify.h"
 #include "threaded_log.h"
 
+#define MEMBERS(...) \
+inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \
+inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); }
+
+#define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \
+inline bool operator _op_(const _c_ &b) const { return _tuple_() _op_ b._tuple_(); }
+
+#define LEXICOGRAPHIC_COMPARISON(_c_) \
+LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \
+LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \
+LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
+
 #endif