Major clean-ups. Migrating to C++11.
authorPeter Iannucci <iannucci@mit.edu>
Mon, 16 Sep 2013 01:01:51 +0000 (21:01 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Mon, 16 Sep 2013 01:01:51 +0000 (21:01 -0400)
55 files changed:
Makefile
config.cc
config.h
gettime.cc [deleted file]
gettime.h [deleted file]
handle.cc
handle.h
lang/algorithm.h [deleted file]
lock.h [new file with mode: 0644]
lock_client.cc
lock_client.h
lock_client_cache_rsm.cc
lock_client_cache_rsm.h
lock_demo.cc
lock_protocol.h
lock_server.cc
lock_server.h
lock_server_cache_rsm.cc
lock_server_cache_rsm.h
lock_smain.cc
lock_tester.cc
mutex.cc [deleted file]
mutex.h [deleted file]
paxos.cc
paxos.h
paxos_protocol.h
random.cc [deleted file]
random.h [deleted file]
rpc/connection.cc
rpc/connection.h
rpc/fifo.h
rpc/jsl_log.cc
rpc/jsl_log.h
rpc/marshall.h
rpc/method_thread.h [deleted file]
rpc/pollmgr.cc
rpc/pollmgr.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rpc/slock.h [deleted file]
rpc/thr_pool.cc
rpc/thr_pool.h
rsm.cc
rsm.h
rsm_client.cc
rsm_client.h
rsm_protocol.h
rsm_tester.cc
rsmtest_client.cc
rsmtest_client.h
srlock.cc [deleted file]
srlock.h [deleted file]
tprintf.cc
tprintf.h

index edf9fd0..4a239bc 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,14 +1,11 @@
-CXXFLAGS =  -g -MMD -Wall -I. -I./rpc
-LDFLAGS = -L. -L/usr/local/lib
-LDLIBS = -lpthread
-LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt)
-LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl)
+CXXFLAGS = -g -MMD -Werror -I. -std=c++11
+LDFLAGS = 
 CXX = g++
 CC = g++
 
 all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest
 
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o gettime.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o
        rm -f $@
        ar cq $@ $^
        ranlib rpc/librpc.a
@@ -18,10 +15,10 @@ rpc/rpctest: rpc/rpctest.o rpc/librpc.a
 lock_demo=lock_demo.o lock_client.o
 lock_demo : $(lock_demo) rpc/librpc.a
 
-lock_tester=lock_tester.o lock_client.o mutex.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
+lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
 lock_tester : $(lock_tester) rpc/librpc.a
 
-lock_server=lock_server.o lock_smain.o mutex.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server=lock_server.o lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
 lock_server : $(lock_server) rpc/librpc.a
 
 rsm_tester=rsm_tester.o rsmtest_client.o
index 0f9ab4c..5127cb2 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,3 +1,4 @@
+#include <thread>
 #include <sstream>
 #include <iostream>
 #include <stdio.h>
 // all views, the other nodes can bring this re-joined node up to
 // date.
 
-static void *
-heartbeatthread(void *x)
+config::config(
+        const std::string &_first,
+        const std::string &_me,
+        config_view_change *_vc)
+    : my_view_id(0), first(_first), me(_me), vc(_vc)
 {
-  config *r = (config *) x;
-  r->heartbeater();
-  return 0;
-}
-
-config::config(std::string _first, std::string _me, config_view_change *_vc) 
-  : myvid (0), first (_first), me (_me), vc (_vc)
-{
-  VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
-  VERIFY(pthread_cond_init(&config_cond, NULL) == 0);  
-
-  std::ostringstream ost;
-  ost << me;
-
-  acc = new acceptor(this, me == _first, me, ost.str());
-  pro = new proposer(this, acc, me);
+    paxos_acceptor = new acceptor(this, me == _first, me, me);
+    paxos_proposer = new proposer(this, paxos_acceptor, me);
 
-  // XXX hack; maybe should have its own port number
-  pxsrpc = acc->get_rpcs();
-  pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+    // XXX hack; maybe should have its own port number
+    paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
 
-  {
-      ScopedLock ml(&cfg_mutex);
-
-      reconstruct();
-
-      pthread_t th;
-      VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
-  }
+    {
+        lock ml(cfg_mutex);
+        reconstruct();
+        std::thread(&config::heartbeater, this).detach();
+    }
 }
 
 void
-config::restore(std::string s)
+config::restore(const std::string &s)
 {
-  ScopedLock ml(&cfg_mutex);
-  acc->restore(s);
-  reconstruct();
+    lock ml(cfg_mutex);
+    paxos_acceptor->restore(s);
+    reconstruct();
 }
 
-std::vector<std::string>
-config::get_view(unsigned instance)
+void
+config::get_view(unsigned instance, std::vector<std::string> &m)
 {
-  ScopedLock ml(&cfg_mutex);
-  return get_view_wo(instance);
+    lock ml(cfg_mutex);
+    get_view_wo(instance, m);
 }
 
 // caller should hold cfg_mutex
-std::vector<std::string>
-config::get_view_wo(unsigned instance)
+void
+config::get_view_wo(unsigned instance, std::vector<std::string> &m)
 {
-  std::string value = acc->value(instance);
-  tprintf("get_view(%d): returns %s\n", instance, value.c_str());
-  return members(value);
+    std::string value = paxos_acceptor->value(instance);
+    tprintf("get_view(%d): returns %s\n", instance, value.c_str());
+    members(value, m);
 }
 
-std::vector<std::string>
-config::members(std::string value)
+void
+config::members(const std::string &value, std::vector<std::string> &view) const
 {
-  std::istringstream ist(value);
-  std::string m;
-  std::vector<std::string> view;
-  while (ist >> m) {
-    view.push_back(m);
-  }
-  return view;
+    std::istringstream ist(value);
+    std::string m;
+    view.clear();
+    while (ist >> m) {
+        view.push_back(m);
+    }
 }
 
 std::string
-config::value(std::vector<std::string> m)
+config::value(const std::vector<std::string> &m) const
 {
-  std::ostringstream ost;
-  for (unsigned i = 0; i < m.size(); i++)  {
-    ost << m[i];
-    ost << " ";
-  }
-  return ost.str();
+    std::ostringstream ost;
+    for (unsigned i = 0; i < m.size(); i++)  {
+        ost << m[i];
+        ost << " ";
+    }
+    return ost.str();
 }
 
 // caller should hold cfg_mutex
 void
 config::reconstruct()
 {
-  if (acc->instance() > 0) {
-    std::string m;
-    myvid = acc->instance();
-    mems = get_view_wo(myvid);
-    tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
-  }
+    if (paxos_acceptor->instance() > 0) {
+        std::string m;
+        my_view_id = paxos_acceptor->instance();
+        get_view_wo(my_view_id, mems);
+        tprintf("config::reconstruct: %d %s\n",
+                my_view_id, print_members(mems).c_str());
+    }
 }
 
 // Called by Paxos's acceptor.
 void
-config::paxos_commit(unsigned instance, std::string value)
+config::paxos_commit(unsigned instance, const std::string &value)
 {
-  std::string m;
-  std::vector<std::string> newmem;
-  ScopedLock ml(&cfg_mutex);
-
-  newmem = members(value);
-  tprintf("config::paxos_commit: %d: %s\n", instance, 
-        print_members(newmem).c_str());
-
-  for (unsigned i = 0; i < mems.size(); i++) {
-    tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
-    if (!isamember(mems[i], newmem) && me != mems[i]) {
-      tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
-      mgr.delete_handle(mems[i]);
+    std::string m;
+    std::vector<std::string> newmem;
+    lock ml(cfg_mutex);
+
+    members(value, newmem);
+    tprintf("config::paxos_commit: %d: %s\n", instance,
+                 print_members(newmem).c_str());
+
+    for (unsigned i = 0; i < mems.size(); i++) {
+        tprintf("config::paxos_commit: is %s still a member?\n",
+                mems[i].c_str());
+        if (!isamember(mems[i], newmem) && me != mems[i]) {
+            tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
+            mgr.delete_handle(mems[i]);
+        }
     }
-  }
 
-  mems = newmem;
-  myvid = instance;
-  if (vc) {
-    unsigned vid = myvid;
-    VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-    vc->commit_change(vid);
-    VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  }
+    mems = newmem;
+    my_view_id = instance;
+    if (vc) {
+        ml.unlock();
+        vc->commit_change(instance);
+        ml.lock();
+    }
 }
 
 bool
-config::ismember(std::string m, unsigned vid)
+config::ismember(const std::string &m, unsigned vid)
 {
-  bool r;
-  ScopedLock ml(&cfg_mutex);
-  std::vector<std::string> v = get_view_wo(vid);
-  r = isamember(m, v);
-  return r;
+    lock ml(cfg_mutex);
+    std::vector<std::string> v;
+    get_view_wo(vid, v);
+    return isamember(m, v);
 }
 
 bool
-config::add(std::string new_m, unsigned vid)
+config::add(const std::string &new_m, unsigned vid)
 {
-  std::vector<std::string> m;
-  std::vector<std::string> curm;
-  ScopedLock ml(&cfg_mutex);
-  if (vid != myvid)
-    return false;
-  tprintf("config::add %s\n", new_m.c_str());
-  m = mems;
-  m.push_back(new_m);
-  curm = mems;
-  std::string v = value(m);
-  int nextvid = myvid + 1;
-  VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-  bool r = pro->run(nextvid, curm, v);
-  VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  if (r) {
-    tprintf("config::add: proposer returned success\n");
-  } else {
-    tprintf("config::add: proposer returned failure\n");
-  }
-  return r;
+    std::vector<std::string> m;
+    std::vector<std::string> curm;
+    lock ml(cfg_mutex);
+    if (vid != my_view_id)
+        return false;
+    tprintf("config::add %s\n", new_m.c_str());
+    m = mems;
+    m.push_back(new_m);
+    curm = mems;
+    std::string v = value(m);
+    int nextvid = my_view_id + 1;
+    bool r;
+    {
+        ml.unlock();
+        r = paxos_proposer->run(nextvid, curm, v);
+        ml.lock();
+    }
+    tprintf("config::add: proposer returned %s\n",
+            r ? "success" : "failure");
+    return r;
 }
 
 // caller should hold cfg_mutex
 bool
-config::remove_wo(std::string m)
+config::remove(const std::string &m)
 {
-  tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
-  std::vector<std::string> n;
-  for (unsigned i = 0; i < mems.size(); i++) {
-    if (mems[i] != m) n.push_back(mems[i]);
-  }
-  std::string v = value(n);
-  std::vector<std::string> cmems = mems;
-  int nextvid = myvid + 1;
-  VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-  bool r = pro->run(nextvid, cmems, v);
-  VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  if (r) {
-    tprintf("config::remove: proposer returned success\n");
-  } else {
-    tprintf("config::remove: proposer returned failure\n");
-  }
-  return r;
+    adopt_lock ml(cfg_mutex);
+    tprintf("config::remove: my_view_id %d remove? %s\n",
+            my_view_id, m.c_str());
+    std::vector<std::string> n;
+    for (unsigned i = 0; i < mems.size(); i++) {
+        if (mems[i] != m)
+            n.push_back(mems[i]);
+    }
+    std::string v = value(n);
+    std::vector<std::string> cmems = mems;
+    int nextvid = my_view_id + 1;
+    bool r;
+    {
+        ml.unlock();
+        r = paxos_proposer->run(nextvid, cmems, v);
+        ml.lock();
+    }
+    tprintf("config::remove: proposer returned %s\n",
+            r ? "success" : "failure");
+    return r;
 }
 
 void
 config::heartbeater()
 {
-  struct timeval now;
-  struct timespec next_timeout;
-  std::string m;
-  heartbeat_t h;
-  bool stable;
-  unsigned vid;
-  std::vector<std::string> cmems;
-  ScopedLock ml(&cfg_mutex);
-  
-  while (1) {
-
-    gettimeofday(&now, NULL);
-    next_timeout.tv_sec = now.tv_sec + 3;
-    next_timeout.tv_nsec = 0;
-    tprintf("heartbeater: go to sleep\n");
-    pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
-
-    stable = true;
-    vid = myvid;
-    cmems = get_view_wo(vid);
-    tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
-
-    if (!isamember(me, cmems)) {
-      tprintf("heartbeater: not member yet; skip hearbeat\n");
-      continue;
-    }
-
-    //find the node with the smallest id
-    m = me;
-    for (unsigned i = 0; i < cmems.size(); i++) {
-      if (m > cmems[i])
-       m = cmems[i];
-    }
-
-    if (m == me) {
-      //if i am the one with smallest id, ping the rest of the nodes
-      for (unsigned i = 0; i < cmems.size(); i++) {
-       if (cmems[i] != me) {
-         if ((h = doheartbeat(cmems[i])) != OK) {
-           stable = false;
-           m = cmems[i];
-           break;
-         }
-       }
-      }
-    } else {
-      //the rest of the nodes ping the one with smallest id
-       if ((h = doheartbeat(m)) != OK) 
-           stable = false;
-    }
-
-    if (!stable && vid == myvid) {
-      remove_wo(m);
+    std::string m;
+    heartbeat_t h;
+    bool stable;
+    unsigned vid;
+    std::vector<std::string> cmems;
+    lock ml(cfg_mutex);
+
+    while (1) {
+        auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
+        tprintf("heartbeater: go to sleep\n");
+        config_cond.wait_until(ml, next_timeout);
+
+        stable = true;
+        vid = my_view_id;
+        get_view_wo(vid, cmems);
+        tprintf("heartbeater: current membership %s\n",
+                print_members(cmems).c_str());
+
+        if (!isamember(me, cmems)) {
+            tprintf("heartbeater: not member yet; skip hearbeat\n");
+            continue;
+        }
+
+        // who has the smallest ID?
+        m = me;
+        for (unsigned i = 0; i < cmems.size(); i++) {
+            if (m > cmems[i])
+                m = cmems[i];
+        }
+
+        if (m == me) {
+            // ping the other nodes
+            for (unsigned i = 0; i < cmems.size(); i++) {
+                if (cmems[i] != me) {
+                    if ((h = doheartbeat(cmems[i])) != OK) {
+                        stable = false;
+                        m = cmems[i];
+                        break;
+                    }
+                }
+            }
+        } else {
+            // ping the node with the smallest ID
+            if ((h = doheartbeat(m)) != OK)
+                stable = false;
+        }
+
+        if (!stable && vid == my_view_id) {
+            remove(m);
+        }
     }
-  }
 }
 
 paxos_protocol::status
 config::heartbeat(std::string m, unsigned vid, int &r)
 {
-  ScopedLock ml(&cfg_mutex);
-  int ret = paxos_protocol::ERR;
-  r = (int) myvid;
-  tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
-  if (vid == myvid) {
-    ret = paxos_protocol::OK;
-  } else if (pro->isrunning()) {
-    VERIFY (vid == myvid + 1 || vid + 1 == myvid);
-    ret = paxos_protocol::OK;
-  } else {
-    ret = paxos_protocol::ERR;
-  }
-  return ret;
+    lock ml(cfg_mutex);
+    int ret = paxos_protocol::ERR;
+    r = (int) my_view_id;
+    tprintf("heartbeat from %s(%d) my_view_id %d\n",
+            m.c_str(), vid, my_view_id);
+    if (vid == my_view_id) {
+        ret = paxos_protocol::OK;
+    } else if (paxos_proposer->isrunning()) {
+        VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
+        ret = paxos_protocol::OK;
+    } else {
+        ret = paxos_protocol::ERR;
+    }
+    return ret;
 }
 
 config::heartbeat_t
-config::doheartbeat(std::string m)
+config::doheartbeat(const std::string &m)
 {
-  int ret = rpc_const::timeout_failure;
-  int r;
-  unsigned vid = myvid;
-  heartbeat_t res = OK;
-
-  tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
-  handle h(m);
-  VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-  rpcc *cl = h.safebind();
-  if (cl) {
-    ret = cl->call(paxos_protocol::heartbeat, me, vid, r, 
-                  rpcc::to(1000));
-  } 
-  VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  if (ret != paxos_protocol::OK) {
-    if (ret == rpc_const::atmostonce_failure || 
-       ret == rpc_const::oldsrv_failure) {
-      mgr.delete_handle(m);
-    } else {
-      tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", 
-            m.c_str(), ret, vid, r);
-      if (ret < 0) res = FAILURE;
-      else res = VIEWERR;
+    adopt_lock ml(cfg_mutex);
+    int ret = rpc_const::timeout_failure;
+    int r;
+    unsigned vid = my_view_id;
+    heartbeat_t res = OK;
+
+    tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+    handle h(m);
+    {
+        ml.unlock();
+        rpcc *cl = h.safebind();
+        if (cl) {
+            ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
+                                         rpcc::to(1000));
+        }
+        ml.lock();
+    }
+    if (ret != paxos_protocol::OK) {
+        if (ret == rpc_const::atmostonce_failure ||
+            ret == rpc_const::oldsrv_failure) {
+            mgr.delete_handle(m);
+        } else {
+            tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
+                         m.c_str(), ret, vid, r);
+            if (ret < 0) res = FAILURE;
+            else res = VIEWERR;
+        }
     }
-  }
-  tprintf("doheartbeat done %d\n", res);
-  return res;
+    tprintf("doheartbeat done %d\n", res);
+    return res;
 }
 
index 9d7b023..cd276fb 100644 (file)
--- a/config.h
+++ b/config.h
@@ -4,51 +4,55 @@
 #include <string>
 #include <vector>
 #include "paxos.h"
+#include "lock.h"
 
 class config_view_change {
- public:
-  virtual void commit_change(unsigned vid) = 0;
-  virtual ~config_view_change() {};
+    public:
+        virtual void commit_change(unsigned view_id) = 0;
+        virtual ~config_view_change() {};
 };
 
 class config : public paxos_change {
- private:
-  acceptor *acc;
-  proposer *pro;
-  rpcs *pxsrpc;
-  unsigned myvid;
-  std::string first;
-  std::string me;
-  config_view_change *vc;
-  std::vector<std::string> mems;
-  pthread_mutex_t cfg_mutex;
-  pthread_cond_t heartbeat_cond;
-  pthread_cond_t config_cond;
-  paxos_protocol::status heartbeat(std::string m, unsigned instance, int &r);
-  std::string value(std::vector<std::string> mems);
-  std::vector<std::string> members(std::string v);
-  std::vector<std::string> get_view_wo(unsigned instance);
-  bool remove_wo(std::string);
-  void reconstruct();
-  typedef enum {
-    OK,        // response and same view #
-    VIEWERR,   // response but different view #
-    FAILURE,   // no response
-  } heartbeat_t;
-  heartbeat_t doheartbeat(std::string m);
- public:
-  config(std::string _first, std::string _me, config_view_change *_vc);
-  unsigned vid() { return myvid; }
-  std::string myaddr() { return me; };
-  std::string dump() { return acc->dump(); };
-  std::vector<std::string> get_view(unsigned instance);
-  void restore(std::string s);
-  bool add(std::string, unsigned vid);
-  bool ismember(std::string m, unsigned vid);
-  void heartbeater(void);
-  void paxos_commit(unsigned instance, std::string v);
-  rpcs *get_rpcs() { return acc->get_rpcs(); }
-  void breakpoint(int b) { pro->breakpoint(b); }
+    private:
+        acceptor *paxos_acceptor;
+        proposer *paxos_proposer;
+        unsigned my_view_id;
+        std::string first;
+        std::string me;
+        config_view_change *vc;
+        std::vector<std::string> mems;
+        mutex cfg_mutex;
+        std::condition_variable config_cond;
+        paxos_protocol::status heartbeat(
+                std::string m,
+                unsigned instance,
+                int &r);
+        std::string value(const std::vector<std::string> &mems) const;
+        void members(const std::string &v, std::vector<std::string> &m) const;
+        void get_view_wo(unsigned instance, std::vector<std::string> &m);
+        bool remove(const std::string &);
+        void reconstruct();
+        typedef enum {
+            OK,        // response and same view #
+            VIEWERR,   // response but different view #
+            FAILURE,   // no response
+        } heartbeat_t;
+        heartbeat_t doheartbeat(const std::string &m);
+    public:
+        config(const std::string &_first,
+               const std::string &_me,
+               config_view_change *_vc);
+        unsigned view_id() { return my_view_id; }
+        const std::string &myaddr() const { return me; };
+        std::string dump() { return paxos_acceptor->dump(); };
+        void get_view(unsigned instance, std::vector<std::string> &m);
+        void restore(const std::string &s);
+        bool add(const std::string &, unsigned view_id);
+        bool ismember(const std::string &m, unsigned view_id);
+        void heartbeater(void);
+        void paxos_commit(unsigned instance, const std::string &v);
+        rpcs *get_rpcs() { return paxos_acceptor->get_rpcs(); }
+        void breakpoint(int b) { paxos_proposer->breakpoint(b); }
 };
 
 #endif
