More cleaning
authorPeter Iannucci <iannucci@mit.edu>
Fri, 27 Sep 2013 06:58:46 +0000 (02:58 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Fri, 27 Sep 2013 07:02:22 +0000 (03:02 -0400)
42 files changed:
config.cc
config.h
handle.cc
handle.h
lang/verify.h
lock.h [deleted file]
lock_client.cc
lock_client.h
lock_demo.cc
lock_protocol.h
lock_server.cc
lock_server.h
lock_smain.cc
lock_tester.cc
log.cc
log.h
paxos.cc
paxos.h
paxos_protocol.h
rpc/connection.cc
rpc/connection.h
rpc/fifo.h
rpc/marshall.h
rpc/pollmgr.cc
rpc/pollmgr.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rpc/thr_pool.cc
rpc/thr_pool.h
rsm.cc
rsm.h
rsm_client.cc
rsm_client.h
rsm_protocol.h
rsm_state_transfer.h [deleted file]
rsm_tester.cc
rsmtest_client.cc
rsmtest_client.h
threaded_log.cc
threaded_log.h
types.h [new file with mode: 0644]

index 7bac4a9..d1cd70a 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,10 +1,5 @@
-#include <thread>
-#include <sstream>
 #include "config.h"
-#include "paxos.h"
 #include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
 
 // The config module maintains views. As a node joins or leaves a
 // view, the next view will be the same as previous view, except with
@@ -40,8 +35,7 @@
 
 config::config(const string &_first, const string &_me, config_view_change *_vc)
     : my_view_id(0), first(_first), me(_me), vc(_vc),
-      paxos_acceptor(this, me == _first, me, me),
-      paxos_proposer(this, &paxos_acceptor, me)
+      paxos(this, me == _first, me, me)
 {
     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
     lock cfg_mutex_lock(cfg_mutex);
@@ -51,7 +45,7 @@ config::config(const string &_first, const string &_me, config_view_change *_vc)
 
 void config::restore(const string &s) {
     lock cfg_mutex_lock(cfg_mutex);
-    paxos_acceptor.restore(s);
+    paxos.restore(s);
     reconstruct(cfg_mutex_lock);
 }
 
@@ -61,7 +55,7 @@ void config::get_view(unsigned instance, vector<string> &m) {
 }
 
 void config::get_view(unsigned instance, vector<string> &m, lock &) {
-    string value = paxos_acceptor.value(instance);
+    string value = paxos.value(instance);
     LOG("get_view(" << instance << "): returns " << value);
     m = members(value);
 }
@@ -80,8 +74,8 @@ string config::value(const vector<string> &m) const {
 
 void config::reconstruct(lock &cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
-    if (paxos_acceptor.instance() > 0) {
-        my_view_id = paxos_acceptor.instance();
+    my_view_id = paxos.instance();
+    if (my_view_id > 0) {
         get_view(my_view_id, mems, cfg_mutex_lock);
         LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
     }
@@ -128,7 +122,7 @@ bool config::add(const string &new_m, unsigned vid) {
     vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
-    bool r = paxos_proposer.run(nextvid, cmems, value(m));
+    bool r = paxos.run(nextvid, cmems, value(m));
     cfg_mutex_lock.lock();
     LOG("config::add: proposer returned " << (r ? "success" : "failure"));
     return r;
@@ -145,7 +139,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) {
     vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
-    bool r = paxos_proposer.run(nextvid, cmems, value(n));
+    bool r = paxos.run(nextvid, cmems, value(n));
     cfg_mutex_lock.lock();
     LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
     return r;
@@ -195,7 +189,7 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
     LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
     if (vid == my_view_id)
         return paxos_protocol::OK;
-    else if (paxos_proposer.isrunning()) {
+    else if (paxos.isrunning()) {
         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
         return paxos_protocol::OK;
     }
index 074cbe9..dc06b8a 100644 (file)
--- a/config.h
+++ b/config.h
@@ -1,23 +1,8 @@
 #ifndef config_h
 #define config_h
 
-#include <string>
-#include <vector>
+#include "types.h"
 #include "paxos.h"
-#include "lock.h"
-
-using std::chrono::steady_clock;
-using std::chrono::seconds;
-using std::string;
-using std::vector;
-using std::thread;
-using std::ostringstream;
-using std::istringstream;
-using std::ostream_iterator;
-using std::istream_iterator;
-using std::copy;
-using std::min;
-using std::min_element;
 
 class config_view_change {
     public:
@@ -31,8 +16,7 @@ class config : public paxos_change {
         string first;
         string me;
         config_view_change *vc;
-        acceptor paxos_acceptor;
-        proposer paxos_proposer;
+        proposer_acceptor paxos;
         vector<string> mems;
         mutex cfg_mutex;
         cond config_cond;
@@ -52,16 +36,15 @@ class config : public paxos_change {
         config(const string &_first, const string &_me, config_view_change *_vc);
         unsigned view_id() { return my_view_id; }
         const string &myaddr() const { return me; }
-        string dump() { return paxos_acceptor.dump(); }
+        string dump() { return paxos.dump(); }
         void get_view(unsigned instance, vector<string> &m);
         void restore(const string &s);
         bool add(const string &, unsigned view_id);
         bool ismember(const string &m, unsigned view_id);
         void heartbeater(void);
         void paxos_commit(unsigned instance, const string &v);
-        // XXX hack; maybe should have its own port number
-        rpcs *get_rpcs() { return paxos_acceptor.get_rpcs(); }
-        void breakpoint(int b) { paxos_proposer.breakpoint(b); }
+        rpcs *get_rpcs() { return paxos.get_rpcs(); }
+        void breakpoint(int b) { paxos.breakpoint(b); }
 };
 
 #endif
index d048ead..3b6e1fa 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -1,9 +1,4 @@
 #include "handle.h"
-#include "threaded_log.h"
-#include "lock.h"
-#include <map>
-
-using std::map;
 
 class hinfo {
 public:
index a06b156..a513b56 100644 (file)
--- a/handle.h
+++ b/handle.h
 #ifndef handle_h
 #define handle_h
 
+#include "types.h"
 #include "rpc/rpc.h"
-#include <string>
-
-using std::string;
 
 class hinfo;
 
index 2b092d2..823a48d 100644 (file)
@@ -3,8 +3,8 @@
 #ifndef verify_client_h
 #define verify_client_h
 
-#include <stdlib.h>
-#include <assert.h>
+#include <cstdlib>
+#include <cassert>
 
 #ifdef NDEBUG
 #define VERIFY(expr) do { if (!(expr)) abort(); } while (0)
diff --git a/lock.h b/lock.h
deleted file mode 100644 (file)
index 1d62c39..0000000
--- a/lock.h
+++ /dev/null
@@ -1,11 +0,0 @@
-#ifndef lock_h
-#define lock_h
-
-#include <thread>
-#include <mutex>
-
-using std::mutex;
-using lock = std::unique_lock<std::mutex>;
-using cond = std::condition_variable;
-
-#endif
index 22e57f1..99dcb5b 100644 (file)
@@ -1,14 +1,8 @@
 // RPC stubs for clients to talk to lock_server, and cache the locks.
 
 #include "lock_client.h"
-#include "rpc/rpc.h"
-#include <algorithm>
-#include "threaded_log.h"
 #include <arpa/inet.h>
 
-#include "rsm_client.h"
-#include "lock.h"
-
 void lock_state::wait(lock & mutex_lock) {
     auto self = std::this_thread::get_id();
     c[self].wait(mutex_lock);
@@ -42,7 +36,7 @@ lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), n
 
     srandom((uint32_t)time(NULL)^last_port);
     rlock_port = ((random()%32000) | (0x1 << 10));
-    id = "127.0.0.1:" + std::to_string(rlock_port);
+    id = "127.0.0.1:" + to_string(rlock_port);
     last_port = rlock_port;
     rpcs *rlsrpc = new rpcs(rlock_port);
     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
index 3290d1a..36ee3a2 100644 (file)
@@ -1,18 +1,14 @@
 // lock client interface.
 
 #ifndef lock_client_h
-
 #define lock_client_h
 
 #ifdef __cplusplus
 
-#include <string>
+#include "types.h"
 #include "lock_protocol.h"
-#include "rpc/rpc.h"
-#include "lang/verify.h"
 #include "rpc/fifo.h"
 #include "rsm_client.h"
-#include "lock.h"
 
 class lock_release_user {
     public:
@@ -20,11 +16,6 @@ class lock_release_user {
         virtual ~lock_release_user() {}
 };
 
-using std::string;
-using std::map;
-using std::thread;
-using std::list;
-
 class lock_state {
 public:
     enum {
index 3b38cdf..72fddf8 100644 (file)
@@ -1,5 +1,4 @@
 #include "lock_client.h"
-#include "threaded_log.h"
 
 char log_thread_prefix = 'd';
 
index 900897a..1e45ddc 100644 (file)
@@ -3,10 +3,8 @@
 #ifndef lock_protocol_h
 #define lock_protocol_h
 
+#include "types.h"
 #include "rpc/rpc.h"
-#include <string>
-
-using std::string;
 
 class lock_protocol {
     public:
@@ -16,7 +14,7 @@ class lock_protocol {
         enum rpc_numbers : proc_t {
             acquire = 0x7001,
             release,
-            stat
+            stat,
         };
 };
 
@@ -25,7 +23,7 @@ class rlock_protocol {
         enum status : status_t { OK, RPCERR };
         enum rpc_numbers : proc_t {
             revoke = 0x8001,
-            retry = 0x8002
+            retry,
         };
 };
 #endif
index cac6a90..379838a 100644 (file)
@@ -1,18 +1,10 @@
 // the caching lock server implementation
 
+#include "types.h"
 #include "lock_server.h"
-#include <sstream>
 #include <unistd.h>
 #include <arpa/inet.h>
-#include "lang/verify.h"
 #include "handle.h"
-#include "threaded_log.h"
-#include "rpc/marshall.h"
-#include "lock.h"
-
-using std::ostringstream;
-using std::istringstream;
-using std::vector;
 
 lock_state::lock_state():
     held(false)
@@ -61,7 +53,7 @@ void lock_server::revoker() [[noreturn]] {
             continue;
 
         lock_state &st = get_lock_state(lid);
-        holder held_by;
+        holder_t held_by;
         {
             lock sl(st.m);
             held_by = st.held_by;
@@ -89,7 +81,7 @@ void lock_server::retryer() [[noreturn]] {
 
         LOG("Sending retry for " << lid);
         lock_state &st = get_lock_state(lid);
-        holder front;
+        holder_t front;
         {
             lock sl(st.m);
             if (st.wanted_by.empty())
@@ -111,8 +103,8 @@ void lock_server::retryer() [[noreturn]] {
 }
 
 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
-    LOG_FUNC_ENTER_SERVER;
-    holder h = holder(id, xid);
+    LOG("lid=" << lid << " client=" << id << "," << xid);
+    holder_t h = holder_t(id, xid);
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
 
@@ -145,11 +137,11 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro
 
     // get in line
     bool found = false;
-    for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
-        if (i->first == id) {
+    for (auto p : st.wanted_by) {
+        if (p.first == id) {
             // make sure client is obeying serialization
-            if (i->second != xid) {
-                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+            if (p.second != xid) {
+                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
                 return lock_protocol::RPCERR;
             }
             found = true;
@@ -159,7 +151,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro
     if (!found)
         st.wanted_by.push_back(h);
 
-    LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
+    LOG("wanted_by=" << st.wanted_by);
 
     // send revoke if we're first in line
     if (st.wanted_by.front() == h)
@@ -168,11 +160,11 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro
     return lock_protocol::RETRY;
 }
 
-int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
-    LOG_FUNC_ENTER_SERVER;
+int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
+    LOG("lid=" << lid << " client=" << id << "," << xid);
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
-    if (st.held && st.held_by == holder(id, xid)) {
+    if (st.held && st.held_by == holder_t(id, xid)) {
         st.held = false;
         LOG("Lock " << lid << " not held");
     }
index 2aa8445..381c527 100644 (file)
@@ -1,33 +1,22 @@
 #ifndef lock_server_h
 #define lock_server_h
 
-#include <string>
-
-#include <map>
-#include <vector>
+#include "types.h"
 #include "lock_protocol.h"
-#include "rpc/rpc.h"
-#include "rsm_state_transfer.h"
 #include "rsm.h"
 #include "rpc/fifo.h"
-#include "lock.h"
-
-using std::string;
-using std::pair;
-using std::list;
-using std::map;
 
-typedef string callback;
-typedef pair<callback, lock_protocol::xid_t> holder;
+typedef string callback_t;
+typedef pair<callback_t, lock_protocol::xid_t> holder_t;
 
 class lock_state {
 public:
     lock_state();
     lock_state(const lock_state &other);
     bool held;
-    holder held_by;
-    list<holder> wanted_by;
-    map<callback, lock_protocol::xid_t> old_requests;
+    holder_t held_by;
+    list<holder_t> wanted_by;
+    map<callback_t, lock_protocol::xid_t> old_requests;
     mutex m;
     lock_state& operator=(const lock_state&);
 };
index d62a25b..5f859a8 100644 (file)
@@ -1,10 +1,6 @@
-#include "rpc/rpc.h"
+#include "lock_server.h"
 #include <arpa/inet.h>
-#include <stdlib.h>
-#include "threaded_log.h"
 #include <unistd.h>
-#include "lock_server.h"
-#include "rsm.h"
 
 // Main loop of lock_server
 
index ac9175b..c192128 100644 (file)
@@ -2,18 +2,10 @@
 // Lock server tester
 //
 
-#include "lock_protocol.h"
 #include "lock_client.h"
-#include "rpc/rpc.h"
 #include <arpa/inet.h>
-#include <vector>
-#include <stdlib.h>
-#include <stdio.h>
-#include "lang/verify.h"
-#include "threaded_log.h"
 #include <sys/types.h>
 #include <unistd.h>
-#include "lock.h"
 
 char log_thread_prefix = 'c';
 
@@ -35,8 +27,8 @@ void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 0) {
-        fprintf(stderr, "error: server granted %s twice\n", lid.c_str());
-        fprintf(stdout, "error: server granted %s twice\n", lid.c_str());
+        cout << "error: server granted " << lid << " twice" << endl;
+        cerr << "error: server granted " << lid << " twice" << endl;
         exit(1);
     }
     ct[x] += 1;
@@ -46,7 +38,7 @@ void check_release(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 1) {
-        fprintf(stderr, "error: client released un-held lock %s\n",  lid.c_str());
+        cerr << "error: client released un-held lock " << lid << endl;
         exit(1);
     }
     ct[x] -= 1;
@@ -132,7 +124,7 @@ main(int argc, char *argv[])
     srandom((uint32_t)getpid());
 
     if (argc < 2) {
-        fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
+        cerr << "Usage: " << argv[0] << " [host:]port [test]" << endl;
         exit(1);
     }
 
diff --git a/log.cc b/log.cc
index 627b7ac..95c40e3 100644 (file)
--- a/log.cc
+++ b/log.cc
@@ -1,28 +1,25 @@
+#include "log.h"
 #include "paxos.h"
-#include <fstream>
-#include <iostream>
-#include "threaded_log.h"
 
 // Paxos must maintain some durable state (i.e., that survives power
 // failures) to run Paxos correct.  This module implements a log with
 // all durable state to run Paxos.  Since the values chosen correspond
 // to views, the log contains all views since the beginning of time.
 
-log::log(acceptor *_acc, std::string _me) : pxs (_acc) {
+log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) {
     name = "paxos-" + _me + ".log";
     logread();
 }
 
 void log::logread(void) {
-    std::ifstream from;
-    std::string type;
+    ifstream from(name);
+    string type;
     unsigned instance;
 
-    from.open(name.c_str());
     LOG("logread");
     while (from >> type) {
         if (type == "done") {
-            std::string v;
+            string v;
             from >> instance;
             from.get();
             getline(from, v);
@@ -34,13 +31,11 @@ void log::logread(void) {
             pxs->n_h.n = 0;
             pxs->n_a.n = 0;
         } else if (type == "propseen") {
-            from >> pxs->n_h.n;
-            from >> pxs->n_h.m;
+            from >> pxs->n_h.n >> pxs->n_h.m;
             LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
         } else if (type == "accepted") {
-            std::string v;
-            from >> pxs->n_a.n;
-            from >> pxs->n_a.m;
+            string v;
+            from >> pxs->n_a.n >> pxs->n_a.m;
             from.get();
             getline(from, v);
             pxs->v_a = v;
@@ -53,28 +48,26 @@ void log::logread(void) {
     from.close();
 }
 
-std::string log::dump() {
-    std::ifstream from;
-    std::string res;
-    std::string v;
-    from.open(name.c_str());
+string log::dump() {
+    ifstream from(name);
+    string res;
+    string v;
     while (getline(from, v))
         res += v + "\n";
     from.close();
     return res;
 }
 
-void log::restore(std::string s) {
-    std::ofstream f;
+void log::restore(string s) {
     LOG("restore: " << s);
-    f.open(name.c_str(), std::ios::trunc);
+    ofstream f(name, std::ios::trunc);
     f << s;
     f.close();
 }
 
 // XXX should be an atomic operation
-void log::loginstance(unsigned instance, std::string v) {
-    std::ofstream f(name, std::ios::app);
+void log::loginstance(unsigned instance, string v) {
+    ofstream f(name, std::ios::app);
     f << "done " << instance << " " << v << "\n";
     f.close();
 }
@@ -82,21 +75,15 @@ void log::loginstance(unsigned instance, std::string v) {
 // an acceptor should call logprop(n_h) when it
 // receives a prepare to which it responds prepare_ok().
 void log::logprop(prop_t n_h) {
-    std::ofstream f;
-    f.open(name.c_str(), std::ios::app);
-    f << "propseen";
-    f << " ";
-    f << n_h.n;
-    f << " ";
-    f << n_h.m;
-    f << "\n";
+    ofstream f(name, std::ios::app);
+    f << "propseen " << n_h.n << " " << n_h.m << "\n";
     f.close();
 }
 
 // an acceptor should call logaccept(n_a, v_a) when it
 // receives an accept RPC to which it replies accept_ok().
-void log::logaccept(prop_t n, std::string v) {
-    std::ofstream f(name, std::ios::app);
+void log::logaccept(prop_t n, string v) {
+    ofstream f(name, std::ios::app);
     f << "accepted " << n.n << " " << n.m << " " << v << "\n";
     f.close();
 }
diff --git a/log.h b/log.h
index 5bd2779..e8acd4a 100644 (file)
--- a/log.h
+++ b/log.h
@@ -1,28 +1,27 @@
 #ifndef log_h
 #define log_h
 
-#include <string>
-#include <vector>
+#include "types.h"
+#include "paxos_protocol.h"
 
-
-class acceptor;
+class proposer_acceptor;
 
 class log {
- private:
-  std::string name;
-  acceptor *pxs;
- public:
-  log (acceptor*, std::string _me);
-  std::string dump();
-  void restore(std::string s);
-  void logread(void);
-  /* Log a committed paxos instance*/
-  void loginstance(unsigned instance, std::string v);
-  /* Log the highest proposal number that the local paxos acceptor has ever seen */
-  void logprop(prop_t n_h);
-  /* Log the proposal (proposal number and value) that the local paxos acceptor 
-     accept has ever accepted */
-  void logaccept(prop_t n_a, std::string v);
+    private:
+        string name;
+        proposer_acceptor *pxs;
+    public:
+        log (proposer_acceptor*, string _me);
+        string dump();
+        void restore(string s);
+        void logread(void);
+        // Log a committed paxos instance
+        void loginstance(unsigned instance, string v);
+        // Log the highest proposal number that the local paxos acceptor has ever seen
+        void logprop(prop_t n_h);
+        // Log the proposal (proposal number and value) that the local paxos acceptor 
+        // accept has ever accepted
+        void logaccept(prop_t n_a, string v);
 };
 
 #endif /* log_h */
index 46d9c1c..095d56a 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,10 +1,21 @@
 #include "paxos.h"
 #include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
-#include "lock.h"
 
-using std::stoi;
+string print_members(const nodes_t &nodes) {
+    ostringstream ost;
+    copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
+    return ost.str();
+}
+
+bool isamember(const node_t & m, const nodes_t & nodes) {
+    return find(nodes.begin(), nodes.end(), m) != nodes.end();
+}
+
+// check if l2 contains a majority of the elements of l1
+bool majority(const nodes_t &l1, const nodes_t &l2) {
+    auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+    return overlap >= (l1.size() >> 1) + 1;
+}
 
 // This module implements the proposer and acceptor of the Paxos
 // distributed algorithm as described by Lamport's "Paxos Made
@@ -15,148 +26,89 @@ using std::stoi;
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
-bool operator> (const prop_t &a, const prop_t &b) {
-    return (a.n > b.n || (a.n == b.n && a.m > b.m));
-}
-
-bool operator>= (const prop_t &a, const prop_t &b) {
-    return (a.n > b.n || (a.n == b.n && a.m >= b.m));
-}
-
-string
-print_members(const vector<string> &nodes) {
-    string s;
-    s.clear();
-    for (unsigned i = 0; i < nodes.size(); i++) {
-        s += nodes[i];
-        if (i < (nodes.size()-1))
-            s += ",";
-    }
-    return s;
-}
-
-
-bool isamember(const string & m, const vector<string> & nodes) {
-    for (auto n : nodes) {
-        if (n == m)
-            return 1;
-    }
-    return 0;
-}
-
-bool proposer::isrunning() {
-    bool r;
-    lock ml(pxs_mutex);
-    r = !stable;
-    return r;
-}
-
-// check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
-    unsigned n = 0;
-
-    for (unsigned i = 0; i < l1.size(); i++) {
-        if (isamember(l1[i], l2))
-            n++;
-    }
-    return n >= (l1.size() >> 1) + 1;
-}
-
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
-  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
-    stable (true)
+proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
+        bool _first, const node_t & _me, const value_t & _value)
+    : delegate(_delegate), me (_me)
 {
-    my_n.n = 0;
-    my_n.m = me;
-}
+    // at this point, the log has already been replayed
+    if (instance_h == 0 && _first) {
+        values[1] = _value;
+        l.loginstance(1, _value);
+        instance_h = 1;
+    }
 
-void proposer::setn()
-{
-    my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+    pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
+    pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
+    pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
 }
 
-bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
+bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
 {
-    vector<string> accepts;
-    vector<string> nodes;
-    string v;
-    bool r = false;
-
-    lock ml(pxs_mutex);
+    lock ml(proposer_mutex);
     LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
     if (!stable) {  // already running proposer?
         LOG("proposer::run: already running");
         return false;
     }
     stable = false;
-    setn();
-    accepts.clear();
-    v.clear();
+    bool r = false;
+    my_n.n = std::max(n_h.n, my_n.n) + 1;
+    nodes_t accepts;
+    value_t v = newv;
     if (prepare(instance, accepts, cur_nodes, v)) {
 
         if (majority(cur_nodes, accepts)) {
-            LOG("paxos::manager: received a majority of prepare responses");
-
-            if (v.size() == 0)
-                v = newv;
+            LOG("paxos::run: received a majority of prepare responses");
 
             breakpoint1();
 
-            nodes = accepts;
-            accepts.clear();
+            nodes_t nodes;
+            nodes.swap(accepts);
             accept(instance, accepts, nodes, v);
 
             if (majority(cur_nodes, accepts)) {
-                LOG("paxos::manager: received a majority of accept responses");
+                LOG("paxos::run: received a majority of accept responses");
 
                 breakpoint2();
 
                 decide(instance, accepts, v);
                 r = true;
             } else {
-                LOG("paxos::manager: no majority of accept responses");
+                LOG("paxos::run: no majority of accept responses");
             }
         } else {
-            LOG("paxos::manager: no majority of prepare responses");
+            LOG("paxos::run: no majority of prepare responses");
         }
     } else {
-        LOG("paxos::manager: prepare is rejected " << stable);
+        LOG("paxos::run: prepare is rejected " << stable);
     }
     stable = true;
     return r;
 }
 
-// proposer::run() calls prepare to send prepare RPCs to nodes
-// and collect responses. if one of those nodes
-// replies with an oldinstance, return false.
-// otherwise fill in accepts with set of nodes that accepted,
-// set v to the v_a with the highest n_a, and return true.
-bool
-proposer::prepare(unsigned instance, vector<string> & accepts,
-        const vector<string> & nodes,
-        string & v)
-{
-    struct paxos_protocol::preparearg arg = { instance, my_n };
-    struct paxos_protocol::prepareres res;
-    prop_t n_a = { 0, "" };
-    rpcc *r;
+bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
+        const nodes_t & nodes, value_t & v) {
+    prepareres res;
+    prop_t highest_n_a{0, ""};
     for (auto i : nodes) {
         handle h(i);
-        if (!(r = h.safebind()))
+        rpcc *r = h.safebind();
+        if (!r)
             continue;
-        int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
+        auto status = (paxos_protocol::status)r->call_timeout(
+                paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 LOG("commiting old instance!");
-                acc->commit(instance, res.v_a);
+                commit(instance, res.v_a);
                 return false;
             }
             if (res.accept) {
                 accepts.push_back(i);
-                if (res.n_a >= n_a) {
+                if (res.n_a >= highest_n_a) {
                     LOG("found a newer accepted proposal");
                     v = res.v_a;
-                    n_a = res.n_a;
+                    highest_n_a = res.n_a;
                 }
             }
         }
@@ -164,79 +116,45 @@ proposer::prepare(unsigned instance, vector<string> & accepts,
     return true;
 }
 
-// run() calls this to send out accept RPCs to accepts.
-// fill in accepts with list of nodes that accepted.
-void
-proposer::accept(unsigned instance, vector<string> & accepts,
-        const vector<string> & nodes, const string & v)
-{
-    struct paxos_protocol::acceptarg arg = { instance, my_n, v };
-    rpcc *r;
+void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
+        const nodes_t & nodes, const value_t & v) {
     for (auto i : nodes) {
         handle h(i);
-        if (!(r = h.safebind()))
+        rpcc *r = h.safebind();
+        if (!r)
             continue;
         bool accept = false;
-        int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
+        int status = r->call_timeout(
+                paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
 }
 
-void
-proposer::decide(unsigned instance, const vector<string> & accepts,
-        const string & v)
-{
-    struct paxos_protocol::decidearg arg = { instance, v };
-    rpcc *r;
+void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
     for (auto i : accepts) {
         handle h(i);
-        if (!(r = h.safebind()))
+        rpcc *r = h.safebind();
+        if (!r)
             continue;
         int res = 0;
-        r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
+        r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
-        const string & _value)
-  : cfg(_cfg), me (_me), instance_h(0)
-{
-    n_h.n = 0;
-    n_h.m = me;
-    n_a.n = 0;
-    n_a.m = me;
-    v_a.clear();
-
-    l = new log (this, me);
-
-    if (instance_h == 0 && _first) {
-        values[1] = _value;
-        l->loginstance(1, _value);
-        instance_h = 1;
-    }
-
-    pxs = new rpcs((uint32_t)stoi(_me));
-    pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
-    pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
-    pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
-}
-
 paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
-        paxos_protocol::preparearg a)
-{
-    lock ml(pxs_mutex);
+proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+    lock ml(acceptor_mutex);
     r.oldinstance = false;
     r.accept = false;
     r.n_a = n_a;
     r.v_a = v_a;
-    if (a.instance <= instance_h) {
+    if (instance <= instance_h) {
         r.oldinstance = true;
-        r.v_a = values[a.instance];
-    } else if (a.n > n_h) {
-        n_h = a.n;
-        l->logprop(n_h);
+        r.v_a = values[instance];
+    } else if (n > n_h) {
+        n_h = n;
+        l.logprop(n_h);
         r.accept = true;
     } else {
         LOG("I totally rejected this request.  Ha.");
@@ -245,106 +163,76 @@ acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
 }
 
 paxos_protocol::status
-acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
-{
-    lock ml(pxs_mutex);
+proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
+    lock ml(acceptor_mutex);
     r = false;
-    if (a.n >= n_h) {
-        n_a = a.n;
-        v_a = a.v;
-        l->logaccept(n_a, v_a);
-        r = true;
+    if (instance == instance_h + 1) {
+        if (n >= n_h) {
+            n_a = n;
+            v_a = v;
+            l.logaccept(n_a, v_a);
+            r = true;
+        }
+        return paxos_protocol::OK;
+    } else {
+        return paxos_protocol::ERR;
     }
-    return paxos_protocol::OK;
 }
 
-// the src argument is only for debugging
 paxos_protocol::status
-acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
-{
-    lock ml(pxs_mutex);
-    LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
-    if (a.instance == instance_h + 1) {
-        VERIFY(v_a == a.v);
-        commit(a.instance, v_a, ml);
-    } else if (a.instance <= instance_h) {
-        // we are ahead ignore.
+proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
+    lock ml(acceptor_mutex);
+    LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
+    if (instance == instance_h + 1) {
+        VERIFY(v_a == v);
+        commit(instance, v_a, ml);
+    } else if (instance <= instance_h) {
+        // we are ahead; ignore.
     } else {
-        // we are behind
+        // we are behind.
         VERIFY(0);
     }
     return paxos_protocol::OK;
 }
 
-void
-acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
-{
+void proposer_acceptor::commit(unsigned instance, const value_t & value) {
+    lock ml(acceptor_mutex);
+    commit(instance, value, ml);
+}
+
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
     LOG("acceptor::commit: instance=" << instance << " has v=" << value);
     if (instance > instance_h) {
-        LOG("commit: highestaccepteinstance = " << instance);
+        LOG("commit: highestacceptedinstance = " << instance);
         values[instance] = value;
-        l->loginstance(instance, value);
+        l.loginstance(instance, value);
         instance_h = instance;
-        n_h.n = 0;
-        n_h.m = me;
-        n_a.n = 0;
-        n_a.m = me;
+        n_a = n_h = {0, me};
         v_a.clear();
-        if (cfg) {
+        if (delegate) {
             pxs_mutex_lock.unlock();
-            cfg->paxos_commit(instance, value);
+            delegate->paxos_commit(instance, value);
             pxs_mutex_lock.lock();
         }
     }
 }
 
-void
-acceptor::commit(unsigned instance, const string & value)
-{
-    lock ml(pxs_mutex);
-    commit(instance, value, ml);
-}
-
-string
-acceptor::dump()
-{
-    return l->dump();
-}
-
-void
-acceptor::restore(const string & s)
-{
-    l->restore(s);
-    l->logread();
-}
-
-
-
 // For testing purposes
-
-// Call this from your code between phases prepare and accept of proposer
-void
-proposer::breakpoint1()
-{
+void proposer_acceptor::breakpoint1() {
     if (break1) {
         LOG("Dying at breakpoint 1!");
         exit(1);
     }
 }
 
-// Call this from your code between phases accept and decide of proposer
-void
-proposer::breakpoint2()
-{
+void proposer_acceptor::breakpoint2() {
     if (break2) {
         LOG("Dying at breakpoint 2!");
         exit(1);
     }
 }
 
-void
-proposer::breakpoint(int b)
-{
+void proposer_acceptor::breakpoint(int b) {
     if (b == 3) {
         LOG("Proposer: breakpoint 1");
         break1 = true;
diff --git a/paxos.h b/paxos.h
index 8561dd5..116403d 100644 (file)
--- a/paxos.h
+++ b/paxos.h
 #ifndef paxos_h
 #define paxos_h
 
-#include <string>
-#include <vector>
-#include <map>
+#include "types.h"
 #include "rpc/rpc.h"
 #include "paxos_protocol.h"
 #include "log.h"
-#include "lock.h"
 
-using std::string;
-using std::map;
-using std::vector;
+using prepareres = paxos_protocol::prepareres;
+
+using node_t = string;
+using nodes_t = vector<node_t>;
+using value_t = string;
 
 class paxos_change {
     public:
-        virtual void paxos_commit(unsigned instance, const string & v) = 0;
+        virtual void paxos_commit(unsigned instance, const value_t & v) = 0;
         virtual ~paxos_change() {}
 };
 
-class acceptor {
+extern bool isamember(const node_t & m, const nodes_t & nodes);
+extern bool majority(const nodes_t & l1, const nodes_t & l2);
+extern string print_members(const nodes_t & nodes);
+
+class proposer_acceptor {
     private:
-        log *l;
-        rpcs *pxs;
-        paxos_change *cfg;
-        string me;
-        mutex pxs_mutex;
+        mutex proposer_mutex;
+        mutex acceptor_mutex;
 
-        // Acceptor state
-        prop_t n_h;            // number of the highest proposal seen in a prepare
-        prop_t n_a;            // number of highest proposal accepted
-        string v_a;    // value of highest proposal accepted
-        unsigned instance_h;   // number of the highest instance we have decided
-        map<unsigned,string> values;   // vals of each instance
-
-        void commit(unsigned instance, const string & v, lock & pxs_mutex_lock);
-        paxos_protocol::status preparereq(paxos_protocol::prepareres & r,
-                const string & src, paxos_protocol::preparearg a);
-        paxos_protocol::status acceptreq(bool & r, const string & src,
-                paxos_protocol::acceptarg a);
-        paxos_protocol::status decidereq(int & r, const string & src,
-                paxos_protocol::decidearg a);
+        paxos_change *delegate;
+        node_t me;
 
-        friend class log;
+        rpcs pxs = {(uint32_t)std::stoi(me)};
 
-    public:
-        acceptor(class paxos_change *cfg, bool _first, const string & _me,
-                const string & _value);
-        ~acceptor() {}
-        void commit(unsigned instance, const string & v);
-        unsigned instance() { return instance_h; }
-        const string & value(unsigned instance) { return values[instance]; }
-        string dump();
-        void restore(const string &);
-        rpcs *get_rpcs() { return pxs; }
-        prop_t get_n_h() { return n_h; }
-        unsigned get_instance_h() { return instance_h; }
-};
+        bool break1 = false;
+        bool break2 = false;
 
-extern bool isamember(const string & m, const vector<string> & nodes);
-extern string print_members(const vector<string> & nodes);
+        // Proposer state
+        bool stable = true;
+        prop_t my_n = {0, me};      // number of the last proposal used in this instance
 
-class proposer {
-    private:
-        log *l;
-        paxos_change *cfg;
-        acceptor *acc;
-        string me;
-        bool break1;
-        bool break2;
+        // Acceptor state
+        prop_t n_h = {0, me};       // number of the highest proposal seen in a prepare
+        prop_t n_a = {0, me};       // number of highest proposal accepted
+        value_t v_a;                // value of highest proposal accepted
+        unsigned instance_h = 0;    // number of the highest instance we have decided
+        map<unsigned,value_t> values;   // vals of each instance
 
-        mutex pxs_mutex;
+        friend class log;
+        log l = {this, me};
 
-        // Proposer state
-        bool stable;
-        prop_t my_n;           // number of the last proposal used in this instance
-
-        void setn();
-        bool prepare(unsigned instance, vector<string> & accepts,
-                const vector<string> & nodes,
-                string & v);
-        void accept(unsigned instance, vector<string> & accepts,
-                const vector<string> & nodes, const string & v);
-        void decide(unsigned instance, const vector<string> & accepts,
-                const string & v);
+        void commit(unsigned instance, const value_t & v);
+        void commit(unsigned instance, const value_t & v, lock & pxs_mutex_lock);
+
+        paxos_protocol::status preparereq(prepareres & r, const node_t & src, unsigned instance, prop_t n);
+        paxos_protocol::status acceptreq(bool & r, const node_t & src, unsigned instance, prop_t n, const value_t & v);
+        paxos_protocol::status decidereq(int & r, const node_t & src, unsigned instance, const value_t & v);
+
+        bool prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v);
+        void accept(unsigned instance, nodes_t & accepts, const nodes_t & nodes, const value_t & v);
+        void decide(unsigned instance, const nodes_t & accepts, const value_t & v);
 
         void breakpoint1();
         void breakpoint2();
-        bool majority(const vector<string> & l1, const vector<string> & l2);
 
-        friend class log;
     public:
-        proposer(class paxos_change *cfg, class acceptor *_acceptor, const string &_me);
-        ~proposer() {}
-        bool run(unsigned instance, const vector<string> & cnodes, const string & v);
-        bool isrunning();
+        proposer_acceptor(paxos_change *delegate, bool _first, const node_t & _me, const value_t & _value);
+        unsigned instance() { return instance_h; }
+        const value_t & value(unsigned instance) { return values[instance]; }
+        string dump() { return l.dump(); }
+        void restore(const string &s) { l.restore(s); l.logread(); }
+        rpcs *get_rpcs() { return &pxs; }
+
+        bool run(unsigned instance, const nodes_t & cnodes, const value_t & v);
+        bool isrunning() { lock ml(proposer_mutex); return !stable; }
         void breakpoint(int b);
 };
 
-
-
-#endif /* paxos_h */
+#endif
index f2bdb3f..c24f155 100644 (file)
@@ -1,11 +1,12 @@
 #ifndef paxos_protocol_h
 #define paxos_protocol_h
 
+#include "types.h"
 #include "rpc/rpc.h"
 
 struct prop_t {
     unsigned n;
-    std::string m;
+    string m;
 };
 
 class paxos_protocol {
@@ -18,45 +19,18 @@ class paxos_protocol {
             heartbeat,
         };
 
-        struct preparearg {
-            unsigned instance;
-            prop_t n;
-        };
-
         struct prepareres {
             bool oldinstance;
             bool accept;
             prop_t n_a;
-            std::string v_a;
-        };
-
-        struct acceptarg {
-            unsigned instance;
-            prop_t n;
-            std::string v;
-        };
-
-        struct decidearg {
-            unsigned instance;
-            std::string v;
+            string v_a;
         };
 };
 
-inline unmarshall & operator>>(unmarshall &u, prop_t &a) {
-    return u >> a.n >> a.m;
-}
-
-inline marshall & operator<<(marshall &m, prop_t a) {
-    return m << a.n << a.m;
-}
-
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::preparearg &a) {
-    return u >> a.instance >> a.n;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::preparearg a) {
-    return m << a.instance << a.n;
-}
+inline unmarshall & operator>>(unmarshall &u, prop_t &a) { return u >> a.n >> a.m; }
+inline marshall & operator<<(marshall &m, prop_t a) { return m << a.n << a.m; }
+inline bool operator>(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) > tie(b.n, b.m); }
+inline bool operator>=(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) >= tie(b.n, b.m); }
 
 inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) {
     return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a;
@@ -66,20 +40,4 @@ inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) {
     return m << r.oldinstance << r.accept << r.n_a << r.v_a;
 }
 
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::acceptarg &a) {
-    return u >> a.instance >> a.n >> a.v;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::acceptarg a) {
-    return m << a.instance << a.n << a.v;
-}
-
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::decidearg &a) {
-    return u >> a.instance >> a.v;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::decidearg a) {
-    return m << a.instance << a.v;
-}
-
 #endif
index db6a3ea..3f60c69 100644 (file)
@@ -1,20 +1,17 @@
+// std::bind and syscall bind have the same name, so don't use std::bind in this file
+#define LIBT4_NO_FUNCTIONAL
+#include "connection.h"
 #include <fcntl.h>
 #include <sys/types.h>
-#include <sys/time.h>
 #include <netinet/tcp.h>
 #include <errno.h>
 #include <signal.h>
 #include <unistd.h>
-
-#include "connection.h"
-#include "pollmgr.h"
 #include "jsl_log.h"
-#include "lang/verify.h"
-#include "lock.h"
+#include <sys/socket.h>
 
 #define MAX_PDU (10<<20) //maximum PDF is 10M
 
-
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
 {
@@ -25,7 +22,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
 
     signal(SIGPIPE, SIG_IGN);
 
-    create_time_ = std::chrono::steady_clock::now();
+    create_time_ = steady_clock::now();
 
     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
 }
@@ -324,7 +321,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
        flags |= O_NONBLOCK;
        fcntl(pipe_[0], F_SETFL, flags);
 
-    th_ = std::thread(&tcpsconn::accept_conn, this);
+    th_ = thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
@@ -333,7 +330,7 @@ tcpsconn::~tcpsconn()
     th_.join();
 
        //close all the active connections
-       std::map<int, connection *>::iterator i;
+       map<int, connection *>::iterator i;
        for (i = conns_.begin(); i != conns_.end(); i++) {
                i->second->closeconn();
                i->second->decref();
@@ -356,8 +353,7 @@ tcpsconn::process_accept()
        connection *ch = new connection(mgr_, s1, lossy_);
 
     // garbage collect all dead connections with refcount of 1
-    std::map<int, connection *>::iterator i;
-    for (i = conns_.begin(); i != conns_.end();) {
+    for (auto i = conns_.begin(); i != conns_.end();) {
         if (i->second->isdead() && i->second->ref() == 1) {
             jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
                     i->second->channo());
index f529a35..261cf9f 100644 (file)
@@ -1,22 +1,16 @@
 #ifndef connection_h
 #define connection_h
 
+#include "types.h"
 #include <sys/types.h>
-#include <sys/socket.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <cstddef>
-#include <thread>
-
-#include <map>
-#include <limits>
-
 #include "pollmgr.h"
 
-constexpr size_t size_t_max = std::numeric_limits<size_t>::max();
+constexpr size_t size_t_max = numeric_limits<size_t>::max();
 
-class thread_exit_exception : std::exception {
-};
+class thread_exit_exception : exception {};
 
 class connection;
 
@@ -64,16 +58,16 @@ class connection : public aio_callback {
         charbuf wpdu_;
         charbuf rpdu_;
 
-        std::chrono::time_point<std::chrono::steady_clock> create_time_;
+        time_point<steady_clock> create_time_;
 
         int waiters_;
         int refno_;
         const int lossy_;
 
-        std::mutex m_;
-        std::mutex ref_m_;
-        std::condition_variable send_complete_;
-        std::condition_variable send_wait_;
+        mutex m_;
+        mutex ref_m_;
+        cond send_complete_;
+        cond send_wait_;
 };
 
 class tcpsconn {
@@ -84,14 +78,14 @@ class tcpsconn {
         void accept_conn();
     private:
         unsigned int port_;
-        std::mutex m_;
-        std::thread th_;
+        mutex m_;
+        thread th_;
         int pipe_[2];
 
         int tcp_; //file desciptor for accepting connection
         chanmgr *mgr_;
         int lossy_;
-        std::map<int, connection *> conns_;
+        map<int, connection *> conns_;
 
         void process_accept();
 };
index dde514d..215ec5b 100644 (file)
@@ -1,8 +1,7 @@
 #ifndef fifo_h
 #define fifo_h
 
-#include <list>
-#include "lock.h"
+#include "types.h"
 
 // blocks enq() and deq() when queue is FULL or EMPTY
 template<class T>
@@ -17,7 +16,7 @@ class fifo {
         }
 
        private:
-               std::list<T> q_;
+               list<T> q_;
         mutex m_;
         cond non_empty_c_; // q went non-empty
                cond has_space_c_; // q is not longer overfull
index abeaae7..20b9c07 100644 (file)
@@ -1,16 +1,10 @@
 #ifndef marshall_h
 #define marshall_h
 
-#include <iostream>
-#include <sstream>
-#include <string>
-#include <vector>
-#include <map>
-#include <stdlib.h>
-#include <string.h>
+#include "types.h"
+#include <cstring>
 #include <cstddef>
-#include <inttypes.h>
-#include "lang/verify.h"
+#include <cinttypes>
 
 using proc_t = uint32_t;
 using status_t = int32_t;
@@ -56,7 +50,7 @@ typedef int rpc_sz_t;
 
 //size of initial buffer allocation
 #define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ (std::max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
+#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
 
 class marshall {
     private:
@@ -66,7 +60,7 @@ class marshall {
 
         inline void reserve(size_t n) {
             if((index_+n) > capacity_){
-                capacity_ += std::max(capacity_, n);
+                capacity_ += max(capacity_, n);
                 VERIFY (buf_ != NULL);
                 buf_ = (char *)realloc(buf_, capacity_);
                 VERIFY(buf_);
@@ -106,12 +100,12 @@ class marshall {
         }
 
         // Return the current content (excluding header) as a string
-        std::string get_content() {
-            return std::string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
+        string get_content() {
+            return string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
         }
 
         // Return the current content (excluding header) as a string
-        std::string str() {
+        string str() {
             return get_content();
         }
 
@@ -135,16 +129,9 @@ marshall& operator<<(marshall &, int8_t);
 marshall& operator<<(marshall &, uint16_t);
 marshall& operator<<(marshall &, int16_t);
 marshall& operator<<(marshall &, uint64_t);
-marshall& operator<<(marshall &, const std::string &);
+marshall& operator<<(marshall &, const string &);
 
-template <class A, typename I=void>
-struct is_enumerable : std::false_type {};
-
-template<class A> struct is_enumerable<A,
-    decltype(std::declval<A&>().cbegin(), std::declval<A&>().cend(), void())
-> : std::true_type {};
-
-template <class A> typename std::enable_if<is_enumerable<A>::value, marshall>::type &
+template <class A> typename enable_if<is_iterable<A>::value, marshall>::type &
 operator<<(marshall &m, const A &x) {
     m << (unsigned int) x.size();
     for (const auto &a : x)
@@ -153,16 +140,16 @@ operator<<(marshall &m, const A &x) {
 }
 
 template <class A, class B> marshall &
-operator<<(marshall &m, const std::pair<A,B> &d) {
+operator<<(marshall &m, const pair<A,B> &d) {
     return m << d.first << d.second;
 }
 
 template<typename E>
-using enum_type_t = typename std::enable_if<std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
+using enum_type_t = typename enable_if<is_enum<E>::value, typename underlying_type<E>::type>::type;
 template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
 template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
 
-template <class E> typename std::enable_if<std::is_enum<E>::value, marshall>::type &
+template <class E> typename enable_if<is_enum<E>::value, marshall>::type &
 operator<<(marshall &m, E e) {
     return m << from_enum(e);
 }
@@ -179,8 +166,8 @@ unmarshall& operator>>(unmarshall &, int32_t &);
 unmarshall& operator>>(unmarshall &, size_t &);
 unmarshall& operator>>(unmarshall &, uint64_t &);
 unmarshall& operator>>(unmarshall &, int64_t &);
-unmarshall& operator>>(unmarshall &, std::string &);
-template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+unmarshall& operator>>(unmarshall &, string &);
+template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
 operator>>(unmarshall &u, E &e);
 
 class unmarshall {
@@ -194,7 +181,7 @@ class unmarshall {
     public:
         unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
         unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {}
-        unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
+        unmarshall(const string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
         {
             //take the content which does not exclude a RPC header from a string
             take_content(s);
@@ -207,7 +194,7 @@ class unmarshall {
         void take_in(unmarshall &another);
 
         //take the content which does not exclude a RPC header from a string
-        void take_content(const std::string &s) {
+        void take_content(const string &s) {
             sz_ = s.size()+RPC_HEADER_SZ;
             buf_ = (char *)realloc(buf_,sz_);
             VERIFY(buf_);
@@ -221,7 +208,7 @@ class unmarshall {
         bool okdone() const { return ok_ && index_ == sz_; }
 
         uint8_t rawbyte();
-        void rawbytes(std::string &s, size_t n);
+        void rawbytes(string &s, size_t n);
         template <class T> void rawbytes(T &t);
 
         size_t ind() { return index_;}
@@ -255,7 +242,7 @@ class unmarshall {
         }
 };
 
-template <class A> typename std::enable_if<is_enumerable<A>::value, unmarshall>::type &
+template <class A> typename enable_if<is_iterable<A>::value, unmarshall>::type &
 operator>>(unmarshall &u, A &x) {
     unsigned n = u.grab<unsigned>();
     x.clear();
@@ -265,26 +252,26 @@ operator>>(unmarshall &u, A &x) {
 }
 
 template <class A, class B> unmarshall &
-operator>>(unmarshall &u, std::map<A,B> &x) {
+operator>>(unmarshall &u, map<A,B> &x) {
     unsigned n = u.grab<unsigned>();
     x.clear();
     while (n--)
-        x.emplace(u.grab<std::pair<A,B>>());
+        x.emplace(u.grab<pair<A,B>>());
     return u;
 }
 
 template <class A, class B> unmarshall &
-operator>>(unmarshall &u, std::pair<A,B> &d) {
+operator>>(unmarshall &u, pair<A,B> &d) {
     return u >> d.first >> d.second;
 }
 
-template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
 operator>>(unmarshall &u, E &e) {
     e = to_enum<E>(u.grab<enum_type_t<E>>());
     return u;
 }
 
-typedef std::function<int(unmarshall &, marshall &)> handler;
+typedef function<int(unmarshall &, marshall &)> handler;
 
 //
 // Automatic marshalling wrappers for RPC handlers
@@ -294,7 +281,7 @@ typedef std::function<int(unmarshall &, marshall &)> handler;
 // C++11 does neither of these two things for us:
 // 1) Declare variables using a parameter pack expansion, like so
 //      Args ...args;
-// 2) Call a function with a std::tuple of the arguments it expects
+// 2) Call a function with a tuple of the arguments it expects
 //
 // We implement an 'invoke' function for functions of the RPC handler
 // signature, i.e. int(R & r, const Args...)
@@ -339,17 +326,17 @@ struct VerifyOnFailure {
 // One for function pointers...
 
 template <class F, class R, class RV, class args_type, size_t ...Indices>
-typename std::enable_if<!std::is_member_function_pointer<F>::value, RV>::type
+typename enable_if<!is_member_function_pointer<F>::value, RV>::type
 invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
-    return f(r, std::move(std::get<Indices>(t))...);
+    return f(r, move(get<Indices>(t))...);
 }
 
 // And one for pointers to member functions...
 
 template <class F, class C, class RV, class R, class args_type, size_t ...Indices>
-typename std::enable_if<std::is_member_function_pointer<F>::value, RV>::type
+typename enable_if<is_member_function_pointer<F>::value, RV>::type
 invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
-    return (c->*f)(r, std::move(std::get<Indices>(t))...);
+    return (c->*f)(r, move(get<Indices>(t))...);
 }
 
 // The class marshalled_func_imp uses partial template specialization to
@@ -373,15 +360,15 @@ struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
         // template parameters running from 0 up to (# args) - 1.
         using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
         // This type definition represents storage for f's unmarshalled
-        // arguments.  std::decay is (most notably) stripping off const
+        // arguments.  decay is (most notably) stripping off const
         // qualifiers.
-        using ArgsStorage = std::tuple<typename std::decay<Args>::type...>;
-        // Allocate a handler (i.e. std::function) to hold the lambda
+        using ArgsStorage = tuple<typename decay<Args>::type...>;
+        // Allocate a handler (i.e. function) to hold the lambda
         // which will unmarshall RPCs and call f.
         return new handler([=](unmarshall &u, marshall &m) -> RV {
             // Unmarshall each argument with the correct type and store the
             // result in a tuple.
-            ArgsStorage t = {u.grab<typename std::decay<Args>::type>()...};
+            ArgsStorage t = {u.grab<typename decay<Args>::type>()...};
             // Verify successful unmarshalling of the entire input stream.
             if (!u.okdone())
                 return (RV)ErrorHandler::unmarshall_args_failure();
@@ -417,7 +404,7 @@ struct marshalled_func<F, ErrorHandler, RV(C::*)(Args...)> :
     public marshalled_func_imp<F, C, RV(Args...), ErrorHandler> {};
 
 template <class F, class ErrorHandler, class Signature>
-struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
+struct marshalled_func<F, ErrorHandler, function<Signature>> :
     public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
 
 #endif
index 919a286..023a7aa 100644 (file)
@@ -1,11 +1,10 @@
+#include "types.h"
 #include <errno.h>
 #include <fcntl.h>
 #include <unistd.h>
 
 #include "jsl_log.h"
-#include "lang/verify.h"
 #include "pollmgr.h"
-#include "lock.h"
 
 PollMgr *PollMgr::instance = NULL;
 static std::once_flag pollmgr_is_initialized;
index 89d1660..a082843 100644 (file)
@@ -1,9 +1,8 @@
 #ifndef pollmgr_h
 #define pollmgr_h 
 
+#include "types.h"
 #include <sys/select.h>
-#include <vector>
-#include <thread>
 
 #ifdef __linux__
 #include <sys/epoll.h>
@@ -24,7 +23,7 @@ class aio_mgr {
                virtual void watch_fd(int fd, poll_flag flag) = 0;
                virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
                virtual bool is_watched(int fd, poll_flag flag) = 0;
-               virtual void wait_ready(std::vector<int> *readable, std::vector<int> *writable) = 0;
+               virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
                virtual ~aio_mgr() {}
 };
 
@@ -55,9 +54,9 @@ class PollMgr {
                static int useless;
 
        private:
-        std::mutex m_;
-        std::condition_variable changedone_c_;
-        std::thread th_;
+        mutex m_;
+        cond changedone_c_;
+        thread th_;
 
                aio_callback *callbacks_[MAX_POLL_FDS];
                aio_mgr *aio_;
@@ -73,7 +72,7 @@ class SelectAIO : public aio_mgr {
                void watch_fd(int fd, poll_flag flag);
                bool unwatch_fd(int fd, poll_flag flag);
                bool is_watched(int fd, poll_flag flag);
-               void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
+               void wait_ready(vector<int> *readable, vector<int> *writable);
 
        private:
 
@@ -82,7 +81,7 @@ class SelectAIO : public aio_mgr {
                int highfds_;
                int pipefd_[2];
 
-        std::mutex m_;
+        mutex m_;
 
 };
 
@@ -94,7 +93,7 @@ class EPollAIO : public aio_mgr {
                void watch_fd(int fd, poll_flag flag);
                bool unwatch_fd(int fd, poll_flag flag);
                bool is_watched(int fd, poll_flag flag);
-               void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
+               void wait_ready(vector<int> *readable, vector<int> *writable);
 
        private:
                int pollfd_;
index c6d93c8..ad3fcd9 100644 (file)
@@ -54,6 +54,7 @@
  x exited worker threads).
  */
 
+#include "types.h"
 #include "rpc.h"
 
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <netdb.h>
 #include <unistd.h>
-#include "lock.h"
 
 #include "jsl_log.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
-
-using std::stoi;
 
 const rpcc::TO rpcc::to_max = { 120000 };
 const rpcc::TO rpcc::to_min = { 1000 };
 
-rpcc::caller::caller(int xxid, unmarshall *xun)
-: xid(xxid), un(xun), done(false)
-{
-}
-
-rpcc::caller::~caller()
-{
-}
-
-inline
-void set_rand_seed()
-{
-    auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
+inline void set_rand_seed() {
+    auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
@@ -196,10 +181,8 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
     }
 
     TO curr_to;
-    std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
-        std::chrono::steady_clock::now() +
-        std::chrono::milliseconds(to.to),
-        nextdeadline;
+    auto finaldeadline = steady_clock::now() + milliseconds(to.to),
+        nextdeadline = finaldeadline;
 
     curr_to.to = to_min.to;
 
@@ -231,20 +214,20 @@ rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
             transmit = false; // only send once on a given channel
         }
 
-        if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
+        if(finaldeadline == time_point<steady_clock>::min())
             break;
 
-        nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
+        nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
-            finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
+            finaldeadline = time_point<steady_clock>::min();
         }
 
         {
             lock cal(ca.m);
             while (!ca.done){
                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
-                if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
+                if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
                     break;
                 }
@@ -421,7 +404,7 @@ rpcs::got_pdu(connection *c, char *b, size_t sz)
 
     djob_t *j = new djob_t(c, b, sz);
     c->incref();
-    bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
+    bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
     if(!succ || !reachable_){
         c->decref();
         delete j;
@@ -447,16 +430,15 @@ rpcs::updatestat(proc_t proc)
     if(curr_counts_ == 0){
         LOG("RPC STATS: ");
         for (auto i = counts_.begin(); i != counts_.end(); i++)
-            LOG(std::hex << i->first << ":" << std::dec << i->second);
+            LOG(hex << i->first << ":" << dec << i->second);
 
         lock rwl(reply_window_m_);
-        map<unsigned int,list<reply_t> >::iterator clt;
 
         size_t totalrep = 0, maxrep = 0;
-        for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-            totalrep += clt->second.size();
-            if(clt->second.size() > maxrep)
-                maxrep = clt->second.size();
+        for (auto clt : reply_window_) {
+            totalrep += clt.second.size();
+            if(clt.second.size() > maxrep)
+                maxrep = clt.second.size();
         }
         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
                         (int) reply_window_.size()-1, totalrep, maxrep);
@@ -639,8 +621,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
 
     int past_xid_rep = l.begin()->xid;
 
-    list<reply_t>::iterator start = l.begin(), it;
-    it = ++start;
+    list<reply_t>::iterator start = l.begin(), it = ++start;
 
     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
         // scan for deletion candidates
@@ -700,24 +681,19 @@ rpcs::add_reply(unsigned int clt_nonce, int xid,
     }
 }
 
-void
-rpcs::free_reply_window(void)
-{
+void rpcs::free_reply_window(void) {
     lock rwl(reply_window_m_);
-    for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-        for (auto it = clt->second.begin(); it != clt->second.end(); it++){
-            if (it->cb_present)
-                free(it->buf);
+    for (auto clt : reply_window_) {
+        for (auto it : clt.second){
+            if (it.cb_present)
+                free(it.buf);
         }
-        clt->second.clear();
+        clt.second.clear();
     }
     reply_window_.clear();
 }
 
-// rpc handler
-int
-rpcs::rpcbind(unsigned int &r, int)
-{
+int rpcs::rpcbind(unsigned int &r, int) {
     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
     r = nonce_;
     return 0;
index d81a5dd..f01af09 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -1,20 +1,13 @@
 #ifndef rpc_h
 #define rpc_h
 
+#include "types.h"
 #include <sys/socket.h>
 #include <netinet/in.h>
-#include <list>
-#include <map>
-#include <stdio.h>
 
 #include "thr_pool.h"
 #include "marshall.h"
 #include "connection.h"
-#include "lock.h"
-
-using std::string;
-using std::map;
-using std::list;
 
 class rpc_const {
     public:
@@ -37,13 +30,12 @@ class rpcc : public chanmgr {
 
         //manages per rpc info
         struct caller {
-            caller(int xxid, unmarshall *un);
-            ~caller();
+            caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {}
 
             int xid;
             unmarshall *un;
             int intret;
-            bool done;
+            bool done = false;
             mutex m;
             cond c;
         };
@@ -126,9 +118,8 @@ rpcc::call_m(proc_t proc, marshall &req, R & r, TO to)
     if (intret < 0) return intret;
     u >> r;
     if (u.okdone() != true) {
-        fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
-                "You are probably calling RPC 0x%x with wrong return "
-                "type.\n", proc);
+        cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
+                "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
         VERIFY(0);
         return rpc_const::unmarshal_reply_failure;
     }
index bf8a56c..c69d317 100644 (file)
@@ -1,28 +1,18 @@
 // RPC test and pseudo-documentation.
 // generates print statements on failures, but eventually says "rpctest OK"
 
+#include "types.h"
 #include "rpc.h"
 #include <arpa/inet.h>
-#include <iostream>
-#include <vector>
-#include <thread>
-#include <stdlib.h>
 #include <getopt.h>
 #include <sys/types.h>
 #include <unistd.h>
 #include "jsl_log.h"
-#include "lang/verify.h"
 
 #define NUM_CL 2
 
 char log_thread_prefix = 'r';
 
-using std::string;
-using std::cout;
-using std::endl;
-using std::vector;
-using std::thread;
-
 rpcs *server;  // server rpc object
 rpcc *clients[NUM_CL];  // client rpc object
 string dst; //server's ip address
index ff3557c..8b9691b 100644 (file)
@@ -1,7 +1,4 @@
 #include "thr_pool.h"
-#include <stdlib.h>
-#include <errno.h>
-#include "lang/verify.h"
 
 // if blocking, then addJob() blocks when queue is full
 // otherwise, addJob() simply returns false when queue is full
index 4ea1bd4..94ce237 100644 (file)
@@ -1,9 +1,7 @@
 #ifndef thr_pool_h
 #define thr_pool_h
 
-#include <vector>
-#include <thread>
-
+#include "types.h"
 #include "fifo.h"
 
 typedef std::function<void()> job_t;
diff --git a/rsm.cc b/rsm.cc
index 1321f7e..00cae81 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
-#include <fstream>
-#include <iostream>
-#include <algorithm>
 #include <sys/types.h>
 #include <unistd.h>
 
+#include "types.h"
 #include "handle.h"
 #include "rsm.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
 #include "rsm_client.h"
-#include "lock.h"
 
 rsm::rsm(std::string _first, std::string _me) :
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
@@ -191,7 +186,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
     insync = true;
     cfg->get_view(vid_insync, backups);
     backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
-    LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end()));
+    LOG("rsm::sync_with_backups " << backups);
     sync_cond.wait(rsm_mutex_lock);
     insync = false;
     return true;
@@ -259,7 +254,7 @@ bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
 bool rsm::join(std::string m, lock & rsm_mutex_lock) {
     handle h(m);
     int ret = 0;
-    rsm_protocol::joinres r;
+    string log;
 
     LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
     rpcc *cl;
@@ -267,7 +262,7 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
-            ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r,
+            ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), log,
                     cfg->myaddr(), last_myvs);
         }
         rsm_mutex_lock.lock();
@@ -277,8 +272,8 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) {
         LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
         return false;
     }
-    LOG("rsm::join: succeeded " << r.log);
-    cfg->restore(r.log);
+    LOG("rsm::join: succeeded " << log);
+    cfg->restore(log);
     return true;
 }
 
@@ -447,7 +442,7 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
 // a node that wants to join an RSM as a server sends a
 // joinreq to the RSM's current primary; this is the
 // handler for that RPC.
-rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) {
+rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
@@ -455,7 +450,7 @@ rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, views
             last_myvs.vid << "," << last_myvs.seqno << ")");
     if (cfg->ismember(m, vid_commit)) {
         LOG("joinreq: is still a member");
-        r.log = cfg->dump();
+        log = cfg->dump();
     } else if (cfg->myaddr() != primary) {
         LOG("joinreq: busy");
         ret = rsm_protocol::BUSY;
@@ -470,8 +465,8 @@ rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, views
             ml.lock();
         }
         if (cfg->ismember(m, cfg->view_id())) {
-            r.log = cfg->dump();
-            LOG("joinreq: ret " << ret << " log " << r.log);
+            log = cfg->dump();
+            LOG("joinreq: ret " << ret << " log " << log);
         } else {
             LOG("joinreq: failed; proposer couldn't add " << succ);
             ret = rsm_protocol::BUSY;
diff --git a/rsm.h b/rsm.h
index 73fa606..f2eb5bd 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -3,19 +3,24 @@
 #ifndef rsm_h
 #define rsm_h
 
-#include <string>
-#include <vector>
+#include "types.h"
 #include "rsm_protocol.h"
-#include "rsm_state_transfer.h"
 #include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include "config.h"
 
+class rsm_state_transfer {
+    public:
+        virtual string marshal_state() = 0;
+        virtual void unmarshal_state(string) = 0;
+        virtual ~rsm_state_transfer() {}
+};
+
 class rsm : public config_view_change {
     private:
         void reg1(int proc, handler *);
     protected:
-        std::map<int, handler *> procs;
+        map<int, handler *> procs;
         config *cfg;
         class rsm_state_transfer *stf;
         rpcs *rsmrpc;
@@ -23,12 +28,12 @@ class rsm : public config_view_change {
         // On primary: viewstamp for the next request from rsm_client
         viewstamp myvs;
         viewstamp last_myvs;   // Viewstamp of the last executed request
-        std::string primary;
+        string primary;
         bool insync;
         bool inviewchange;
         unsigned vid_commit;  // Latest view id that is known to rsm layer
         unsigned vid_insync;  // The view id that this node is synchronizing for
-        std::vector<std::string> backups;   // A list of unsynchronized backups
+        vector<string> backups;   // A list of unsynchronized backups
 
         // For testing purposes
         rpcs *testsvr;
@@ -37,30 +42,26 @@ class rsm : public config_view_change {
         bool break1;
         bool break2;
 
-
-        rsm_client_protocol::status client_members(std::vector<std::string> &r, int i);
-        rsm_protocol::status invoke(int &, int proc, viewstamp vs, std::string mreq);
-        rsm_protocol::status transferreq(rsm_protocol::transferres &r, std::string src,
+        rsm_client_protocol::status client_members(vector<string> &r, int i);
+        rsm_protocol::status invoke(int &, int proc, viewstamp vs, string mreq);
+        rsm_protocol::status transferreq(rsm_protocol::transferres &r, string src,
                 viewstamp last, unsigned vid);
-        rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid);
-        rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src,
+        rsm_protocol::status transferdonereq(int &, string m, unsigned vid);
+        rsm_protocol::status joinreq(string & log, string src,
                 viewstamp last);
         rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
         rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
 
-        std::mutex rsm_mutex;
-        std::mutex invoke_mutex;
-        std::condition_variable recovery_cond;
-        std::condition_variable sync_cond;
+        mutex rsm_mutex, invoke_mutex;
+        cond recovery_cond, sync_cond;
 
-        void execute(int procno, std::string req, std::string &r);
-        rsm_client_protocol::status client_invoke(std::string &r, int procno,
-                std::string req);
-        bool statetransfer(std::string m, lock & rsm_mutex_lock);
-        bool statetransferdone(std::string m, lock & rsm_mutex_lock);
-        bool join(std::string m, lock & rsm_mutex_lock);
+        void execute(int procno, string req, string &r);
+        rsm_client_protocol::status client_invoke(string &r, int procno, string req);
+        bool statetransfer(string m, lock & rsm_mutex_lock);
+        bool statetransferdone(string m, lock & rsm_mutex_lock);
+        bool join(string m, lock & rsm_mutex_lock);
         void set_primary(unsigned vid);
-        std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid);
+        string find_highest(viewstamp &vs, string &m, unsigned &vid);
         bool sync_with_backups(lock & rsm_mutex_lock);
         bool sync_with_primary(lock & rsm_mutex_lock);
         void net_repair(bool heal, lock & rsm_mutex_lock);
@@ -69,7 +70,7 @@ class rsm : public config_view_change {
         void partition1(lock & rsm_mutex_lock);
         void commit_change(unsigned vid, lock & rsm_mutex_lock);
     public:
-        rsm (std::string _first, std::string _me);
+        rsm (string _first, string _me);
         ~rsm() {}
 
         bool amiprimary();
index bff32c2..310d1ad 100644 (file)
@@ -1,14 +1,10 @@
+#include "types.h"
 #include "rsm_client.h"
-#include <vector>
 #include <arpa/inet.h>
-#include <stdio.h>
 #include <handle.h>
 #include <unistd.h>
-#include "lang/verify.h"
-#include "lock.h"
-#include "threaded_log.h"
 
-rsm_client::rsm_client(std::string dst) : primary(dst) {
+rsm_client::rsm_client(string dst) : primary(dst) {
     LOG("create rsm_client");
     lock ml(rsm_client_mutex);
     VERIFY (init_members(ml));
@@ -20,10 +16,10 @@ void rsm_client::primary_failure(lock &) {
     known_mems.pop_back();
 }
 
-rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
+rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) {
     lock ml(rsm_client_mutex);
     while (1) {
-        LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
+        LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary);
         handle h(primary);
 
         ml.unlock();
@@ -36,7 +32,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con
         if (!cl)
             goto prim_fail;
 
-        LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
+        LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
         if (ret == rsm_client_protocol::OK)
             return rsm_protocol::OK;
         if (ret == rsm_client_protocol::BUSY) {
@@ -50,7 +46,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, con
                 continue;
         }
 prim_fail:
-        LOG("primary " << primary << " failed ret " << std::dec << ret);
+        LOG("primary " << primary << " failed ret " << dec << ret);
         primary_failure(ml);
         LOG("rsm_client::invoke: retry new primary " << primary);
     }
index 814616f..bb21f24 100644 (file)
@@ -1,11 +1,8 @@
 #ifndef rsm_client_h
 #define rsm_client_h
 
-#include "rpc/rpc.h"
+#include "types.h"
 #include "rsm_protocol.h"
-#include <string>
-#include <vector>
-
 
 //
 // rsm client interface.
index 6b508d8..53908f3 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef rsm_protocol_h
 #define rsm_protocol_h
 
+#include "types.h"
 #include "rpc/rpc.h"
 
 class rsm_client_protocol {
@@ -23,33 +24,21 @@ class rsm_protocol {
     public:
         enum status : status_t { OK, ERR, BUSY};
         enum rpc_numbers : proc_t {
-            invoke = 0x10001,
+            invoke = 0xa001,
             transferreq,
             transferdonereq,
             joinreq,
         };
 
         struct transferres {
-            std::string state;
+            string state;
             viewstamp last;
         };
-
-        struct joinres {
-            std::string log;
-        };
 };
 
-inline bool operator==(viewstamp a, viewstamp b) {
-    return a.vid == b.vid && a.seqno == b.seqno;
-}
-
-inline bool operator>(viewstamp a, viewstamp b) {
-    return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
-}
-
-inline bool operator!=(viewstamp a, viewstamp b) {
-    return a.vid != b.vid || a.seqno != b.seqno;
-}
+inline bool operator==(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) == tie(b.vid, b.seqno); }
+inline bool operator>(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) > tie(b.vid, b.seqno); }
+inline bool operator!=(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) != tie(b.vid, b.seqno); }
 
 inline marshall& operator<<(marshall &m, viewstamp v) {
     return m << v.vid << v.seqno;
@@ -67,14 +56,6 @@ inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) {
     return u >> r.state >> r.last;
 }
 
-inline marshall & operator<<(marshall &m, rsm_protocol::joinres r) {
-    return m << r.log;
-}
-
-inline unmarshall & operator>>(unmarshall &u, rsm_protocol::joinres &r) {
-    return u >> r.log;
-}
-
 class rsm_test_protocol {
     public:
         enum status : status_t {OK, ERR};
diff --git a/rsm_state_transfer.h b/rsm_state_transfer.h
deleted file mode 100644 (file)
index 62a130c..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-#ifndef rsm_state_transfer_h
-#define rsm_state_transfer_h
-
-class rsm_state_transfer {
- public:
-  virtual std::string marshal_state() = 0;
-  virtual void unmarshal_state(std::string) = 0;
-  virtual ~rsm_state_transfer() {}
-};
-
-#endif
index 31b8c1a..1a8b833 100644 (file)
@@ -2,34 +2,27 @@
 // RSM test client
 //
 
+#include "types.h"
 #include "rsm_protocol.h"
 #include "rsmtest_client.h"
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <vector>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string>
 
 char log_thread_prefix = 't';
 
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
     if(argc != 4){
-        fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
-        exit(1);
+        cerr << "Usage: " << argv[0] << " [host:]port [partition] arg" << endl;
+        return 1;
     }
 
     rsmtest_client *lc = new rsmtest_client(argv[1]);
-    std::string command(argv[2]);
+    string command(argv[2]);
     if (command == "partition") {
-        printf("net_repair returned %d\n", lc->net_repair(atoi(argv[3])));
+        cout << "net_repair returned " << lc->net_repair(stoi(argv[3]));
     } else if (command == "breakpoint") {
-        int b = atoi(argv[3]);
-        printf("breakpoint %d returned %d\n", b, lc->breakpoint(b));
+        int b = stoi(argv[3]);
+        cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
     } else {
-        fprintf(stderr, "Unknown command %s\n", argv[2]);
+        cerr << "Unknown command " << argv[2] << endl;
     }
     return 0;
 }
index 0c56f8a..cb3ce8c 100644 (file)
@@ -1,16 +1,11 @@
 // RPC stubs for clients to talk to rsmtest_server
 
 #include "rsmtest_client.h"
-#include "rpc/rpc.h"
 #include <arpa/inet.h>
 
-#include <sstream>
-#include <iostream>
-#include <stdio.h>
-
 rsmtest_client::rsmtest_client(std::string dst) : cl(dst) {
     if (cl.bind() < 0)
-        printf("rsmtest_client: call bind\n");
+        cout << "rsmtest_client: call bind" << endl;
 }
 
 rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
index 51f8511..ad3d4c1 100644 (file)
@@ -3,9 +3,8 @@
 #ifndef rsmtest_client_h
 #define rsmtest_client_h
 
-#include <string>
+#include "types.h"
 #include "rsm_protocol.h"
-#include "rpc/rpc.h"
 
 // Client interface to the rsmtest server
 class rsmtest_client {
index 57ddc08..6a213b1 100644 (file)
@@ -1,9 +1,7 @@
-#include <sys/time.h>
-#include <stdint.h>
 #include "threaded_log.h"
 
-std::mutex cerr_mutex;
-std::map<std::thread::id, int> thread_name_map;
+mutex cerr_mutex;
+map<thread::id, int> thread_name_map;
 int next_thread_num = 0;
-std::map<void *, int> instance_name_map;
+map<void *, int> instance_name_map;
 int next_instance_num = 0;
index 6918220..ebb2222 100644 (file)
 #ifndef threaded_log_h
 #define threaded_log_h
 
-#include <iomanip>
-#include <iostream>
-#include <stdio.h>
-#include <map>
-#include "lock.h"
+#include "types.h"
 
 extern mutex cerr_mutex;
-extern std::map<std::thread::id, int> thread_name_map;
+extern map<thread::id, int> thread_name_map;
 extern int next_thread_num;
-extern std::map<void *, int> instance_name_map;
+extern map<void *, int> instance_name_map;
 extern int next_instance_num;
 extern char log_thread_prefix;
 
-template <class A>
-struct iterator_pair : public std::pair<A, A> {
-    explicit iterator_pair(const A & first, const A & second) : std::pair<A, A>(first, second) {}
-};
-
-template <class A>
-const struct iterator_pair<A> make_iterator_pair(const A & first, const A & second) {
-    return iterator_pair<A>(first, second);
-}
-
-template <class A, class B>
-std::ostream & operator<<(std::ostream &o, const std::pair<A,B> &d) {
-    o << "<" << d.first << "," << d.second << ">";
-    return o;
+namespace std {
+    // This... is an awful hack.  But sticking this in std:: makes it possible for
+    // ostream_iterator to use it.
+    template <class A, class B>
+    ostream & operator<<(ostream &o, const pair<A,B> &d) {
+        return o << "<" << d.first << "," << d.second << ">";
+    }
 }
 
 template <class A>
-std::ostream & operator<<(std::ostream &o, const iterator_pair<A> &d) {
+typename enable_if<is_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
+operator<<(ostream &o, const A &a) {
     o << "[";
-    for (auto i=d.first; i!=d.second; i++) {
-        o << *i;
-        auto j(i);
-        if (++j != d.second)
-            o << ", ";
-    }
+    auto oit = ostream_iterator<typename A::value_type>(o, ", ");
+    copy(a.begin(), a.end(), oit);
     o << "]";
     return o;
 }
 
 #define LOG_PREFIX { \
-    cerr_mutex.lock(); \
     auto _thread_ = std::this_thread::get_id(); \
     int _tid_ = thread_name_map[_thread_]; \
     if (_tid_==0) \
         _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
-    auto _utime_ = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
-    std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \
-    std::cerr << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << _tid_; \
-    std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \
+    auto _utime_ = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000; \
+    cerr << setfill('0') << dec << left << setw(9) << _utime_ << " "; \
+    cerr << setfill(' ') << log_thread_prefix << left << setw(2) << _tid_; \
+    cerr << " " << setw(20) << __FILE__ << " " << setw(18) << __func__; \
 }
 #define LOG_THIS_POINTER { \
     int _self_ = instance_name_map[this]; \
     if (_self_==0) \
         _self_ = instance_name_map[this] = ++next_instance_num; \
-    std::cerr << "#" << std::setw(2) << _self_; \
-}
-#define LOG_SUFFIX { \
-    cerr_mutex.unlock(); \
+    cerr << "#" << setw(2) << _self_; \
 }
 
 #define LOG_NONMEMBER(_x_) { \
+    lock _cel_(cerr_mutex); \
     LOG_PREFIX; \
-    std::cerr << _x_ << std::endl; \
-    LOG_SUFFIX; \
+    cerr << _x_ << endl; \
 }
 #define LOG(_x_) { \
+    lock _cel_(cerr_mutex); \
     LOG_PREFIX; \
     LOG_THIS_POINTER; \
-    std::cerr << _x_ << std::endl; \
-    LOG_SUFFIX; \
-}
-#define LOG_FUNC_ENTER { \
-    LOG_PREFIX; \
-    LOG_THIS_POINTER; \
-    std::cerr << "lid=" << lid; \
-    std::cerr << std::endl; \
-    LOG_SUFFIX; \
-}
-#define LOG_FUNC_ENTER_SERVER { \
-    LOG_PREFIX; \
-    LOG_THIS_POINTER; \
-    std::cerr << "lid=" << lid; \
-    std::cerr << " client=" << id << "," << xid; \
-    std::cerr << std::endl; \
-    LOG_SUFFIX; \
-}
-#define LOG_FUNC_EXIT { \
-    LOG_PREFIX; \
-    LOG_THIS_POINTER; \
-    std::cerr << "return" << lid; \
-    std::cerr << std::endl; \
-    LOG_SUFFIX; \
+    cerr << _x_ << endl; \
 }
 
 #endif
diff --git a/types.h b/types.h
new file mode 100644 (file)
index 0000000..9739a2a
--- /dev/null
+++ b/types.h
@@ -0,0 +1,113 @@
+#ifndef types_h
+#define types_h
+
+#include <algorithm>
+using std::copy;
+using std::move;
+using std::max;
+using std::min;
+using std::min_element;
+using std::find;
+using std::count_if;
+
+#include <chrono>
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+using std::chrono::microseconds;
+using std::chrono::nanoseconds;
+using std::chrono::steady_clock;
+using std::chrono::system_clock;
+using std::chrono::duration_cast;
+using std::chrono::time_point_cast;
+using std::chrono::time_point;
+
+#include <exception>
+using std::exception;
+
+#include <fstream>
+using std::ofstream;
+using std::ifstream;
+
+#ifndef LIBT4_NO_FUNCTIONAL
+#include <functional>
+using std::function;
+using std::bind;
+using std::placeholders::_1;
+#endif
+
+#include <iomanip>
+#include <iostream>
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::dec;
+using std::hex;
+using std::left;
+using std::setw;
+using std::setfill;
+using std::setprecision;
+using std::ostream;
+using std::istream;
+using std::ostream_iterator;
+using std::istream_iterator;
+
+#include <limits>
+using std::numeric_limits;
+
+#include <list>
+using std::list;
+
+#include <map>
+using std::map;
+
+#include <mutex>
+using std::mutex;
+using lock = std::unique_lock<std::mutex>;
+using cond = std::condition_variable;
+using std::cv_status;
+
+#include <sstream>
+using std::ostringstream;
+using std::istringstream;
+
+#include <string>
+using std::string;
+using std::to_string;
+using std::stoi;
+
+#include <thread>
+using std::thread;
+
+#include <tuple>
+using std::tuple;
+using std::get;
+using std::tie;
+
+#include <type_traits>
+using std::decay;
+using std::true_type;
+using std::false_type;
+using std::is_enum;
+using std::is_member_function_pointer;
+using std::is_same;
+using std::underlying_type;
+using std::enable_if;
+
+#include <utility>
+using std::pair;
+using std::declval;
+
+#include <vector>
+using std::vector;
+
+
+template <class A, typename I=void> struct is_iterable : false_type {};
+
+template<class A> struct is_iterable<A,
+    decltype(declval<A&>().cbegin(), declval<A&>().cend(), void())
+> : true_type {};
+
+#include "lang/verify.h"
+#include "threaded_log.h"
+
+#endif