More clean-ups and cool template stuff
authorPeter Iannucci <iannucci@mit.edu>
Thu, 26 Sep 2013 23:20:14 +0000 (19:20 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 26 Sep 2013 23:27:52 +0000 (19:27 -0400)
32 files changed:
Makefile
Makefile.osx
config.cc
config.h
handle.cc
handle.h
lock.h
lock_client.cc
lock_client.h
lock_demo.cc
lock_protocol.h
lock_server.cc
lock_smain.cc
lock_tester.cc
log.cc
paxos.cc
paxos.h
paxos_protocol.h
rpc/marshall.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm.h
rsm_client.cc
rsm_client.h
rsm_protocol.h
rsm_tester.cc
rsmtest_client.cc
rsmtest_client.h
threaded_log.cc [moved from tprintf.cc with 88% similarity]
threaded_log.h [moved from tprintf.h with 88% similarity]

index 6bd396f..3da8e03 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -11,18 +11,18 @@ rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_lo
        ar cq $@ $^
        ranlib rpc/librpc.a
 
-rpc/rpctest: rpc/rpctest.o tprintf.o rpc/librpc.a
+rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
 
-lock_demo=lock_demo.o lock_client.o tprintf.o rsm_client.o handle.o
+lock_demo=lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o
 lock_demo : $(lock_demo) rpc/librpc.a
 
-lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o
+lock_tester=lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o
 lock_tester : $(lock_tester) rpc/librpc.a
 
-lock_server=lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server.o
+lock_server=lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o
 lock_server : $(lock_server) rpc/librpc.a
 
-rsm_tester=rsm_tester.o rsmtest_client.o tprintf.o
+rsm_tester=rsm_tester.o rsmtest_client.o threaded_log.o
 rsm_tester: $(rsm_tester) rpc/librpc.a
 
 %.o: %.cc
index 75f2e2b..91ce5ee 100644 (file)
@@ -1,5 +1,5 @@
-#PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations
-#PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors
+PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations
+PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors
 CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY)
 LDFLAGS = -stdlib=libc++
 CXX = clang++
index 9319abe..7bac4a9 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,20 +1,11 @@
 #include <thread>
 #include <sstream>
-#include <iostream>
-#include <stdio.h>
 #include "config.h"
 #include "paxos.h"
 #include "handle.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 #include "lang/verify.h"
 
-using namespace std::chrono;
-using std::string;
-using std::vector;
-using std::thread;
-using std::ostringstream;
-using std::istringstream;
-
 // The config module maintains views. As a node joins or leaves a
 // view, the next view will be the same as previous view, except with
 // the new node added or removed. The first view contains only node
@@ -47,277 +38,193 @@ using std::istringstream;
 // all views, the other nodes can bring this re-joined node up to
 // date.
 
-config::config(
-        const string &_first,
-        const string &_me,
-        config_view_change *_vc)
-    : my_view_id(0), first(_first), me(_me), vc(_vc)
+config::config(const string &_first, const string &_me, config_view_change *_vc)
+    : my_view_id(0), first(_first), me(_me), vc(_vc),
+      paxos_acceptor(this, me == _first, me, me),
+      paxos_proposer(this, &paxos_acceptor, me)
 {
-    paxos_acceptor = new acceptor(this, me == _first, me, me);
-    paxos_proposer = new proposer(this, paxos_acceptor, me);
-
-    // XXX hack; maybe should have its own port number
-    paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
-
-    {
-        lock ml(cfg_mutex);
-        reconstruct(ml);
-        thread(&config::heartbeater, this).detach();
-    }
+    get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
+    lock cfg_mutex_lock(cfg_mutex);
+    reconstruct(cfg_mutex_lock);
+    thread(&config::heartbeater, this).detach();
 }
 