diff --git a/gettime.cc b/gettime.cc
deleted file mode 100644 (file)
index 926c510..0000000
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (c), MM Weiss
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without modification, 
- * are permitted provided that the following conditions are met:
- * 
- *     1. Redistributions of source code must retain the above copyright notice, 
- *     this list of conditions and the following disclaimer.
- *     
- *     2. Redistributions in binary form must reproduce the above copyright notice, 
- *     this list of conditions and the following disclaimer in the documentation 
- *     and/or other materials provided with the distribution.
- *     
- *     3. Neither the name of the MM Weiss nor the names of its contributors 
- *     may be used to endorse or promote products derived from this software without 
- *     specific prior written permission.
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
- * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 
- * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 
- * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 
- * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
- * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-/*
- *  clock_gettime_stub.c
- *  gcc -Wall -c clock_gettime_stub.c
- *  posix realtime functions; MacOS user space glue
- */
-/*  @comment
- *  other possible implementation using intel builtin rdtsc
- *  rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html
- *  
- *  we could get the ticks by doing this
- * 
- *  __asm __volatile("mov %%ebx, %%esi\n\t"
- *             "cpuid\n\t"
- *             "xchg %%esi, %%ebx\n\t"
- *             "rdtsc"
- *             : "=a" (a),
- *               "=d" (d)
- *     );
- *  we could even replace our tricky sched_yield call by assembly code to get a better accurency,
- *  anyway the following C stub will satisfy 99% of apps using posix clock_gettime call, 
- *  moreover, the setter version (clock_settime) could be easly written using mach primitives:
- *  http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time)
- *  
- *  hackers don't be crackers, don't you use a flush toilet?
- * 
- *
- *  @see draft: ./posix-realtime-stub/posix-realtime-stub.c
- *
- */
-
-#ifdef __APPLE__
-
-#pragma weak clock_gettime
-
-#include <sys/time.h>
-#include <sys/resource.h>
-#include <mach/mach.h>
-#include <mach/clock.h>
-#include <mach/mach_time.h>
-#include <errno.h>
-#include <unistd.h>
-#include <sched.h>
-
-typedef enum {
-       CLOCK_REALTIME,
-       CLOCK_MONOTONIC,
-       CLOCK_PROCESS_CPUTIME_ID,
-       CLOCK_THREAD_CPUTIME_ID
-} clockid_t;
-
-static mach_timebase_info_data_t __clock_gettime_inf;
-
-int clock_gettime(clockid_t clk_id, struct timespec *tp) {
-       kern_return_t   ret;
-       clock_serv_t    clk;
-       clock_id_t clk_serv_id;
-       mach_timespec_t tm;
-       
-       uint64_t start, end, delta, nano;
-       
-       int retval = -1;
-       switch (clk_id) {
-               case CLOCK_REALTIME:
-               case CLOCK_MONOTONIC:
-                       clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK;
-                       if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) {
-                               if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) {
-                                       tp->tv_sec  = tm.tv_sec;
-                                       tp->tv_nsec = tm.tv_nsec;
-                                       retval = 0;
-                               }
-                       }
-                       if (KERN_SUCCESS != ret) {
-                               errno = EINVAL;
-                               retval = -1;
-                       }
-               break;
-               case CLOCK_PROCESS_CPUTIME_ID:
-               case CLOCK_THREAD_CPUTIME_ID:
-                       start = mach_absolute_time();
-                       if (clk_id == CLOCK_PROCESS_CPUTIME_ID) {
-                               getpid();
-                       } else {
-                               sched_yield();
-                       }
-                       end = mach_absolute_time();
-                       delta = end - start;    
-                       if (0 == __clock_gettime_inf.denom) {
-                               mach_timebase_info(&__clock_gettime_inf);
-                       }
-                       nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom;
-                       tp->tv_sec = nano * 1e-9;  
-                       tp->tv_nsec = nano - (tp->tv_sec * 1e9);
-                       retval = 0;
-               break;
-               default:
-                       errno = EINVAL;
-                       retval = -1;
-       }
-       return retval;
-}
-
-#endif // __APPLE__
diff --git a/gettime.h b/gettime.h
deleted file mode 100644 (file)
index f10def4..0000000
--- a/gettime.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#ifndef gettime_h
-#define gettime_h
-
-#ifdef __APPLE__
-typedef enum {
-       CLOCK_REALTIME,
-       CLOCK_MONOTONIC,
-       CLOCK_PROCESS_CPUTIME_ID,
-       CLOCK_THREAD_CPUTIME_ID
-} clockid_t;
-
-int clock_gettime(clockid_t clk_id, struct timespec *tp);
-#endif
-
-#endif
index a233d5b..e998b3c 100644 (file)
--- a/handle.cc
+++ b/handle.cc
 #include "handle.h"
 #include <stdio.h>
 #include "tprintf.h"
+#include "lock.h"
 
 handle_mgr mgr;
 
 handle::handle(std::string m) 
 {
-  h = mgr.get_handle(m);
+    h = mgr.get_handle(m);
 }
 
 rpcc *
 handle::safebind()
 {
-  if (!h)
-    return NULL;
-  ScopedLock ml(&h->cl_mutex);
-  if (h->del)
-    return NULL;
-  if (h->cl)
+    if (!h)
+        return NULL;
+    lock ml(h->cl_mutex);
+    if (h->del)
+        return NULL;
+    if (h->cl)
+        return h->cl;
+    sockaddr_in dstsock;
+    make_sockaddr(h->m.c_str(), &dstsock);
+    rpcc *cl = new rpcc(dstsock);
+    tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
+    int ret;
+    // Starting with lab 6, our test script assumes that the failure
+    // can be detected by paxos and rsm layer within few seconds. We have
+    // to set the timeout with a small value to support the assumption.
+    // 
+    // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
+    // lab 6 and lab 7 because the rpc layer may delay your RPC request, 
+    // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
+    ret = cl->bind(rpcc::to(1000));
+    if (ret < 0) {
+        tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
+        delete cl;
+        h->del = true;
+    } else {
+        tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
+        h->cl = cl;
+    }
     return h->cl;
-  sockaddr_in dstsock;
-  make_sockaddr(h->m.c_str(), &dstsock);
-  rpcc *cl = new rpcc(dstsock);
-  tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
-  int ret;
-  // Starting with lab 6, our test script assumes that the failure
-  // can be detected by paxos and rsm layer within few seconds. We have
-  // to set the timeout with a small value to support the assumption.
-  // 
-  // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
-  // lab 6 and lab 7 because the rpc layer may delay your RPC request, 
-  // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
-  ret = cl->bind(rpcc::to(1000));
-  if (ret < 0) {
-    tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
-    delete cl;
-    h->del = true;
-  } else {
-    tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
-    h->cl = cl;
-  }
-  return h->cl;
 }
 
 handle::~handle() 
 {
-  if (h) mgr.done_handle(h);
+    if (h) mgr.done_handle(h);
 }
 
 handle_mgr::handle_mgr()
 {
-  VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0);
 }
 
 struct hinfo *
 handle_mgr::get_handle(std::string m)
 {
-  ScopedLock ml(&handle_mutex);
-  struct hinfo *h = 0;
-  if (hmap.find(m) == hmap.end()) {
-    h = new hinfo;
-    h->cl = NULL;
-    h->del = false;
-    h->refcnt = 1;
-    h->m = m;
-    pthread_mutex_init(&h->cl_mutex, NULL);
-    hmap[m] = h;
-  } else if (!hmap[m]->del) {
-    h = hmap[m];
-    h->refcnt ++;
-  }
-  return h;
+    lock ml(handle_mutex);
+    struct hinfo *h = 0;
+    if (hmap.find(m) == hmap.end()) {
+        h = new hinfo;
+        h->cl = NULL;
+        h->del = false;
+        h->refcnt = 1;
+        h->m = m;
+        hmap[m] = h;
+    } else if (!hmap[m]->del) {
+        h = hmap[m];
+        h->refcnt ++;
+    }
+    return h;
 }
 
 void 
 handle_mgr::done_handle(struct hinfo *h)
 {
-  ScopedLock ml(&handle_mutex);
-  h->refcnt--;
-  if (h->refcnt == 0 && h->del)
-    delete_handle_wo(h->m);
+    lock ml(handle_mutex);
+    h->refcnt--;
+    if (h->refcnt == 0 && h->del)
+        delete_handle_wo(h->m);
 }
 
 void
 handle_mgr::delete_handle(std::string m)
 {
-  ScopedLock ml(&handle_mutex);
-  delete_handle_wo(m);
+    lock ml(handle_mutex);
+    delete_handle_wo(m);
 }
 
 // Must be called with handle_mutex locked.
 void
 handle_mgr::delete_handle_wo(std::string m)
 {
-  if (hmap.find(m) == hmap.end()) {
-    tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
-  } else {
-    tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
-          hmap[m]->refcnt);
-    struct hinfo *h = hmap[m];
-    if (h->refcnt == 0) {
-      if (h->cl) {
-        h->cl->cancel();
-        delete h->cl;
-      }
-      pthread_mutex_destroy(&h->cl_mutex);
-      hmap.erase(m);
-      delete h;
+    if (hmap.find(m) == hmap.end()) {
+        tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
     } else {
-      h->del = true;
+        tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
+                hmap[m]->refcnt);
+        struct hinfo *h = hmap[m];
+        if (h->refcnt == 0) {
+            if (h->cl) {
+                h->cl->cancel();
+                delete h->cl;
+            }
+            hmap.erase(m);
+            delete h;
+        } else {
+            h->del = true;
+        }
     }
-  }
 }
index ecd8884..6b042fb 100644 (file)
--- a/handle.h
+++ b/handle.h
 
 #include <string>
 #include <vector>
-#include "rpc.h"
+#include "rpc/rpc.h"
 
 struct hinfo {
   rpcc *cl;
   int refcnt;
   bool del;
   std::string m;
-  pthread_mutex_t cl_mutex;
+  std::mutex cl_mutex;
 };
 
 class handle {
- private:
-  struct hinfo *h;
- public:
-  handle(std::string m);
-  ~handle();
-  /* safebind will try to bind with the rpc server on the first call.
-   * Since bind may block, the caller probably should not hold a mutex
-   * when calling safebind.
-   *
-   * return: 
-   *   if the first safebind succeeded, all later calls would return
-   *   a rpcc object; otherwise, all later calls would return NULL.
-   *
-   * Example:
-   *   handle h(dst);
-   *   XXX_protocol::status ret;
-   *   if (h.safebind()) {
-   *     ret = h.safebind()->call(...);
-   *   }
-   *   if (!h.safebind() || ret != XXX_protocol::OK) {
-   *     // handle failure
-   *   }
-   */
-  rpcc *safebind();
+    private:
+        struct hinfo *h;
+    public:
+        handle(std::string m);
+        ~handle();
+        /* safebind will try to bind with the rpc server on the first call.
+         * Since bind may block, the caller probably should not hold a mutex
+         * when calling safebind.
+         *
+         * return: 
+         *   if the first safebind succeeded, all later calls would return
+         *   a rpcc object; otherwise, all later calls would return NULL.
+         *
+         * Example:
+         *   handle h(dst);
+         *   XXX_protocol::status ret;
+         *   if (h.safebind()) {
+         *     ret = h.safebind()->call(...);
+         *   }
+         *   if (!h.safebind() || ret != XXX_protocol::OK) {
+         *     // handle failure
+         *   }
+         */
+        rpcc *safebind();
 };
 
 class handle_mgr {
- private:
-  pthread_mutex_t handle_mutex;
-  std::map<std::string, struct hinfo *> hmap;
- public:
-  handle_mgr();
-  struct hinfo *get_handle(std::string m);
-  void done_handle(struct hinfo *h);
-  void delete_handle(std::string m);
-  void delete_handle_wo(std::string m);
+    private:
+        std::mutex handle_mutex;
+        std::map<std::string, struct hinfo *> hmap;
+    public:
+        handle_mgr();
+        struct hinfo *get_handle(std::string m);
+        void done_handle(struct hinfo *h);
+        void delete_handle(std::string m);
+        void delete_handle_wo(std::string m);
 };
 
 extern class handle_mgr mgr;
diff --git a/lang/algorithm.h b/lang/algorithm.h
deleted file mode 100644 (file)
index a487094..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-// compile time version of min and max
-
-#ifndef algorithm_h
-#define algorithm_h
-
-template <int A, int B>
-struct static_max
-{
-    static const int value = A > B ? A : B;
-};
-
-template <int A, int B>
-struct static_min
-{
-    static const int value = A < B ? A : B;
-};
-
-#endif
diff --git a/lock.h b/lock.h
new file mode 100644 (file)
index 0000000..00d4374
--- /dev/null
+++ b/lock.h
@@ -0,0 +1,19 @@
+#ifndef lock_h
+#define lock_h
+
+#include <thread>
+#include <mutex>
+
+using std::mutex;
+using lock = std::unique_lock<std::mutex>;
+
+class adopt_lock : public lock {
+public:
+    inline adopt_lock(class mutex &m) : std::unique_lock<std::mutex>(m, std::adopt_lock) {
+    }
+    inline ~adopt_lock() {
+        release();
+    }
+};
+
+#endif
index 6e9fab1..11bc476 100644 (file)
@@ -1,7 +1,7 @@
 // RPC stubs for clients to talk to lock_server
 
 #include "lock_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 
 #include <sstream>
index df22711..b1176c4 100644 (file)
@@ -7,7 +7,7 @@
 
 #include <string>
 #include "lock_protocol.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <vector>
 
 // Client interface to the lock server
@@ -39,7 +39,7 @@ typedef enum {
 
 typedef int t4_status;
 
-typedef unsigned long long t4_lockid_t;
+typedef const char * t4_lockid_t;
 
 t4_lock_client *t4_lock_client_new(const char *dst);
 void t4_lock_client_delete(t4_lock_client *);
index 8d64e50..80bc87f 100644 (file)
@@ -2,7 +2,7 @@
 // see lock_client.cache.h for protocol details.
 
 #include "lock_client_cache_rsm.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <sstream>
 #include <iostream>
 #include <algorithm>
@@ -10,6 +10,9 @@
 #include "tprintf.h"
 
 #include "rsm_client.h"
+#include "lock.h"
+
+using std::ostringstream;
 
 lock_state::lock_state():
     state(none)
@@ -17,32 +20,29 @@ lock_state::lock_state():
 }
 
 void lock_state::wait() {
-    pthread_t self = pthread_self();
-    c[self].wait(m);
+    auto self = std::this_thread::get_id();
+    {
+        adopt_lock ml(m);
+        c[self].wait(ml);
+    }
     c.erase(self);
 }
 
 void lock_state::signal() {
     // signal anyone
     if (c.begin() != c.end())
-        c.begin()->second.signal();
+        c.begin()->second.notify_one();
 }
 
-void lock_state::signal(pthread_t who) {
+void lock_state::signal(std::thread::id who) {
     if (c.count(who))
-        c[who].signal();
-}
-
-static void * releasethread(void *x) {
-    lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
-    cc->releaser();
-    return 0;
+        c[who].notify_one();
 }
 
 int lock_client_cache_rsm::last_port = 0;
 
 lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
-    ScopedLock sl(lock_table_lock);
+    lock sl(lock_table_lock);
     // by the semantics of std::map, this will create
     // the lock if it doesn't already exist
     return lock_table[lid];
@@ -62,12 +62,11 @@ lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_use
     rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
     rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
     {
-        ScopedLock sl(xid_mutex);
+        lock sl(xid_mutex);
         xid = 0;
     }
     rsmc = new rsm_client(xdst);
-    int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
-    VERIFY (r == 0);
+    releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this);
 }
 
 void lock_client_cache_rsm::releaser() {
@@ -77,13 +76,14 @@ void lock_client_cache_rsm::releaser() {
         LOG("Releaser: " << lid);
 
         lock_state &st = get_lock_state(lid);
-        ScopedLock sl(st.m);
-        VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
+        lock sl(st.m);
+        VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
         st.state = lock_state::releasing;
         {
-            ScopedUnlock su(st.m);
+            sl.unlock();
             int r;
             rsmc->call(lock_protocol::release, lid, id, st.xid, r);
+            sl.lock();
         }
         st.state = lock_state::none;
         LOG("Lock " << lid << ": none");
@@ -93,8 +93,8 @@ void lock_client_cache_rsm::releaser() {
 
 lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
     lock_state &st = get_lock_state(lid);
-    ScopedLock sl(st.m);
-    pthread_t self = pthread_self();
+    lock sl(st.m);
+    auto self = std::this_thread::get_id();
 
     // check for reentrancy
     VERIFY(st.state != lock_state::locked || st.held_by != self);
@@ -108,16 +108,17 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid
 
         if (st.state == lock_state::none || st.state == lock_state::retrying) {
             if (st.state == lock_state::none) {
-                ScopedLock sl(xid_mutex);
+                lock sl(xid_mutex);
                 st.xid = xid++;
             }
             st.state = lock_state::acquiring;
             LOG("Lock " << lid << ": acquiring");
             lock_protocol::status result;
             {
-                ScopedUnlock su(st.m);
+                sl.unlock();
                 int r;
                 result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
+                sl.lock();
             }
             LOG("acquire returned " << result);
             if (result == lock_protocol::OK) {
@@ -129,11 +130,11 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid
         VERIFY(st.wanted_by.size() != 0);
         if (st.state == lock_state::free) {
             // is it for me?
-            pthread_t front = st.wanted_by.front();
-            if (front == releaser_thread) {
+            auto front = st.wanted_by.front();
+            if (front == releaser_thread.get_id()) {
                 st.wanted_by.pop_front();
                 st.state = lock_state::locked;
-                st.held_by = releaser_thread;
+                st.held_by = releaser_thread.get_id();
                 LOG("Queuing " << lid << " for release");
                 release_fifo.enq(lid);
             } else if (front == self) {
@@ -157,16 +158,16 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid
 
 lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
     lock_state &st = get_lock_state(lid);
-    ScopedLock sl(st.m);
-    pthread_t self = pthread_self();
+    lock sl(st.m);
+    auto self = std::this_thread::get_id();
     VERIFY(st.state == lock_state::locked && st.held_by == self);
     st.state = lock_state::free;
     LOG("Lock " << lid << ": free");
     if (st.wanted_by.size()) {
-        pthread_t front = st.wanted_by.front();
-        if (front == releaser_thread) {
+        auto front = st.wanted_by.front();
+        if (front == releaser_thread.get_id()) {
             st.state = lock_state::locked;
-            st.held_by = releaser_thread;
+            st.held_by = releaser_thread.get_id();
             st.wanted_by.pop_front();
             LOG("Queuing " << lid << " for release");
             release_fifo.enq(lid);
@@ -180,29 +181,29 @@ lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid
 rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
     LOG("Revoke handler " << lid << " " << xid);
     lock_state &st = get_lock_state(lid);
-    ScopedLock sl(st.m);
+    lock sl(st.m);
 
     if (st.state == lock_state::releasing || st.state == lock_state::none)
         return rlock_protocol::OK;
 
     if (st.state == lock_state::free &&
-        (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
+        (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
         // gimme
         st.state = lock_state::locked;
-        st.held_by = releaser_thread;
+        st.held_by = releaser_thread.get_id();
         if (st.wanted_by.size())
             st.wanted_by.pop_front();
         release_fifo.enq(lid);
     } else {
         // get in line
-        st.wanted_by.push_back(releaser_thread);
+        st.wanted_by.push_back(releaser_thread.get_id());
     }
     return rlock_protocol::OK;
 }
 
 rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
     lock_state &st = get_lock_state(lid);
-    ScopedLock sl(st.m);
+    lock sl(st.m);
     VERIFY(st.state == lock_state::acquiring);
     st.state = lock_state::retrying;
     LOG("Lock " << lid << ": none");
index 28b0323..049d18a 100644 (file)
@@ -6,23 +6,22 @@
 
 #include <string>
 #include "lock_protocol.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include "lock_client.h"
 #include "lang/verify.h"
-#include "mutex.h"
 #include "rpc/fifo.h"
 #include "rsm_client.h"
 
-// Classes that inherit lock_release_user can override dorelease so that
-// that they will be called when lock_client releases a lock.
-// You will not need to do anything with this class until Lab 5.
 class lock_release_user {
     public:
         virtual void dorelease(lock_protocol::lockid_t) = 0;
         virtual ~lock_release_user() {};
 };
 
-using namespace std;
+using std::string;
+using std::thread;
+using std::list;
+using std::map;
 
 typedef string callback;
 
@@ -37,14 +36,14 @@ public:
         acquiring,
         releasing
     } state;
-    pthread_t held_by;
-    list<pthread_t> wanted_by;
+    std::thread::id held_by;
+    list<std::thread::id> wanted_by;
     mutex m;
-    map<pthread_t, cond> c;
+    map<std::thread::id, std::condition_variable> c;
     lock_protocol::xid_t xid;
     void wait();
     void signal();
-    void signal(pthread_t who);
+    void signal(std::thread::id who);
 };
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
@@ -55,7 +54,7 @@ class lock_client_cache_rsm;
 // lock_revoke_server.
 class lock_client_cache_rsm : public lock_client {
     private:
-        pthread_t releaser_thread;
+        std::thread releaser_thread;
         rsm_client *rsmc;
         class lock_release_user *lu;
         int rlock_port;
index 4d74acb..4285269 100644 (file)
@@ -4,27 +4,20 @@
 
 #include "lock_protocol.h"
 #include "lock_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include <vector>
 #include <stdlib.h>
 #include <stdio.h>
 
-std::string dst;
-lock_client *lc;
-
 int
 main(int argc, char *argv[])
 {
-  int r;
-
-  if(argc != 2){
-    fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
-    exit(1);
-  }
+    if(argc != 2) {
+        fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
+        return 1;
+    }
 
-  dst = argv[1];
-  lc = new lock_client(dst);
-  r = lc->stat(1);
-  printf ("stat returned %d\n", r);
+    lock_client *lc = new lock_client(argv[1]);
+    printf ("stat returned %d\n", lc->stat("1"));
 }
index 60df0ef..61f0998 100644 (file)
@@ -3,13 +3,14 @@
 #ifndef lock_protocol_h
 #define lock_protocol_h
 
-#include "rpc.h"
+#include "rpc/rpc.h"
+#include <string>
 
 class lock_protocol {
     public:
         enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR };
         typedef int status;
-        typedef unsigned long long lockid_t;
+        typedef std::string lockid_t;
         typedef unsigned long long xid_t;
         enum rpc_numbers {
             acquire = 0x7001,
index 3801814..be0c1c9 100644 (file)
@@ -5,6 +5,7 @@
 #include <stdio.h>
 #include <unistd.h>
 #include <arpa/inet.h>
+#include "lock.h"
 
 lock_server::lock_server():
     nacquire (0)
@@ -14,11 +15,10 @@ lock_server::lock_server():
 // caller must hold lock_lock
 mutex &
 lock_server::get_lock(lock_protocol::lockid_t lid) {
-    lock_lock.acquire();
+    lock ml(lock_lock);
     // by the semantics of std::map, this will create
     // the mutex if it doesn't already exist
     mutex &l = locks[lid];
-    lock_lock.release();
     return l;
 }
 
@@ -34,13 +34,13 @@ lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r)
 lock_protocol::status
 lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r)
 {
-    get_lock(lid).acquire();
+    get_lock(lid).lock();
     return lock_protocol::OK;
 }
 
 lock_protocol::status
 lock_server::release(int clt, lock_protocol::lockid_t lid, int &r)
 {
-    get_lock(lid).release();
+    get_lock(lid).unlock();
     return lock_protocol::OK;
 }
index e24e359..f03a717 100644 (file)
@@ -7,13 +7,11 @@
 #include <string>
 #include "lock_protocol.h"
 #include "lock_client.h"
-#include "rpc.h"
-#include <pthread.h>
+#include "rpc/rpc.h"
 #include <list>
 #include <map>
-#include "mutex.h"
 
-using namespace std;
+using std::map;
 
 typedef map<lock_protocol::lockid_t, mutex> lock_map;
 
index 5149049..c3f75e8 100644 (file)
@@ -9,12 +9,25 @@
 #include "handle.h"
 #include "tprintf.h"
 #include "rpc/marshall.h"
+#include "lock.h"
+
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
 
 lock_state::lock_state():
     held(false)
 {
 }
 
+lock_state& lock_state::operator=(const lock_state& o) {
+    held = o.held;
+    held_by = o.held_by;
+    wanted_by = o.wanted_by;
+    old_requests = o.old_requests;
+    return *this;
+}
+
 template <class A, class B>
 ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
     o << "<" << d.first << "," << d.second << ">";
@@ -66,28 +79,15 @@ unmarshall & operator>>(unmarshall &u, lock_state &d) {
 
 
 lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
-    ScopedLock sl(lock_table_lock);
+    lock sl(lock_table_lock);
     // by the semantics of map, this will create
     // the lock if it doesn't already exist
     return lock_table[lid];
 }
 
-static void *revokethread(void *x) {
-    lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
-    sc->revoker();
-    return 0;
-}
-
-static void *retrythread(void *x) {
-    lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
-    sc->retryer();
-    return 0;
-}
-
 lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
-    pthread_t th;
-    VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0);
-    VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0);
+    std::thread(&lock_server_cache_rsm::revoker, this).detach();
+    std::thread(&lock_server_cache_rsm::retryer, this).detach();
     rsm->set_state_transfer(this);
 }
 
@@ -102,7 +102,7 @@ void lock_server_cache_rsm::revoker() {
         lock_state &st = get_lock_state(lid);
         holder held_by;
         {
-            ScopedLock sl(st.m);
+            lock sl(st.m);
             held_by = st.held_by;
         }
 
@@ -130,7 +130,7 @@ void lock_server_cache_rsm::retryer() {
         lock_state &st = get_lock_state(lid);
         holder front;
         {
-            ScopedLock sl(st.m);
+            lock sl(st.m);
             if (st.wanted_by.empty())
                 continue;
             front = st.wanted_by.front();
@@ -155,7 +155,7 @@ int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_
     LOG_FUNC_ENTER_SERVER;
     holder h = holder(id, xid);
     lock_state &st = get_lock_state(lid);
-    ScopedLock sl(st.m);
+    lock sl(st.m);
 
     // deal with duplicated requests
     if (st.old_requests.count(id)) {
@@ -212,7 +212,7 @@ int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_
 int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
     LOG_FUNC_ENTER_SERVER;
     lock_state &st = get_lock_state(lid);
-    ScopedLock sl(st.m);
+    lock sl(st.m);
     if (st.held && st.held_by == holder(id, xid)) {
         st.held = false;
         LOG("Lock " << lid << " not held");
@@ -223,7 +223,7 @@ int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, loc
 }
 
 string lock_server_cache_rsm::marshal_state() {
-    ScopedLock sl(lock_table_lock);
+    lock sl(lock_table_lock);
     marshall rep;
     rep << nacquire;
     rep << lock_table;
@@ -231,7 +231,7 @@ string lock_server_cache_rsm::marshal_state() {
 }
 
 void lock_server_cache_rsm::unmarshal_state(string state) {
-    ScopedLock sl(lock_table_lock);
+    lock sl(lock_table_lock);
     unmarshall rep(state);
     rep >> nacquire;
     rep >> lock_table;
index eb86bd0..4a33361 100644 (file)
@@ -6,13 +6,16 @@
 #include <map>
 #include <vector>
 #include "lock_protocol.h"
-#include "rpc.h"
-#include "mutex.h"
+#include "rpc/rpc.h"
 #include "rsm_state_transfer.h"
 #include "rsm.h"
 #include "rpc/fifo.h"
+#include "lock.h"
 
-using namespace std;
+using std::string;
+using std::pair;
+using std::list;
+using std::map;
 
 typedef string callback;
 typedef pair<callback, lock_protocol::xid_t> holder;
@@ -25,6 +28,7 @@ public:
     list<holder> wanted_by;
     map<callback, lock_protocol::xid_t> old_requests;
     mutex m;
+    lock_state& operator=(const lock_state&);
 };
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
index 4cc8136..c256cbe 100644 (file)
@@ -1,4 +1,4 @@
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include <stdlib.h>
 #include <stdio.h>
@@ -7,8 +7,6 @@
 #include "paxos.h"
 #include "rsm.h"
 
-#include "jsl_log.h"
-
 // Main loop of lock_server
 
 char tprintf_thread_prefix = 's';
@@ -16,57 +14,30 @@ char tprintf_thread_prefix = 's';
 int
 main(int argc, char *argv[])
 {
-  int count = 0;
-
-  setvbuf(stdout, NULL, _IONBF, 0);
-  setvbuf(stderr, NULL, _IONBF, 0);
-
-  srandom(getpid());
+    int count = 0;
 
-  if(argc != 3){
-    fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
-    exit(1);
-  }
+    setvbuf(stdout, NULL, _IONBF, 0);
+    setvbuf(stderr, NULL, _IONBF, 0);
 
-  char *count_env = getenv("RPC_COUNT");
-  if(count_env != NULL){
-    count = atoi(count_env);
-  }
+    srandom(getpid());
 
-  //jsl_set_debug(2);
-  // Comment out the next line to switch between the ordinary lock
-  // server and the RSM.  In Lab 6, we disable the lock server and
-  // implement Paxos.  In Lab 7, we will make the lock server use your
-  // RSM layer.
-#define        RSM
-#ifdef RSM
-// You must comment out the next line once you are done with Step One.
-//#define STEP_ONE 
-#ifdef STEP_ONE
-  rpcs server(atoi(argv[1]));
-  lock_server_cache_rsm ls;
-  server.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
-  server.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
-  server.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
-#else
-  rsm rsm(argv[1], argv[2]);
-  lock_server_cache_rsm ls(&rsm);
-  rsm.set_state_transfer((rsm_state_transfer *)&ls);
-  rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
-  rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
-  rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
-#endif // STEP_ONE
-#endif // RSM
+    if(argc != 3){
+        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+        exit(1);
+    }
 
-#ifndef RSM
-  lock_server_cache ls;
-  rpcs server(atoi(argv[1]), count);
-  server.reg(lock_protocol::stat, &ls, &lock_server_cache::stat);
-  server.reg(lock_protocol::acquire, &ls, &lock_server_cache::acquire);
-  server.reg(lock_protocol::release, &ls, &lock_server_cache::release);
-#endif
+    char *count_env = getenv("RPC_COUNT");
+    if(count_env != NULL){
+        count = atoi(count_env);
+    }
 
+    rsm rsm(argv[1], argv[2]);
+    lock_server_cache_rsm ls(&rsm);
+    rsm.set_state_transfer((rsm_state_transfer *)&ls);
+    rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
+    rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
+    rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
 
-  while(1)
-    sleep(1000);
+    while(1)
+        sleep(1000);
 }
index 8e736ce..d063cdc 100644 (file)
@@ -4,8 +4,7 @@
 
 #include "lock_protocol.h"
 #include "lock_client.h"
-#include "rpc.h"
-#include "jsl_log.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include <vector>
 #include <stdlib.h>
 #include "tprintf.h"
 #include <sys/types.h>
 #include <unistd.h>
+#include "lock.h"
 
 char tprintf_thread_prefix = 'c';
 
 // must be >= 2
-int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
+const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
 std::string dst;
 lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt];
-lock_protocol::lockid_t a = 1;
-lock_protocol::lockid_t b = 2;
-lock_protocol::lockid_t c = 3;
+lock_protocol::lockid_t a = "1";
+lock_protocol::lockid_t b = "2";
+lock_protocol::lockid_t c = "3";
 
 // check_grant() and check_release() check that the lock server
 // doesn't grant the same lock to both clients.
 // it assumes that lock names are distinct in the first byte.
 int ct[256];
-pthread_mutex_t count_mutex;
+std::mutex count_mutex;
 
 void
 check_grant(lock_protocol::lockid_t lid)
 {
-  ScopedLock ml(&count_mutex);
-  int x = lid & 0xff;
-  if(ct[x] != 0){
-    fprintf(stderr, "error: server granted %016llx twice\n", lid);
-    fprintf(stdout, "error: server granted %016llx twice\n", lid);
-    exit(1);
-  }
-  ct[x] += 1;
+    lock ml(count_mutex);
+    int x = lid[0] & 0x0f;
+    if(ct[x] != 0){
+        fprintf(stderr, "error: server granted %s twice\n", lid.c_str());
+        fprintf(stdout, "error: server granted %s twice\n", lid.c_str());
+        exit(1);
+    }
+    ct[x] += 1;
 }
 
 void
 check_release(lock_protocol::lockid_t lid)
 {
-  ScopedLock ml(&count_mutex);
-  int x = lid & 0xff;
-  if(ct[x] != 1){
-    fprintf(stderr, "error: client released un-held lock %016llx\n",  lid);
-    exit(1);
-  }
-  ct[x] -= 1;
+    lock ml(count_mutex);
+    int x = lid[0] & 0x0f;
+    if(ct[x] != 1){
+        fprintf(stderr, "error: client released un-held lock %s\n",  lid.c_str());
+        exit(1);
+    }
+    ct[x] -= 1;
 }
 
 void
@@ -84,159 +84,151 @@ test1(void)
 void *
 test2(void *x) 
 {
-  int i = * (int *) x;
-
-  tprintf ("test2: client %d acquire a release a\n", i);
-  lc[i]->acquire(a);
-  tprintf ("test2: client %d acquire done\n", i);
-  check_grant(a);
-  sleep(1);
-  tprintf ("test2: client %d release\n", i);
-  check_release(a);
-  lc[i]->release(a);
-  tprintf ("test2: client %d release done\n", i);
-  return 0;
-}
+    int i = * (int *) x;
 
-void *
-test3(void *x)
-{
-  int i = * (int *) x;
-
-  tprintf ("test3: client %d acquire a release a concurrent\n", i);
-  for (int j = 0; j < 10; j++) {
+    tprintf ("test2: client %d acquire a release a\n", i);
     lc[i]->acquire(a);
+    tprintf ("test2: client %d acquire done\n", i);
     check_grant(a);
-    tprintf ("test3: client %d got lock\n", i);
+    sleep(1);
+    tprintf ("test2: client %d release\n", i);
     check_release(a);
     lc[i]->release(a);
-  }
-  return 0;
+    tprintf ("test2: client %d release done\n", i);
+    return 0;
 }
 
 void *
-test4(void *x)
+test3(void *x)
 {
-  int i = * (int *) x;
+    int i = * (int *) x;
+
+    tprintf ("test3: client %d acquire a release a concurrent\n", i);
+    for (int j = 0; j < 10; j++) {
+        lc[i]->acquire(a);
+        check_grant(a);
+        tprintf ("test3: client %d got lock\n", i);
+        check_release(a);
+        lc[i]->release(a);
+    }
+    return 0;
+}
 
-  tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
-  for (int j = 0; j < 10; j++) {
-    lc[0]->acquire(a);
-    check_grant(a);
-    tprintf ("test4: thread %d on client 0 got lock\n", i);
-    check_release(a);
-    lc[0]->release(a);
-  }
-  return 0;
+void *
+test4(void *x)
+{
+    int i = * (int *) x;
+
+    tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
+    for (int j = 0; j < 10; j++) {
+        lc[0]->acquire(a);
+        check_grant(a);
+        tprintf ("test4: thread %d on client 0 got lock\n", i);
+        check_release(a);
+        lc[0]->release(a);
+    }
+    return 0;
 }
 
 void *
 test5(void *x)
 {
-  int i = * (int *) x;
-
-  tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
-  for (int j = 0; j < 10; j++) {
-    if (i < 5)  lc[0]->acquire(a);
-    else  lc[1]->acquire(a);
-    check_grant(a);
-    tprintf ("test5: client %d got lock\n", i);
-    check_release(a);
-    if (i < 5) lc[0]->release(a);
-    else lc[1]->release(a);
-  }
-  return 0;
+    int i = * (int *) x;
+
+    tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
+    for (int j = 0; j < 10; j++) {
+        if (i < 5)  lc[0]->acquire(a);
+        else  lc[1]->acquire(a);
+        check_grant(a);
+        tprintf ("test5: client %d got lock\n", i);
+        check_release(a);
+        if (i < 5) lc[0]->release(a);
+        else lc[1]->release(a);
+    }
+    return 0;
 }
 
 int
 main(int argc, char *argv[])
 {
-    int r;
-    pthread_t th[nt];
+    std::thread th[nt];
     int test = 0;
 
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
     srandom(getpid());
 
-    //jsl_set_debug(2);
-
     if(argc < 2) {
-      fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
-      exit(1);
+        fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
+        exit(1);
     }
 
     dst = argv[1]; 
 
     if (argc > 2) {
-      test = atoi(argv[2]);
-      if(test < 1 || test > 5){
-        tprintf("Test number must be between 1 and 5\n");
-        exit(1);
-      }
+        test = atoi(argv[2]);
+        if(test < 1 || test > 5){
+            tprintf("Test number must be between 1 and 5\n");
+            exit(1);
+        }
     }
 
-    VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0);
     tprintf("cache lock client\n");
     for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst);
 
     if(!test || test == 1){
-      test1();
+        test1();
     }
 
     if(!test || test == 2){
-      // test2
-      for (int i = 0; i < nt; i++) {
-       int *a = new int (i);
-       r = pthread_create(&th[i], NULL, test2, (void *) a);
-       VERIFY (r == 0);
-      }
-      for (int i = 0; i < nt; i++) {
-       pthread_join(th[i], NULL);
-      }
+        // test2
+        for (int i = 0; i < nt; i++) {
+            int *a = new int (i);
+            th[i] = std::thread(test2, a);
+        }
+        for (int i = 0; i < nt; i++) {
+            th[i].join();
+        }
     }
 
     if(!test || test == 3){
-      tprintf("test 3\n");
-      
-      // test3
-      for (int i = 0; i < nt; i++) {
-       int *a = new int (i);
-       r = pthread_create(&th[i], NULL, test3, (void *) a);
-       VERIFY (r == 0);
-      }
-      for (int i = 0; i < nt; i++) {
-       pthread_join(th[i], NULL);
-      }
+        tprintf("test 3\n");
+
+        // test3
+        for (int i = 0; i < nt; i++) {
+            int *a = new int (i);
+            th[i] = std::thread(test3, a);
+        }
+        for (int i = 0; i < nt; i++) {
+            th[i].join();
+        }
     }
 
     if(!test || test == 4){
-      tprintf("test 4\n");
-      
-      // test 4
-      for (int i = 0; i < 2; i++) {
-       int *a = new int (i);
-       r = pthread_create(&th[i], NULL, test4, (void *) a);
-       VERIFY (r == 0);
-      }
-      for (int i = 0; i < 2; i++) {
-       pthread_join(th[i], NULL);
-      }
+        tprintf("test 4\n");
+
+        // test 4
+        for (int i = 0; i < 2; i++) {
+            int *a = new int (i);
+            th[i] = std::thread(test4, a);
+        }
+        for (int i = 0; i < 2; i++) {
+            th[i].join();
+        }
     }
 
     if(!test || test == 5){
-      tprintf("test 5\n");
-      
-      // test 5
-      
-      for (int i = 0; i < nt; i++) {
-       int *a = new int (i);
-       r = pthread_create(&th[i], NULL, test5, (void *) a);
-       VERIFY (r == 0);
-      }
-      for (int i = 0; i < nt; i++) {
-       pthread_join(th[i], NULL);
-      }
+        tprintf("test 5\n");
+
+        // test 5
+
+        for (int i = 0; i < nt; i++) {
+            int *a = new int (i);
+            th[i] = std::thread(test5, a);
+        }
+        for (int i = 0; i < nt; i++) {
+            th[i].join();
+        }
     }
 
     tprintf ("%s: passed all tests successfully\n", argv[0]);
diff --git a/mutex.cc b/mutex.cc
deleted file mode 100644 (file)
index 4e54cc0..0000000
--- a/mutex.cc
+++ /dev/null
@@ -1,42 +0,0 @@
-#include "mutex.h"
-#include "lang/verify.h"
-
-mutex::mutex() {
-    VERIFY(pthread_mutex_init(&m, NULL) == 0);
-}
-
-mutex::~mutex() {
-    VERIFY(pthread_mutex_destroy(&m) == 0);
-}
-
-void mutex::acquire() {
-    VERIFY(pthread_mutex_lock(&m) == 0);
-}
-
-void mutex::release() {
-    VERIFY(pthread_mutex_unlock(&m) == 0);
-}
-
-mutex::operator pthread_mutex_t *() {
-    return &m;
-}
-
-cond::cond() {
-    VERIFY(pthread_cond_init(&c, NULL) == 0);
-}
-
-cond::~cond() {
-    VERIFY(pthread_cond_destroy(&c) == 0);
-}
-
-void cond::wait(mutex &m) {
-    VERIFY(pthread_cond_wait(&c, m) == 0);
-}
-
-void cond::signal() {
-    VERIFY(pthread_cond_signal(&c) == 0);
-}
-
-void cond::broadcast() {
-    VERIFY(pthread_cond_broadcast(&c) == 0);
-}
diff --git a/mutex.h b/mutex.h
deleted file mode 100644 (file)
index 228a3bb..0000000
--- a/mutex.h
+++ /dev/null
@@ -1,28 +0,0 @@
-#ifndef mutex_h
-#define mutex_h
-
-#include <pthread.h>
-
-class mutex {
- protected:
-  pthread_mutex_t m;
- public:
-  mutex();
-  ~mutex();
-  void acquire();
-  void release();
-  operator pthread_mutex_t *();
-};
-
-class cond {
- protected:
-  pthread_cond_t c;
- public:
-  cond();
-  ~cond();
-  void wait(mutex &m);
-  void signal();
-  void broadcast();
-};
-
-#endif
index c3a445f..89f1714 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,9 +1,9 @@
 #include "paxos.h"
 #include "handle.h"
-// #include <signal.h>
 #include <stdio.h>
 #include "tprintf.h"
 #include "lang/verify.h"
+#include "lock.h"
 
 // This module implements the proposer and acceptor of the Paxos
 // distributed algorithm as described by Lamport's "Paxos Made
@@ -52,7 +52,7 @@ bool
 proposer::isrunning()
 {
   bool r;
-  ScopedLock ml(pxs_mutex);
+  lock ml(pxs_mutex);
   r = !stable;
   return r;
 }
@@ -94,7 +94,7 @@ proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv
   std::string v;
   bool r = false;
 
-  ScopedLock ml(pxs_mutex);
+  lock ml(pxs_mutex);
   tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
         print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
   if (!stable) {  // already running proposer?
@@ -241,7 +241,7 @@ paxos_protocol::status
 acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
     paxos_protocol::prepareres &r)
 {
-    ScopedLock ml(pxs_mutex);
+    lock ml(pxs_mutex);
     r.oldinstance = false;
     r.accept = false;
     r.n_a = n_a;
@@ -262,7 +262,7 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
 paxos_protocol::status
 acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
 {
-    ScopedLock ml(pxs_mutex);
+    lock ml(pxs_mutex);
     r = false;
     if (a.n >= n_h) {
         n_a = a.n;
@@ -274,52 +274,53 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
 }
 
 // the src argument is only for debug purpose
-paxos_protocol::status
+    paxos_protocol::status
 acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
 {
-  ScopedLock ml(pxs_mutex);
-  tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", 
-        a.instance, instance_h, v_a.c_str());
-  if (a.instance == instance_h + 1) {
-    VERIFY(v_a == a.v);
-    commit_wo(a.instance, v_a);
-  } else if (a.instance <= instance_h) {
-    // we are ahead ignore.
-  } else {
-    // we are behind
-    VERIFY(0);
-  }
-  return paxos_protocol::OK;
+    lock ml(pxs_mutex);
+    tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", 
+            a.instance, instance_h, v_a.c_str());
+    if (a.instance == instance_h + 1) {
+        VERIFY(v_a == a.v);
+        commit_wo(a.instance, v_a);
+    } else if (a.instance <= instance_h) {
+        // we are ahead ignore.
+    } else {
+        // we are behind
+        VERIFY(0);
+    }
+    return paxos_protocol::OK;
 }
 
 void
 acceptor::commit_wo(unsigned instance, std::string value)
 {
-  //assume pxs_mutex is held
-  tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
-  if (instance > instance_h) {
-    tprintf("commit: highestaccepteinstance = %d\n", instance);
-    values[instance] = value;
-    l->loginstance(instance, value);
-    instance_h = instance;
-    n_h.n = 0;
-    n_h.m = me;
-    n_a.n = 0;
-    n_a.m = me;
-    v_a.clear();
-    if (cfg) {
-      pxs_mutex.release();
-      cfg->paxos_commit(instance, value);
-      pxs_mutex.acquire();
+    //assume pxs_mutex is held
+    adopt_lock ml(pxs_mutex);
+    tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+    if (instance > instance_h) {
+        tprintf("commit: highestaccepteinstance = %d\n", instance);
+        values[instance] = value;
+        l->loginstance(instance, value);
+        instance_h = instance;
+        n_h.n = 0;
+        n_h.m = me;
+        n_a.n = 0;
+        n_a.m = me;
+        v_a.clear();
+        if (cfg) {
+            ml.unlock();
+            cfg->paxos_commit(instance, value);
+            ml.lock();
+        }
     }
-  }
 }
 
 void
 acceptor::commit(unsigned instance, std::string value)
 {
-  ScopedLock ml(pxs_mutex);
-  commit_wo(instance, value);
+    lock ml(pxs_mutex);
+    commit_wo(instance, value);
 }
 
 std::string
diff --git a/paxos.h b/paxos.h
index 7188edb..c7b1af4 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -3,15 +3,14 @@
 
 #include <string>
 #include <vector>
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include "paxos_protocol.h"
 #include "log.h"
-#include "mutex.h"
 
 
 class paxos_change {
  public:
-  virtual void paxos_commit(unsigned instance, std::string v) = 0;
+  virtual void paxos_commit(unsigned instance, const std::string &v) = 0;
   virtual ~paxos_change() {};
 };
 
@@ -21,7 +20,7 @@ class acceptor {
   rpcs *pxs;
   paxos_change *cfg;
   std::string me;
-  mutex pxs_mutex;
+  std::mutex pxs_mutex;
 
   // Acceptor state
   prop_t n_h;          // number of the highest proposal seen in a prepare
@@ -67,7 +66,7 @@ class proposer {
   bool break1;
   bool break2;
 
-  mutex pxs_mutex;
+  std::mutex pxs_mutex;
 
   // Proposer state
   bool stable;
index 9c2703e..734ca51 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef paxos_protocol_h
 #define paxos_protocol_h
 
-#include "rpc.h"
+#include "rpc/rpc.h"
 
 struct prop_t {
   unsigned n;
diff --git a/random.cc b/random.cc
deleted file mode 100644 (file)
index d344958..0000000
--- a/random.cc
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "mutex.h"
-#include <stdlib.h>
-#include "rpc/slock.h"
-#include <stdint.h>
-
-static mutex rand_m;
-
-void srand_safe(unsigned int seed) {
-    ScopedLock s(rand_m);
-    srandom(seed);
-}
-
-// RAND_MAX is guaranteed to be at least 32767
-// but math with rand() is annoying.
-// Here's a canned solution from
-// http://www.azillionmonkeys.com/qed/random.html
-// which gives uniform numbers in [0, r)
-
-#define RS_SCALE (1.0 / (1.0 + RAND_MAX))
-
-double drand (void) {
-    double d;
-    do {
-       d = (((random() * RS_SCALE) + random()) * RS_SCALE + random()) * RS_SCALE;
-    } while (d >= 1); /* Round off */
-    return d;
-}
-
-#define irand(x) ((unsigned int) ((x) * drand()))
-
-// use this to construct a 32-bit thread-safe RNG
-#define RAND32 ((uint32_t)irand(1ul<<32))
-uint32_t rand32_safe() {
-    ScopedLock s(rand_m);
-    return RAND32;
-}
diff --git a/random.h b/random.h
deleted file mode 100644 (file)
index fce86ef..0000000
--- a/random.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef random_h
-#define random_h
-
-void srand_safe(unsigned int seed);
-uint32_t rand32_safe();
-
-#endif
index c22ad45..fec7a4c 100644 (file)
 #include <signal.h>
 #include <unistd.h>
 
-#include "method_thread.h"
 #include "connection.h"
-#include "slock.h"
 #include "pollmgr.h"
 #include "jsl_log.h"
-#include "gettime.h"
 #include "lang/verify.h"
+#include "lock.h"
 
 #define MAX_PDU (10<<20) //maximum PDF is 10M
 
 
-connection::connection(chanmgr *m1, int f1, int l1) 
+connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
 {
 
-       int flags = fcntl(fd_, F_GETFL, NULL);
-       flags |= O_NONBLOCK;
-       fcntl(fd_, F_SETFL, flags);
+    int flags = fcntl(fd_, F_GETFL, NULL);
+    flags |= O_NONBLOCK;
+    fcntl(fd_, F_SETFL, flags);
+
+    signal(SIGPIPE, SIG_IGN);
 
-       signal(SIGPIPE, SIG_IGN);
-       VERIFY(pthread_mutex_init(&m_,0)==0);
-       VERIFY(pthread_mutex_init(&ref_m_,0)==0);
-       VERIFY(pthread_cond_init(&send_wait_,0)==0);
-       VERIFY(pthread_cond_init(&send_complete_,0)==0);
-        VERIFY(gettimeofday(&create_time_, NULL) == 0); 
+    VERIFY(gettimeofday(&create_time_, NULL) == 0);
 
-       PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
 }
 
 connection::~connection()
 {
-       VERIFY(dead_);
-       VERIFY(pthread_mutex_destroy(&m_)== 0);
-       VERIFY(pthread_mutex_destroy(&ref_m_)== 0);
-       VERIFY(pthread_cond_destroy(&send_wait_) == 0);
-       VERIFY(pthread_cond_destroy(&send_complete_) == 0);
-       if (rpdu_.buf)
-               free(rpdu_.buf);
-       VERIFY(!wpdu_.buf);
-       close(fd_);
+    VERIFY(dead_);
+    if (rpdu_.buf)
+        free(rpdu_.buf);
+    VERIFY(!wpdu_.buf);
+    close(fd_);
 }
 
 void
 connection::incref()
 {
-       ScopedLock ml(&ref_m_);
-       refno_++;
+    lock rl(ref_m_);
+    refno_++;
 }
 
 bool
 connection::isdead()
 {
-       ScopedLock ml(&m_);
-       return dead_;
+    lock ml(m_);
+    return dead_;
 }
 
 void
 connection::closeconn()
 {
-       {
-               ScopedLock ml(&m_);
-               if (!dead_) {
-                       dead_ = true;
-                       shutdown(fd_,SHUT_RDWR);
-               }else{
-                       return;
-               }
-       }
-       //after block_remove_fd, select will never wait on fd_ 
-       //and no callbacks will be active
-       PollMgr::Instance()->block_remove_fd(fd_);
+    {
+        lock ml(m_);
+        if (!dead_) {
+            dead_ = true;
+            shutdown(fd_,SHUT_RDWR);
+        } else {
+            return;
+        }
+    }
+    //after block_remove_fd, select will never wait on fd_
+    //and no callbacks will be active
+    PollMgr::Instance()->block_remove_fd(fd_);
 }
 
 void
 connection::decref()
 {
-       VERIFY(pthread_mutex_lock(&ref_m_)==0);
-       refno_ --;
-       VERIFY(refno_>=0);
-       if (refno_==0) {
-               VERIFY(pthread_mutex_lock(&m_)==0);
-               if (dead_) {
-                       VERIFY(pthread_mutex_unlock(&ref_m_)==0);
-                       VERIFY(pthread_mutex_unlock(&m_)==0);
-                       delete this;
-                       return;
-               }
-               VERIFY(pthread_mutex_unlock(&m_)==0);
-       }
-       pthread_mutex_unlock(&ref_m_);
+    bool dead = false;
+    {
+        lock rl(ref_m_);
+        refno_--;
+        VERIFY(refno_>=0);
+        if (refno_==0) {
+            lock ml(m_);
+            dead = dead_;
+        }
+    }
+    if (dead) {
+        delete this;
+    }
 }
 
 int
 connection::ref()
 {
-       ScopedLock rl(&ref_m_);
+    lock rl(ref_m_);
        return refno_;
 }
 
 int
 connection::compare(connection *another)
 {
-        if (create_time_.tv_sec > another->create_time_.tv_sec)
-                return 1;
-        if (create_time_.tv_sec < another->create_time_.tv_sec)
-                return -1;
-        if (create_time_.tv_usec > another->create_time_.tv_usec)
-                return 1;
-        if (create_time_.tv_usec < another->create_time_.tv_usec)
-                return -1;
-        return 0;
+    if (create_time_.tv_sec > another->create_time_.tv_sec)
+        return 1;
+    if (create_time_.tv_sec < another->create_time_.tv_sec)
+        return -1;
+    if (create_time_.tv_usec > another->create_time_.tv_usec)
+        return 1;
+    if (create_time_.tv_usec < another->create_time_.tv_usec)
+        return -1;
+    return 0;
 }
 
 bool
 connection::send(char *b, int sz)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        waiters_++;
        while (!dead_ && wpdu_.buf) {
-               VERIFY(pthread_cond_wait(&send_wait_, &m_)==0);
+        send_wait_.wait(ml);
        }
        waiters_--;
        if (dead_) {
@@ -145,16 +134,16 @@ connection::send(char *b, int sz)
 
        if (!writepdu()) {
                dead_ = true;
-               VERIFY(pthread_mutex_unlock(&m_) == 0);
+        ml.unlock();
                PollMgr::Instance()->block_remove_fd(fd_);
-               VERIFY(pthread_mutex_lock(&m_) == 0);
+        ml.lock();
        }else{
                if (wpdu_.solong == wpdu_.sz) {
                }else{
                        //should be rare to need to explicitly add write callback
                        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
                        while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
-                               VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0);
+                send_complete_.wait(ml);
                        }
                }
        }
@@ -162,7 +151,7 @@ connection::send(char *b, int sz)
        wpdu_.solong = wpdu_.sz = 0;
        wpdu_.buf = NULL;
        if (waiters_ > 0)
-               pthread_cond_broadcast(&send_wait_);
+        send_wait_.notify_all();
        return ret;
 }
 
@@ -170,7 +159,7 @@ connection::send(char *b, int sz)
 void
 connection::write_cb(int s)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        VERIFY(!dead_);
        VERIFY(fd_ == s);
        if (wpdu_.sz == 0) {
@@ -180,20 +169,20 @@ connection::write_cb(int s)
        if (!writepdu()) {
                PollMgr::Instance()->del_callback(fd_, CB_RDWR);
                dead_ = true;
-       }else{
+       } else {
                VERIFY(wpdu_.solong >= 0);
                if (wpdu_.solong < wpdu_.sz) {
                        return;
                }
-       } 
-       pthread_cond_signal(&send_complete_);
+    }
+       send_complete_.notify_one();
 }
 
 //fd_ is ready to be read
 void
 connection::read_cb(int s)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        VERIFY(fd_ == s);
        if (dead_)  {
                return;
@@ -207,7 +196,7 @@ connection::read_cb(int s)
        if (!succ) {
                PollMgr::Instance()->del_callback(fd_,CB_RDWR);
                dead_ = true;
-               pthread_cond_signal(&send_complete_);
+               send_complete_.notify_one();
        }
 
        if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
@@ -268,7 +257,7 @@ connection::readpdu()
 
                if (sz > MAX_PDU) {
                        char *tmpb = (char *)&sz1;
-                       jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, 
+                       jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
                                        sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
                        return false;
                }
@@ -295,12 +284,9 @@ connection::readpdu()
        return true;
 }
 
-tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) 
+tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
 : mgr_(m1), lossy_(lossytest)
 {
-
-       VERIFY(pthread_mutex_init(&m_,NULL) == 0);
-
        struct sockaddr_in sin;
        memset(&sin, 0, sizeof(sin));
        sin.sin_family = AF_INET;
@@ -326,11 +312,11 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
                VERIFY(0);
        }
 
-        socklen_t addrlen = sizeof(sin);
-        VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
-        port_ = ntohs(sin.sin_port);
+    socklen_t addrlen = sizeof(sin);
+    VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
+    port_ = ntohs(sin.sin_port);
 
-       jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, 
+       jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
                sin.sin_port);
 
        if (pipe(pipe_) < 0) {
@@ -342,20 +328,20 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
        flags |= O_NONBLOCK;
        fcntl(pipe_[0], F_SETFL, flags);
 
-       VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); 
+    th_ = std::thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
 {
        VERIFY(close(pipe_[1]) == 0);
-       VERIFY(pthread_join(th_, NULL) == 0);
+    th_.join();
 
        //close all the active connections
        std::map<int, connection *>::iterator i;
        for (i = conns_.begin(); i != conns_.end(); i++) {
                i->second->closeconn();
                i->second->decref();
-       }       
+       }
 }
 
 void
@@ -363,13 +349,13 @@ tcpsconn::process_accept()
 {
        sockaddr_in sin;
        socklen_t slen = sizeof(sin);
-       int s1 = accept(tcp_, (sockaddr *)&sin, &slen); 
+       int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
        if (s1 < 0) {
                perror("tcpsconn::accept_conn error");
-               pthread_exit(NULL);
+               throw thread_exit_exception();
        }
 
-       jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", 
+       jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
                        s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
        connection *ch = new connection(mgr_, s1, lossy_);
 
@@ -398,34 +384,41 @@ tcpsconn::accept_conn()
        fd_set rfds;
        int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
 
-       while (1) { 
-               FD_ZERO(&rfds);
-               FD_SET(pipe_[0], &rfds);
-               FD_SET(tcp_, &rfds);
-
-               int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
-               if (ret < 0) {
-                       if (errno == EINTR) {
-                               continue;
-                       } else {
-                               perror("accept_conn select:");
-                               jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
-                               VERIFY(0);
-                       }
-               }
-
-               if (FD_ISSET(pipe_[0], &rfds)) {
-                       close(pipe_[0]);
-                       close(tcp_);
-                       return;
-               }
-               else if (FD_ISSET(tcp_, &rfds)) {
-                       process_accept();
-               } else {
-                       VERIFY(0);
-               }
-       }
+    try {
+
+        while (1) {
+            FD_ZERO(&rfds);
+            FD_SET(pipe_[0], &rfds);
+            FD_SET(tcp_, &rfds);
+
+            int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+            if (ret < 0) {
+                if (errno == EINTR) {
+                    continue;
+                } else {
+                    perror("accept_conn select:");
+                    jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
+                    VERIFY(0);
+                }
+            }
+
+            if (FD_ISSET(pipe_[0], &rfds)) {
+                close(pipe_[0]);
+                close(tcp_);
+                return;
+            }
+            else if (FD_ISSET(tcp_, &rfds)) {
+                process_accept();
+            } else {
+                VERIFY(0);
+            }
+        }
+    }
+    catch (thread_exit_exception e)
+    {
+        return;
+    }
 }
 
 connection *
@@ -435,7 +428,7 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
        int yes = 1;
        setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
        if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-               jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", 
+               jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
                                inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
                close(s);
                return NULL;
@@ -445,4 +438,3 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
        return new connection(mgr, s, lossy);
 }
 
-
index da48cf4..1ef4470 100644 (file)
 #ifndef connection_h
-#define connection_h 1
+#define connection_h
 
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <cstddef>
+#include <thread>
 
 #include <map>
 
 #include "pollmgr.h"
 
+class thread_exit_exception : std::exception {
+};
+
 class connection;
 
 class chanmgr {
-       public:
-               virtual bool got_pdu(connection *c, char *b, int sz) = 0;
-               virtual ~chanmgr() {}
+    public:
+        virtual bool got_pdu(connection *c, char *b, int sz) = 0;
+        virtual ~chanmgr() {}
 };
 
 class connection : public aio_callback {
-       public:
-               struct charbuf {
-                       charbuf(): buf(NULL), sz(0), solong(0) {}
-                       charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
-                       char *buf;
-                       int sz;
-                       int solong; //amount of bytes written or read so far
-               };
-
-               connection(chanmgr *m1, int f1, int lossytest=0);
-               ~connection();
-
-               int channo() { return fd_; }
-               bool isdead();
-               void closeconn();
-
-               bool send(char *b, int sz);
-               void write_cb(int s);
-               void read_cb(int s);
-
-               void incref();
-               void decref();
-               int ref();
-                
-                int compare(connection *another);
-       private:
-
-               bool readpdu();
-               bool writepdu();
-
-               chanmgr *mgr_;
-               const int fd_;
-               bool dead_;
-
-               charbuf wpdu_;
-               charbuf rpdu_;
-                
-                struct timeval create_time_;
-
-               int waiters_;
-               int refno_;
-               const int lossy_;
-
-               pthread_mutex_t m_;
-               pthread_mutex_t ref_m_;
-               pthread_cond_t send_complete_;
-               pthread_cond_t send_wait_;
+    public:
+        struct charbuf {
+            charbuf(): buf(NULL), sz(0), solong(0) {}
+            charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
+            char *buf;
+            int sz;
+            int solong; //amount of bytes written or read so far
+        };
+
+        connection(chanmgr *m1, int f1, int lossytest=0);
+        ~connection();
+
+        int channo() { return fd_; }
+        bool isdead();
+        void closeconn();
+
+        bool send(char *b, int sz);
+        void write_cb(int s);
+        void read_cb(int s);
+
+        void incref();
+        void decref();
+        int ref();
+
+        int compare(connection *another);
+    private:
+
+        bool readpdu();
+        bool writepdu();
+
+        chanmgr *mgr_;
+        const int fd_;
+        bool dead_;
+
+        charbuf wpdu_;
+        charbuf rpdu_;
+
+        struct timeval create_time_;
+
+        int waiters_;
+        int refno_;
+        const int lossy_;
+
+        std::mutex m_;
+        std::mutex ref_m_;
+        std::condition_variable send_complete_;
+        std::condition_variable send_wait_;
 };
 
 class tcpsconn {
-       public:
-               tcpsconn(chanmgr *m1, int port, int lossytest=0);
-               ~tcpsconn();
-                inline int port() { return port_; }
-               void accept_conn();
-       private:
-                int port_;
-               pthread_mutex_t m_;
-               pthread_t th_;
-               int pipe_[2];
-
-               int tcp_; //file desciptor for accepting connection
-               chanmgr *mgr_;
-               int lossy_;
-               std::map<int, connection *> conns_;
-
-               void process_accept();
+    public:
+        tcpsconn(chanmgr *m1, int port, int lossytest=0);
+        ~tcpsconn();
+        inline int port() { return port_; }
+        void accept_conn();
+    private:
+        int port_;
+        std::mutex m_;
+        std::thread th_;
+        int pipe_[2];
+
+        int tcp_; //file desciptor for accepting connection
+        chanmgr *mgr_;
+        int lossy_;
+        std::map<int, connection *> conns_;
+
+        void process_accept();
 };
 
 struct bundle {
-       bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
-       chanmgr *mgr;
-       int tcp;
-       int lossy;
+    bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
+    chanmgr *mgr;
+    int tcp;
+    int lossy;
 };
 
-void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0);
 connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
 #endif
index 979cf62..d190c26 100644 (file)
@@ -9,81 +9,69 @@
 #include <sys/time.h>
 #include <time.h>
 #include <errno.h>
-#include "slock.h"
 #include "lang/verify.h"
+#include "lock.h"
 
 template<class T>
 class fifo {
        public:
                fifo(int m=0);
-               ~fifo();
                bool enq(T, bool blocking=true);
                void deq(T *);
                bool size();
 
        private:
                std::list<T> q_;
-               pthread_mutex_t m_;
-               pthread_cond_t non_empty_c_; // q went non-empty
-               pthread_cond_t has_space_c_; // q is not longer overfull
+        mutex m_;
+        std::condition_variable non_empty_c_; // q went non-empty
+               std::condition_variable has_space_c_; // q is not longer overfull
                unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit
 };
 
 template<class T>
 fifo<T>::fifo(int limit) : max_(limit)
 {
-       VERIFY(pthread_mutex_init(&m_, 0) == 0);
-       VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0);
-       VERIFY(pthread_cond_init(&has_space_c_, 0) == 0);
-}
-
-template<class T>
-fifo<T>::~fifo()
-{
-       //fifo is to be deleted only when no threads are using it!
-       VERIFY(pthread_mutex_destroy(&m_)==0);
-       VERIFY(pthread_cond_destroy(&non_empty_c_) == 0);
-       VERIFY(pthread_cond_destroy(&has_space_c_) == 0);
 }
 
 template<class T> bool
 fifo<T>::size()
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        return q_.size();
 }
 
 template<class T> bool
 fifo<T>::enq(T e, bool blocking)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        while (1) {
                if (!max_ || q_.size() < max_) {
                        q_.push_back(e);
                        break;
                }
-               if (blocking)
-                       VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0);
+               if (blocking) {
+            has_space_c_.wait(ml);
+        }
                else
                        return false;
        }
-       VERIFY(pthread_cond_signal(&non_empty_c_) == 0);
+    non_empty_c_.notify_one();
        return true;
 }
 
 template<class T> void
 fifo<T>::deq(T *e)
 {
-       ScopedLock ml(&m_);
+       lock ml(m_);
 
        while(1) {
                if(q_.empty()){
-                       VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0);
+            non_empty_c_.wait(ml);
                } else {
                        *e = q_.front();
                        q_.pop_front();
                        if (max_ && q_.size() < max_) {
-                               VERIFY(pthread_cond_signal(&has_space_c_)==0);
+                has_space_c_.notify_one();
                        }
                        break;
                }
index 06e5c2c..de02fc2 100644 (file)
@@ -3,7 +3,7 @@
 int JSL_DEBUG_LEVEL = 0;
 void
 jsl_set_debug(int level) {
-       JSL_DEBUG_LEVEL = level;
+    JSL_DEBUG_LEVEL = level;
 }
 
 
index 7f92998..c6ea812 100644 (file)
@@ -1,25 +1,18 @@
-#ifndef __JSL_LOG_H__
-#define __JSL_LOG_H__ 1
+#ifndef jsl_log_h
+#define jsl_log_h
 
 enum dbcode {
-       JSL_DBG_OFF = 0,
-       JSL_DBG_1 = 1, // Critical
-       JSL_DBG_2 = 2, // Error
-       JSL_DBG_3 = 3, // Info
-       JSL_DBG_4 = 4, // Debugging
+    JSL_DBG_OFF = 0,
+    JSL_DBG_1 = 1, // Critical
+    JSL_DBG_2 = 2, // Error
+    JSL_DBG_3 = 3, // Info
+    JSL_DBG_4 = 4, // Debugging
 };
 
 extern int JSL_DEBUG_LEVEL;
 
-#define jsl_log(level,...)                                    \
-       do {                                                        \
-               if(JSL_DEBUG_LEVEL < abs(level))                                                                                \
-               {;}                                                       \
-               else {                                                    \
-                       { printf(__VA_ARGS__);}                                                                                                         \
-               }                                                         \
-       } while(0)
+#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);}
 
 void jsl_set_debug(int level);
 
-#endif // __JSL_LOG_H__
+#endif
index e0370d1..644a220 100644 (file)
@@ -11,7 +11,6 @@
 #include <cstddef>
 #include <inttypes.h>
 #include "lang/verify.h"
-#include "lang/algorithm.h"
 
 struct req_header {
        req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0):
@@ -32,16 +31,15 @@ struct reply_header {
 typedef uint64_t rpc_checksum_t;
 typedef int rpc_sz_t;
 
-enum {
-       //size of initial buffer allocation 
-       DEFAULT_RPC_SZ = 1024,
+//size of initial buffer allocation 
+#define DEFAULT_RPC_SZ 1024
+#define RPC_HEADER_SZ_NO_CHECKSUM (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
 #if RPC_CHECKSUMMING
-       //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
-       RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t)
+//size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
+#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM + sizeof(rpc_checksum_t))
 #else
-               RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t)
+#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM)
 #endif
-};
 
 class marshall {
        private:
diff --git a/rpc/method_thread.h b/rpc/method_thread.h
deleted file mode 100644 (file)
index bcbc08b..0000000
+++ /dev/null
@@ -1,164 +0,0 @@
-#ifndef method_thread_h
-#define method_thread_h
-
-// method_thread(): start a thread that runs an object method.
-// returns a pthread_t on success, and zero on error.
-
-#include <pthread.h>
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include "lang/verify.h"
-
-static pthread_t
-method_thread_parent(void *(*fn)(void *), void *arg, bool detach)
-{
-       pthread_t th;
-       pthread_attr_t attr;
-       pthread_attr_init(&attr);
-       // set stack size to 100K, so we don't run out of memory
-       pthread_attr_setstacksize(&attr, 100*1024);
-       int err = pthread_create(&th, &attr, fn, arg);
-       pthread_attr_destroy(&attr);
-       if (err != 0) {
-               fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err));
-               exit(1);
-       }
-
-       if (detach) {
-               // don't keep thread state around after exit, to avoid
-               // running out of threads. set detach==false if you plan
-               // to pthread_join.
-               VERIFY(pthread_detach(th) == 0);
-       }
-
-       return th;
-}
-
-static void
-method_thread_child()
-{
-       // defer pthread_cancel() by default. check explicitly by
-       // enabling then pthread_testcancel().
-       int oldstate, oldtype;
-       VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0);
-       VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0);
-}
-
-template <class C> pthread_t 
-method_thread(C *o, bool detach, void (C::*m)())
-{
-       class XXX {
-               public:
-                       C *o;
-                       void (C::*m)();
-                       static void *yyy(void *vvv) {
-                               XXX *x = (XXX*)vvv;
-                               C *o = x->o;
-                               void (C::*m)() = x->m;
-                               delete x;
-                               method_thread_child();
-                               (o->*m)();
-                               return 0;
-                       }
-       };
-       XXX *x = new XXX;
-       x->o = o;
-       x->m = m;
-       return method_thread_parent(&XXX::yyy, (void *) x, detach);
-}
-
-template <class C, class A> pthread_t
-method_thread(C *o, bool detach, void (C::*m)(A), A a)
-{
-       class XXX {
-               public:
-                       C *o;
-                       void (C::*m)(A a);
-                       A a;
-                       static void *yyy(void *vvv) {
-                               XXX *x = (XXX*)vvv;
-                               C *o = x->o;
-                               void (C::*m)(A ) = x->m;
-                               A a = x->a;
-                               delete x;
-                               method_thread_child();
-                               (o->*m)(a);
-                               return 0;
-                       }
-       };
-       XXX *x = new XXX;
-       x->o = o;
-       x->m = m;
-       x->a = a;
-       return method_thread_parent(&XXX::yyy, (void *) x, detach);
-}
-
-namespace {
-       // ~xavid: this causes a bizzare compile error on OS X.5 when
-       //         it's declared in the function, so I moved it out here.
-       template <class C, class A1, class A2>
-               class XXX {
-                       public:
-                               C *o;
-                               void (C::*m)(A1 a1, A2 a2);
-                               A1 a1;
-                               A2  a2;
-                               static void *yyy(void *vvv) {
-                                       XXX *x = (XXX*)vvv;
-                                       C *o = x->o;
-                                       void (C::*m)(A1 , A2 ) = x->m;
-                                       A1 a1 = x->a1;
-                                       A2 a2 = x->a2;
-                                       delete x;
-                                       method_thread_child();
-                                       (o->*m)(a1, a2);
-                                       return 0;
-                               }
-               };
-}
-
-template <class C, class A1, class A2> pthread_t
-method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2)
-{
-       XXX<C,A1,A2> *x = new XXX<C,A1,A2>;
-       x->o = o;
-       x->m = m;
-       x->a1 = a1;
-       x->a2 = a2;
-       return method_thread_parent(&XXX<C,A1,A2>::yyy, (void *) x, detach);
-}
-
-template <class C, class A1, class A2, class A3> pthread_t
-method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3)
-{
-       class XXX {
-               public:
-                       C *o;
-                       void (C::*m)(A1 a1, A2 a2, A3 a3);
-                       A1 a1;
-                       A2  a2;
-                       A3 a3;
-                       static void *yyy(void *vvv) {
-                               XXX *x = (XXX*)vvv;
-                               C *o = x->o;
-                               void (C::*m)(A1 , A2 , A3 ) = x->m;
-                               A1 a1 = x->a1;
-                               A2 a2 = x->a2;
-                               A3 a3 = x->a3;
-                               delete x;
-                               method_thread_child();
-                               (o->*m)(a1, a2, a3);
-                               return 0;
-                       }
-       };
-       XXX *x = new XXX;
-       x->o = o;
-       x->m = m;
-       x->a1 = a1;
-       x->a2 = a2;
-       x->a3 = a3;
-       return method_thread_parent(&XXX::yyy, (void *) x, detach);
-}
-
-#endif
index f73a3a5..33aeae2 100644 (file)
@@ -1,16 +1,14 @@
-#include <sys/time.h>
 #include <errno.h>
 #include <fcntl.h>
 #include <unistd.h>
 