-void
-config::restore(const string &s)
-{
-    lock ml(cfg_mutex);
-    paxos_acceptor->restore(s);
-    reconstruct(ml);
+void config::restore(const string &s) {
+    lock cfg_mutex_lock(cfg_mutex);
+    paxos_acceptor.restore(s);
+    reconstruct(cfg_mutex_lock);
 }
 
-void
-config::get_view(unsigned instance, vector<string> &m)
-{
-    lock ml(cfg_mutex);
-    get_view(instance, m, ml);
+void config::get_view(unsigned instance, vector<string> &m) {
+    lock cfg_mutex_lock(cfg_mutex);
+    get_view(instance, m, cfg_mutex_lock);
 }
 
-// caller should hold cfg_mutex
-void
-config::get_view(unsigned instance, vector<string> &m, lock &)
-{
-    string value = paxos_acceptor->value(instance);
-    tprintf("get_view(%d): returns %s\n", instance, value.c_str());
-    members(value, m);
+void config::get_view(unsigned instance, vector<string> &m, lock &) {
+    string value = paxos_acceptor.value(instance);
+    LOG("get_view(" << instance << "): returns " << value);
+    m = members(value);
 }
 
-void
-config::members(const string &value, vector<string> &view) const
-{
+vector<string> config::members(const string &value) const {
     istringstream ist(value);
-    string m;
-    view.clear();
-    while (ist >> m)
-        view.push_back(m);
+    using it = istream_iterator<string>;
+    return {it(ist), it()};
 }
 
-string
-config::value(const vector<string> &m) const
-{
+string config::value(const vector<string> &m) const {
     ostringstream ost;
-    for (unsigned i = 0; i < m.size(); i++)  {
-        ost << m[i];
-        ost << " ";
-    }
+    copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
     return ost.str();
 }
 
-void
-config::reconstruct(lock &cfg_mutex_lock)
-{
+void config::reconstruct(lock &cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
-    if (paxos_acceptor->instance() > 0) {
-        my_view_id = paxos_acceptor->instance();
+    if (paxos_acceptor.instance() > 0) {
+        my_view_id = paxos_acceptor.instance();
         get_view(my_view_id, mems, cfg_mutex_lock);
-        tprintf("config::reconstruct: %d %s\n",
-                my_view_id, print_members(mems).c_str());
+        LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
     }
 }
 
 // Called by Paxos's acceptor.
-void
-config::paxos_commit(unsigned instance, const string &value)
-{
-    vector<string> newmem;
-    lock ml(cfg_mutex);
+void config::paxos_commit(unsigned instance, const string &value) {
+    lock cfg_mutex_lock(cfg_mutex);
 
-    members(value, newmem);
-    tprintf("config::paxos_commit: %d: %s\n", instance,
-                 print_members(newmem).c_str());
+    vector<string> newmem = members(value);
+    LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
 
-    for (unsigned i = 0; i < mems.size(); i++) {
-        tprintf("config::paxos_commit: is %s still a member?\n",
-                mems[i].c_str());
-        if (!isamember(mems[i], newmem) && me != mems[i]) {
-            tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
-            mgr.delete_handle(mems[i]);
+    for (auto mem : mems) {
+        LOG("config::paxos_commit: is " << mem << " still a member?");
+        if (!isamember(mem, newmem) && me != mem) {
+            LOG("config::paxos_commit: delete " << mem);
+            invalidate_handle(mem);
         }
     }
 
     mems = newmem;
     my_view_id = instance;
     if (vc) {
-        ml.unlock();
+        cfg_mutex_lock.unlock();
         vc->commit_change(instance);
-        ml.lock();
+        cfg_mutex_lock.lock();
     }
 }
 
-bool
-config::ismember(const string &m, unsigned vid)
-{
-    lock ml(cfg_mutex);
+bool config::ismember(const string &m, unsigned vid) {
+    lock cfg_mutex_lock(cfg_mutex);
     vector<string> v;
-    get_view(vid, v, ml);
+    get_view(vid, v, cfg_mutex_lock);
     return isamember(m, v);
 }
 
-bool
-config::add(const string &new_m, unsigned vid)
-{
-    vector<string> m;
-    vector<string> curm;
-    lock ml(cfg_mutex);
+bool config::add(const string &new_m, unsigned vid) {
+    lock cfg_mutex_lock(cfg_mutex);
     if (vid != my_view_id)
         return false;
-    tprintf("config::add %s\n", new_m.c_str());
-    m = mems;
+    LOG("config::add " << new_m);
+    vector<string> m = mems;
     m.push_back(new_m);
-    curm = mems;
-    string v = value(m);
+    vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
-    bool r;
-    {
-        ml.unlock();
-        r = paxos_proposer->run(nextvid, curm, v);
-        ml.lock();
-    }
-    tprintf("config::add: proposer returned %s\n",
-            r ? "success" : "failure");
+    cfg_mutex_lock.unlock();
+    bool r = paxos_proposer.run(nextvid, cmems, value(m));
+    cfg_mutex_lock.lock();
+    LOG("config::add: proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
 // caller should hold cfg_mutex
-bool
-config::remove(const string &m)
-{
-    adopt_lock ml(cfg_mutex);
-    tprintf("config::remove: my_view_id %d remove? %s\n",
-            my_view_id, m.c_str());
+bool config::remove(const string &m, lock &cfg_mutex_lock) {
+    LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
     vector<string> n;
-    for (unsigned i = 0; i < mems.size(); i++) {
-        if (mems[i] != m)
-            n.push_back(mems[i]);
+    for (auto mem : mems) {
+        if (mem != m)
+            n.push_back(mem);
     }
-    string v = value(n);
     vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
-    bool r;
-    {
-        ml.unlock();
-        r = paxos_proposer->run(nextvid, cmems, v);
-        ml.lock();
-    }
-    tprintf("config::remove: proposer returned %s\n",
-            r ? "success" : "failure");
+    cfg_mutex_lock.unlock();
+    bool r = paxos_proposer.run(nextvid, cmems, value(n));
+    cfg_mutex_lock.lock();
+    LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
-void
-config::heartbeater() [[noreturn]]
-{
-    string m;
-    heartbeat_t h;
-    bool stable;
-    unsigned vid;
-    vector<string> cmems;
-    lock ml(cfg_mutex);
+void config::heartbeater() [[noreturn]] {
+    lock cfg_mutex_lock(cfg_mutex);
 
     while (1) {
         auto next_timeout = steady_clock::now() + seconds(3);
-        tprintf("heartbeater: go to sleep\n");
-        config_cond.wait_until(ml, next_timeout);
+        LOG("heartbeater: go to sleep");
+        config_cond.wait_until(cfg_mutex_lock, next_timeout);
 
-        stable = true;
-        vid = my_view_id;
-        get_view(vid, cmems, ml);
-        tprintf("heartbeater: current membership %s\n",
-                print_members(cmems).c_str());
+        unsigned vid = my_view_id;
+        vector<string> cmems;
+        get_view(vid, cmems, cfg_mutex_lock);
+        LOG("heartbeater: current membership " << print_members(cmems));
 
         if (!isamember(me, cmems)) {
-            tprintf("heartbeater: not member yet; skip hearbeat\n");
+            LOG("heartbeater: not member yet; skip hearbeat");
             continue;
         }
 
         // who has the smallest ID?
-        m = me;
-        for (unsigned i = 0; i < cmems.size(); i++) {
-            if (m > cmems[i])
-                m = cmems[i];
-        }
+        string m = min(me, *min_element(cmems.begin(), cmems.end()));
 
         if (m == me) {
             // ping the other nodes
-            for (unsigned i = 0; i < cmems.size(); i++) {
-                if (cmems[i] != me) {
-                    if ((h = doheartbeat(cmems[i])) != OK) {
-                        stable = false;
-                        m = cmems[i];
-                        break;
-                    }
-                }
+            for (string mem : cmems) {
+                if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
+                    continue;
+                if (vid == my_view_id)
+                    remove(mem, cfg_mutex_lock);
+                break;
             }
         } else {
             // ping the node with the smallest ID
-            if ((h = doheartbeat(m)) != OK)
-                stable = false;
-        }
-
-        if (!stable && vid == my_view_id) {
-            remove(m);
+            if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
+                remove(m, cfg_mutex_lock);
         }
     }
 }
 
-paxos_protocol::status
-config::heartbeat(int &r, string m, unsigned vid)
-{
-    lock ml(cfg_mutex);
-    int ret = paxos_protocol::ERR;
+paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+    lock cfg_mutex_lock(cfg_mutex);
     r = (int) my_view_id;
-    tprintf("heartbeat from %s(%d) my_view_id %d\n",
-            m.c_str(), vid, my_view_id);
-    if (vid == my_view_id) {
-        ret = paxos_protocol::OK;
-    } else if (paxos_proposer->isrunning()) {
+    LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+    if (vid == my_view_id)
+        return paxos_protocol::OK;
+    else if (paxos_proposer.isrunning()) {
         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
-        ret = paxos_protocol::OK;
-    } else {
-        ret = paxos_protocol::ERR;
+        return paxos_protocol::OK;
     }
-    return ret;
+    return paxos_protocol::ERR;
 }
 
-config::heartbeat_t
-config::doheartbeat(const string &m)
-{
-    adopt_lock ml(cfg_mutex);
-    int ret = rpc_const::timeout_failure;
-    int r = 0;
+config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     unsigned vid = my_view_id;
-    heartbeat_t res = OK;
-
-    tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+    LOG("doheartbeater to " << m << " (" << vid << ")");
     handle h(m);
-    {
-        ml.unlock();
-        rpcc *cl = h.safebind();
-        if (cl) {
-            ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
-        }
-        ml.lock();
-    }
-    if (ret != paxos_protocol::OK) {
-        if (ret == rpc_const::atmostonce_failure ||
-            ret == rpc_const::oldsrv_failure) {
-            mgr.delete_handle(m);
-        } else {
-            tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
-                         m.c_str(), ret, vid, r);
-            if (ret < 0) res = FAILURE;
-            else res = VIEWERR;
-        }
+
+    cfg_mutex_lock.unlock();
+    int r = 0, ret = rpc_const::bind_failure;
+    if (rpcc *cl = h.safebind())
+        ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
+    cfg_mutex_lock.lock();
+
+    heartbeat_t res = OK;
+    switch (ret) {
+        case paxos_protocol::OK:
+            break;
+        case rpc_const::atmostonce_failure:
+        case rpc_const::oldsrv_failure:
+            invalidate_handle(m);
+            break;
+        default:
+            LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+            res = (ret < 0) ? FAILURE : VIEWERR;
     }
-    tprintf("doheartbeat done %d\n", res);
+    LOG("doheartbeat done " << res);
     return res;
 }
-
index fcf1566..074cbe9 100644 (file)
--- a/config.h
+++ b/config.h
@@ -6,6 +6,19 @@
 #include "paxos.h"
 #include "lock.h"
 
+using std::chrono::steady_clock;
+using std::chrono::seconds;
+using std::string;
+using std::vector;
+using std::thread;
+using std::ostringstream;
+using std::istringstream;
+using std::ostream_iterator;
+using std::istream_iterator;
+using std::copy;
+using std::min;
+using std::min_element;
+
 class config_view_change {
     public:
         virtual void commit_change(unsigned view_id) = 0;
@@ -14,42 +27,41 @@ class config_view_change {
 
 class config : public paxos_change {
     private:
-        acceptor *paxos_acceptor;
-        proposer *paxos_proposer;
         unsigned my_view_id;
-        std::string first;
-        std::string me;
+        string first;
+        string me;
         config_view_change *vc;
-        std::vector<std::string> mems;
+        acceptor paxos_acceptor;
+        proposer paxos_proposer;
+        vector<string> mems;
         mutex cfg_mutex;
-        std::condition_variable config_cond;
-        paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance);
-        std::string value(const std::vector<std::string> &mems) const;
-        void members(const std::string &v, std::vector<std::string> &m) const;
-        void get_view(unsigned instance, std::vector<std::string> &m, lock &cfg_mutex_lock);
-        bool remove(const std::string &);
+        cond config_cond;
+        paxos_protocol::status heartbeat(int &r, string m, unsigned instance);
+        string value(const vector<string> &mems) const;
+        vector<string> members(const string &v) const;
+        void get_view(unsigned instance, vector<string> &m, lock &cfg_mutex_lock);
+        bool remove(const string &, lock &cfg_mutex_lock);
         void reconstruct(lock &cfg_mutex_lock);
         typedef enum {
             OK,        // response and same view #
             VIEWERR,   // response but different view #
             FAILURE,   // no response
         } heartbeat_t;
-        heartbeat_t doheartbeat(const std::string &m);
+        heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock);
     public:
-        config(const std::string &_first,
-               const std::string &_me,
-               config_view_change *_vc);
+        config(const string &_first, const string &_me, config_view_change *_vc);
         unsigned view_id() { return my_view_id; }
-        const std::string &myaddr() const { return me; }
-        std::string dump() { return paxos_acceptor->dump(); }
-        void get_view(unsigned instance, std::vector<std::string> &m);
-        void restore(const std::string &s);
-        bool add(const std::string &, unsigned view_id);
-        bool ismember(const std::string &m, unsigned view_id);
+        const string &myaddr() const { return me; }
+        string dump() { return paxos_acceptor.dump(); }
+        void get_view(unsigned instance, vector<string> &m);
+        void restore(const string &s);
+        bool add(const string &, unsigned view_id);
+        bool ismember(const string &m, unsigned view_id);
         void heartbeater(void);
-        void paxos_commit(unsigned instance, const std::string &v);
-        rpcs *get_rpcs() { return paxos_acceptor->get_rpcs(); }
-        void breakpoint(int b) { paxos_proposer->breakpoint(b); }
+        void paxos_commit(unsigned instance, const string &v);
+        // XXX hack; maybe should have its own port number
+        rpcs *get_rpcs() { return paxos_acceptor.get_rpcs(); }
+        void breakpoint(int b) { paxos_proposer.breakpoint(b); }
 };
 
 #endif
index ff38a56..d048ead 100644 (file)
--- a/handle.cc
+++ b/handle.cc
 #include "handle.h"
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
 #include "lock.h"
+#include <map>
 
-handle_mgr mgr;
+using std::map;
 
-handle::handle(std::string m) 
-{
-    h = mgr.get_handle(m);
-}
+class hinfo {
+public:
+    rpcc *cl = nullptr;
+    int refcnt = 0;
+    bool del = false;
+    string m;
+    mutex client_mutex;
+    hinfo(const string & m_) : m(m_) {}
+};
+
+class handle_mgr {
+    private:
+        mutex mgr_mutex;
+        map<string, hinfo *> hmap;
+        void delete_handle(const string & m, lock & handle_mutex_lock);
+    public:
+        hinfo *acquire_handle(string m);
+        void release_handle(hinfo *h);
+        void delete_handle(const string & m);
+};
+
+static handle_mgr mgr;
 
-rpcc *
-handle::safebind()
-{
+handle::handle(const string & m) : h(mgr.acquire_handle(m)) {}
+
+rpcc * handle::safebind() {
     if (!h)
-        return NULL;
-    lock ml(h->cl_mutex);
+        return nullptr;
+    lock ml(h->client_mutex);
     if (h->del)
-        return NULL;
+        return nullptr;
     if (h->cl)
         return h->cl;
-    sockaddr_in dstsock;
-    make_sockaddr(h->m.c_str(), &dstsock);
-    rpcc *cl = new rpcc(dstsock);
-    tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
-    int ret;
+    rpcc *cl = new rpcc(h->m);
+    LOG("handler_mgr::acquire_handle trying to bind..." << h->m);
     // The test script assumes that the failure can be detected by paxos and
     // rsm layer within few seconds. We have to set the timeout with a small
     // value to support the assumption.
     // 
     // With RPC_LOSSY=5, tests may fail due to delays and time outs.
-    ret = cl->bind(rpcc::to(1000));
+    int ret = cl->bind(rpcc::to(1000));
     if (ret < 0) {
-        tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
+        LOG("handle_mgr::acquire_handle bind failure! " << h->m << " " << ret);
         delete cl;
         h->del = true;
     } else {
-        tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
+        LOG("handle_mgr::acquire_handle bind succeeded " << h->m);
         h->cl = cl;
     }
     return h->cl;
 }
 
-handle::~handle() 
-{
-    if (h) mgr.done_handle(h);
-}
-
-handle_mgr::handle_mgr()
-{
+handle::~handle() {
+    if (h) mgr.release_handle(h);
 }
 
-struct hinfo *
-handle_mgr::get_handle(std::string m)
-{
-    lock ml(handle_mutex);
-    struct hinfo *h = 0;
+hinfo * handle_mgr::acquire_handle(string m) {
+    lock ml(mgr_mutex);
+    hinfo *h = nullptr;
     if (hmap.find(m) == hmap.end()) {
-        h = new hinfo;
-        h->cl = NULL;
-        h->del = false;
-        h->refcnt = 1;
-        h->m = m;
+        h = new hinfo(m);
         hmap[m] = h;
     } else if (!hmap[m]->del) {
         h = hmap[m];
-        h->refcnt ++;
     }
+    h->refcnt++;
     return h;
 }
 
-void 
-handle_mgr::done_handle(struct hinfo *h)
-{
-    lock ml(handle_mutex);
-    h->refcnt--;
-    if (h->refcnt == 0 && h->del)
-        delete_handle_wo(h->m);
+void handle_mgr::release_handle(hinfo *h) {
+    lock ml(mgr_mutex);
+    if (--h->refcnt == 0 && h->del)
+        delete_handle(h->m, ml);
 }
 
-void
-handle_mgr::delete_handle(std::string m)
-{
-    lock ml(handle_mutex);
-    delete_handle_wo(m);
+void handle_mgr::delete_handle(const string & m) {
+    lock ml(mgr_mutex);
+    delete_handle(m, ml);
 }
 
-// Must be called with handle_mutex locked.
-void
-handle_mgr::delete_handle_wo(std::string m)
-{
+void handle_mgr::delete_handle(const string & m, lock &) {
     if (hmap.find(m) == hmap.end()) {
-        tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
-    } else {
-        tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
-                hmap[m]->refcnt);
-        struct hinfo *h = hmap[m];
-        if (h->refcnt == 0) {
-            if (h->cl) {
-                h->cl->cancel();
-                delete h->cl;
-            }
-            hmap.erase(m);
-            delete h;
-        } else {
-            h->del = true;
-        }
+        LOG("handle_mgr::delete_handle: cl " << m << " isn't in cl list");
+        return;
     }
+    LOG("handle_mgr::delete_handle: cl " << m << " refcnt " << hmap[m]->refcnt);
+    hinfo *h = hmap[m];
+    if (h->refcnt == 0) {
+        if (h->cl) {
+            h->cl->cancel();
+            delete h->cl;
+        }
+        hmap.erase(m);
+        delete h;
+    } else
+        h->del = true;
+}
+
+void invalidate_handle(const string & m) {
+    mgr.delete_handle(m);
 }
index 6b042fb..a06b156 100644 (file)
--- a/handle.h
+++ b/handle.h
 #ifndef handle_h
 #define handle_h
 
-#include <string>
-#include <vector>
 #include "rpc/rpc.h"
+#include <string>
 
-struct hinfo {
-  rpcc *cl;
-  int refcnt;
-  bool del;
-  std::string m;
-  std::mutex cl_mutex;
-};
+using std::string;
+
+class hinfo;
 
 class handle {
     private:
-        struct hinfo *h;
+        hinfo *h;
     public:
-        handle(std::string m);
+        handle(const string & m);
         ~handle();
         /* safebind will try to bind with the rpc server on the first call.
          * Since bind may block, the caller probably should not hold a mutex
@@ -62,18 +57,6 @@ class handle {
         rpcc *safebind();
 };
 
-class handle_mgr {
-    private:
-        std::mutex handle_mutex;
-        std::map<std::string, struct hinfo *> hmap;
-    public:
-        handle_mgr();
-        struct hinfo *get_handle(std::string m);
-        void done_handle(struct hinfo *h);
-        void delete_handle(std::string m);
-        void delete_handle_wo(std::string m);
-};
-
-extern class handle_mgr mgr;
+void invalidate_handle(const string & m);
 
 #endif
diff --git a/lock.h b/lock.h
index 1789ec3..1d62c39 100644 (file)
--- a/lock.h
+++ b/lock.h
@@ -8,13 +8,4 @@ using std::mutex;
 using lock = std::unique_lock<std::mutex>;
 using cond = std::condition_variable;
 
-class adopt_lock : public lock {
-public:
-    explicit inline adopt_lock(class mutex &m) : std::unique_lock<std::mutex>(m, std::adopt_lock) {
-    }
-    inline ~adopt_lock() {
-        release();
-    }
-};
-
 #endif
index 035d80b..22e57f1 100644 (file)
@@ -2,29 +2,16 @@
 
 #include "lock_client.h"
 #include "rpc/rpc.h"
-#include <sstream>
-#include <iostream>
 #include <algorithm>
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
 #include <arpa/inet.h>
 
 #include "rsm_client.h"
 #include "lock.h"
 
-using std::ostringstream;
-
-lock_state::lock_state():
-    state(none)
-{
-}
-
-void lock_state::wait() {
+void lock_state::wait(lock & mutex_lock) {
     auto self = std::this_thread::get_id();
-    {
-        adopt_lock ml(m);
-        c[self].wait(ml);
-    }
+    c[self].wait(mutex_lock);
     c.erase(self);
 }
 
@@ -34,46 +21,34 @@ void lock_state::signal() {
         c.begin()->second.notify_one();
 }
 
-void lock_state::signal(std::thread::id who) {
+void lock_state::signal(thread::id who) {
     if (c.count(who))
         c[who].notify_one();
 }
 
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
 unsigned int lock_client::last_port = 0;
 
 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
-    // by the semantics of std::map, this will create
-    // the lock if it doesn't already exist
-    return lock_table[lid];
+    return lock_table[lid]; // creates the lock if it doesn't already exist
 }
 
-lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) {
-    sockaddr_in dstsock;
-    make_sockaddr(xdst.c_str(), &dstsock);
-    cl = new rpcc(dstsock);
-    if (cl->bind() < 0) {
+lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) {
+    cl = new rpcc(xdst);
+    if (cl->bind() < 0)
         LOG("lock_client: call bind");
-    }
 
     srandom((uint32_t)time(NULL)^last_port);
     rlock_port = ((random()%32000) | (0x1 << 10));
-    const char *hname;
-    // VERIFY(gethostname(hname, 100) == 0);
-    hname = "127.0.0.1";
-    ostringstream host;
-    host << hname << ":" << rlock_port;
-    id = host.str();
+    id = "127.0.0.1:" + std::to_string(rlock_port);
     last_port = rlock_port;
     rpcs *rlsrpc = new rpcs(rlock_port);
     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
-    {
-        lock sl(xid_mutex);
-        next_xid = 0;
-    }
     rsmc = new rsm_client(xdst);
-    releaser_thread = std::thread(&lock_client::releaser, this);
+    releaser_thread = thread(&lock_client::releaser, this);
 }
 
 void lock_client::releaser() [[noreturn]] {
@@ -103,7 +78,7 @@ void lock_client::releaser() [[noreturn]] {
 int lock_client::stat(lock_protocol::lockid_t lid) {
     VERIFY(0);
     int r;
-    lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid);
+    auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
     VERIFY (ret == lock_protocol::OK);
     return r;
 }
@@ -134,7 +109,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
             {
                 sl.unlock();
                 int r;
-                result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
+                result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
                 sl.lock();
             }
             LOG("acquire returned " << result);
@@ -165,7 +140,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
         }
 
         LOG("waiting...");
-        st.wait();
+        st.wait(sl);
         LOG("wait ended");
     }
 
@@ -241,7 +216,7 @@ t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
 }
 
 t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
-    return ((lock_client *)client)->acquire(lid);
+    return ((lock_client *)client)->release(lid);
 }
 
 t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
index 541cc23..3290d1a 100644 (file)
@@ -12,6 +12,7 @@
 #include "lang/verify.h"
 #include "rpc/fifo.h"
 #include "rsm_client.h"
+#include "lock.h"
 
 class lock_release_user {
     public:
@@ -20,15 +21,12 @@ class lock_release_user {
 };
 
 using std::string;
+using std::map;
 using std::thread;
 using std::list;
-using std::map;
-
-typedef string callback;
 
 class lock_state {
 public:
-    lock_state();
     enum {
         none = 0,
         retrying,
@@ -36,15 +34,15 @@ public:
         locked,
         acquiring,
         releasing
-    } state;
-    std::thread::id held_by;
-    list<std::thread::id> wanted_by;
+    } state = none;
+    thread::id held_by;
+    list<thread::id> wanted_by;
     mutex m;
-    map<std::thread::id, std::condition_variable> c;
+    map<thread::id, cond> c;
     lock_protocol::xid_t xid;
-    void wait();
+    void wait(lock & mutex_lock);
     void signal();
-    void signal(std::thread::id who);
+    void signal(thread::id who);
 };
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
@@ -54,7 +52,7 @@ typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 class lock_client {
     private:
         rpcc *cl;
-        std::thread releaser_thread;
+        thread releaser_thread;
         rsm_client *rsmc;
         class lock_release_user *lu;
         unsigned int rlock_port;
index 3a85949..3b38cdf 100644 (file)
@@ -1,11 +1,9 @@
 #include "lock_client.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 
-char tprintf_thread_prefix = 'd';
+char log_thread_prefix = 'd';
 
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
     if(argc != 2) {
         fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
         return 1;
index 61f0998..900897a 100644 (file)
@@ -6,13 +6,14 @@
 #include "rpc/rpc.h"
 #include <string>
 
+using std::string;
+
 class lock_protocol {
     public:
-        enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR };
-        typedef int status;
-        typedef std::string lockid_t;
-        typedef unsigned long long xid_t;
-        enum rpc_numbers {
+        enum status : status_t { OK, RETRY, RPCERR, NOENT, IOERR };
+        using lockid_t = string;
+        using xid_t = uint64_t;
+        enum rpc_numbers : proc_t {
             acquire = 0x7001,
             release,
             stat
@@ -21,9 +22,8 @@ class lock_protocol {
 
 class rlock_protocol {
     public:
-        enum xxstatus { OK, RPCERR };
-        typedef int status;
-        enum rpc_numbers {
+        enum status : status_t { OK, RPCERR };
+        enum rpc_numbers : proc_t {
             revoke = 0x8001,
             retry = 0x8002
         };
index a82231e..cac6a90 100644 (file)
@@ -6,7 +6,7 @@
 #include <arpa/inet.h>
 #include "lang/verify.h"
 #include "handle.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 #include "rpc/marshall.h"
 #include "lock.h"
 
@@ -74,7 +74,7 @@ void lock_server::revoker() [[noreturn]] {
         proxy = handle(held_by.first).safebind();
         if (proxy) {
             int r;
-            rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+            auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
             LOG("Revoke returned " << ret);
         }
     }
@@ -97,8 +97,6 @@ void lock_server::retryer() [[noreturn]] {
             front = st.wanted_by.front();
         }
 
-        rlock_protocol::status ret = -1;
-
         rpcc *proxy = NULL;
         // try a few times?
         //int t=5;
@@ -106,7 +104,7 @@ void lock_server::retryer() [[noreturn]] {
         proxy = handle(front.first).safebind();
         if (proxy) {
             int r;
-            ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
+            auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
             LOG("Retry returned " << ret);
         }
     }
index 363f886..d62a25b 100644 (file)
@@ -1,18 +1,16 @@
 #include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include <stdlib.h>
-#include "tprintf.h"
+#include "threaded_log.h"
 #include <unistd.h>
 #include "lock_server.h"
 #include "rsm.h"
 
 // Main loop of lock_server
 
-char tprintf_thread_prefix = 's';
+char log_thread_prefix = 's';
 
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
 
index f4e68bd..ac9175b 100644 (file)
 #include <stdlib.h>
 #include <stdio.h>
 #include "lang/verify.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 #include <sys/types.h>
 #include <unistd.h>
 #include "lock.h"
 
-char tprintf_thread_prefix = 'c';
+char log_thread_prefix = 'c';
 
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
@@ -31,12 +31,10 @@ lock_protocol::lockid_t c = "3";
 int ct[256];
 std::mutex count_mutex;
 
-void
-check_grant(lock_protocol::lockid_t lid)
-{
+void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
-    if(ct[x] != 0){
+    if (ct[x] != 0) {
         fprintf(stderr, "error: server granted %s twice\n", lid.c_str());
         fprintf(stdout, "error: server granted %s twice\n", lid.c_str());
         exit(1);
@@ -44,22 +42,18 @@ check_grant(lock_protocol::lockid_t lid)
     ct[x] += 1;
 }
 
-void
-check_release(lock_protocol::lockid_t lid)
-{
+void check_release(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
-    if(ct[x] != 1){
+    if (ct[x] != 1) {
         fprintf(stderr, "error: client released un-held lock %s\n",  lid.c_str());
         exit(1);
     }
     ct[x] -= 1;
 }
 
-void
-test1(void)
-{
-    tprintf ("acquire a release a acquire a release a\n");
+void test1(void) {
+    LOG_NONMEMBER("acquire a release a acquire a release a");
     lc[0]->acquire(a);
     check_grant(a);
     lc[0]->release(a);
@@ -69,7 +63,7 @@ test1(void)
     lc[0]->release(a);
     check_release(a);
 
-    tprintf ("acquire a acquire b release b release a\n");
+    LOG_NONMEMBER("acquire a acquire b release b release a");
     lc[0]->acquire(a);
     check_grant(a);
     lc[0]->acquire(b);
@@ -80,63 +74,51 @@ test1(void)
     check_release(a);
 }
 
-void *
-test2(int i) 
-{
-    tprintf ("test2: client %d acquire a release a\n", i);
+void test2(int i) {
+    LOG_NONMEMBER("test2: client " << i << " acquire a release a");
     lc[i]->acquire(a);
-    tprintf ("test2: client %d acquire done\n", i);
+    LOG_NONMEMBER("test2: client " << i << " acquire done");
     check_grant(a);
     sleep(1);
-    tprintf ("test2: client %d release\n", i);
+    LOG_NONMEMBER("test2: client " << i << " release");
     check_release(a);
     lc[i]->release(a);
-    tprintf ("test2: client %d release done\n", i);
-    return 0;
+    LOG_NONMEMBER("test2: client " << i << " release done");
 }
 
-void *
-test3(int i)
-{
-    tprintf ("test3: client %d acquire a release a concurrent\n", i);
+void test3(int i) {
+    LOG_NONMEMBER("test3: client " << i << " acquire a release a concurrent");
     for (int j = 0; j < 10; j++) {
         lc[i]->acquire(a);
         check_grant(a);
-        tprintf ("test3: client %d got lock\n", i);
+        LOG_NONMEMBER("test3: client " << i << " got lock");
         check_release(a);
         lc[i]->release(a);
     }
-    return 0;
 }
 
-void *
-test4(int i)
-{
-    tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
+void test4(int i) {
+    LOG_NONMEMBER("test4: thread " << i << " acquire a release a concurrent; same clnt");
     for (int j = 0; j < 10; j++) {
         lc[0]->acquire(a);
         check_grant(a);
-        tprintf ("test4: thread %d on client 0 got lock\n", i);
+        LOG_NONMEMBER("test4: thread " << i << " on client 0 got lock");
         check_release(a);
         lc[0]->release(a);
     }
-    return 0;
 }
 
-void *
-test5(int i)
-{
-    tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
+void test5(int i) {
+    LOG_NONMEMBER("test5: client " << i << " acquire a release a concurrent; same and diff clnt");
     for (int j = 0; j < 10; j++) {
         if (i < 5)  lc[0]->acquire(a);
         else  lc[1]->acquire(a);
         check_grant(a);
-        tprintf ("test5: client %d got lock\n", i);
+        LOG_NONMEMBER("test5: client " << i << " got lock");
         check_release(a);
         if (i < 5) lc[0]->release(a);
         else lc[1]->release(a);
     }
-    return 0;
 }
 
 int
@@ -149,7 +131,7 @@ main(int argc, char *argv[])
     setvbuf(stderr, NULL, _IONBF, 0);
     srandom((uint32_t)getpid());
 
-    if(argc < 2) {
+    if (argc < 2) {
         fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
         exit(1);
     }
@@ -158,20 +140,20 @@ main(int argc, char *argv[])
 
     if (argc > 2) {
         test = atoi(argv[2]);
-        if(test < 1 || test > 5){
-            tprintf("Test number must be between 1 and 5\n");
+        if (test < 1 || test > 5) {
+            LOG_NONMEMBER("Test number must be between 1 and 5");
             exit(1);
         }
     }
 
-    tprintf("cache lock client\n");
+    LOG_NONMEMBER("cache lock client");
     for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst);
 
-    if(!test || test == 1){
+    if (!test || test == 1) {
         test1();
     }
 
-    if(!test || test == 2){
+    if (!test || test == 2) {
         // test2
         for (int i = 0; i < nt; i++)
             th[i] = std::thread(test2, i);
@@ -179,36 +161,33 @@ main(int argc, char *argv[])
             th[i].join();
     }
 
-    if(!test || test == 3){
-        tprintf("test 3\n");
+    if (!test || test == 3) {
+        LOG_NONMEMBER("test 3");
 
-        // test3
         for (int i = 0; i < nt; i++)
             th[i] = std::thread(test3, i);
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
 
-    if(!test || test == 4){
-        tprintf("test 4\n");
+    if (!test || test == 4) {
+        LOG_NONMEMBER("test 4");
 
-        // test 4
         for (int i = 0; i < 2; i++)
             th[i] = std::thread(test4, i);
         for (int i = 0; i < 2; i++)
             th[i].join();
     }
 
-    if(!test || test == 5){
-        tprintf("test 5\n");
+    if (!test || test == 5) {
+        LOG_NONMEMBER("test 5");
 
-        // test 5
         for (int i = 0; i < nt; i++)
             th[i] = std::thread(test5, i);
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
 
-    tprintf ("%s: passed all tests successfully\n", argv[0]);
+    LOG_NONMEMBER(argv[0] << ": passed all tests successfully");
 
 }
diff --git a/log.cc b/log.cc
index baf3c2f..627b7ac 100644 (file)
--- a/log.cc
+++ b/log.cc
 #include "paxos.h"
 #include <fstream>
 #include <iostream>
-#include "tprintf.h"
+#include "threaded_log.h"
 
 // Paxos must maintain some durable state (i.e., that survives power
 // failures) to run Paxos correct.  This module implements a log with
 // all durable state to run Paxos.  Since the values chosen correspond
 // to views, the log contains all views since the beginning of time.
 
-log::log(acceptor *_acc, std::string _me)
-  : pxs (_acc)
-{
-  name = "paxos-" + _me + ".log";
-  logread();
+log::log(acceptor *_acc, std::string _me) : pxs (_acc) {
+    name = "paxos-" + _me + ".log";
+    logread();
 }
 
-void
-log::logread(void)
-{
-  std::ifstream from;
-  std::string type;
-  unsigned instance;
+void log::logread(void) {
+    std::ifstream from;
+    std::string type;
+    unsigned instance;
 
-  from.open(name.c_str());
-  LOG("logread");
-  while (from >> type) {
-    if (type == "done") {
-      std::string v;
-      from >> instance;
-      from.get();
-      getline(from, v);
-      pxs->values[instance] = v;
-      pxs->instance_h = instance;
-      LOG("logread: instance: " << instance << " w. v = " <<
-              pxs->values[instance]);
-      pxs->v_a.clear();
-      pxs->n_h.n = 0;
-      pxs->n_a.n = 0;
-    } else if (type == "propseen") {
-      from >> pxs->n_h.n;
-      from >> pxs->n_h.m;
-      LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
-    } else if (type == "accepted") {
-      std::string v;
-      from >> pxs->n_a.n;
-      from >> pxs->n_a.m;
-      from.get();
-      getline(from, v);
-      pxs->v_a = v;
-      LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
-    } else {
-      LOG("logread: unknown log record");
-      VERIFY(0);
-    }
-  } 
-  from.close();
+    from.open(name.c_str());
+    LOG("logread");
+    while (from >> type) {
+        if (type == "done") {
+            std::string v;
+            from >> instance;
+            from.get();
+            getline(from, v);
+            pxs->values[instance] = v;
+            pxs->instance_h = instance;
+            LOG("logread: instance: " << instance << " w. v = " <<
+                    pxs->values[instance]);
+            pxs->v_a.clear();
+            pxs->n_h.n = 0;
+            pxs->n_a.n = 0;
+        } else if (type == "propseen") {
+            from >> pxs->n_h.n;
+            from >> pxs->n_h.m;
+            LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
+        } else if (type == "accepted") {
+            std::string v;
+            from >> pxs->n_a.n;
+            from >> pxs->n_a.m;
+            from.get();
+            getline(from, v);
+            pxs->v_a = v;
+            LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
+        } else {
+            LOG("logread: unknown log record");
+            VERIFY(0);
+        }
+    } 
+    from.close();
 }
 
-std::string 
-log::dump()
-{
-  std::ifstream from;
-  std::string res;
-  std::string v;
-  from.open(name.c_str());
-  while (getline(from, v)) {
-    res = res + v + "\n";
-  }
-  from.close();
-  return res;
+std::string log::dump() {
+    std::ifstream from;
+    std::string res;
+    std::string v;
+    from.open(name.c_str());
+    while (getline(from, v))
+        res += v + "\n";
+    from.close();
+    return res;
 }
 
-void
-log::restore(std::string s)
-{
-  std::ofstream f;
-  LOG("restore: " << s);
-  f.open(name.c_str(), std::ios::trunc);
-  f << s;
-  f.close();
+void log::restore(std::string s) {
+    std::ofstream f;
+    LOG("restore: " << s);
+    f.open(name.c_str(), std::ios::trunc);
+    f << s;
+    f.close();
 }
 
 // XXX should be an atomic operation
-void
-log::loginstance(unsigned instance, std::string v)
-{
-  std::ofstream f;
-  f.open(name.c_str(), std::ios::app);
-  f << "done";
-  f << " ";
-  f << instance;
-  f << " ";
-  f << v;
-  f << "\n";
-  f.close();
+void log::loginstance(unsigned instance, std::string v) {
+    std::ofstream f(name, std::ios::app);
+    f << "done " << instance << " " << v << "\n";
+    f.close();
 }
 
 // an acceptor should call logprop(n_h) when it
 // receives a prepare to which it responds prepare_ok().
-void
-log::logprop(prop_t n_h)
-{
-  std::ofstream f;
-  f.open(name.c_str(), std::ios::app);
-  f << "propseen";
-  f << " ";
-  f << n_h.n;
-  f << " ";
-  f << n_h.m;
-  f << "\n";
-  f.close();
+void log::logprop(prop_t n_h) {
+    std::ofstream f;
+    f.open(name.c_str(), std::ios::app);
+    f << "propseen";
+    f << " ";
+    f << n_h.n;
+    f << " ";
+    f << n_h.m;
+    f << "\n";
+    f.close();
 }
 
 // an acceptor should call logaccept(n_a, v_a) when it
 // receives an accept RPC to which it replies accept_ok().
-void
-log::logaccept(prop_t n, std::string v)
-{
-  std::ofstream f;
-  f.open(name.c_str(), std::ios::app);
-  f << "accepted";
-  f << " ";
-  f << n.n;
-  f << " ";
-  f << n.m;
-  f << " ";
-  f << v;
-  f << "\n";
-  f.close();
+void log::logaccept(prop_t n, std::string v) {
+    std::ofstream f(name, std::ios::app);
+    f << "accepted " << n.n << " " << n.m << " " << v << "\n";
+    f.close();
 }
-
index 83bf4f1..46d9c1c 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,10 +1,11 @@
 #include "paxos.h"
 #include "handle.h"
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
 #include "lang/verify.h"
 #include "lock.h"
 
+using std::stoi;
+
 // This module implements the proposer and acceptor of the Paxos
 // distributed algorithm as described by Lamport's "Paxos Made
 // Simple".  To kick off an instance of Paxos, the caller supplies a
@@ -22,9 +23,9 @@ bool operator>= (const prop_t &a, const prop_t &b) {
     return (a.n > b.n || (a.n == b.n && a.m >= b.m));
 }
 
-std::string
-print_members(const std::vector<std::string> &nodes) {
-    std::string s;
+string
+print_members(const vector<string> &nodes) {
+    string s;
     s.clear();
     for (unsigned i = 0; i < nodes.size(); i++) {
         s += nodes[i];
@@ -35,7 +36,7 @@ print_members(const std::vector<std::string> &nodes) {
 }
 
 
-bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+bool isamember(const string & m, const vector<string> & nodes) {
     for (auto n : nodes) {
         if (n == m)
             return 1;
@@ -51,8 +52,7 @@ bool proposer::isrunning() {
 }
 
 // check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const std::vector<std::string> &l1,
-               const std::vector<std::string> &l2) {
+bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
     unsigned n = 0;
 
     for (unsigned i = 0; i < l1.size(); i++) {
@@ -62,8 +62,7 @@ bool proposer::majority(const std::vector<std::string> &l1,
     return n >= (l1.size() >> 1) + 1;
 }
 
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
-        const std::string &_me)
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
     stable (true)
 {
@@ -76,19 +75,17 @@ void proposer::setn()
     my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
 }
 
-bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
-        const std::string & newv)
+bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
 {
-    std::vector<std::string> accepts;
-    std::vector<std::string> nodes;
-    std::string v;
+    vector<string> accepts;
+    vector<string> nodes;
+    string v;
     bool r = false;
 
     lock ml(pxs_mutex);
-    tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
-            print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+    LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
     if (!stable) {  // already running proposer?
-        tprintf("proposer::run: already running\n");
+        LOG("proposer::run: already running");
         return false;
     }
     stable = false;
@@ -98,7 +95,7 @@ bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes
     if (prepare(instance, accepts, cur_nodes, v)) {
 
         if (majority(cur_nodes, accepts)) {
-            tprintf("paxos::manager: received a majority of prepare responses\n");
+            LOG("paxos::manager: received a majority of prepare responses");
 
             if (v.size() == 0)
                 v = newv;
@@ -110,20 +107,20 @@ bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes
             accept(instance, accepts, nodes, v);
 
             if (majority(cur_nodes, accepts)) {
-                tprintf("paxos::manager: received a majority of accept responses\n");
+                LOG("paxos::manager: received a majority of accept responses");
 
                 breakpoint2();
 
                 decide(instance, accepts, v);
                 r = true;
             } else {
-                tprintf("paxos::manager: no majority of accept responses\n");
+                LOG("paxos::manager: no majority of accept responses");
             }
         } else {
-            tprintf("paxos::manager: no majority of prepare responses\n");
+            LOG("paxos::manager: no majority of prepare responses");
         }
     } else {
-        tprintf("paxos::manager: prepare is rejected %d\n", stable);
+        LOG("paxos::manager: prepare is rejected " << stable);
     }
     stable = true;
     return r;
@@ -135,9 +132,9 @@ bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes
 // otherwise fill in accepts with set of nodes that accepted,
 // set v to the v_a with the highest n_a, and return true.
 bool
-proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
-        const std::vector<std::string> & nodes,
-        std::string & v)
+proposer::prepare(unsigned instance, vector<string> & accepts,
+        const vector<string> & nodes,
+        string & v)
 {
     struct paxos_protocol::preparearg arg = { instance, my_n };
     struct paxos_protocol::prepareres res;
@@ -150,14 +147,14 @@ proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
         int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
-                tprintf("commiting old instance!\n");
+                LOG("commiting old instance!");
                 acc->commit(instance, res.v_a);
                 return false;
             }
             if (res.accept) {
                 accepts.push_back(i);
                 if (res.n_a >= n_a) {
-                    tprintf("found a newer accepted proposal\n");
+                    LOG("found a newer accepted proposal");
                     v = res.v_a;
                     n_a = res.n_a;
                 }
@@ -170,8 +167,8 @@ proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
 // run() calls this to send out accept RPCs to accepts.
 // fill in accepts with list of nodes that accepted.
 void
-proposer::accept(unsigned instance, std::vector<std::string> & accepts,
-        const std::vector<std::string> & nodes, const std::string & v)
+proposer::accept(unsigned instance, vector<string> & accepts,
+        const vector<string> & nodes, const string & v)
 {
     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
     rpcc *r;
@@ -187,8 +184,8 @@ proposer::accept(unsigned instance, std::vector<std::string> & accepts,
 }
 
 void
-proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
-        const std::string & v)
+proposer::decide(unsigned instance, const vector<string> & accepts,
+        const string & v)
 {
     struct paxos_protocol::decidearg arg = { instance, v };
     rpcc *r;
@@ -201,8 +198,8 @@ proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
-        const std::string & _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
+        const string & _value)
   : cfg(_cfg), me (_me), instance_h(0)
 {
     n_h.n = 0;
@@ -219,14 +216,14 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _m
         instance_h = 1;
     }
 
-    pxs = new rpcs((uint32_t)std::stoi(_me));
+    pxs = new rpcs((uint32_t)stoi(_me));
     pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
     pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
     pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
 }
 
 paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
         paxos_protocol::preparearg a)
 {
     lock ml(pxs_mutex);
@@ -242,13 +239,13 @@ acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
         l->logprop(n_h);
         r.accept = true;
     } else {
-        tprintf("I totally rejected this request.  Ha.\n");
+        LOG("I totally rejected this request.  Ha.");
     }
     return paxos_protocol::OK;
 }
 
 paxos_protocol::status
-acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
 {
     lock ml(pxs_mutex);
     r = false;
@@ -263,11 +260,10 @@ acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
 
 // the src argument is only for debugging
 paxos_protocol::status
-acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
+acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
 {
     lock ml(pxs_mutex);
-    tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
-            a.instance, instance_h, v_a.c_str());
+    LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
     if (a.instance == instance_h + 1) {
         VERIFY(v_a == a.v);
         commit(a.instance, v_a, ml);
@@ -281,11 +277,11 @@ acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
 }
 
 void
-acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
+acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
 {
-    tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+    LOG("acceptor::commit: instance=" << instance << " has v=" << value);
     if (instance > instance_h) {
-        tprintf("commit: highestaccepteinstance = %d\n", instance);
+        LOG("commit: highestaccepteinstance = " << instance);
         values[instance] = value;
         l->loginstance(instance, value);
         instance_h = instance;
@@ -303,20 +299,20 @@ acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_
 }
 
 void
-acceptor::commit(unsigned instance, const std::string & value)
+acceptor::commit(unsigned instance, const string & value)
 {
     lock ml(pxs_mutex);
     commit(instance, value, ml);
 }
 
-std::string
+string
 acceptor::dump()
 {
     return l->dump();
 }
 
 void
-acceptor::restore(const std::string & s)
+acceptor::restore(const string & s)
 {
     l->restore(s);
     l->logread();
@@ -331,7 +327,7 @@ void
 proposer::breakpoint1()
 {
     if (break1) {
-        tprintf("Dying at breakpoint 1!\n");
+        LOG("Dying at breakpoint 1!");
         exit(1);
     }
 }
@@ -341,7 +337,7 @@ void
 proposer::breakpoint2()
 {
     if (break2) {
-        tprintf("Dying at breakpoint 2!\n");
+        LOG("Dying at breakpoint 2!");
         exit(1);
     }
 }
@@ -350,10 +346,10 @@ void
 proposer::breakpoint(int b)
 {
     if (b == 3) {
-        tprintf("Proposer: breakpoint 1\n");
+        LOG("Proposer: breakpoint 1");
         break1 = true;
     } else if (b == 4) {
-        tprintf("Proposer: breakpoint 2\n");
+        LOG("Proposer: breakpoint 2");
         break2 = true;
     }
 }
diff --git a/paxos.h b/paxos.h
index 9650de1..8561dd5 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -3,15 +3,19 @@
 
 #include <string>
 #include <vector>
+#include <map>
 #include "rpc/rpc.h"
 #include "paxos_protocol.h"
 #include "log.h"
 #include "lock.h"
 
+using std::string;
+using std::map;
+using std::vector;
 
 class paxos_change {
     public:
-        virtual void paxos_commit(unsigned instance, const std::string & v) = 0;
+        virtual void paxos_commit(unsigned instance, const string & v) = 0;
         virtual ~paxos_change() {}
 };
 
@@ -20,49 +24,49 @@ class acceptor {
         log *l;
         rpcs *pxs;
         paxos_change *cfg;
-        std::string me;
+        string me;
         mutex pxs_mutex;
 
         // Acceptor state
         prop_t n_h;            // number of the highest proposal seen in a prepare
         prop_t n_a;            // number of highest proposal accepted
-        std::string v_a;       // value of highest proposal accepted
+        string v_a;    // value of highest proposal accepted
         unsigned instance_h;   // number of the highest instance we have decided
-        std::map<unsigned,std::string> values; // vals of each instance
+        map<unsigned,string> values;   // vals of each instance
 
-        void commit(unsigned instance, const std::string & v, lock & pxs_mutex_lock);
+        void commit(unsigned instance, const string & v, lock & pxs_mutex_lock);
         paxos_protocol::status preparereq(paxos_protocol::prepareres & r,
-                const std::string & src, paxos_protocol::preparearg a);
-        paxos_protocol::status acceptreq(bool & r, const std::string & src,
+                const string & src, paxos_protocol::preparearg a);
+        paxos_protocol::status acceptreq(bool & r, const string & src,
                 paxos_protocol::acceptarg a);
-        paxos_protocol::status decidereq(int & r, const std::string & src,
+        paxos_protocol::status decidereq(int & r, const string & src,
                 paxos_protocol::decidearg a);
 
         friend class log;
 
     public:
-        acceptor(class paxos_change *cfg, bool _first, const std::string & _me,
-                const std::string & _value);
+        acceptor(class paxos_change *cfg, bool _first, const string & _me,
+                const string & _value);
         ~acceptor() {}
-        void commit(unsigned instance, const std::string & v);
+        void commit(unsigned instance, const string & v);
         unsigned instance() { return instance_h; }
-        const std::string & value(unsigned instance) { return values[instance]; }
-        std::string dump();
-        void restore(const std::string &);
+        const string & value(unsigned instance) { return values[instance]; }
+        string dump();
+        void restore(const string &);
         rpcs *get_rpcs() { return pxs; }
         prop_t get_n_h() { return n_h; }
         unsigned get_instance_h() { return instance_h; }
 };
 
-extern bool isamember(const std::string & m, const std::vector<std::string> & nodes);
-extern std::string print_members(const std::vector<std::string> & nodes);
+extern bool isamember(const string & m, const vector<string> & nodes);
+extern string print_members(const vector<string> & nodes);
 
 class proposer {
     private:
         log *l;
         paxos_change *cfg;
         acceptor *acc;
-        std::string me;
+        string me;
         bool break1;
         bool break2;
 
@@ -73,23 +77,23 @@ class proposer {
         prop_t my_n;           // number of the last proposal used in this instance
 
         void setn();
-        bool prepare(unsigned instance, std::vector<std::string> & accepts,
-                const std::vector<std::string> & nodes,
-                std::string & v);
-        void accept(unsigned instance, std::vector<std::string> & accepts,
-                const std::vector<std::string> & nodes, const std::string & v);
-        void decide(unsigned instance, const std::vector<std::string> & accepts,
-                const std::string & v);
+        bool prepare(unsigned instance, vector<string> & accepts,
+                const vector<string> & nodes,
+                string & v);
+        void accept(unsigned instance, vector<string> & accepts,
+                const vector<string> & nodes, const string & v);
+        void decide(unsigned instance, const vector<string> & accepts,
+                const string & v);
 
         void breakpoint1();
         void breakpoint2();
-        bool majority(const std::vector<std::string> & l1, const std::vector<std::string> & l2);
+        bool majority(const vector<string> & l1, const vector<string> & l2);
 
         friend class log;
     public:
-        proposer(class paxos_change *cfg, class acceptor *_acceptor, const std::string &_me);
+        proposer(class paxos_change *cfg, class acceptor *_acceptor, const string &_me);
         ~proposer() {}
-        bool run(unsigned instance, const std::vector<std::string> & cnodes, const std::string & v);
+        bool run(unsigned instance, const vector<string> & cnodes, const string & v);
         bool isrunning();
         void breakpoint(int b);
 };
index 734ca51..f2bdb3f 100644 (file)
 #include "rpc/rpc.h"
 
 struct prop_t {
-  unsigned n;
-  std::string m;
+    unsigned n;
+    std::string m;
 };
 
 class paxos_protocol {
- public:
-  enum xxstatus { OK, ERR };
-  typedef int status;
-  enum rpc_numbers {
-    preparereq = 0x11001,
-    acceptreq,
-    decidereq,
-    heartbeat,
-  };
-
-  struct preparearg {
-    unsigned instance;
-    prop_t n;
-  };
-
-  struct prepareres {
-    bool oldinstance;
-    bool accept;
-    prop_t n_a;
-    std::string v_a;
-  };
-
-  struct acceptarg {
-    unsigned instance;
-    prop_t n;
-    std::string v;
-  };
-
-  struct decidearg {
-    unsigned instance;
-    std::string v;
-  };
-
+    public:
+        enum status : status_t { OK, ERR };
+        enum rpc_numbers : proc_t {
+            preparereq = 0x11001,
+            acceptreq,
+            decidereq,
+            heartbeat,
+        };
+
+        struct preparearg {
+            unsigned instance;
+            prop_t n;
+        };
+
+        struct prepareres {
+            bool oldinstance;
+            bool accept;
+            prop_t n_a;
+            std::string v_a;
+        };
+
+        struct acceptarg {
+            unsigned instance;
+            prop_t n;
+            std::string v;
+        };
+
+        struct decidearg {
+            unsigned instance;
+            std::string v;
+        };
 };
 
-inline unmarshall &
-operator>>(unmarshall &u, prop_t &a)
-{
-  u >> a.n;
-  u >> a.m;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, prop_t &a) {
+    return u >> a.n >> a.m;
 }
 
-inline marshall &
-operator<<(marshall &m, prop_t a)
-{
-  m << a.n;
-  m << a.m;
-  return m;
+inline marshall & operator<<(marshall &m, prop_t a) {
+    return m << a.n << a.m;
 }
 
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::preparearg &a)
-{
-  u >> a.instance;
-  u >> a.n;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::preparearg &a) {
+    return u >> a.instance >> a.n;
 }
 
-inline marshall &
-operator<<(marshall &m, paxos_protocol::preparearg a)
-{
-  m << a.instance;
-  m << a.n;
-  return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::preparearg a) {
+    return m << a.instance << a.n;
 }
 
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::prepareres &r)
-{
-  u >> r.oldinstance;
-  u >> r.accept;
-  u >> r.n_a;
-  u >> r.v_a;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) {
+    return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a;
 }
 
-inline marshall &
-operator<<(marshall &m, paxos_protocol::prepareres r)
-{
-  m << r.oldinstance;
-  m << r.accept;
-  m << r.n_a;
-  m << r.v_a;
-  return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) {
+    return m << r.oldinstance << r.accept << r.n_a << r.v_a;
 }
 
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::acceptarg &a)
-{
-  u >> a.instance;
-  u >> a.n;
-  u >> a.v;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::acceptarg &a) {
+    return u >> a.instance >> a.n >> a.v;
 }
 
-inline marshall &
-operator<<(marshall &m, paxos_protocol::acceptarg a)
-{
-  m << a.instance;
-  m << a.n;
-  m << a.v;
-  return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::acceptarg a) {
+    return m << a.instance << a.n << a.v;
 }
 
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::decidearg &a)
-{
-  u >> a.instance;
-  u >> a.v;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::decidearg &a) {
+    return u >> a.instance >> a.v;
 }
 
-inline marshall &
-operator<<(marshall &m, paxos_protocol::decidearg a)
-{
-  m << a.instance;
-  m << a.v;
-  return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::decidearg a) {
+    return m << a.instance << a.v;
 }
 
 #endif
index 676a682..abeaae7 100644 (file)
 #include <inttypes.h>
 #include "lang/verify.h"
 
+using proc_t = uint32_t;
+using status_t = int32_t;
+
 struct request_header {
-    request_header(int x=0, int p=0, unsigned c=0, unsigned s=0, int xi=0) :
+    request_header(int x=0, proc_t p=0, unsigned c=0, unsigned s=0, int xi=0) :
         xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
     int xid;
-    int proc;
+    proc_t proc;
     unsigned int clt_nonce;
     unsigned int srv_nonce;
     int xid_rep;
@@ -134,7 +137,14 @@ marshall& operator<<(marshall &, int16_t);
 marshall& operator<<(marshall &, uint64_t);
 marshall& operator<<(marshall &, const std::string &);
 
-template <class A> marshall &
+template <class A, typename I=void>
+struct is_enumerable : std::false_type {};
+
+template<class A> struct is_enumerable<A,
+    decltype(std::declval<A&>().cbegin(), std::declval<A&>().cend(), void())
+> : std::true_type {};
+
+template <class A> typename std::enable_if<is_enumerable<A>::value, marshall>::type &
 operator<<(marshall &m, const A &x) {
     m << (unsigned int) x.size();
     for (const auto &a : x)
@@ -144,9 +154,17 @@ operator<<(marshall &m, const A &x) {
 
 template <class A, class B> marshall &
 operator<<(marshall &m, const std::pair<A,B> &d) {
-    m << d.first;
-    m << d.second;
-    return m;
+    return m << d.first << d.second;
+}
+
+template<typename E>
+using enum_type_t = typename std::enable_if<std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
+template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
+template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
+
+template <class E> typename std::enable_if<std::is_enum<E>::value, marshall>::type &
+operator<<(marshall &m, E e) {
+    return m << from_enum(e);
 }
 
 class unmarshall;
@@ -162,6 +180,8 @@ unmarshall& operator>>(unmarshall &, size_t &);
 unmarshall& operator>>(unmarshall &, uint64_t &);
 unmarshall& operator>>(unmarshall &, int64_t &);
 unmarshall& operator>>(unmarshall &, std::string &);
+template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+operator>>(unmarshall &u, E &e);
 
 class unmarshall {
     private:
@@ -235,7 +255,8 @@ class unmarshall {
         }
 };
 
-template <class A> unmarshall & operator>>(unmarshall &u, A &x) {
+template <class A> typename std::enable_if<is_enumerable<A>::value, unmarshall>::type &
+operator>>(unmarshall &u, A &x) {
     unsigned n = u.grab<unsigned>();
     x.clear();
     while (n--)
@@ -257,6 +278,12 @@ operator>>(unmarshall &u, std::pair<A,B> &d) {
     return u >> d.first >> d.second;
 }
 
+template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+operator>>(unmarshall &u, E &e) {
+    e = to_enum<E>(u.grab<enum_type_t<E>>());
+    return u;
+}
+
 typedef std::function<int(unmarshall &, marshall &)> handler;
 
 //
@@ -311,17 +338,17 @@ struct VerifyOnFailure {
 
 // One for function pointers...
 
-template <class F, class R, class args_type, size_t ...Indices>
-typename std::enable_if<!std::is_member_function_pointer<F>::value, int>::type
-invoke(F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
+template <class F, class R, class RV, class args_type, size_t ...Indices>
+typename std::enable_if<!std::is_member_function_pointer<F>::value, RV>::type
+invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
     return f(r, std::move(std::get<Indices>(t))...);
 }
 
 // And one for pointers to member functions...
 
-template <class F, class C, class R, class args_type, size_t ...Indices>
-typename std::enable_if<std::is_member_function_pointer<F>::value, int>::type
-invoke(F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
+template <class F, class C, class RV, class R, class args_type, size_t ...Indices>
+typename std::enable_if<std::is_member_function_pointer<F>::value, RV>::type
+invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
     return (c->*f)(r, std::move(std::get<Indices>(t))...);
 }
 
@@ -339,8 +366,8 @@ template <class Functor, class Instance, class Signature,
 // the same pattern as Signature; this allows us to ignore the distinctions
 // between various types of callable objects at this level of abstraction.
 
-template <class F, class C, class ErrorHandler, class R, class... Args>
-struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
+template <class F, class C, class ErrorHandler, class R, class RV, class... Args>
+struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
     static inline handler *wrap(F f, C *c=nullptr) {
         // This type definition corresponds to an empty struct with
         // template parameters running from 0 up to (# args) - 1.
@@ -351,20 +378,20 @@ struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
         using ArgsStorage = std::tuple<typename std::decay<Args>::type...>;
         // Allocate a handler (i.e. std::function) to hold the lambda
         // which will unmarshall RPCs and call f.
-        return new handler([=](unmarshall &u, marshall &m) -> int {
+        return new handler([=](unmarshall &u, marshall &m) -> RV {
             // Unmarshall each argument with the correct type and store the
             // result in a tuple.
             ArgsStorage t = {u.grab<typename std::decay<Args>::type>()...};
             // Verify successful unmarshalling of the entire input stream.
             if (!u.okdone())
-                return ErrorHandler::unmarshall_args_failure();
+                return (RV)ErrorHandler::unmarshall_args_failure();
             // Allocate space for the RPC response -- will be passed into the
             // function as an lvalue reference.
             R r;
             // Perform the invocation.  Note that Indices() calls the default
             // constructor of the empty struct with the special template
             // parameters.
-            int b = invoke(f, c, r, t, Indices());
+            RV b = invoke(RV(), f, c, r, t, Indices());
             // Marshall the response.
             m << r;
             // Make like a tree.
@@ -381,13 +408,13 @@ struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
 template <class Functor, class ErrorHandler=VerifyOnFailure,
     class Signature=Functor> struct marshalled_func;
 
-template <class F, class ErrorHandler, class... Args>
-struct marshalled_func<F, ErrorHandler, int(*)(Args...)> :
-    public marshalled_func_imp<F, void, int(Args...), ErrorHandler> {};
+template <class F, class ErrorHandler, class RV, class... Args>
+struct marshalled_func<F, ErrorHandler, RV(*)(Args...)> :
+    public marshalled_func_imp<F, void, RV(Args...), ErrorHandler> {};
 
-template <class F, class ErrorHandler, class C, class... Args>
-struct marshalled_func<F, ErrorHandler, int(C::*)(Args...)> :
-    public marshalled_func_imp<F, C, int(Args...), ErrorHandler> {};
+template <class F, class ErrorHandler, class RV, class C, class... Args>
+struct marshalled_func<F, ErrorHandler, RV(C::*)(Args...)> :
+    public marshalled_func_imp<F, C, RV(Args...), ErrorHandler> {};
 
 template <class F, class ErrorHandler, class Signature>
 struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
index 5e43547..c6d93c8 100644 (file)
 #include "lock.h"
 
 #include "jsl_log.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 #include "lang/verify.h"
 
+using std::stoi;
+
 const rpcc::TO rpcc::to_max = { 120000 };
 const rpcc::TO rpcc::to_min = { 1000 };
 
@@ -86,8 +88,8 @@ void set_rand_seed()
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
-rpcc::rpcc(sockaddr_in d, bool retrans) :
-    dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+rpcc::rpcc(const string & d, bool retrans) :
+    dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
 {
     if(retrans){
@@ -146,7 +148,7 @@ rpcc::bind(TO to)
 rpcc::cancel(void)
 {
     lock ml(m_);
-    tprintf("rpcc::cancel: force callers to fail");
+    LOG("rpcc::cancel: force callers to fail");
     for(auto &p : calls_){
         caller *ca = p.second;
 
@@ -163,11 +165,11 @@ rpcc::cancel(void)
         destroy_wait_ = true;
         destroy_wait_c_.wait(ml);
     }
-    tprintf("rpcc::cancel: done");
+    LOG("rpcc::cancel: done");
 }
 
 int
-rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
+rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
         TO to)
 {
 
@@ -189,7 +191,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
         xid_rep = xid_rep_window_.front();
     }
 
@@ -428,7 +430,7 @@ rpcs::got_pdu(connection *c, char *b, size_t sz)
 }
 
 void
-rpcs::reg1(unsigned int proc, handler *h)
+rpcs::reg1(proc_t proc, handler *h)
 {
     lock pl(procs_m_);
     VERIFY(procs_.count(proc) == 0);
@@ -437,18 +439,18 @@ rpcs::reg1(unsigned int proc, handler *h)
 }
 
 void
-rpcs::updatestat(unsigned int proc)
+rpcs::updatestat(proc_t proc)
 {
     lock cl(count_m_);
     counts_[proc]++;
     curr_counts_--;
     if(curr_counts_ == 0){
-        tprintf("RPC STATS: ");
+        LOG("RPC STATS: ");
         for (auto i = counts_.begin(); i != counts_.end(); i++)
-            tprintf("%x:%lu ", i->first, i->second);
+            LOG(std::hex << i->first << ":" << std::dec << i->second);
 
         lock rwl(reply_window_m_);
-        std::map<unsigned int,std::list<reply_t> >::iterator clt;
+        map<unsigned int,list<reply_t> >::iterator clt;
 
         size_t totalrep = 0, maxrep = 0;
         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
@@ -471,7 +473,7 @@ rpcs::dispatch(djob_t *j)
 
     request_header h;
     req.unpack_req_header(&h);
-    unsigned int proc = (unsigned int)h.proc;
+    proc_t proc = h.proc;
 
     if(!req.ok()){
         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
@@ -630,14 +632,14 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
 {
     lock rwl(reply_window_m_);
 
-    std::list<reply_t> &l = reply_window_[clt_nonce];
+    list<reply_t> &l = reply_window_[clt_nonce];
 
     VERIFY(l.size() > 0);
     VERIFY(xid >= xid_rep);
 
     int past_xid_rep = l.begin()->xid;
 
-    std::list<reply_t>::iterator start = l.begin(), it;
+    list<reply_t>::iterator start = l.begin(), it;
     it = ++start;
 
     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
@@ -685,8 +687,8 @@ rpcs::add_reply(unsigned int clt_nonce, int xid,
 {
     lock rwl(reply_window_m_);
     // remember the RPC reply value
-    std::list<reply_t> &l = reply_window_[clt_nonce];
-    std::list<reply_t>::iterator it = l.begin();
+    list<reply_t> &l = reply_window_[clt_nonce];
+    list<reply_t>::iterator it = l.begin();
     // skip to our place in the list
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
@@ -748,7 +750,7 @@ marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
 
 marshall &
-operator<<(marshall &m, const std::string &s) {
+operator<<(marshall &m, const string &s) {
     m << (unsigned int) s.size();
     m.rawbytes(s.data(), s.size());
     return m;
@@ -797,7 +799,7 @@ unmarshall::rawbyte()
 }
 
 void
-unmarshall::rawbytes(std::string &ss, size_t n)
+unmarshall::rawbytes(string &ss, size_t n)
 {
     VERIFY(ensure(n));
     ss.assign(buf_+index_, n);
@@ -825,7 +827,7 @@ unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); ret
 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, std::string &s) {
+unmarshall & operator>>(unmarshall &u, string &s) {
     unsigned sz = u.grab<unsigned>();
     if(u.ok())
         u.rawbytes(s, sz);
@@ -839,24 +841,23 @@ bool operator<(const sockaddr_in &a, const sockaddr_in &b){
 }
 
 /*---------------auxilary function--------------*/
-void
-make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
+sockaddr_in make_sockaddr(const string &hostandport) {
     auto colon = hostandport.find(':');
-    if (colon == std::string::npos)
-        make_sockaddr("127.0.0.1", hostandport, dst);
+    if (colon == string::npos)
+        return make_sockaddr("127.0.0.1", hostandport);
     else
-        make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
+        return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
 }
 
-void
-make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
-    bzero(dst, sizeof(*dst));
-    dst->sin_family = AF_INET;
+sockaddr_in make_sockaddr(const string &host, const string &port) {
+    sockaddr_in dst;
+    bzero(&dst, sizeof(dst));
+    dst.sin_family = AF_INET;
 
     struct in_addr a{inet_addr(host.c_str())};
 
     if(a.s_addr != INADDR_NONE)
-        dst->sin_addr.s_addr = a.s_addr;
+        dst.sin_addr.s_addr = a.s_addr;
     else {
         struct hostent *hp = gethostbyname(host.c_str());
 
@@ -865,7 +866,8 @@ make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_
             exit(1);
         }
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
-        dst->sin_addr.s_addr = a.s_addr;
+        dst.sin_addr.s_addr = a.s_addr;
     }
-    dst->sin_port = hton((uint16_t)std::stoi(port));
+    dst.sin_port = hton((uint16_t)stoi(port));
+    return dst;
 }
index c0420a5..d81a5dd 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
 #include "thr_pool.h"
 #include "marshall.h"
 #include "connection.h"
+#include "lock.h"
 
-#ifdef DMALLOC
-#include "dmalloc.h"
-#endif
+using std::string;
+using std::map;
+using std::list;
 
 class rpc_const {
     public:
@@ -43,8 +44,8 @@ class rpcc : public chanmgr {
             unmarshall *un;
             int intret;
             bool done;
-            std::mutex m;
-            std::condition_variable c;
+            mutex m;
+            cond c;
         };
 
         void get_refconn(connection **ch);
@@ -62,27 +63,27 @@ class rpcc : public chanmgr {
 
         connection *chan_;
 
-        std::mutex m_; // protect insert/delete to calls[]
-        std::mutex chan_m_;
+        mutex m_; // protect insert/delete to calls[]
+        mutex chan_m_;
 
         bool destroy_wait_;
-        std::condition_variable destroy_wait_c_;
+        cond destroy_wait_c_;
 
-        std::map<int, caller *> calls_;
-        std::list<int> xid_rep_window_;
+        map<int, caller *> calls_;
+        list<int> xid_rep_window_;
 
         struct request {
             request() { clear(); }
             void clear() { buf.clear(); xid = -1; }
             bool isvalid() { return xid != -1; }
-            std::string buf;
+            string buf;
             int xid;
         };
         struct request dup_req_;
         int xid_rep_done_;
     public:
 
-        rpcc(sockaddr_in d, bool retrans=true);
+        rpcc(const string & d, bool retrans=true);
         ~rpcc();
 
         struct TO {
@@ -99,27 +100,26 @@ class rpcc : public chanmgr {
         void set_reachable(bool r) { reachable_ = r; }
 
         void cancel();
-                
-                int islossy() { return lossytest_ > 0; }
 
-        int call1(unsigned int proc, 
+        int islossy() { return lossytest_ > 0; }
+
+        int call1(proc_t proc, 
                 marshall &req, unmarshall &rep, TO to);
 
         bool got_pdu(connection *c, char *b, size_t sz);
 
-
         template<class R>
-            int call_m(unsigned int proc, marshall &req, R & r, TO to);
+            int call_m(proc_t proc, marshall &req, R & r, TO to);
 
         template<class R, typename ...Args>
-            inline int call(unsigned int proc, R & r, const Args&... args);
+            inline int call(proc_t proc, R & r, const Args&... args);
 
         template<class R, typename ...Args>
-            inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
+            inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
 };
 
 template<class R> int 
-rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) 
+rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
 {
     unmarshall u;
     int intret = call1(proc, req, u, to);
@@ -136,13 +136,13 @@ rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to)
 }
 
 template<class R, typename... Args> inline int
-rpcc::call(unsigned int proc, R & r, const Args&... args)
+rpcc::call(proc_t proc, R & r, const Args&... args)
 {
     return call_timeout(proc, rpcc::to_max, r, args...);
 }
 
 template<class R, typename... Args> inline int
-rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
+rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
 {
     marshall m{args...};
     return call_m(proc, m, r, to);
@@ -191,7 +191,7 @@ class rpcs : public chanmgr {
     // provide at most once semantics by maintaining a window of replies
     // per client that that client hasn't acknowledged receiving yet.
         // indexed by client nonce.
-    std::map<unsigned int, std::list<reply_t> > reply_window_;
+    map<unsigned int, list<reply_t> > reply_window_;
 
     void free_reply_window(void);
     void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
@@ -200,26 +200,26 @@ class rpcs : public chanmgr {
             int xid, int rep_xid,
             char **b, size_t *sz);
 
-    void updatestat(unsigned int proc);
+    void updatestat(proc_t proc);
 
     // latest connection to the client
-    std::map<unsigned int, connection *> conns_;
+    map<unsigned int, connection *> conns_;
 
     // counting
     const size_t counting_;
     size_t curr_counts_;
-    std::map<unsigned int, size_t> counts_;
+    map<proc_t, size_t> counts_;
 
     int lossytest_; 
     bool reachable_;
 
     // map proc # to function
-    std::map<unsigned int, handler *> procs_;
+    map<proc_t, handler *> procs_;
 
-    std::mutex procs_m_; // protect insert/delete to procs[]
-    std::mutex count_m_;  //protect modification of counts
-    std::mutex reply_window_m_; // protect reply window et al
-    std::mutex conss_m_; // protect conns_
+    mutex procs_m_; // protect insert/delete to procs[]
+    mutex count_m_;  //protect modification of counts
+    mutex reply_window_m_; // protect reply window et al
+    mutex conss_m_; // protect conns_
 
 
     protected:
@@ -233,7 +233,7 @@ class rpcs : public chanmgr {
     void dispatch(djob_t *);
 
     // internal handler registration
-    void reg1(unsigned int proc, handler *);
+    void reg1(proc_t proc, handler *);
 
     ThrPool* dispatchpool_;
     tcpsconn* listener_;
@@ -249,7 +249,7 @@ class rpcs : public chanmgr {
 
     bool got_pdu(connection *c, char *b, size_t sz);
 
-    template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
+    template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
 };
 
 struct ReturnOnFailure {
@@ -258,12 +258,11 @@ struct ReturnOnFailure {
     }
 };
 
-template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
+template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
 }
 
-void make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst);
-void make_sockaddr(const std::string &host, const std::string &port, struct
-        sockaddr_in *dst);
+sockaddr_in make_sockaddr(const string &hostandport);
+sockaddr_in make_sockaddr(const string &host, const string &port);
 
 #endif
index dbb10c6..bf8a56c 100644 (file)
@@ -3,9 +3,10 @@
 
 #include "rpc.h"
 #include <arpa/inet.h>
-#include <stdio.h>
+#include <iostream>
+#include <vector>
+#include <thread>
 #include <stdlib.h>
-#include <string.h>
 #include <getopt.h>
 #include <sys/types.h>
 #include <unistd.h>
 
 #define NUM_CL 2
 
-char tprintf_thread_prefix = 'r';
+char log_thread_prefix = 'r';
+
+using std::string;
+using std::cout;
+using std::endl;
+using std::vector;
+using std::thread;
 
 rpcs *server;  // server rpc object
 rpcc *clients[NUM_CL];  // client rpc object
-struct sockaddr_in dst; //server's ip address
+string dst; //server's ip address
 int port;
 
 // server-side handlers. they must be methods of some class
@@ -26,10 +33,10 @@ int port;
 // from multiple classes.
 class srv {
        public:
-               int handle_22(std::string & r, const std::string a, const std::string b);
+               int handle_22(string & r, const string a, const string b);
                int handle_fast(int &r, const int a);
                int handle_slow(int &r, const int a);
-               int handle_bigrep(std::string &r, const size_t a);
+               int handle_bigrep(string &r, const size_t a);
 };
 
 // a handler. a and b are arguments, r is the result.
@@ -40,7 +47,7 @@ class srv {
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
 int
-srv::handle_22(std::string &r, const std::string a, std::string b)
+srv::handle_22(string &r, const string a, string b)
 {
        r = a + b;
        return 0;
@@ -62,9 +69,9 @@ srv::handle_slow(int &r, const int a)
 }
 
 int
-srv::handle_bigrep(std::string &r, const size_t len)
+srv::handle_bigrep(string &r, const size_t len)
 {
-       r = std::string((size_t)len, 'x');
+       r = string((size_t)len, 'x');
        return 0;
 }
 
@@ -88,7 +95,7 @@ testmarshall()
        VERIFY(m.size()==RPC_HEADER_SZ);
        int i = 12345;
        unsigned long long l = 1223344455L;
-       std::string s = std::string("hallo....");
+       string s = "hallo....";
        m << i;
        m << l;
        m << s;
@@ -104,7 +111,7 @@ testmarshall()
        VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
        int i1;
        unsigned long long l1;
-       std::string s1;
+       string s1;
        un >> i1;
        un >> l1;
        un >> s1;
@@ -120,12 +127,11 @@ client1(size_t cl)
 
        for(int i = 0; i < 100; i++){
                int arg = (random() % 2000);
-               std::string rep;
+               string rep;
                int ret = clients[which_cl]->call(25, rep, arg);
                VERIFY(ret == 0);
-               if ((int)rep.size()!=arg) {
-                       printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
-               }
+               if ((int)rep.size()!=arg)
+                       cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
                VERIFY((int)rep.size() == arg);
        }
 
@@ -142,7 +148,7 @@ client1(size_t cl)
                auto end = std::chrono::steady_clock::now();
                auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
                if (ret != 0)
-                       printf("%d ms have elapsed!!!\n", (int)diff);
+                       cout << diff << " ms have elapsed!!!" << endl;
                VERIFY(ret == 0);
                VERIFY(rep == (which ? arg+1 : arg+2));
        }
@@ -158,12 +164,10 @@ client2(size_t cl)
 
        while(time(0) - t1 < 10){
                int arg = (random() % 2000);
-               std::string rep;
+               string rep;
                int ret = clients[which_cl]->call(25, rep, arg);
-               if ((int)rep.size()!=arg) {
-                       printf("ask for %d reply got %d ret %d\n",
-                               arg, (int)rep.size(), ret);
-               }
+               if ((int)rep.size()!=arg)
+                       cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
                VERIFY((int)rep.size() == arg);
        }
 }
@@ -184,57 +188,53 @@ client3(void *xx)
 void
 simple_tests(rpcc *c)
 {
-       printf("simple_tests\n");
+       cout << "simple_tests" << endl;
        // an RPC call to procedure #22.
        // rpcc::call() looks at the argument types to decide how
        // to marshall the RPC call packet, and how to unmarshall
        // the reply packet.
-       std::string rep;
-       int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+       string rep;
+       int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
        VERIFY(intret == 0); // this is what handle_22 returns
        VERIFY(rep == "hello goodbye");
-       printf("   -- string concat RPC .. ok\n");
+       cout << "   -- string concat RPC .. ok" << endl;
 
        // small request, big reply (perhaps req via UDP, reply via TCP)
        intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
        VERIFY(intret == 0);
        VERIFY(rep.size() == 70000);
-       printf("   -- small request, big reply .. ok\n");
+       cout << "   -- small request, big reply .. ok" << endl;
 
        // specify a timeout value to an RPC that should succeed (udp)
        int xx = 0;
        intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
        VERIFY(intret == 0 && xx == 78);
-       printf("   -- no spurious timeout .. ok\n");
+       cout << "   -- no spurious timeout .. ok" << endl;
 
        // specify a timeout value to an RPC that should succeed (tcp)
        {
-               std::string arg(1000, 'x');
-               std::string rep2;
-               c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
+               string arg(1000, 'x');
+               string rep2;
+               c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
                VERIFY(rep2.size() == 1001);
-               printf("   -- no spurious timeout .. ok\n");
+               cout << "   -- no spurious timeout .. ok" << endl;
        }
 
        // huge RPC
-       std::string big(1000000, 'x');
-       intret = c->call(22, rep, big, (std::string)"z");
+       string big(1000000, 'x');
+       intret = c->call(22, rep, big, (string)"z");
        VERIFY(rep.size() == 1000001);
-       printf("   -- huge 1M rpc request .. ok\n");
+       cout << "   -- huge 1M rpc request .. ok" << endl;
 
        // specify a timeout value to an RPC that should timeout (udp)
-       struct sockaddr_in non_existent;
-       memset(&non_existent, 0, sizeof(non_existent));
-       non_existent.sin_family = AF_INET;
-       non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
-       non_existent.sin_port = htons(7661);
+    string non_existent = "127.0.0.1:7661";
        rpcc *c1 = new rpcc(non_existent);
        time_t t0 = time(0);
        intret = c1->bind(rpcc::to(3000));
        time_t t1 = time(0);
        VERIFY(intret < 0 && (t1 - t0) <= 4);
-       printf("   -- rpc timeout .. ok\n");
-       printf("simple_tests OK\n");
+       cout << "   -- rpc timeout .. ok" << endl;
+       cout << "simple_tests OK" << endl;
 }
 
 void 
@@ -243,23 +243,23 @@ concurrent_test(size_t nt)
        // create threads that make lots of calls in parallel,
        // to test thread synchronization for concurrent calls
        // and dispatches.
-       printf("start concurrent_test (%lu threads) ...", nt);
+       cout << "start concurrent_test (" << nt << " threads) ...";
 
-    std::vector<std::thread> th(nt);
+    vector<thread> th(nt);
 
        for(size_t i = 0; i < nt; i++)
-        th[i] = std::thread(client1, i);
+        th[i] = thread(client1, i);
 
        for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       printf(" OK\n");
+       cout << " OK" << endl;
 }
 
 void 
 lossy_test()
 {
-       printf("start lossy_test ...");
+       cout << "start lossy_test ...";
        VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
 
        if (server) {
@@ -275,15 +275,15 @@ lossy_test()
 
        size_t nt = 1;
 
-    std::vector<std::thread> th(nt);
+    vector<thread> th(nt);
 
        for(size_t i = 0; i < nt; i++)
-        th[i] = std::thread(client2, i);
+        th[i] = thread(client2, i);
 
        for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       printf(".. OK\n");
+       cout << ".. OK" << endl;
        VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
 }
 
@@ -293,22 +293,22 @@ failure_test()
        rpcc *client1;
        rpcc *client = clients[0];
 
-       printf("failure_test\n");
+       cout << "failure_test" << endl;
 
        delete server;
 
        client1 = new rpcc(dst);
        VERIFY (client1->bind(rpcc::to(3000)) < 0);
-       printf("   -- create new client and try to bind to failed server .. failed ok\n");
+       cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
        delete client1;
 
        startserver();
 
-       std::string rep;
-       int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+       string rep;
+       int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
        VERIFY(intret == rpc_const::oldsrv_failure);
-       printf("   -- call recovered server with old client .. failed ok\n");
+       cout << "   -- call recovered server with old client .. failed ok" << endl;
 
        delete client;
 
@@ -316,25 +316,25 @@ failure_test()
        VERIFY (client->bind() >= 0);
        VERIFY (client->bind() < 0);
 
-       intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+       intret = client->call(22, rep, (string)"hello", (string)" goodbye");
        VERIFY(intret == 0);
        VERIFY(rep == "hello goodbye");
 
-       printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
+       cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
 
 
        size_t nt = 10;
-       printf("   -- concurrent test on new rpc client w/ %lu threads ..", nt);
+       cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
 
-    std::vector<std::thread> th(nt);
+    vector<thread> th(nt);
 
        for(size_t i = 0; i < nt; i++)
-        th[i] = std::thread(client3, client);
+        th[i] = thread(client3, client);
 
        for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       printf("ok\n");
+       cout << "ok" << endl;
 
        delete server;
        delete client;
@@ -342,19 +342,19 @@ failure_test()
        startserver();
        clients[0] = client = new rpcc(dst);
        VERIFY (client->bind() >= 0);
-       printf("   -- delete existing rpc client and server, create replacements.. ok\n");
+       cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
 
-       printf("   -- concurrent test on new client and server w/ %lu threads ..", nt);
+       cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
 
        for(size_t i = 0; i < nt; i++)
-        th[i] = std::thread(client3, client);
+        th[i] = thread(client3, client);
 
        for(size_t i = 0; i < nt; i++)
         th[i].join();
 
-       printf("ok\n");
+       cout << "ok" << endl;
 
-       printf("failure_test OK\n");
+       cout << "failure_test OK" << endl;
 }
 
 int
@@ -406,16 +406,13 @@ main(int argc, char *argv[])
        testmarshall();
 
        if (isserver) {
-               printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
+               cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
                startserver();
        }
 
        if (isclient) {
                // server's address.
-               memset(&dst, 0, sizeof(dst));
-               dst.sin_family = AF_INET;
-               dst.sin_addr.s_addr = inet_addr("127.0.0.1");
-               dst.sin_port = htons(port);
+        dst = "127.0.0.1:" + std::to_string(port);
 
 
                // start the client.  bind it to the server.
@@ -435,7 +432,7 @@ main(int argc, char *argv[])
                        failure_test();
                }
 
-               printf("rpctest OK\n");
+               cout << "rpctest OK" << endl;
 
                exit(0);
        }
diff --git a/rsm.cc b/rsm.cc
index 8e597d6..1321f7e 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -86,7 +86,7 @@
 
 #include "handle.h"
 #include "rsm.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 #include "lang/verify.h"
 #include "rsm_client.h"
 #include "lock.h"
@@ -140,9 +140,9 @@ void rsm::recovery() [[noreturn]] {
         while (!cfg->ismember(cfg->myaddr(), vid_commit)) {
             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
             // cfg->view_id in this manner involves a race.  I suspect not.
-            if (join(primary)) {
+            if (join(primary, ml)) {
                 LOG("recovery: joined");
-                commit_change_wo(cfg->view_id());
+                commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
                 std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
@@ -152,9 +152,9 @@ void rsm::recovery() [[noreturn]] {
         vid_insync = vid_commit;
         LOG("recovery: sync vid_insync " << vid_insync);
         if (primary == cfg->myaddr()) {
-            r = sync_with_backups();
+            r = sync_with_backups(ml);
         } else {
-            r = sync_with_primary();
+            r = sync_with_primary(ml);
         }
         LOG("recovery: sync done");
 
@@ -173,40 +173,39 @@ void rsm::recovery() [[noreturn]] {
     }
 }
 
-bool rsm::sync_with_backups() {
-    adopt_lock ml(rsm_mutex);
-    ml.unlock();
+bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
+    rsm_mutex_lock.unlock();
     {
         // Make sure that the state of lock_server is stable during
         // synchronization; otherwise, the primary's state may be more recent
         // than replicas after the synchronization.
-        lock ml2(invoke_mutex);
+        lock invoke_mutex_lock(invoke_mutex);
         // By acquiring and releasing the invoke_mutex once, we make sure that
         // the state of lock_server will not be changed until all
         // replicas are synchronized. The reason is that client_invoke arrives
         // after this point of time will see inviewchange == true, and returns
         // BUSY.
     }
-    ml.lock();
+    rsm_mutex_lock.lock();
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
     cfg->get_view(vid_insync, backups);
     backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
     LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end()));
-    sync_cond.wait(ml);
+    sync_cond.wait(rsm_mutex_lock);
     insync = false;
     return true;
 }
 
 
-bool rsm::sync_with_primary() {
+bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
     // Remember the primary of vid_insync
     std::string m = primary;
     while (vid_insync == vid_commit) {
-        if (statetransfer(m))
+        if (statetransfer(m, rsm_mutex_lock))
             break;
     }
-    return statetransferdone(m);
+    return statetransferdone(m, rsm_mutex_lock);
 }
 
 
@@ -214,55 +213,50 @@ bool rsm::sync_with_primary() {
  * Call to transfer state from m to the local node.
  * Assumes that rsm_mutex is already held.
  */
-bool rsm::statetransfer(std::string m)
+bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
 {
     rsm_protocol::transferres r;
     handle h(m);
     int ret = 0;
-    tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n",
-            m.c_str(), last_myvs.vid, last_myvs.seqno);
+    LOG("rsm::statetransfer: contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
     rpcc *cl;
     {
-        adopt_lock ml(rsm_mutex);
-        ml.unlock();
+        rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl) {
             ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000),
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
-        ml.lock();
+        rsm_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
-        tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(),
-                (long unsigned) cl, ret);
+        LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
         return false;
     }
     if (stf && last_myvs != r.last) {
         stf->unmarshal_state(r.state);
     }
     last_myvs = r.last;
-    tprintf("rsm::statetransfer transfer from %s success, vs(%d,%d)\n",
-            m.c_str(), last_myvs.vid, last_myvs.seqno);
+    LOG("rsm::statetransfer transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
     return true;
 }
 
-bool rsm::statetransferdone(std::string m) {
-    adopt_lock ml(rsm_mutex);
-    ml.unlock();
+bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
+    rsm_mutex_lock.unlock();
     handle h(m);
     rpcc *cl = h.safebind();
     bool done = false;
     if (cl) {
         int r;
-        rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
+        auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
         done = (ret == rsm_protocol::OK);
     }
-    ml.lock();
+    rsm_mutex_lock.lock();
     return done;
 }
 
 
-bool rsm::join(std::string m) {
+bool rsm::join(std::string m, lock & rsm_mutex_lock) {
     handle h(m);
     int ret = 0;
     rsm_protocol::joinres r;
@@ -270,14 +264,13 @@ bool rsm::join(std::string m) {
     LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
     rpcc *cl;
     {
-        adopt_lock ml(rsm_mutex);
-        ml.unlock();
+        rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
             ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r,
                     cfg->myaddr(), last_myvs);
         }
-        ml.lock();
+        rsm_mutex_lock.lock();
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
@@ -295,16 +288,16 @@ bool rsm::join(std::string m) {
  */
 void rsm::commit_change(unsigned vid) {
     lock ml(rsm_mutex);
-    commit_change_wo(vid);
+    commit_change(vid, ml);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
         breakpoint2();
 }
 
-void rsm::commit_change_wo(unsigned vid) {
+void rsm::commit_change(unsigned vid, lock &) {
     if (vid <= vid_commit)
         return;
-    tprintf("commit_change: new view (%d)  last vs (%d,%d) %s insync %d\n",
-            vid, last_myvs.vid, last_myvs.seqno, primary.c_str(), insync);
+    LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
+            last_myvs.seqno << ") " << primary << " insync " << insync);
     vid_commit = vid;
     inviewchange = true;
     set_primary(vid);
@@ -316,13 +309,13 @@ void rsm::commit_change_wo(unsigned vid) {
 
 
 void rsm::execute(int procno, std::string req, std::string &r) {
-    tprintf("execute\n");
+    LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
     unmarshall args(req);
     marshall rep;
     std::string reps;
-    rsm_protocol::status ret = (*h)(args, rep);
+    auto ret = (rsm_protocol::status)(*h)(args, rep);
     marshall rep1;
     rep1 << ret;
     rep1 << rep.str();
@@ -367,14 +360,14 @@ rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::
             rpcc *cl = h.safebind();
             if (!cl)
                 return rsm_client_protocol::BUSY;
-            rsm_protocol::status ret;
             int ignored_rval;
-            ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req);
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
             breakpoint1();
-            partition1();
+            lock rsm_mutex_lock(rsm_mutex);
+            partition1(rsm_mutex_lock);
         }
     }
     execute(procno, req, r);
@@ -427,16 +420,14 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req)
 rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
-    int ret = rsm_protocol::OK;
-    tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
-            last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
-    if (!insync || vid != vid_insync) {
+    LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
+            last_myvs.vid << "," << last_myvs.seqno << ")");
+    if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
-    }
     if (stf && last != last_myvs)
         r.state = stf->marshal_state();
     r.last = last_myvs;
-    return ret;
+    return rsm_protocol::OK;
 }
 
 /**
@@ -457,16 +448,16 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
 // joinreq to the RSM's current primary; this is the
 // handler for that RPC.
 rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) {
-    int ret = rsm_protocol::OK;
+    auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
-    tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(),
-            last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
+    LOG("joinreq: src " << m << " last (" << last.vid << "," << last.seqno << ") mylast (" <<
+            last_myvs.vid << "," << last_myvs.seqno << ")");
     if (cfg->ismember(m, vid_commit)) {
-        tprintf("joinreq: is still a member\n");
+        LOG("joinreq: is still a member");
         r.log = cfg->dump();
     } else if (cfg->myaddr() != primary) {
-        tprintf("joinreq: busy\n");
+        LOG("joinreq: busy");
         ret = rsm_protocol::BUSY;
     } else {
         // We cache vid_commit to avoid adding m to a view which already contains
@@ -480,9 +471,9 @@ rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, views
         }
         if (cfg->ismember(m, cfg->view_id())) {
             r.log = cfg->dump();
-            tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str());
+            LOG("joinreq: ret " << ret << " log " << r.log);
         } else {
-            tprintf("joinreq: failed; proposer couldn't add %d\n", succ);
+            LOG("joinreq: failed; proposer couldn't add " << succ);
             ret = rsm_protocol::BUSY;
         }
     }
@@ -500,8 +491,7 @@ rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
     r = m;
-    tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(),
-            primary.c_str());
+    LOG("rsm::client_members return " << print_members(m) << " m " << primary);
     return rsm_client_protocol::OK;
 }
 
@@ -515,7 +505,7 @@ void rsm::set_primary(unsigned vid) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
-        tprintf("set_primary: primary stays %s\n", primary.c_str());
+        LOG("set_primary: primary stays " << primary);
         return;
     }
 
@@ -523,7 +513,7 @@ void rsm::set_primary(unsigned vid) {
     for (unsigned i = 0; i < p.size(); i++) {
         if (isamember(p[i], c)) {
             primary = p[i];
-            tprintf("set_primary: primary is %s\n", primary.c_str());
+            LOG("set_primary: primary is " << primary);
             return;
         }
     }
@@ -541,25 +531,25 @@ bool rsm::amiprimary() {
 // Simulate partitions
 
 // assumes caller holds rsm_mutex
-void rsm::net_repair_wo(bool heal) {
+void rsm::net_repair(bool heal, lock &) {
     std::vector<std::string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
             handle h(m[i]);
-            tprintf("rsm::net_repair_wo: %s %d\n", m[i].c_str(), heal);
+            LOG("rsm::net_repair: " << m[i] << " " << heal);
             if (h.safebind()) h.safebind()->set_reachable(heal);
         }
     }
     rsmrpc->set_reachable(heal);
 }
 
-rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
     lock ml(rsm_mutex);
-    tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
-            heal, dopartition, partitioned);
+    LOG("rsm::test_net_repairreq: " << heal << " (dopartition " <<
+            dopartition << ", partitioned " << partitioned << ")");
     if (heal) {
-        net_repair_wo(heal);
+        net_repair(heal, ml);
         partitioned = false;
     } else {
         dopartition = true;
@@ -573,30 +563,30 @@ rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) {
 
 void rsm::breakpoint1() {
     if (break1) {
-        tprintf("Dying at breakpoint 1 in rsm!\n");
+        LOG("Dying at breakpoint 1 in rsm!");
         exit(1);
     }
 }
 
 void rsm::breakpoint2() {
     if (break2) {
-        tprintf("Dying at breakpoint 2 in rsm!\n");
+        LOG("Dying at breakpoint 2 in rsm!");
         exit(1);
     }
 }
 
-void rsm::partition1() {
+void rsm::partition1(lock & rsm_mutex_lock) {
     if (dopartition) {
-        net_repair_wo(false);
+        net_repair(false, rsm_mutex_lock);
         dopartition = false;
         partitioned = true;
     }
 }
 
-rsm_test_protocol::status rsm::breakpointreq(int &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
-    tprintf("rsm::breakpointreq: %d\n", b);
+    LOG("rsm::breakpointreq: " << b);
     if (b == 1) break1 = true;
     else if (b == 2) break2 = true;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
diff --git a/rsm.h b/rsm.h
index 87734e1..73fa606 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -45,8 +45,8 @@ class rsm : public config_view_change {
         rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid);
         rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src,
                 viewstamp last);
-        rsm_test_protocol::status test_net_repairreq(int &r, int heal);
-        rsm_test_protocol::status breakpointreq(int &r, int b);
+        rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
+        rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
 
         std::mutex rsm_mutex;
         std::mutex invoke_mutex;
@@ -56,18 +56,18 @@ class rsm : public config_view_change {
         void execute(int procno, std::string req, std::string &r);
         rsm_client_protocol::status client_invoke(std::string &r, int procno,
                 std::string req);
-        bool statetransfer(std::string m);
-        bool statetransferdone(std::string m);
-        bool join(std::string m);
+        bool statetransfer(std::string m, lock & rsm_mutex_lock);
+        bool statetransferdone(std::string m, lock & rsm_mutex_lock);
+        bool join(std::string m, lock & rsm_mutex_lock);
         void set_primary(unsigned vid);
         std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid);
-        bool sync_with_backups();
-        bool sync_with_primary();
-        void net_repair_wo(bool heal);
+        bool sync_with_backups(lock & rsm_mutex_lock);
+        bool sync_with_primary(lock & rsm_mutex_lock);
+        void net_repair(bool heal, lock & rsm_mutex_lock);
         void breakpoint1();
         void breakpoint2();
-        void partition1();
-        void commit_change_wo(unsigned vid);
+        void partition1(lock & rsm_mutex_lock);
+        void commit_change(unsigned vid, lock & rsm_mutex_lock);
     public:
         rsm (std::string _first, std::string _me);
         ~rsm() {}
index b68ef0c..bff32c2 100644 (file)
@@ -6,31 +6,21 @@
 #include <unistd.h>
 #include "lang/verify.h"
 #include "lock.h"
-#include "tprintf.h"
+#include "threaded_log.h"
 
-rsm_client::rsm_client(std::string dst) {
+rsm_client::rsm_client(std::string dst) : primary(dst) {
     LOG("create rsm_client");
-    std::vector<std::string> mems;
-
-    sockaddr_in dstsock;
-    make_sockaddr(dst.c_str(), &dstsock);
-    primary = dst;
-
-    {
-        lock ml(rsm_client_mutex);
-        VERIFY (init_members());
-    }
+    lock ml(rsm_client_mutex);
+    VERIFY (init_members(ml));
     LOG("rsm_client: done");
 }
 
-// Assumes caller holds rsm_client_mutex
-void rsm_client::primary_failure() {
+void rsm_client::primary_failure(lock &) {
     primary = known_mems.back();
     known_mems.pop_back();
 }
 
 rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
-    int ret = 0;
     lock ml(rsm_client_mutex);
     while (1) {
         LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
@@ -38,8 +28,9 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con
 
         ml.unlock();
         rpcc *cl = h.safebind();
+        auto ret = rsm_client_protocol::OK;
         if (cl)
-            ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
+            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
         ml.lock();
 
         if (!cl)
@@ -47,7 +38,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con
 
         LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
         if (ret == rsm_client_protocol::OK)
-            break;
+            return rsm_protocol::OK;
         if (ret == rsm_client_protocol::BUSY) {
             LOG("rsm is busy " << primary);
             sleep(3);
@@ -55,29 +46,27 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con
         }
         if (ret == rsm_client_protocol::NOTPRIMARY) {
             LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
-            if (init_members())
+            if (init_members(ml))
                 continue;
         }
 prim_fail:
         LOG("primary " << primary << " failed ret " << std::dec << ret);
-        primary_failure();
+        primary_failure(ml);
         LOG("rsm_client::invoke: retry new primary " << primary);
     }
-    return ret;
 }
 
-bool rsm_client::init_members() {
+bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
     LOG("rsm_client::init_members get members!");
     handle h(primary);
     int ret = rsm_client_protocol::ERR;
     rpcc *cl;
     {
-        adopt_lock ml(rsm_client_mutex);
-        ml.unlock();
+        rsm_client_mutex_lock.unlock();
         cl = h.safebind();
         if (cl)
             ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0);
-        ml.lock();
+        rsm_client_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK)
         return false;
index 5d0cd71..814616f 100644 (file)
@@ -20,8 +20,8 @@ class rsm_client {
         std::string primary;
         std::vector<std::string> known_mems;
         std::mutex rsm_client_mutex;
-        void primary_failure();
-        bool init_members();
+        void primary_failure(lock & rsm_client_mutex_lock);
+        bool init_members(lock & rsm_client_mutex_lock);
     public:
         rsm_client(std::string dst);
         rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req);
@@ -34,32 +34,32 @@ class rsm_client {
 
 template<class R>
 int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
-       std::string rep;
-        std::string res;
-       int intret = invoke(proc, rep, req.cstr());
-        VERIFY( intret == rsm_client_protocol::OK );
-        unmarshall u(rep);
-       u >> intret;
-       if (intret < 0) return intret;
-        u >> res;
-        if (!u.okdone()) {
-                fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
-                       "You probably forgot to set the reply string in "
-                       "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
-                       "type\n", proc);
-                VERIFY(0);
-               return rpc_const::unmarshal_reply_failure;
-        }
-        unmarshall u1(res);
-        u1 >> r;
-       if(!u1.okdone()) {
-                fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
-                       "You are probably calling RPC 0x%x with wrong return "
-                       "type.\n", proc);
-                VERIFY(0);
-               return rpc_const::unmarshal_reply_failure;
-        }
-       return intret;
+    std::string rep;
+    std::string res;
+    int intret = invoke(proc, rep, req.cstr());
+    VERIFY( intret == rsm_client_protocol::OK );
+    unmarshall u(rep);
+    u >> intret;
+    if (intret < 0) return intret;
+    u >> res;
+    if (!u.okdone()) {
+        fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
+                "You probably forgot to set the reply string in "
+                "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
+                "type\n", proc);
+        VERIFY(0);
+        return rpc_const::unmarshal_reply_failure;
+    }
+    unmarshall u1(res);
+    u1 >> r;
+    if(!u1.okdone()) {
+        fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
+                "You are probably calling RPC 0x%x with wrong return "
+                "type.\n", proc);
+        VERIFY(0);
+        return rpc_const::unmarshal_reply_failure;
+    }
+    return intret;
 }
 
 template<class R, class ...Args>
index 68ff061..6b508d8 100644 (file)
 
 #include "rpc/rpc.h"
 
-
 class rsm_client_protocol {
- public:
-  enum xxstatus { OK, ERR, NOTPRIMARY, BUSY};
-  typedef int status;
-  enum rpc_numbers {
-    invoke = 0x9001,
-    members,
-  };
+    public:
+        enum status : status_t {OK, ERR, NOTPRIMARY, BUSY};
+        enum rpc_numbers : proc_t {
+            invoke = 0x9001,
+            members,
+        };
 };
 
-
 struct viewstamp {
-  viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) :
-      vid(_vid), seqno(_seqno) {}
-  unsigned int vid;
-  unsigned int seqno;
-  inline void operator++(int) { seqno++; }
+    viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : vid(_vid), seqno(_seqno) {}
+    unsigned int vid;
+    unsigned int seqno;
+    inline void operator++(int) { seqno++; }
 };
 
 class rsm_protocol {
- public:
-  enum xxstatus { OK, ERR, BUSY};
-  typedef int status;
-  enum rpc_numbers {
-    invoke = 0x10001,
-    transferreq,
-    transferdonereq,
-    joinreq,
-  };
-
-  struct transferres {
-    std::string state;
-    viewstamp last;
-  };
-  
-  struct joinres {
-    std::string log;
-  };
+    public:
+        enum status : status_t { OK, ERR, BUSY};
+        enum rpc_numbers : proc_t {
+            invoke = 0x10001,
+            transferreq,
+            transferdonereq,
+            joinreq,
+        };
+
+        struct transferres {
+            std::string state;
+            viewstamp last;
+        };
+
+        struct joinres {
+            std::string log;
+        };
 };
 
 inline bool operator==(viewstamp a, viewstamp b) {
-  return a.vid == b.vid && a.seqno == b.seqno;
+    return a.vid == b.vid && a.seqno == b.seqno;
 }
 
 inline bool operator>(viewstamp a, viewstamp b) {
-  return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
+    return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
 }
 
 inline bool operator!=(viewstamp a, viewstamp b) {
-  return a.vid != b.vid || a.seqno != b.seqno;
+    return a.vid != b.vid || a.seqno != b.seqno;
 }
 
-inline marshall& operator<<(marshall &m, viewstamp v)
-{
-  m << v.vid;
-  m << v.seqno;
-  return m;
+inline marshall& operator<<(marshall &m, viewstamp v) {
+    return m << v.vid << v.seqno;
 }
 
 inline unmarshall& operator>>(unmarshall &u, viewstamp &v) {
-  u >> v.vid;
-  u >> v.seqno;
-  return u;
+    return u >> v.vid >> v.seqno;
 }
 
-inline marshall &
-operator<<(marshall &m, rsm_protocol::transferres r)
-{
-  m << r.state;
-  m << r.last;
-  return m;
+inline marshall & operator<<(marshall &m, rsm_protocol::transferres r) {
+    return m << r.state << r.last;
 }
 
-inline unmarshall &
-operator>>(unmarshall &u, rsm_protocol::transferres &r)
-{
-  u >> r.state;
-  u >> r.last;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) {
+    return u >> r.state >> r.last;
 }
 
-inline marshall &
-operator<<(marshall &m, rsm_protocol::joinres r)
-{
-  m << r.log;
-  return m;
+inline marshall & operator<<(marshall &m, rsm_protocol::joinres r) {
+    return m << r.log;
 }
 
-inline unmarshall &
-operator>>(unmarshall &u, rsm_protocol::joinres &r)
-{
-  u >> r.log;
-  return u;
+inline unmarshall & operator>>(unmarshall &u, rsm_protocol::joinres &r) {
+    return u >> r.log;
 }
 
 class rsm_test_protocol {
- public:
-  enum xxstatus { OK, ERR};
-  typedef int status;
-  enum rpc_numbers {
-    net_repair = 0x12001,
-    breakpoint = 0x12002,
-  };
+    public:
+        enum status : status_t {OK, ERR};
+        enum rpc_numbers : proc_t {
+            net_repair = 0x12001,
+            breakpoint = 0x12002,
+        };
 };
 
 #endif 
index 3ff4733..31b8c1a 100644 (file)
@@ -11,7 +11,7 @@
 #include <stdio.h>
 #include <string>
 
-char tprintf_thread_prefix = 't';
+char log_thread_prefix = 't';
 
 int
 main(int argc, char *argv[])
index c61194e..0c56f8a 100644 (file)
@@ -8,30 +8,21 @@
 #include <iostream>
 #include <stdio.h>
 
-rsmtest_client::rsmtest_client(std::string dst)
-{
-    sockaddr_in dstsock;
-    make_sockaddr(dst.c_str(), &dstsock);
-    cl = new rpcc(dstsock);
-    if (cl->bind() < 0) {
+rsmtest_client::rsmtest_client(std::string dst) : cl(dst) {
+    if (cl.bind() < 0)
         printf("rsmtest_client: call bind\n");
-    }
 }
 
-int
-rsmtest_client::net_repair(int heal)
-{
-    int r;
-    int ret = cl->call(rsm_test_protocol::net_repair, r, heal);
+rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
+    rsm_test_protocol::status r;
+    auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::net_repair, r, heal);
     VERIFY (ret == rsm_test_protocol::OK);
     return r;
 }
 
-int
-rsmtest_client::breakpoint(int b)
-{
-    int r;
-    int ret = cl->call(rsm_test_protocol::breakpoint, r, b);
+rsm_test_protocol::status rsmtest_client::breakpoint(int b) {
+    rsm_test_protocol::status r;
+    auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::breakpoint, r, b);
     VERIFY (ret == rsm_test_protocol::OK);
     return r;
 }
index dc7388e..51f8511 100644 (file)
@@ -10,7 +10,7 @@
 // Client interface to the rsmtest server
 class rsmtest_client {
     protected:
-        rpcc *cl;
+        rpcc cl;
     public:
         rsmtest_client(std::string d);
         virtual ~rsmtest_client() {}
similarity index 88%
rename from tprintf.cc
rename to threaded_log.cc
index 93a6070..57ddc08 100644 (file)
@@ -1,6 +1,6 @@
 #include <sys/time.h>
 #include <stdint.h>
-#include "tprintf.h"
+#include "threaded_log.h"
 
 std::mutex cerr_mutex;
 std::map<std::thread::id, int> thread_name_map;
similarity index 88%
rename from tprintf.h
rename to threaded_log.h
index 41539fe..6918220 100644 (file)
--- a/tprintf.h
@@ -1,5 +1,5 @@
-#ifndef tprintf_h
-#define tprintf_h
+#ifndef threaded_log_h
+#define threaded_log_h
 
 #include <iomanip>
 #include <iostream>
@@ -12,7 +12,7 @@ extern std::map<std::thread::id, int> thread_name_map;
 extern int next_thread_num;
 extern std::map<void *, int> instance_name_map;
 extern int next_instance_num;
-extern char tprintf_thread_prefix;
+extern char log_thread_prefix;
 
 template <class A>
 struct iterator_pair : public std::pair<A, A> {
@@ -51,7 +51,7 @@ std::ostream & operator<<(std::ostream &o, const iterator_pair<A> &d) {
         _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
     auto _utime_ = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
     std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \
-    std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << _tid_; \
+    std::cerr << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << _tid_; \
     std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \
 }
 #define LOG_THIS_POINTER { \
@@ -98,13 +98,4 @@ std::ostream & operator<<(std::ostream &o, const iterator_pair<A> &d) {
     LOG_SUFFIX; \
 }
 
-#define tprintf(...) { \
-    char *buf = nullptr; \
-    int len = asprintf(&buf, __VA_ARGS__); \
-    if (buf[len-1]=='\n') \
-        buf[len-1] = '\0'; \
-    LOG_NONMEMBER(buf); \
-    free(buf); \
-}
-
 #endif