-#include "slock.h"
 #include "jsl_log.h"
-#include "method_thread.h"
 #include "lang/verify.h"
 #include "pollmgr.h"
+#include "lock.h"
 
 PollMgr *PollMgr::instance = NULL;
-static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
+static std::once_flag pollmgr_is_initialized;
 
 void
 PollMgrInit()
@@ -21,7 +19,7 @@ PollMgrInit()
 PollMgr *
 PollMgr::Instance()
 {
-       pthread_once(&pollmgr_is_initialized, PollMgrInit);
+    std::call_once(pollmgr_is_initialized, PollMgrInit);
        return instance;
 }
 
@@ -31,9 +29,7 @@ PollMgr::PollMgr() : pending_change_(false)
        aio_ = new SelectAIO();
        //aio_ = new EPollAIO();
 
-       VERIFY(pthread_mutex_init(&m_, NULL) == 0);
-       VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
-       VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
+    th_ = std::thread(&PollMgr::wait_loop, this);
 }
 
 PollMgr::~PollMgr()
@@ -47,7 +43,7 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
        VERIFY(fd < MAX_POLL_FDS);
 
-       ScopedLock ml(&m_);
+    lock ml(m_);
        aio_->watch_fd(fd, flag);
 
        VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
@@ -60,17 +56,17 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 void
 PollMgr::block_remove_fd(int fd)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        aio_->unwatch_fd(fd, CB_RDWR);
        pending_change_ = true;
-       VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
+    changedone_c_.wait(ml);
        callbacks_[fd] = NULL;
 }
 
 void
 PollMgr::del_callback(int fd, poll_flag flag)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        if (aio_->unwatch_fd(fd, flag)) {
                callbacks_[fd] = NULL;
        }
@@ -79,7 +75,7 @@ PollMgr::del_callback(int fd, poll_flag flag)
 bool
 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        if (!callbacks_[fd] || callbacks_[fd]!=c)
                return false;
 
@@ -95,10 +91,10 @@ PollMgr::wait_loop()
 
        while (1) {
                {
-                       ScopedLock ml(&m_);
+            lock ml(m_);
                        if (pending_change_) {
                                pending_change_ = false;
-                               VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
+                changedone_c_.notify_all();
                        }
                }
                readable.clear();
@@ -137,19 +133,16 @@ SelectAIO::SelectAIO() : highfds_(0)
        int flags = fcntl(pipefd_[0], F_GETFL, NULL);
        flags |= O_NONBLOCK;
        fcntl(pipefd_[0], F_SETFL, flags);
-
-       VERIFY(pthread_mutex_init(&m_, NULL) == 0);
 }
 
 SelectAIO::~SelectAIO()
 {
-       VERIFY(pthread_mutex_destroy(&m_) == 0);
 }
 
 void
 SelectAIO::watch_fd(int fd, poll_flag flag)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        if (highfds_ <= fd) 
                highfds_ = fd;
 
@@ -169,7 +162,7 @@ SelectAIO::watch_fd(int fd, poll_flag flag)
 bool
 SelectAIO::is_watched(int fd, poll_flag flag)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        if (flag == CB_RDONLY) {
                return FD_ISSET(fd,&rfds_);
        }else if (flag == CB_WRONLY) {
@@ -182,7 +175,7 @@ SelectAIO::is_watched(int fd, poll_flag flag)
 bool 
 SelectAIO::unwatch_fd(int fd, poll_flag flag)
 {
-       ScopedLock ml(&m_);
+    lock ml(m_);
        if (flag == CB_RDONLY) {
                FD_CLR(fd, &rfds_);
        }else if (flag == CB_WRONLY) {
@@ -221,11 +214,10 @@ SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
        int high;
 
        {
-               ScopedLock ml(&m_);
+        lock ml(m_);
                trfds = rfds_;
                twfds = wfds_;
                high = highfds_;
-
        }
 
        int ret = select(high+1, &trfds, &twfds, NULL, NULL);
index c0a5748..89d1660 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <sys/select.h>
 #include <vector>
+#include <thread>
 
 #ifdef __linux__
 #include <sys/epoll.h>
@@ -54,9 +55,9 @@ class PollMgr {
                static int useless;
 
        private:
-               pthread_mutex_t m_;
-               pthread_cond_t changedone_c_;
-               pthread_t th_;
+        std::mutex m_;
+        std::condition_variable changedone_c_;
+        std::thread th_;
 
                aio_callback *callbacks_[MAX_POLL_FDS];
                aio_mgr *aio_;
@@ -81,7 +82,7 @@ class SelectAIO : public aio_mgr {
                int highfds_;
                int pipefd_[2];
 
-               pthread_mutex_t m_;
+        std::mutex m_;
 
 };
 
index cd03073..5de3984 100644 (file)
  object.  A connection object is deleted only when the underlying connection is
  dead and the reference count reaches zero.
 
- The previous version of the RPC library uses pthread_cancel* routines 
- to implement the deletion of rpcc and rpcs objects. The idea is to cancel 
- all active threads that might be holding a reference to an object before 
- deleting that object. However, pthread_cancel is not robust and there are
- always bugs where outstanding references to deleted objects persist.
- This version of the RPC library does not do pthread_cancel, but explicitly 
- joins exited threads to make sure no outstanding references exist before 
- deleting objects.
+ This version of the RPC library explicitly joins exited threads to make sure
+ no outstanding references exist before deleting objects.
 
  To delete a rpcc object safely, the users of the library must ensure that
  there are no outstanding calls on the rpcc object.
  3.  delete the dispatch thread pool which involves waiting for current active
  RPC handlers to finish.  It is interesting how a thread pool can be deleted
  without using thread cancellation. The trick is to inject x "poison pills" for
- a thread pool of x threads. Upon getting a poison pill instead of a normal 
+ a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
  x exited worker threads).
  */
 
 #include "rpc.h"
-#include "method_thread.h"
-#include "slock.h"
 
 #include <sys/types.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
-#include <time.h>
 #include <netdb.h>
 #include <unistd.h>
+#include "lock.h"
 
 #include "jsl_log.h"
-#include "gettime.h"
 #include "lang/verify.h"
 
 const rpcc::TO rpcc::to_max = { 120000 };
@@ -81,363 +72,346 @@ const rpcc::TO rpcc::to_min = { 1000 };
 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
 : xid(xxid), un(xun), done(false)
 {
-       VERIFY(pthread_mutex_init(&m,0) == 0);
-       VERIFY(pthread_cond_init(&c, 0) == 0);
 }
 
 rpcc::caller::~caller()
 {
-       VERIFY(pthread_mutex_destroy(&m) == 0);
-       VERIFY(pthread_cond_destroy(&c) == 0);
 }
 
 inline
 void set_rand_seed()
 {
-       struct timespec ts;
-       clock_gettime(CLOCK_REALTIME, &ts);
-       srandom((int)ts.tv_nsec^((int)getpid()));
+    auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
+    srandom((int)now.time_since_epoch().count()^((int)getpid()));
 }
 
-rpcc::rpcc(sockaddr_in d, bool retrans) : 
-       dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), 
-       retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+rpcc::rpcc(sockaddr_in d, bool retrans) :
+    dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+    retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
 {
-       VERIFY(pthread_mutex_init(&m_, 0) == 0);
-       VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
-       VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
-
-       if(retrans){
-               set_rand_seed();
-               clt_nonce_ = random();
-       } else {
-               // special client nonce 0 means this client does not 
-               // require at-most-once logic from the server
-               // because it uses tcp and never retries a failed connection
-               clt_nonce_ = 0;
-       }
-
-       char *loss_env = getenv("RPC_LOSSY");
-       if(loss_env != NULL){
-               lossytest_ = atoi(loss_env);
-       }
-
-       // xid starts with 1 and latest received reply starts with 0
-       xid_rep_window_.push_back(0);
-
-       jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", 
-                       clt_nonce_, lossytest_); 
+    if(retrans){
+        set_rand_seed();
+        clt_nonce_ = random();
+    } else {
+        // special client nonce 0 means this client does not
+        // require at-most-once logic from the server
+        // because it uses tcp and never retries a failed connection
+        clt_nonce_ = 0;
+    }
+
+    char *loss_env = getenv("RPC_LOSSY");
+    if(loss_env != NULL){
+        lossytest_ = atoi(loss_env);
+    }
+
+    // xid starts with 1 and latest received reply starts with 0
+    xid_rep_window_.push_back(0);
+
+    jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
+            clt_nonce_, lossytest_);
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc()
 {
-       jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", 
-                       clt_nonce_, chan_?chan_->channo():-1); 
-       if(chan_){
-               chan_->closeconn();
-               chan_->decref();
-       }
-       VERIFY(calls_.size() == 0);
-       VERIFY(pthread_mutex_destroy(&m_) == 0);
-       VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
+    jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
+            clt_nonce_, chan_?chan_->channo():-1);
+    if(chan_){
+        chan_->closeconn();
+        chan_->decref();
+    }
+    VERIFY(calls_.size() == 0);
 }
 
 int
 rpcc::bind(TO to)
 {
-       int r;
-       int ret = call(rpc_const::bind, 0, r, to);
-       if(ret == 0){
-               ScopedLock ml(&m_);
-               bind_done_ = true;
-               srv_nonce_ = r;
-       } else {
-               jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", 
-                               inet_ntoa(dst_.sin_addr), ret);
-       }
-       return ret;
+    int r;
+    int ret = call(rpc_const::bind, 0, r, to);
+    if(ret == 0){
+        lock ml(m_);
+        bind_done_ = true;
+        srv_nonce_ = r;
+    } else {
+        jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
+                inet_ntoa(dst_.sin_addr), ret);
+    }
+    return ret;
 };
 
 // Cancel all outstanding calls
-void
+    void
 rpcc::cancel(void)
 {
-  ScopedLock ml(&m_);
-  printf("rpcc::cancel: force callers to fail\n");
-  std::map<int,caller*>::iterator iter;
-  for(iter = calls_.begin(); iter != calls_.end(); iter++){
-    caller *ca = iter->second;
+    lock ml(m_);
+    printf("rpcc::cancel: force callers to fail\n");
+    std::map<int,caller*>::iterator iter;
+    for(iter = calls_.begin(); iter != calls_.end(); iter++){
+        caller *ca = iter->second;
 
-    jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
-    {
-      ScopedLock cl(&ca->m);
-      ca->done = true;
-      ca->intret = rpc_const::cancel_failure;
-      VERIFY(pthread_cond_signal(&ca->c) == 0);
+        jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
+        {
+            lock cl(ca->m);
+            ca->done = true;
+            ca->intret = rpc_const::cancel_failure;
+            ca->c.notify_one();
+        }
     }
-  }
 
-  while (calls_.size () > 0){
-    destroy_wait_ = true;
-    VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
-  }
-  printf("rpcc::cancel: done\n");
+    while (calls_.size () > 0){
+        destroy_wait_ = true;
+        destroy_wait_c_.wait(ml);
+    }
+    printf("rpcc::cancel: done\n");
 }
 
 int
 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
-               TO to)
+        TO to)
 {
 
-       caller ca(0, &rep);
+    caller ca(0, &rep);
         int xid_rep;
-       {
-               ScopedLock ml(&m_);
+    {
+        lock ml(m_);
 
-               if((proc != rpc_const::bind && !bind_done_) ||
-                               (proc == rpc_const::bind && bind_done_)){
-                       jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
-                       return rpc_const::bind_failure;
-               }
+        if((proc != rpc_const::bind && !bind_done_) ||
+                (proc == rpc_const::bind && bind_done_)){
+            jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
+            return rpc_const::bind_failure;
+        }
 
-               if(destroy_wait_){
-                 return rpc_const::cancel_failure;
-               }
+        if(destroy_wait_){
+          return rpc_const::cancel_failure;
+        }
 
-               ca.xid = xid_++;
-               calls_[ca.xid] = &ca;
+        ca.xid = xid_++;
+        calls_[ca.xid] = &ca;
 
-               req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
+        req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
                              xid_rep_window_.front());
-               req.pack_req_header(h);
+        req.pack_req_header(h);
                 xid_rep = xid_rep_window_.front();
-       }
-
-       TO curr_to;
-       struct timespec now, nextdeadline, finaldeadline; 
-
-       clock_gettime(CLOCK_REALTIME, &now);
-       add_timespec(now, to.to, &finaldeadline); 
-       curr_to.to = to_min.to;
-
-       bool transmit = true;
-       connection *ch = NULL;
-
-       while (1){
-               if(transmit){
-                       get_refconn(&ch);
-                       if(ch){
-                               if(reachable_) {
-                                        request forgot;
-                                        {
-                                                ScopedLock ml(&m_);
-                                                if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
-                                                        forgot = dup_req_;
-                                                        dup_req_.clear();
-                                                }
-                                        }
-                                        if (forgot.isvalid()) 
-                                                ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
-                                        ch->send(req.cstr(), req.size());
-                                }
-                               else jsl_log(JSL_DBG_1, "not reachable\n");
-                               jsl_log(JSL_DBG_2, 
-                                               "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", 
-                                               clt_nonce_, proc, ca.xid, clt_nonce_); 
-                       }
-                       transmit = false; // only send once on a given channel
-               }
-
-               if(!finaldeadline.tv_sec)
-                       break;
-
-               clock_gettime(CLOCK_REALTIME, &now);
-               add_timespec(now, curr_to.to, &nextdeadline); 
-               if(cmp_timespec(nextdeadline,finaldeadline) > 0){
-                       nextdeadline = finaldeadline;
-                       finaldeadline.tv_sec = 0;
-               }
-
-               {
-                       ScopedLock cal(&ca.m);
-                       while (!ca.done){
-                               jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
-                               if(pthread_cond_timedwait(&ca.c, &ca.m,
-                                                 &nextdeadline) == ETIMEDOUT){
-                                       jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
-                                       break;
-                               }
-                       }
-                       if(ca.done){
-                               jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
-                               break;
-                       }
-               }
-
-               if(retrans_ && (!ch || ch->isdead())){
-                       // since connection is dead, retransmit
-                        // on the new connection 
-                       transmit = true; 
-               }
-               curr_to.to <<= 1;
-       }
-
-       { 
-                // no locking of ca.m since only this thread changes ca.xid 
-               ScopedLock ml(&m_);
-               calls_.erase(ca.xid);
-               // may need to update the xid again here, in case the
-               // packet times out before it's even sent by the channel.
-               // I don't think there's any harm in maybe doing it twice
-               update_xid_rep(ca.xid);
-
-               if(destroy_wait_){
-                 VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
-               }
-       }
-
-        if (ca.done && lossytest_)
+    }
+
+    TO curr_to;
+    std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
+        std::chrono::steady_clock::now() +
+        std::chrono::milliseconds(to.to),
+        nextdeadline;
+
+    curr_to.to = to_min.to;
+
+    bool transmit = true;
+    connection *ch = NULL;
+
+    while (1){
+        if(transmit){
+            get_refconn(&ch);
+            if(ch){
+                if(reachable_) {
+                    request forgot;
+                    {
+                        lock ml(m_);
+                        if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
+                            forgot = dup_req_;
+                            dup_req_.clear();
+                        }
+                    }
+                    if (forgot.isvalid())
+                        ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
+                    ch->send(req.cstr(), req.size());
+                }
+                else jsl_log(JSL_DBG_1, "not reachable\n");
+                jsl_log(JSL_DBG_2,
+                        "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
+                        clt_nonce_, proc, ca.xid, clt_nonce_);
+            }
+            transmit = false; // only send once on a given channel
+        }
+
+        if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
+            break;
+
+        nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
+        if(nextdeadline > finaldeadline) {
+            nextdeadline = finaldeadline;
+            finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
+        }
+
         {
-                ScopedLock ml(&m_);
-                if (!dup_req_.isvalid()) {
-                        dup_req_.buf.assign(req.cstr(), req.size());
-                        dup_req_.xid = ca.xid;
+            lock cal(ca.m);
+            while (!ca.done){
+                jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
+                if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
+                    jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
+                    break;
                 }
-                if (xid_rep > xid_rep_done_)
-                        xid_rep_done_ = xid_rep;
+            }
+            if(ca.done){
+                jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
+                break;
+            }
         }
 
-       ScopedLock cal(&ca.m);
+        if(retrans_ && (!ch || ch->isdead())){
+            // since connection is dead, retransmit
+            // on the new connection
+            transmit = true;
+        }
+        curr_to.to <<= 1;
+    }
 
-       jsl_log(JSL_DBG_2, 
-                       "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", 
-                       clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
-                       ntohs(dst_.sin_port), ca.done, ca.intret);
+    {
+        // no locking of ca.m since only this thread changes ca.xid
+        lock ml(m_);
+        calls_.erase(ca.xid);
+        // may need to update the xid again here, in case the
+        // packet times out before it's even sent by the channel.
+        // I don't think there's any harm in maybe doing it twice
+        update_xid_rep(ca.xid);
+
+        if(destroy_wait_){
+          destroy_wait_c_.notify_one();
+        }
+    }
+
+    if (ca.done && lossytest_)
+    {
+        lock ml(m_);
+        if (!dup_req_.isvalid()) {
+            dup_req_.buf.assign(req.cstr(), req.size());
+            dup_req_.xid = ca.xid;
+        }
+        if (xid_rep > xid_rep_done_)
+            xid_rep_done_ = xid_rep;
+    }
 
-       if(ch)
-               ch->decref();
+    lock cal(ca.m);
 
-       // destruction of req automatically frees its buffer
-       return (ca.done? ca.intret : rpc_const::timeout_failure);
+    jsl_log(JSL_DBG_2,
+            "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
+            clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
+            ntohs(dst_.sin_port), ca.done, ca.intret);
+
+    if(ch)
+        ch->decref();
+
+    // destruction of req automatically frees its buffer
+    return (ca.done? ca.intret : rpc_const::timeout_failure);
 }
 
 void
 rpcc::get_refconn(connection **ch)
 {
-       ScopedLock ml(&chan_m_);
-       if(!chan_ || chan_->isdead()){
-               if(chan_)
-                       chan_->decref();
-               chan_ = connect_to_dst(dst_, this, lossytest_);
-       }
-       if(ch && chan_){
-               if(*ch){
-                       (*ch)->decref();
-               }
-               *ch = chan_;
-               (*ch)->incref();
-       }
+    lock ml(chan_m_);
+    if(!chan_ || chan_->isdead()){
+        if(chan_)
+            chan_->decref();
+        chan_ = connect_to_dst(dst_, this, lossytest_);
+    }
+    if(ch && chan_){
+        if(*ch){
+            (*ch)->decref();
+        }
+        *ch = chan_;
+        (*ch)->incref();
+    }
 }
 
-// PollMgr's thread is being used to 
-// make this upcall from connection object to rpcc. 
+// PollMgr's thread is being used to
+// make this upcall from connection object to rpcc.
 // this funtion must not block.
 //
-// this function keeps no reference for connection *c 
+// this function keeps no reference for connection *c
 bool
 rpcc::got_pdu(connection *c, char *b, int sz)
 {
-       unmarshall rep(b, sz);
-       reply_header h;
-       rep.unpack_reply_header(&h);
-
-       if(!rep.ok()){
-               jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
-               return true;
-       }
-
-       ScopedLock ml(&m_);
-
-       update_xid_rep(h.xid);
-
-       if(calls_.find(h.xid) == calls_.end()){
-               jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
-               return true;
-       }
-       caller *ca = calls_[h.xid];
-
-       ScopedLock cl(&ca->m);
-       if(!ca->done){
-               ca->un->take_in(rep);
-               ca->intret = h.ret;
-               if(ca->intret < 0){
-                       jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
-                                       h.xid, ca->intret);
-               }
-               ca->done = 1;
-       }
-       VERIFY(pthread_cond_broadcast(&ca->c) == 0);
-       return true;
+    unmarshall rep(b, sz);
+    reply_header h;
+    rep.unpack_reply_header(&h);
+
+    if(!rep.ok()){
+        jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
+        return true;
+    }
+
+    lock ml(m_);
+
+    update_xid_rep(h.xid);
+
+    if(calls_.find(h.xid) == calls_.end()){
+        jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
+        return true;
+    }
+    caller *ca = calls_[h.xid];
+
+    lock cl(ca->m);
+    if(!ca->done){
+        ca->un->take_in(rep);
+        ca->intret = h.ret;
+        if(ca->intret < 0){
+            jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
+                    h.xid, ca->intret);
+        }
+        ca->done = 1;
+    }
+    ca->c.notify_all();
+    return true;
 }
 
 // assumes thread holds mutex m
-void 
+void
 rpcc::update_xid_rep(unsigned int xid)
 {
-       std::list<unsigned int>::iterator it;
+    std::list<unsigned int>::iterator it;
 
-       if(xid <= xid_rep_window_.front()){
-               return;
-       }
+    if(xid <= xid_rep_window_.front()){
+        return;
+    }
 
-       for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
-               if(*it > xid){
-                       xid_rep_window_.insert(it, xid);
-                       goto compress;
-               }
-       }
-       xid_rep_window_.push_back(xid);
+    for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+        if(*it > xid){
+            xid_rep_window_.insert(it, xid);
+            goto compress;
+        }
+    }
+    xid_rep_window_.push_back(xid);
 
 compress:
-       it = xid_rep_window_.begin();
-       for (it++; it != xid_rep_window_.end(); it++){
-               while (xid_rep_window_.front() + 1 == *it)
-                       xid_rep_window_.pop_front();
-       }
+    it = xid_rep_window_.begin();
+    for (it++; it != xid_rep_window_.end(); it++){
+        while (xid_rep_window_.front() + 1 == *it)
+            xid_rep_window_.pop_front();
+    }
 }
 
 
 rpcs::rpcs(unsigned int p1, int count)
   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
 {
-       VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
-       VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
-       VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
-       VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
-
-       set_rand_seed();
-       nonce_ = random();
-       jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
+    set_rand_seed();
+    nonce_ = random();
+    jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
 
-       char *loss_env = getenv("RPC_LOSSY");
-       if(loss_env != NULL){
-               lossytest_ = atoi(loss_env);
-       }
+    char *loss_env = getenv("RPC_LOSSY");
+    if(loss_env != NULL){
+        lossytest_ = atoi(loss_env);
+    }
 
-       reg(rpc_const::bind, this, &rpcs::rpcbind);
-       dispatchpool_ = new ThrPool(6,false);
+    reg(rpc_const::bind, this, &rpcs::rpcbind);
+    dispatchpool_ = new ThrPool(6,false);
 
-       listener_ = new tcpsconn(this, port_, lossytest_);
+    listener_ = new tcpsconn(this, port_, lossytest_);
 }
 
 rpcs::~rpcs()
 {
-       // must delete listener before dispatchpool
-       delete listener_;
-       delete dispatchpool_;
-       free_reply_window();
+    // must delete listener before dispatchpool
+    delete listener_;
+    delete dispatchpool_;
+    free_reply_window();
 }
 
 bool
@@ -448,149 +422,149 @@ rpcs::got_pdu(connection *c, char *b, int sz)
             return true;
         }
 
-       djob_t *j = new djob_t(c, b, sz);
-       c->incref();
-       bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
-       if(!succ || !reachable_){
-               c->decref();
-               delete j;
-       }
-       return succ; 
+    djob_t *j = new djob_t(c, b, sz);
+    c->incref();
+    bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
+    if(!succ || !reachable_){
+        c->decref();
+        delete j;
+    }
+    return succ;
 }
 
 void
 rpcs::reg1(unsigned int proc, handler *h)
 {
-       ScopedLock pl(&procs_m_);
-       VERIFY(procs_.count(proc) == 0);
-       procs_[proc] = h;
-       VERIFY(procs_.count(proc) >= 1);
+    lock pl(procs_m_);
+    VERIFY(procs_.count(proc) == 0);
+    procs_[proc] = h;
+    VERIFY(procs_.count(proc) >= 1);
 }
 
 void
 rpcs::updatestat(unsigned int proc)
 {
-       ScopedLock cl(&count_m_);
-       counts_[proc]++;
-       curr_counts_--;
-       if(curr_counts_ == 0){
-               std::map<int, int>::iterator i;
-               printf("RPC STATS: ");
-               for (i = counts_.begin(); i != counts_.end(); i++){
-                       printf("%x:%d ", i->first, i->second);
-               }
-               printf("\n");
-
-               ScopedLock rwl(&reply_window_m_);
-               std::map<unsigned int,std::list<reply_t> >::iterator clt;
-
-               unsigned int totalrep = 0, maxrep = 0;
-               for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-                       totalrep += clt->second.size();
-                       if(clt->second.size() > maxrep)
-                               maxrep = clt->second.size();
-               }
-               jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", 
+    lock cl(count_m_);
+    counts_[proc]++;
+    curr_counts_--;
+    if(curr_counts_ == 0){
+        std::map<int, int>::iterator i;
+        printf("RPC STATS: ");
+        for (i = counts_.begin(); i != counts_.end(); i++){
+            printf("%x:%d ", i->first, i->second);
+        }
+        printf("\n");
+
+        lock rwl(reply_window_m_);
+        std::map<unsigned int,std::list<reply_t> >::iterator clt;
+
+        unsigned int totalrep = 0, maxrep = 0;
+        for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+            totalrep += clt->second.size();
+            if(clt->second.size() > maxrep)
+                maxrep = clt->second.size();
+        }
+        jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
                         (int) reply_window_.size()-1, totalrep, maxrep);
-               curr_counts_ = counting_;
-       }
+        curr_counts_ = counting_;
+    }
 }
 
 void
 rpcs::dispatch(djob_t *j)
 {
-       connection *c = j->conn;
-       unmarshall req(j->buf, j->sz);
-       delete j;
-
-       req_header h;
-       req.unpack_req_header(&h);
-       int proc = h.proc;
-
-       if(!req.ok()){
-               jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
-               c->decref();
-               return;
-       }
-
-       jsl_log(JSL_DBG_2,
-                       "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
-                       h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
-
-       marshall rep;
-       reply_header rh(h.xid,0);
-
-       // is client sending to an old instance of server?
-       if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
-               jsl_log(JSL_DBG_2,
-                               "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
-                               h.srv_nonce, nonce_, h.proc);
-               rh.ret = rpc_const::oldsrv_failure;
-               rep.pack_reply_header(rh);
-               c->send(rep.cstr(),rep.size());
-               return;
-       }
-
-       handler *f;
-       // is RPC proc a registered procedure?
-       {
-               ScopedLock pl(&procs_m_);
-               if(procs_.count(proc) < 1){
-                       fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
-                               proc);
-                       c->decref();
+    connection *c = j->conn;
+    unmarshall req(j->buf, j->sz);
+    delete j;
+
+    req_header h;
+    req.unpack_req_header(&h);
+    int proc = h.proc;
+
+    if(!req.ok()){
+        jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
+        c->decref();
+        return;
+    }
+
+    jsl_log(JSL_DBG_2,
+            "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
+            h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
+
+    marshall rep;
+    reply_header rh(h.xid,0);
+
+    // is client sending to an old instance of server?
+    if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
+        jsl_log(JSL_DBG_2,
+                "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
+                h.srv_nonce, nonce_, h.proc);
+        rh.ret = rpc_const::oldsrv_failure;
+        rep.pack_reply_header(rh);
+        c->send(rep.cstr(),rep.size());
+        return;
+    }
+
+    handler *f;
+    // is RPC proc a registered procedure?
+    {
+        lock pl(procs_m_);
+        if(procs_.count(proc) < 1){
+            fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
+                proc);
+            c->decref();
                         VERIFY(0);
-                       return;
-               }
-
-               f = procs_[proc];
-       }
-
-       rpcs::rpcstate_t stat;
-       char *b1;
-       int sz1;
-
-       if(h.clt_nonce){
-               // have i seen this client before?
-               {
-                       ScopedLock rwl(&reply_window_m_);
-                       // if we don't know about this clt_nonce, create a cleanup object
-                       if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
-                               VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+            return;
+        }
+
+        f = procs_[proc];
+    }
+
+    rpcs::rpcstate_t stat;
+    char *b1;
+    int sz1;
+
+    if(h.clt_nonce){
+        // have i seen this client before?
+        {
+            lock rwl(reply_window_m_);
+            // if we don't know about this clt_nonce, create a cleanup object
+            if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
+                VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
-                               jsl_log(JSL_DBG_2,
-                                               "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", 
-                                               h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
-                       }
-               }
-
-               // save the latest good connection to the client
-               {
-                       ScopedLock rwl(&conss_m_);
-                       if(conns_.find(h.clt_nonce) == conns_.end()){
-                               c->incref();
-                               conns_[h.clt_nonce] = c;
-                       } else if(conns_[h.clt_nonce]->compare(c) < 0){
-                               conns_[h.clt_nonce]->decref();
-                               c->incref();
-                               conns_[h.clt_nonce] = c;
-                       }
-               }
-
-               stat = checkduplicate_and_update(h.clt_nonce, h.xid,
+                jsl_log(JSL_DBG_2,
+                        "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
+                        h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
+            }
+        }
+
+        // save the latest good connection to the client
+        {
+            lock rwl(conss_m_);
+            if(conns_.find(h.clt_nonce) == conns_.end()){
+                c->incref();
+                conns_[h.clt_nonce] = c;
+            } else if(conns_[h.clt_nonce]->compare(c) < 0){
+                conns_[h.clt_nonce]->decref();
+                c->incref();
+                conns_[h.clt_nonce] = c;
+            }
+        }
+
+        stat = checkduplicate_and_update(h.clt_nonce, h.xid,
                                                  h.xid_rep, &b1, &sz1);
-       } else {
-               // this client does not require at most once logic
-               stat = NEW;
-       }
-
-       switch (stat){
-               case NEW: // new request
-                       if(counting_){
-                               updatestat(proc);
-                       }
-
-                       rh.ret = f->fn(req, rep);
+    } else {
+        // this client does not require at most once logic
+        stat = NEW;
+    }
+
+    switch (stat){
+        case NEW: // new request
+            if(counting_){
+                updatestat(proc);
+            }
+
+            rh.ret = f->fn(req, rep);
                         if (rh.ret == rpc_const::unmarshal_args_failure) {
                                 fprintf(stderr, "rpcs::dispatch: failed to"
                                        " unmarshall the arguments. You are"
@@ -598,50 +572,50 @@ rpcs::dispatch(djob_t *j)
                                        " types of arguments.\n", proc);
                                 VERIFY(0);
                         }
-                       VERIFY(rh.ret >= 0);
-
-                       rep.pack_reply_header(rh);
-                       rep.take_buf(&b1,&sz1);
-
-                       jsl_log(JSL_DBG_2,
-                                       "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
-                                       sz1, h.xid, proc, rh.ret, h.clt_nonce);
-
-                       if(h.clt_nonce > 0){
-                               // only record replies for clients that require at-most-once logic
-                               add_reply(h.clt_nonce, h.xid, b1, sz1);
-                       }
-
-                       // get the latest connection to the client
-                       {
-                               ScopedLock rwl(&conss_m_);
-                               if(c->isdead() && c != conns_[h.clt_nonce]){
-                                       c->decref();
-                                       c = conns_[h.clt_nonce];
-                                       c->incref();
-                               }
-                       }
-
-                       c->send(b1, sz1);
-                       if(h.clt_nonce == 0){
-                               // reply is not added to at-most-once window, free it
-                               free(b1);
-                       }
-                       break;
-               case INPROGRESS: // server is working on this request
-                       break;
-               case DONE: // duplicate and we still have the response
-                       c->send(b1, sz1);
-                       break;
-               case FORGOTTEN: // very old request and we don't have the response anymore
-                       jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", 
-                                       h.xid, h.clt_nonce);
-                       rh.ret = rpc_const::atmostonce_failure;
-                       rep.pack_reply_header(rh);
-                       c->send(rep.cstr(),rep.size());
-                       break;
-       }
-       c->decref();
+            VERIFY(rh.ret >= 0);
+
+            rep.pack_reply_header(rh);
+            rep.take_buf(&b1,&sz1);
+
+            jsl_log(JSL_DBG_2,
+                    "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
+                    sz1, h.xid, proc, rh.ret, h.clt_nonce);
+
+            if(h.clt_nonce > 0){
+                // only record replies for clients that require at-most-once logic
+                add_reply(h.clt_nonce, h.xid, b1, sz1);
+            }
+
+            // get the latest connection to the client
+            {
+                lock rwl(conss_m_);
+                if(c->isdead() && c != conns_[h.clt_nonce]){
+                    c->decref();
+                    c = conns_[h.clt_nonce];
+                    c->incref();
+                }
+            }
+
+            c->send(b1, sz1);
+            if(h.clt_nonce == 0){
+                // reply is not added to at-most-once window, free it
+                free(b1);
+            }
+            break;
+        case INPROGRESS: // server is working on this request
+            break;
+        case DONE: // duplicate and we still have the response
+            c->send(b1, sz1);
+            break;
+        case FORGOTTEN: // very old request and we don't have the response anymore
+            jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
+                    h.xid, h.clt_nonce);
+            rh.ret = rpc_const::atmostonce_failure;
+            rep.pack_reply_header(rh);
+            c->send(rep.cstr(),rep.size());
+            break;
+    }
+    c->decref();
 }
 
 // rpcs::dispatch calls this when an RPC request arrives.
@@ -658,11 +632,11 @@ rpcs::dispatch(djob_t *j)
 //   INPROGRESS: seen this xid, and still processing it.
 //   DONE: seen this xid, previous reply returned in *b and *sz.
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
-rpcs::rpcstate_t 
+rpcs::rpcstate_t
 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
-               unsigned int xid_rep, char **b, int *sz)
+        unsigned int xid_rep, char **b, int *sz)
 {
-       ScopedLock rwl(&reply_window_m_);
+    lock rwl(reply_window_m_);
 
     std::list<reply_t> &l = reply_window_[clt_nonce];
 
@@ -711,13 +685,13 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
 // and passes the return value in b and sz.
 // add_reply() should remember b and sz.
-// free_reply_window() and checkduplicate_and_update is responsible for 
+// free_reply_window() and checkduplicate_and_update is responsible for
 // calling free(b).
 void
 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
-               char *b, int sz)
+        char *b, int sz)
 {
-       ScopedLock rwl(&reply_window_m_);
+    lock rwl(reply_window_m_);
     // remember the RPC reply value
     std::list<reply_t> &l = reply_window_[clt_nonce];
     std::list<reply_t>::iterator it = l.begin();
@@ -735,353 +709,310 @@ rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
 void
 rpcs::free_reply_window(void)
 {
-       std::map<unsigned int,std::list<reply_t> >::iterator clt;
-       std::list<reply_t>::iterator it;
+    std::map<unsigned int,std::list<reply_t> >::iterator clt;
+    std::list<reply_t>::iterator it;
 
-       ScopedLock rwl(&reply_window_m_);
-       for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-               for (it = clt->second.begin(); it != clt->second.end(); it++){
-                       if (it->cb_present)
+    lock rwl(reply_window_m_);
+    for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+        for (it = clt->second.begin(); it != clt->second.end(); it++){
+            if (it->cb_present)
                 free(it->buf);
-               }
-               clt->second.clear();
-       }
-       reply_window_.clear();
+        }
+        clt->second.clear();
+    }
+    reply_window_.clear();
 }
 
 // rpc handler
-int 
+int
 rpcs::rpcbind(int a, int &r)
 {
-       jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
-       r = nonce_;
-       return 0;
+    jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
+    r = nonce_;
+    return 0;
 }
 
 void
 marshall::rawbyte(unsigned char x)
 {
-       if(_ind >= _capa){
-               _capa *= 2;
-               VERIFY (_buf != NULL);
-               _buf = (char *)realloc(_buf, _capa);
-               VERIFY(_buf);
-       }
-       _buf[_ind++] = x;
+    if(_ind >= _capa){
+        _capa *= 2;
+        VERIFY (_buf != NULL);
+        _buf = (char *)realloc(_buf, _capa);
+        VERIFY(_buf);
+    }
+    _buf[_ind++] = x;
 }
 
 void
 marshall::rawbytes(const char *p, int n)
 {
-       if((_ind+n) > _capa){
-               _capa = _capa > n? 2*_capa:(_capa+n);
-               VERIFY (_buf != NULL);
-               _buf = (char *)realloc(_buf, _capa);
-               VERIFY(_buf);
-       }
-       memcpy(_buf+_ind, p, n);
-       _ind += n;
+    if((_ind+n) > _capa){
+        _capa = _capa > n? 2*_capa:(_capa+n);
+        VERIFY (_buf != NULL);
+        _buf = (char *)realloc(_buf, _capa);
+        VERIFY(_buf);
+    }
+    memcpy(_buf+_ind, p, n);
+    _ind += n;
 }
 
 marshall &
 operator<<(marshall &m, bool x)
 {
-       m.rawbyte(x);
-       return m;
+    m.rawbyte(x);
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, unsigned char x)
 {
-       m.rawbyte(x);
-       return m;
+    m.rawbyte(x);
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, char x)
 {
-       m << (unsigned char) x;
-       return m;
+    m << (unsigned char) x;
+    return m;
 }
 
 
 marshall &
 operator<<(marshall &m, unsigned short x)
 {
-       m.rawbyte((x >> 8) & 0xff);
-       m.rawbyte(x & 0xff);
-       return m;
+    m.rawbyte((x >> 8) & 0xff);
+    m.rawbyte(x & 0xff);
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, short x)
 {
-       m << (unsigned short) x;
-       return m;
+    m << (unsigned short) x;
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, unsigned int x)
 {
-       // network order is big-endian
-       m.rawbyte((x >> 24) & 0xff);
-       m.rawbyte((x >> 16) & 0xff);
-       m.rawbyte((x >> 8) & 0xff);
-       m.rawbyte(x & 0xff);
-       return m;
+    // network order is big-endian
+    m.rawbyte((x >> 24) & 0xff);
+    m.rawbyte((x >> 16) & 0xff);
+    m.rawbyte((x >> 8) & 0xff);
+    m.rawbyte(x & 0xff);
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, int x)
 {
-       m << (unsigned int) x;
-       return m;
+    m << (unsigned int) x;
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, const std::string &s)
 {
-       m << (unsigned int) s.size();
-       m.rawbytes(s.data(), s.size());
-       return m;
+    m << (unsigned int) s.size();
+    m.rawbytes(s.data(), s.size());
+    return m;
 }
 
 marshall &
 operator<<(marshall &m, unsigned long long x)
 {
-       m << (unsigned int) (x >> 32);
-       m << (unsigned int) x;
-       return m;
+    m << (unsigned int) (x >> 32);
+    m << (unsigned int) x;
+    return m;
 }
 
 void
 marshall::pack(int x)
 {
-       rawbyte((x >> 24) & 0xff);
-       rawbyte((x >> 16) & 0xff);
-       rawbyte((x >> 8) & 0xff);
-       rawbyte(x & 0xff);
+    rawbyte((x >> 24) & 0xff);
+    rawbyte((x >> 16) & 0xff);
+    rawbyte((x >> 8) & 0xff);
+    rawbyte(x & 0xff);
 }
 
 void
 unmarshall::unpack(int *x)
 {
-       (*x) = (rawbyte() & 0xff) << 24;
-       (*x) |= (rawbyte() & 0xff) << 16;
-       (*x) |= (rawbyte() & 0xff) << 8;
-       (*x) |= rawbyte() & 0xff;
+    (*x) = (rawbyte() & 0xff) << 24;
+    (*x) |= (rawbyte() & 0xff) << 16;
+    (*x) |= (rawbyte() & 0xff) << 8;
+    (*x) |= rawbyte() & 0xff;
 }
 
 // take the contents from another unmarshall object
 void
 unmarshall::take_in(unmarshall &another)
 {
-       if(_buf)
-               free(_buf);
-       another.take_buf(&_buf, &_sz);
-       _ind = RPC_HEADER_SZ;
-       _ok = _sz >= RPC_HEADER_SZ?true:false;
+    if(_buf)
+        free(_buf);
+    another.take_buf(&_buf, &_sz);
+    _ind = RPC_HEADER_SZ;
+    _ok = _sz >= RPC_HEADER_SZ?true:false;
 }
 
 bool
 unmarshall::okdone()
 {
-       if(ok() && _ind == _sz){
-               return true;
-       } else {
-               return false;
-       }
+    if(ok() && _ind == _sz){
+        return true;
+    } else {
+        return false;
+    }
 }
 
 unsigned int
 unmarshall::rawbyte()
 {
-       char c = 0;
-       if(_ind >= _sz)
-               _ok = false;
-       else
-               c = _buf[_ind++];
-       return c;
+    char c = 0;
+    if(_ind >= _sz)
+        _ok = false;
+    else
+        c = _buf[_ind++];
+    return c;
 }
 
 unmarshall &
 operator>>(unmarshall &u, bool &x)
 {
-       x = (bool) u.rawbyte() ;
-       return u;
+    x = (bool) u.rawbyte() ;
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, unsigned char &x)
 {
-       x = (unsigned char) u.rawbyte() ;
-       return u;
+    x = (unsigned char) u.rawbyte() ;
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, char &x)
 {
-       x = (char) u.rawbyte();
-       return u;
+    x = (char) u.rawbyte();
+    return u;
 }
 
 
 unmarshall &
 operator>>(unmarshall &u, unsigned short &x)
 {
-       x = (u.rawbyte() & 0xff) << 8;
-       x |= u.rawbyte() & 0xff;
-       return u;
+    x = (u.rawbyte() & 0xff) << 8;
+    x |= u.rawbyte() & 0xff;
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, short &x)
 {
-       x = (u.rawbyte() & 0xff) << 8;
-       x |= u.rawbyte() & 0xff;
-       return u;
+    x = (u.rawbyte() & 0xff) << 8;
+    x |= u.rawbyte() & 0xff;
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, unsigned int &x)
 {
-       x = (u.rawbyte() & 0xff) << 24;
-       x |= (u.rawbyte() & 0xff) << 16;
-       x |= (u.rawbyte() & 0xff) << 8;
-       x |= u.rawbyte() & 0xff;
-       return u;
+    x = (u.rawbyte() & 0xff) << 24;
+    x |= (u.rawbyte() & 0xff) << 16;
+    x |= (u.rawbyte() & 0xff) << 8;
+    x |= u.rawbyte() & 0xff;
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, int &x)
 {
-       x = (u.rawbyte() & 0xff) << 24;
-       x |= (u.rawbyte() & 0xff) << 16;
-       x |= (u.rawbyte() & 0xff) << 8;
-       x |= u.rawbyte() & 0xff;
-       return u;
+    x = (u.rawbyte() & 0xff) << 24;
+    x |= (u.rawbyte() & 0xff) << 16;
+    x |= (u.rawbyte() & 0xff) << 8;
+    x |= u.rawbyte() & 0xff;
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, unsigned long long &x)
 {
-       unsigned int h, l;
-       u >> h;
-       u >> l;
-       x = l | ((unsigned long long) h << 32);
-       return u;
+    unsigned int h, l;
+    u >> h;
+    u >> l;
+    x = l | ((unsigned long long) h << 32);
+    return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, std::string &s)
 {
-       unsigned sz;
-       u >> sz;
-       if(u.ok())
-               u.rawbytes(s, sz);
-       return u;
+    unsigned sz;
+    u >> sz;
+    if(u.ok())
+        u.rawbytes(s, sz);
+    return u;
 }
 
 void
 unmarshall::rawbytes(std::string &ss, unsigned int n)
 {
-       if((_ind+n) > (unsigned)_sz){
-               _ok = false;
-       } else {
-               std::string tmps = std::string(_buf+_ind, n);
-               swap(ss, tmps);
-               VERIFY(ss.size() == n);
-               _ind += n;
-       }
+    if((_ind+n) > (unsigned)_sz){
+        _ok = false;
+    } else {
+        std::string tmps = std::string(_buf+_ind, n);
+        swap(ss, tmps);
+        VERIFY(ss.size() == n);
+        _ind += n;
+    }
 }
 
 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
-       return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
-                       ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
-                        ((a.sin_port < b.sin_port))));
+    return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
+            ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
+             ((a.sin_port < b.sin_port))));
 }
 
 /*---------------auxilary function--------------*/
 void
 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
 
-       char host[200];
-       const char *localhost = "127.0.0.1";
-       const char *port = index(hostandport, ':');
-       if(port == NULL){
-               memcpy(host, localhost, strlen(localhost)+1);
-               port = hostandport;
-       } else {
-               memcpy(host, hostandport, port-hostandport);
-               host[port-hostandport] = '\0';
-               port++;
-       }
+    char host[200];
+    const char *localhost = "127.0.0.1";
+    const char *port = index(hostandport, ':');
+    if(port == NULL){
+        memcpy(host, localhost, strlen(localhost)+1);
+        port = hostandport;
+    } else {
+        memcpy(host, hostandport, port-hostandport);
+        host[port-hostandport] = '\0';
+        port++;
+    }
 
-       make_sockaddr(host, port, dst);
+    make_sockaddr(host, port, dst);
 
 }
 
 void
 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
 
-       in_addr_t a;
-
-       bzero(dst, sizeof(*dst));
-       dst->sin_family = AF_INET;
-
-       a = inet_addr(host);
-       if(a != INADDR_NONE){
-               dst->sin_addr.s_addr = a;
-       } else {
-               struct hostent *hp = gethostbyname(host);
-               if(hp == 0 || hp->h_length != 4){
-                       fprintf(stderr, "cannot find host name %s\n", host);
-                       exit(1);
-               }
-               dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
-       }
-       dst->sin_port = htons(atoi(port));
-}
+    in_addr_t a;
 
-int
-cmp_timespec(const struct timespec &a, const struct timespec &b)
-{
-       if(a.tv_sec > b.tv_sec)
-               return 1;
-       else if(a.tv_sec < b.tv_sec)
-               return -1;
-       else {
-               if(a.tv_nsec > b.tv_nsec)
-                       return 1;
-               else if(a.tv_nsec < b.tv_nsec)
-                       return -1;
-               else
-                       return 0;
-       }
-}
+    bzero(dst, sizeof(*dst));
+    dst->sin_family = AF_INET;
 
-void
-add_timespec(const struct timespec &a, int b, struct timespec *result)
-{
-       // convert to millisec, add timeout, convert back
-       result->tv_sec = a.tv_sec + b/1000;
-       result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
-       VERIFY(result->tv_nsec >= 0);
-       while (result->tv_nsec > 1000000000){
-               result->tv_sec++;
-               result->tv_nsec-=1000000000;
-       }
-}
-
-int
-diff_timespec(const struct timespec &end, const struct timespec &start)
-{
-       int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
-       VERIFY(diff || end.tv_sec == start.tv_sec);
-       if(end.tv_nsec > start.tv_nsec){
-               diff += (end.tv_nsec-start.tv_nsec)/1000000;
-       } else {
-               diff -= (start.tv_nsec-end.tv_nsec)/1000000;
-       }
-       return diff;
+    a = inet_addr(host);
+    if(a != INADDR_NONE){
+        dst->sin_addr.s_addr = a;
+    } else {
+        struct hostent *hp = gethostbyname(host);
+        if(hp == 0 || hp->h_length != 4){
+            fprintf(stderr, "cannot find host name %s\n", host);
+            exit(1);
+        }
+        dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
+    }
+    dst->sin_port = htons(atoi(port));
 }
index fc61236..723121c 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -43,8 +43,8 @@ class rpcc : public chanmgr {
                        unmarshall *un;
                        int intret;
                        bool done;
-                       pthread_mutex_t m;
-                       pthread_cond_t c;
+            std::mutex m;
+            std::condition_variable c;
                };
 
                void get_refconn(connection **ch);
@@ -62,11 +62,11 @@ class rpcc : public chanmgr {
 
                connection *chan_;
 
-               pthread_mutex_t m_; // protect insert/delete to calls[]
-               pthread_mutex_t chan_m_;
+        std::mutex m_; // protect insert/delete to calls[]
+               std::mutex chan_m_;
 
                bool destroy_wait_;
-               pthread_cond_t destroy_wait_c_;
+        std::condition_variable destroy_wait_c_;
 
                std::map<int, caller *> calls_;
                std::list<unsigned int> xid_rep_window_;
@@ -328,10 +328,10 @@ class rpcs : public chanmgr {
        // map proc # to function
        std::map<int, handler *> procs_;
 
-       pthread_mutex_t procs_m_; // protect insert/delete to procs[]
-       pthread_mutex_t count_m_;  //protect modification of counts
-       pthread_mutex_t reply_window_m_; // protect reply window et al
-       pthread_mutex_t conss_m_; // protect conns_
+       std::mutex procs_m_; // protect insert/delete to procs[]
+       std::mutex count_m_;  //protect modification of counts
+       std::mutex reply_window_m_; // protect reply window et al
+       std::mutex conss_m_; // protect conns_
 
 
        protected:
@@ -626,8 +626,4 @@ void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
 void make_sockaddr(const char *host, const char *port,
                struct sockaddr_in *dst);
 
-int cmp_timespec(const struct timespec &a, const struct timespec &b);
-void add_timespec(const struct timespec &a, int b, struct timespec *result);
-int diff_timespec(const struct timespec &a, const struct timespec &b);
-
 #endif
index 4c66e43..d90e494 100644 (file)
@@ -10,7 +10,6 @@
 #include <sys/types.h>
 #include <unistd.h>
 #include "jsl_log.h"
-#include "gettime.h"
 #include "lang/verify.h"
 
 #define NUM_CL 2
@@ -19,7 +18,6 @@ rpcs *server;  // server rpc object
 rpcc *clients[NUM_CL];  // client rpc object
 struct sockaddr_in dst; //server's ip address
 int port;
-pthread_attr_t attr;
 
 // server-side handlers. they must be methods of some class
 // to simplify rpcs::reg(). a server process can have handlers
@@ -112,12 +110,11 @@ testmarshall()
        VERIFY(i1==i && l1==l && s1==s);
 }
 
-void *
-client1(void *xx)
+void
+client1(int cl)
 {
-
        // test concurrency.
-       int which_cl = ((unsigned long) xx ) % NUM_CL;
+       int which_cl = ((unsigned long) cl ) % NUM_CL;
 
        for(int i = 0; i < 100; i++){
                int arg = (random() % 2000);
@@ -137,25 +134,22 @@ client1(void *xx)
                int arg = (random() % 1000);
                int rep;
 
-               struct timespec start,end;
-               clock_gettime(CLOCK_REALTIME, &start);
+               auto start = std::chrono::steady_clock::now();
 
                int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
-               clock_gettime(CLOCK_REALTIME, &end);
-               int diff = diff_timespec(end, start);
+               auto end = std::chrono::steady_clock::now();
+               int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
                if (ret != 0)
                        printf("%d ms have elapsed!!!\n", diff);
                VERIFY(ret == 0);
                VERIFY(rep == (which ? arg+1 : arg+2));
        }
-
-       return 0;
 }
 
-void *
-client2(void *xx)
+void
+client2(int cl)
 {
-       int which_cl = ((unsigned long) xx ) % NUM_CL;
+       int which_cl = ((unsigned long) cl ) % NUM_CL;
 
        time_t t1;
        time(&t1);
@@ -170,10 +164,9 @@ client2(void *xx)
                }
                VERIFY((int)rep.size() == arg);
        }
-       return 0;
 }
 
-void *
+void
 client3(void *xx)
 {
        rpcc *c = (rpcc *) xx;
@@ -183,7 +176,6 @@ client3(void *xx)
                int ret = c->call(24, i, rep, rpcc::to(3000));
                VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
        }
-       return 0;
 }
 
 
@@ -267,18 +259,15 @@ concurrent_test(int nt)
        // create threads that make lots of calls in parallel,
        // to test thread synchronization for concurrent calls
        // and dispatches.
-       int ret;
-
        printf("start concurrent_test (%d threads) ...", nt);
 
-       pthread_t th[nt];
+    std::vector<std::thread> th(nt);
        for(int i = 0; i < nt; i++){
-               ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
-               VERIFY(ret == 0);
+        th[i] = std::thread(client1, i);
        }
 
        for(int i = 0; i < nt; i++){
-               VERIFY(pthread_join(th[i], NULL) == 0);
+        th[i].join();
        }
        printf(" OK\n");
 }
@@ -286,8 +275,6 @@ concurrent_test(int nt)
 void 
 lossy_test()
 {
-       int ret;
-
        printf("start lossy_test ...");
        VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
 
@@ -303,13 +290,12 @@ lossy_test()
        }
 
        int nt = 1;
-       pthread_t th[nt];
+    std::vector<std::thread> th(nt);
        for(int i = 0; i < nt; i++){
-               ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
-               VERIFY(ret == 0);
+        th[i] = std::thread(client2, i);
        }
        for(int i = 0; i < nt; i++){
-               VERIFY(pthread_join(th[i], NULL) == 0);
+        th[i].join();
        }
        printf(".. OK\n");
        VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
@@ -352,17 +338,15 @@ failure_test()
 
 
        int nt = 10;
-       int ret;
        printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
 
-       pthread_t th[nt];
+    std::vector<std::thread> th(nt);
        for(int i = 0; i < nt; i++){
-               ret = pthread_create(&th[i], &attr, client3, (void *) client);
-               VERIFY(ret == 0);
+        th[i] = std::thread(client3, client);
        }
 
        for(int i = 0; i < nt; i++){
-               VERIFY(pthread_join(th[i], NULL) == 0);
+        th[i].join();
        }
        printf("ok\n");
 
@@ -376,12 +360,11 @@ failure_test()
 
        printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
        for(int i = 0; i < nt; i++){
-               ret = pthread_create(&th[i], &attr, client3, (void *)client);
-               VERIFY(ret == 0);
+        th[i] = std::thread(client3, client);
        }
 
        for(int i = 0; i < nt; i++){
-               VERIFY(pthread_join(th[i], NULL) == 0);
+        th[i].join();
        }
        printf("ok\n");
 
@@ -436,12 +419,8 @@ main(int argc, char *argv[])
 
        testmarshall();
 
-       pthread_attr_init(&attr);
-       // set stack size to 32K, so we don't run out of memory
-       pthread_attr_setstacksize(&attr, 32*1024);
-
        if (isserver) {
-               printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
+               printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
                startserver();
        }
 
diff --git a/rpc/slock.h b/rpc/slock.h
deleted file mode 100644 (file)
index 7f419c4..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-#ifndef __SCOPED_LOCK__
-#define __SCOPED_LOCK__
-
-#include <pthread.h>
-#include "lang/verify.h"
-struct ScopedLock {
-       private:
-               pthread_mutex_t *m_;
-       public:
-               ScopedLock(pthread_mutex_t *m): m_(m) {
-                       VERIFY(pthread_mutex_lock(m_)==0);
-               }
-               ~ScopedLock() {
-                       VERIFY(pthread_mutex_unlock(m_)==0);
-               }
-};
-struct ScopedUnlock {
-       private:
-               pthread_mutex_t *m_;
-       public:
-               ScopedUnlock(pthread_mutex_t *m): m_(m) {
-                       VERIFY(pthread_mutex_unlock(m_)==0);
-               }
-               ~ScopedUnlock() {
-                       VERIFY(pthread_mutex_lock(m_)==0);
-               }
-};
-#endif  /*__SCOPED_LOCK__*/
index f9f32fa..26226cd 100644 (file)
@@ -1,10 +1,9 @@
-#include "slock.h"
 #include "thr_pool.h"
 #include <stdlib.h>
 #include <errno.h>
 #include "lang/verify.h"
 
-static void *
+static void
 do_worker(void *arg)
 {
        ThrPool *tp = (ThrPool *)arg;
@@ -15,7 +14,6 @@ do_worker(void *arg)
 
                (void)(j.f)(j.a);
        }
-       pthread_exit(NULL);
 }
 
 //if blocking, then addJob() blocks when queue is full
@@ -23,13 +21,8 @@ do_worker(void *arg)
 ThrPool::ThrPool(int sz, bool blocking)
 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
 {
-       pthread_attr_init(&attr_);
-       pthread_attr_setstacksize(&attr_, 128<<10);
-
        for (int i = 0; i < sz; i++) {
-               pthread_t t;
-               VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0);
-               th_.push_back(t);
+        th_.push_back(std::thread(do_worker, this));
        }
 }
 
@@ -39,19 +32,17 @@ ThrPool::~ThrPool()
 {
        for (int i = 0; i < nthreads_; i++) {
                job_t j;
-               j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit
+               j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit
                jobq_.enq(j);
        }
 
        for (int i = 0; i < nthreads_; i++) {
-               VERIFY(pthread_join(th_[i], NULL)==0);
+        th_[i].join();
        }
-
-       VERIFY(pthread_attr_destroy(&attr_)==0);
 }
 
 bool 
-ThrPool::addJob(void *(*f)(void *), void *a)
+ThrPool::addJob(void (*f)(void *), void *a)
 {
        job_t j;
        j.f = f;
index 5095961..4427aee 100644 (file)
@@ -1,17 +1,15 @@
-#ifndef __THR_POOL__
-#define __THR_POOL__
+#ifndef thr_pool_h
+#define thr_pool_h
 
-#include <pthread.h>
 #include <vector>
+#include <thread>
 
 #include "fifo.h"
 
 class ThrPool {
-
-
        public:
                struct job_t {
-                       void *(*f)(void *); //function point
+                       void (*f)(void *); //function point
                        void *a; //function arguments
                };
 
@@ -23,18 +21,17 @@ class ThrPool {
                bool takeJob(job_t *j);
 
        private:
-               pthread_attr_t attr_;
                int nthreads_;
                bool blockadd_;
 
 
                fifo<job_t> jobq_;
-               std::vector<pthread_t> th_;
+               std::vector<std::thread> th_;
 
-               bool addJob(void *(*f)(void *), void *a);
+               bool addJob(void (*f)(void *), void *a);
 };
 
-       template <class C, class A> bool 
+template <class C, class A> bool 
 ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
 {
 
@@ -43,14 +40,13 @@ ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
                        C *o;
                        void (C::*m)(A a);
                        A a;
-                       static void *func(void *vvv) {
+                       static void func(void *vvv) {
                                objfunc_wrapper *x = (objfunc_wrapper*)vvv;
                                C *o = x->o;
                                void (C::*m)(A ) = x->m;
                                A a = x->a;
                                (o->*m)(a);
                                delete x;
-                               return 0;
                        }
        };
 
diff --git a/rsm.cc b/rsm.cc
index 6841e3d..478e0e0 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
 #include "tprintf.h"
 #include "lang/verify.h"
 #include "rsm_client.h"
-
-static void *recoverythread(void *x) {
-    rsm *r = (rsm *) x;
-    r->recovery();
-    return 0;
-}
+#include "lock.h"
 
 rsm::rsm(std::string _first, std::string _me) :
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
     partitioned (false), dopartition(false), break1(false), break2(false)
 {
-    pthread_t th;
+    std::thread th;
 
     last_myvs.vid = 0;
     last_myvs.seqno = 0;
@@ -128,29 +123,32 @@ rsm::rsm(std::string _first, std::string _me) :
     testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq);
 
     {
-        ScopedLock ml(rsm_mutex);
-        VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0);
+        lock ml(rsm_mutex);
+        th = std::thread(&rsm::recovery, this);
     }
 }
 
 void rsm::reg1(int proc, handler *h) {
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     procs[proc] = h;
 }
 
 // The recovery thread runs this function
 void rsm::recovery() {
     bool r = true;
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
 
     while (1) {
         while (!cfg->ismember(cfg->myaddr(), vid_commit)) {
+            // XXX iannucci 2013/09/15 -- I don't understand whether accessing
+            // cfg->view_id in this manner involves a race.  I suspect not.
             if (join(primary)) {
                 tprintf("recovery: joined\n");
-                commit_change_wo(cfg->vid());
+                commit_change_wo(cfg->view_id());
             } else {
-                ScopedUnlock su(rsm_mutex);
-                sleep (30); // XXX make another node in cfg primary?
+                ml.unlock();
+                std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
+                ml.lock();
             }
         }
         vid_insync = vid_commit;
@@ -173,7 +171,7 @@ void rsm::recovery() {
             inviewchange = false;
         }
         tprintf("recovery: go to sleep %d %d\n", insync, inviewchange);
-        recovery_cond.wait(rsm_mutex);
+        recovery_cond.wait(ml);
     }
 }
 
@@ -190,24 +188,26 @@ std::ostream & operator<<(std::ostream &o, const std::vector<A> &d) {
 }
 
 bool rsm::sync_with_backups() {
+    adopt_lock ml(rsm_mutex);
+    ml.unlock();
     {
-        ScopedUnlock su(rsm_mutex);
         // Make sure that the state of lock_server_cache_rsm is stable during
         // synchronization; otherwise, the primary's state may be more recent
         // than replicas after the synchronization.
-        ScopedLock ml(invoke_mutex);
+        lock ml(invoke_mutex);
         // By acquiring and releasing the invoke_mutex once, we make sure that
         // the state of lock_server_cache_rsm will not be changed until all
         // replicas are synchronized. The reason is that client_invoke arrives
         // after this point of time will see inviewchange == true, and returns
         // BUSY.
     }
+    ml.lock();
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
-    backups = std::vector<std::string>(cfg->get_view(vid_insync));
+    cfg->get_view(vid_insync, backups);
     backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
     LOG("rsm::sync_with_backups " << backups);
-    sync_cond.wait(rsm_mutex);
+    sync_cond.wait(ml);
     insync = false;
     return true;
 }
@@ -237,12 +237,14 @@ bool rsm::statetransfer(std::string m)
             m.c_str(), last_myvs.vid, last_myvs.seqno);
     rpcc *cl;
     {
-        ScopedUnlock su(rsm_mutex);
+        adopt_lock ml(rsm_mutex);
+        ml.unlock();
         cl = h.safebind();
         if (cl) {
             ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(),
                     last_myvs, vid_insync, r, rpcc::to(1000));
         }
+        ml.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
         tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(),
@@ -259,16 +261,18 @@ bool rsm::statetransfer(std::string m)
 }
 
 bool rsm::statetransferdone(std::string m) {
-    ScopedUnlock su(rsm_mutex);
+    adopt_lock ml(rsm_mutex);
+    ml.unlock();
     handle h(m);
     rpcc *cl = h.safebind();
-    if (!cl)
-        return false;
-    int r;
-    rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r);
-    if (ret != rsm_protocol::OK)
-        return false;
-    return true;
+    bool done = false;
+    if (cl) {
+        int r;
+        rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r);
+        done = (ret == rsm_protocol::OK);
+    }
+    ml.lock();
+    return done;
 }
 
 
@@ -281,12 +285,14 @@ bool rsm::join(std::string m) {
             last_myvs.seqno);
     rpcc *cl;
     {
-        ScopedUnlock su(rsm_mutex);
+        adopt_lock ml(rsm_mutex);
+        ml.unlock();
         cl = h.safebind();
         if (cl != 0) {
             ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs,
                     r, rpcc::to(120000));
         }
+        ml.lock();
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
@@ -304,7 +310,7 @@ bool rsm::join(std::string m) {
  * completed a view change
  */
 void rsm::commit_change(unsigned vid) {
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     commit_change_wo(vid);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
         breakpoint2();
@@ -318,8 +324,8 @@ void rsm::commit_change_wo(unsigned vid) {
     vid_commit = vid;
     inviewchange = true;
     set_primary(vid);
-    recovery_cond.signal();
-    sync_cond.signal();
+    recovery_cond.notify_one();
+    sync_cond.notify_one();
     if (cfg->ismember(cfg->myaddr(), vid_commit))
         breakpoint2();
 }
@@ -347,12 +353,12 @@ void rsm::execute(int procno, std::string req, std::string &r) {
 //
 rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) {
     LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
-    ScopedLock ml(invoke_mutex);
+    lock ml(invoke_mutex);
     std::vector<std::string> m;
     std::string myaddr;
     viewstamp vs;
     {
-        ScopedLock ml(rsm_mutex);
+        lock ml(rsm_mutex);
         LOG("Checking for inviewchange");
         if (inviewchange)
             return rsm_client_protocol::BUSY;
@@ -361,7 +367,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std:
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
         LOG("Assigning a viewstamp");
-        m = cfg->get_view(vid_commit);
+        cfg->get_view(vid_commit, m);
         // assign the RPC the next viewstamp number
         vs = myvs;
         myvs++;
@@ -401,11 +407,11 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std:
 
 rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) {
     LOG("rsm::invoke: procno 0x" << std::hex << proc);
-    ScopedLock ml(invoke_mutex);
+    lock ml(invoke_mutex);
     std::vector<std::string> m;
     std::string myaddr;
     {
-        ScopedLock ml(rsm_mutex);
+        lock ml(rsm_mutex);
         // check if !inviewchange
         LOG("Checking for view change");
         if (inviewchange)
@@ -415,7 +421,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
-        m = cfg->get_view(vid_commit);
+        cfg->get_view(vid_commit, m);
         if (find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
@@ -436,7 +442,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d
  */
 rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid,
         rsm_protocol::transferres &r) {
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     int ret = rsm_protocol::OK;
     tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
             last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
@@ -454,12 +460,12 @@ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned
  * for view vid
  */
 rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     backups.erase(find(backups.begin(), backups.end(), m));
     if (backups.empty())
-        sync_cond.signal();
+        sync_cond.notify_one();
     return rsm_protocol::OK;
 }
 
@@ -469,7 +475,7 @@ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
 rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) {
     int ret = rsm_protocol::OK;
 
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(),
             last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
     if (cfg->ismember(m, vid_commit)) {
@@ -484,10 +490,11 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j
         unsigned vid_cache = vid_commit;
         bool succ;
         {
-            ScopedUnlock su(rsm_mutex);
+            ml.unlock();
             succ = cfg->add(m, vid_cache);
+            ml.lock();
         }
-        if (cfg->ismember(m, cfg->vid())) {
+        if (cfg->ismember(m, cfg->view_id())) {
             r.log = cfg->dump();
             tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str());
         } else {
@@ -505,8 +512,8 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j
  */
 rsm_client_protocol::status rsm::client_members(int i, std::vector<std::string> &r) {
     std::vector<std::string> m;
-    ScopedLock ml(rsm_mutex);
-    m = cfg->get_view(vid_commit);
+    lock ml(rsm_mutex);
+    cfg->get_view(vid_commit, m);
     m.push_back(primary);
     r = m;
     tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(),
@@ -518,8 +525,9 @@ rsm_client_protocol::status rsm::client_members(int i, std::vector<std::string>
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
-    std::vector<std::string> c = cfg->get_view(vid);
-    std::vector<std::string> p = cfg->get_view(vid - 1);
+    std::vector<std::string> c, p;
+    cfg->get_view(vid, c);
+    cfg->get_view(vid - 1, p);
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
@@ -539,7 +547,7 @@ void rsm::set_primary(unsigned vid) {
 }
 
 bool rsm::amiprimary() {
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     return primary == cfg->myaddr() && !inviewchange;
 }
 
@@ -551,7 +559,7 @@ bool rsm::amiprimary() {
 // assumes caller holds rsm_mutex
 void rsm::net_repair_wo(bool heal) {
     std::vector<std::string> m;
-    m = cfg->get_view(vid_commit);
+    cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
             handle h(m[i]);
@@ -563,7 +571,7 @@ void rsm::net_repair_wo(bool heal) {
 }
 
 rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) {
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
             heal, dopartition, partitioned);
     if (heal) {
@@ -603,7 +611,7 @@ void rsm::partition1() {
 
 rsm_test_protocol::status rsm::breakpointreq(int b, int &r) {
     r = rsm_test_protocol::OK;
-    ScopedLock ml(rsm_mutex);
+    lock ml(rsm_mutex);
     tprintf("rsm::breakpointreq: %d\n", b);
     if (b == 1) break1 = true;
     else if (b == 2) break2 = true;
diff --git a/rsm.h b/rsm.h
index c5bf4fc..ec7632d 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -7,7 +7,7 @@
 #include <vector>
 #include "rsm_protocol.h"
 #include "rsm_state_transfer.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include "config.h"
 
@@ -51,10 +51,10 @@ class rsm : public config_view_change {
         rsm_test_protocol::status test_net_repairreq(int heal, int &r);
         rsm_test_protocol::status breakpointreq(int b, int &r);
 
-        mutex rsm_mutex;
-        mutex invoke_mutex;
-        cond recovery_cond;
-        cond sync_cond;
+        std::mutex rsm_mutex;
+        std::mutex invoke_mutex;
+        std::condition_variable recovery_cond;
+        std::condition_variable sync_cond;
 
         void execute(int procno, std::string req, std::string &r);
         rsm_client_protocol::status client_invoke(int procno, std::string req,
index 69fec20..9beb0b3 100644 (file)
@@ -5,19 +5,18 @@
 #include <handle.h>
 #include <unistd.h>
 #include "lang/verify.h"
-
+#include "lock.h"
 
 rsm_client::rsm_client(std::string dst) {
     printf("create rsm_client\n");
     std::vector<std::string> mems;
 
-    pthread_mutex_init(&rsm_client_mutex, NULL);
     sockaddr_in dstsock;
     make_sockaddr(dst.c_str(), &dstsock);
     primary = dst;
 
     {
-        ScopedLock ml(&rsm_client_mutex);
+        lock ml(rsm_client_mutex);
         VERIFY (init_members());
     }
     printf("rsm_client: done\n");
@@ -31,16 +30,16 @@ void rsm_client::primary_failure() {
 
 rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
     int ret;
-    ScopedLock ml(&rsm_client_mutex);
+    lock ml(rsm_client_mutex);
     while (1) {
         printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
         handle h(primary);
 
-        VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
+        ml.unlock();
         rpcc *cl = h.safebind();
         if (cl)
             ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
-        VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
+        ml.lock();
 
         if (!cl)
             goto prim_fail;
@@ -71,14 +70,18 @@ prim_fail:
 bool rsm_client::init_members() {
     printf("rsm_client::init_members get members!\n");
     handle h(primary);
-    VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
     int ret;
-    rpcc *cl = h.safebind();
-    if (cl) {
-        ret = cl->call(rsm_client_protocol::members, 0, known_mems,
-                rpcc::to(1000));
+    rpcc *cl;
+    {
+        adopt_lock ml(rsm_client_mutex);
+        ml.unlock();
+        cl = h.safebind();
+        if (cl) {
+            ret = cl->call(rsm_client_protocol::members, 0, known_mems,
+                    rpcc::to(1000));
+        }
+        ml.lock();
     }
-    VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
     if (cl == 0 || ret != rsm_protocol::OK)
         return false;
     if (known_mems.size() < 1) {
index 3219179..039bc26 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef rsm_client_h
 #define rsm_client_h
 
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include "rsm_protocol.h"
 #include <string>
 #include <vector>
@@ -19,7 +19,7 @@ class rsm_client {
     protected:
         std::string primary;
         std::vector<std::string> known_mems;
-        pthread_mutex_t rsm_client_mutex;
+        std::mutex rsm_client_mutex;
         void primary_failure();
         bool init_members();
     public:
index a27ef83..d479d5d 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef rsm_protocol_h
 #define rsm_protocol_h
 
-#include "rpc.h"
+#include "rpc/rpc.h"
 
 
 class rsm_client_protocol {
index 08034e2..f172602 100644 (file)
@@ -4,37 +4,30 @@
 
 #include "rsm_protocol.h"
 #include "rsmtest_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include <vector>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string>
-using namespace std;
-
-rsmtest_client *lc;
 
 int
 main(int argc, char *argv[])
 {
-  int r;
-
-  if(argc != 4){
-    fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
-    exit(1);
-  }
+    if(argc != 4){
+        fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
+        exit(1);
+    }
 
-  lc = new rsmtest_client(argv[1]);
-  string command(argv[2]);
-  if (command == "partition") {
-    r = lc->net_repair(atoi(argv[3]));
-    printf ("net_repair returned %d\n", r);
-  } else if (command == "breakpoint") {
-    int b = atoi(argv[3]);
-    r = lc->breakpoint(b);
-    printf ("breakpoint %d returned %d\n", b, r);
-  } else {
-    fprintf(stderr, "Unknown command %s\n", argv[2]);
-  }
-  exit(0);
+    rsmtest_client *lc = new rsmtest_client(argv[1]);
+    std::string command(argv[2]);
+    if (command == "partition") {
+        printf("net_repair returned %d\n", lc->net_repair(atoi(argv[3])));
+    } else if (command == "breakpoint") {
+        int b = atoi(argv[3]);
+        printf("breakpoint %d returned %d\n", b, lc->breakpoint(b));
+    } else {
+        fprintf(stderr, "Unknown command %s\n", argv[2]);
+    }
+    return 0;
 }
index f008228..e27e8e5 100644 (file)
@@ -1,7 +1,7 @@
 // RPC stubs for clients to talk to rsmtest_server
 
 #include "rsmtest_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 #include <arpa/inet.h>
 
 #include <sstream>
 
 rsmtest_client::rsmtest_client(std::string dst)
 {
-  sockaddr_in dstsock;
-  make_sockaddr(dst.c_str(), &dstsock);
-  cl = new rpcc(dstsock);
-  if (cl->bind() < 0) {
-    printf("rsmtest_client: call bind\n");
-  }
+    sockaddr_in dstsock;
+    make_sockaddr(dst.c_str(), &dstsock);
+    cl = new rpcc(dstsock);
+    if (cl->bind() < 0) {
+        printf("rsmtest_client: call bind\n");
+    }
 }
 
 int
 rsmtest_client::net_repair(int heal)
 {
-  int r;
-  int ret = cl->call(rsm_test_protocol::net_repair, heal, r);
-  VERIFY (ret == rsm_test_protocol::OK);
-  return r;
+    int r;
+    int ret = cl->call(rsm_test_protocol::net_repair, heal, r);
+    VERIFY (ret == rsm_test_protocol::OK);
+    return r;
 }
 
 int
 rsmtest_client::breakpoint(int b)
 {
-  int r;
-  int ret = cl->call(rsm_test_protocol::breakpoint, b, r);
-  VERIFY (ret == rsm_test_protocol::OK);
-  return r;
+    int r;
+    int ret = cl->call(rsm_test_protocol::breakpoint, b, r);
+    VERIFY (ret == rsm_test_protocol::OK);
+    return r;
 }
 
 
index a175d9e..73a6cf1 100644 (file)
@@ -5,16 +5,16 @@
 
 #include <string>
 #include "rsm_protocol.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
 
 // Client interface to the rsmtest server
 class rsmtest_client {
- protected:
-  rpcc *cl;
- public:
-  rsmtest_client(std::string d);
-  virtual ~rsmtest_client() {};
-  virtual rsm_test_protocol::status net_repair(int heal);
-  virtual rsm_test_protocol::status breakpoint(int b);
+    protected:
+        rpcc *cl;
+    public:
+        rsmtest_client(std::string d);
+        virtual ~rsmtest_client() {};
+        virtual rsm_test_protocol::status net_repair(int heal);
+        virtual rsm_test_protocol::status breakpoint(int b);
 };
 #endif
diff --git a/srlock.cc b/srlock.cc
deleted file mode 100644 (file)
index 8966527..0000000
--- a/srlock.cc
+++ /dev/null
@@ -1,16 +0,0 @@
-#include "srlock.h"
-
-ScopedRemoteLock::ScopedRemoteLock(lock_client *lc, lock_protocol::lockid_t lid) :
-    lc_(lc), lid_(lid) {
-    lc_->acquire(lid_);
-    releaseOnFree = true;
-}
-
-void ScopedRemoteLock::retain() {
-    releaseOnFree = false;
-}
-
-ScopedRemoteLock::~ScopedRemoteLock() {
-    if (releaseOnFree)
-        lc_->release(lid_);
-}
diff --git a/srlock.h b/srlock.h
deleted file mode 100644 (file)
index 40bd8bd..0000000
--- a/srlock.h
+++ /dev/null
@@ -1,18 +0,0 @@
-#ifndef srlock_h
-#define srlock_h
-
-#include "lock_protocol.h"
-#include "lock_client.h"
-
-class ScopedRemoteLock {
-    protected:
-        lock_client *lc_;
-        lock_protocol::lockid_t lid_;
-        bool releaseOnFree;
-    public:
-        ScopedRemoteLock(lock_client *, lock_protocol::lockid_t);
-        void retain();
-        ~ScopedRemoteLock();
-};
-
-#endif
index 8f6c8a9..93a6070 100644 (file)
@@ -1,16 +1,9 @@
-#include "mutex.h"
 #include <sys/time.h>
 #include <stdint.h>
 #include "tprintf.h"
 
-uint64_t utime() {
-    struct timeval tp;
-    gettimeofday(&tp, NULL);
-    return (tp.tv_usec + (uint64_t)tp.tv_sec * 1000000) % 1000000000;
-}
-
-mutex cerr_mutex;
-std::map<pthread_t, int> thread_name_map;
+std::mutex cerr_mutex;
+std::map<std::thread::id, int> thread_name_map;
 int next_thread_num = 0;
 std::map<void *, int> instance_name_map;
 int next_instance_num = 0;
index 6a0cb2a..c61626a 100644 (file)
--- a/tprintf.h
+++ b/tprintf.h
@@ -1,27 +1,27 @@
-#ifndef TPRINTF_H
-#define TPRINTF_H
+#ifndef tprintf_h
+#define tprintf_h
 
 #include <iomanip>
 #include <iostream>
-#include "mutex.h"
-#include <time.h>
 #include <stdio.h>
 #include <map>
+#include "lock.h"
 
 extern mutex cerr_mutex;
-extern std::map<pthread_t, int> thread_name_map;
+extern std::map<std::thread::id, int> thread_name_map;
 extern int next_thread_num;
 extern std::map<void *, int> instance_name_map;
 extern int next_instance_num;
 extern char tprintf_thread_prefix;
 
 #define LOG_PREFIX { \
-    cerr_mutex.acquire(); \
-    pthread_t self = pthread_self(); \
+    cerr_mutex.lock(); \
+    auto self = std::this_thread::get_id(); \
     int tid = thread_name_map[self]; \
     if (tid==0) \
         tid = thread_name_map[self] = ++next_thread_num; \
-    std::cerr << std::left << std::setw(9) << utime() << " "; \
+    auto utime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
+    std::cerr << std::left << std::setw(9) << utime << " "; \
     std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \
     std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \
 }
@@ -32,7 +32,7 @@ extern char tprintf_thread_prefix;
     std::cerr << "#" << std::setw(2) << self; \
 }
 #define LOG_SUFFIX { \
-    cerr_mutex.release(); \
+    cerr_mutex.unlock(); \
 }
 
 #define LOG_NONMEMBER(x) { \
@@ -48,7 +48,7 @@ extern char tprintf_thread_prefix;
 }
 #define JOIN(from,to,sep) ({ \
     ostringstream oss; \
-    for(typeof(from) i=from;i!=to;i++) \
+    for(auto i=from;i!=to;i++) \
         oss << *i << sep; \
     oss.str(); \
 })
@@ -85,6 +85,4 @@ extern char tprintf_thread_prefix;
     LOG_NONMEMBER(buf); \
 }
 
-uint64_t utime();
-
 #endif