Got rid of most using directives. Ported tests to python.
authorPeter Iannucci <iannucci@mit.edu>
Wed, 27 Nov 2013 06:17:02 +0000 (01:17 -0500)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 27 Nov 2013 06:17:02 +0000 (01:17 -0500)
30 files changed:
config.cc
config.h
endian.h
handle.cc
lock_client.cc
lock_client.h
lock_server.h
lock_tester.cc
log.cc
paxos.h
paxos_protocol.h
rpc/connection.cc
rpc/connection.h
rpc/fifo.h
rpc/marshall.h
rpc/marshall_wrap.h
rpc/poll_mgr.cc
rpc/poll_mgr.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rpc/thr_pool.h
rsm.cc
rsm.h
rsm_client.h
rsm_protocol.h
rsm_tester.cc
rsm_tester.py [new file with mode: 0755]
threaded_log.cc
types.h

index e1b0963..4931a9f 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,6 +1,8 @@
 #include "config.h"
 #include "handle.h"
 
+using std::vector;
+
 // The config module maintains views. As a node joins or leaves a
 // view, the next view will be the same as previous view, except with
 // the new node added or removed. The first view contains only node
index 26a612d..d2b33ba 100644 (file)
--- a/config.h
+++ b/config.h
@@ -17,11 +17,11 @@ class config : public paxos_change {
         string me;
         config_view_change *vc;
         proposer_acceptor paxos;
-        vector<string> mems;
-        mutex cfg_mutex;
+        std::vector<string> mems;
+        std::mutex cfg_mutex;
         cond config_cond;
         paxos_protocol::status heartbeat(int & r, string m, unsigned instance);
-        void get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock);
+        void get_view(unsigned instance, std::vector<string> & m, lock & cfg_mutex_lock);
         bool remove(const string &, lock & cfg_mutex_lock);
         void reconstruct(lock & cfg_mutex_lock);
         typedef enum {
@@ -35,7 +35,7 @@ class config : public paxos_change {
         unsigned view_id() { return my_view_id; }
         const string & myaddr() const { return me; }
         string dump() { return paxos.dump(); }
-        void get_view(unsigned instance, vector<string> & m);
+        void get_view(unsigned instance, std::vector<string> & m);
         void restore(const string & s);
         bool add(const string &, unsigned view_id);
         bool ismember(const string & m, unsigned view_id);
index f34d371..feb3bbd 100644 (file)
--- a/endian.h
+++ b/endian.h
@@ -20,14 +20,18 @@ inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
 
 template <class T> inline T ntoh(T t) { return hton(t); }
 
-template <class... Args, size_t... Indices> inline tuple<typename remove_reference<Args>::type...>
+template <class... Args, size_t... Indices>
+inline tuple<typename std::remove_reference<Args>::type...>
 tuple_hton_imp(tuple<Args...> && t, tuple_indices<Indices...>) {
-    return tuple<typename remove_reference<Args>::type...>(hton(get<Indices>(t))...);
+    return tuple<
+        typename std::remove_reference<Args>::type...
+    >(hton(std::get<Indices>(t))...);
 }
 
-template <class... Args> inline tuple<typename remove_reference<Args>::type...>
+template <class... Args>
+inline tuple<typename std::remove_reference<Args>::type...>
 hton(tuple<Args...> && t) {
-    return tuple_hton_imp(forward<tuple<Args...>>(t), TUPLE_INDICES(Args));
+    return tuple_hton_imp(std::forward<tuple<Args...>>(t), TUPLE_INDICES(Args));
 }
 
 template <class T> inline typename
index 17d04af..926cb59 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -5,18 +5,18 @@ public:
     unique_ptr<rpcc> client;
     bool valid = true;
     string destination;
-    mutex client_mutex;
+    std::mutex client_mutex;
     hinfo(const string & destination_) : destination(destination_) {}
 };
 
-static mutex mgr_mutex;
-static map<string, shared_ptr<hinfo>> hmap;
+static std::mutex mgr_mutex;
+static std::map<string, shared_ptr<hinfo>> hmap;
 
 handle::handle(const string & destination) : destination_(destination) {
     lock ml(mgr_mutex);
     h = hmap[destination];
     if (!h || !h->valid)
-        h = (hmap[destination] = make_shared<hinfo>(destination));
+        h = (hmap[destination] = std::make_shared<hinfo>(destination));
 }
 
 rpcc * handle::safebind() {
index 7a44940..9949eac 100644 (file)
@@ -4,7 +4,7 @@
 #include <arpa/inet.h>
 
 void lock_state::wait(lock & mutex_lock) {
-    auto self = this_thread::get_id();
+    auto self = std::this_thread::get_id();
     c[self].wait(mutex_lock);
     c.erase(self);
 }
@@ -20,8 +20,6 @@ void lock_state::signal(thread::id who) {
         c[who].notify_one();
 }
 
-typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-
 in_port_t lock_client::last_port = 0;
 
 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
@@ -36,7 +34,7 @@ lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xi
 
     srandom((uint32_t)time(NULL)^last_port);
     rlock_port = ((random()%32000) | (0x1 << 10));
-    id = "127.0.0.1:" + to_string(rlock_port);
+    id = "127.0.0.1:" + std::to_string(rlock_port);
     last_port = rlock_port;
     rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
@@ -81,7 +79,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) {
 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
     lock_state & st = get_lock_state(lid);
     lock sl(st.m);
-    auto self = this_thread::get_id();
+    auto self = std::this_thread::get_id();
 
     // check for reentrancy
     VERIFY(st.state != lock_state::locked || st.held_by != self);
@@ -147,7 +145,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
     lock_state & st = get_lock_state(lid);
     lock sl(st.m);
-    auto self = this_thread::get_id();
+    auto self = std::this_thread::get_id();
     VERIFY(st.state == lock_state::locked && st.held_by == self);
     st.state = lock_state::free;
     LOG << "Lock " << lid << ": free";
index 31bf905..5cdb26e 100644 (file)
@@ -26,17 +26,17 @@ public:
         acquiring,
         releasing
     } state = none;
-    thread::id held_by;
-    list<thread::id> wanted_by;
-    mutex m;
-    map<thread::id, cond> c;
+    std::thread::id held_by;
+    std::list<thread::id> wanted_by;
+    std::mutex m;
+    std::map<thread::id, cond> c;
     lock_protocol::xid_t xid;
     void wait(lock & mutex_lock);
     void signal();
     void signal(thread::id who);
 };
 
-typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+typedef std::map<lock_protocol::lockid_t, lock_state> lock_map;
 
 // Clients that caches locks.  The server can revoke locks using
 // lock_revoke_server.
@@ -50,10 +50,10 @@ class lock_client {
         in_port_t rlock_port;
         string hostname;
         string id;
-        mutex xid_mutex;
+        std::mutex xid_mutex;
         lock_protocol::xid_t next_xid;
         fifo<lock_protocol::lockid_t> release_fifo;
-        mutex lock_table_lock;
+        std::mutex lock_table_lock;
         lock_map lock_table;
         lock_state & get_lock_state(lock_protocol::lockid_t lid);
     public:
index da8b788..88b9e11 100644 (file)
@@ -6,7 +6,7 @@
 #include "rsm.h"
 #include "rpc/fifo.h"
 
-typedef pair<callback_t, lock_protocol::xid_t> holder_t;
+typedef std::pair<callback_t, lock_protocol::xid_t> holder_t;
 
 class lock_state {
 public:
@@ -14,20 +14,20 @@ public:
     lock_state(const lock_state & other);
     bool held;
     holder_t held_by;
-    list<holder_t> wanted_by;
-    map<callback_t, lock_protocol::xid_t> old_requests;
-    mutex m;
+    std::list<holder_t> wanted_by;
+    std::map<callback_t, lock_protocol::xid_t> old_requests;
+    std::mutex m;
     lock_state & operator=(const lock_state &);
 
     MEMBERS(held, held_by, wanted_by)
 };
 
-typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+typedef std::map<lock_protocol::lockid_t, lock_state> lock_map;
 
 class lock_server : private rsm_state_transfer {
     private:
         int nacquire;
-        mutex lock_table_lock;
+        std::mutex lock_table_lock;
         lock_map lock_table;
         lock_state & get_lock_state(lock_protocol::lockid_t lid);
         fifo<lock_protocol::lockid_t> retry_fifo;
index e9ec0a8..7143401 100644 (file)
@@ -20,7 +20,7 @@ static lock_protocol::lockid_t c = "3";
 // doesn't grant the same lock to both clients.
 // it assumes that lock names are distinct in the first byte.
 static int ct[256];
-static mutex count_mutex;
+static std::mutex count_mutex;
 
 static void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
diff --git a/log.cc b/log.cc
index 2f1d679..decb827 100644 (file)
--- a/log.cc
+++ b/log.cc
@@ -12,7 +12,7 @@ log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) {
 }
 
 void log::logread(void) {
-    ifstream from(name);
+    std::ifstream from(name);
     string type;
     unsigned instance;
 
diff --git a/paxos.h b/paxos.h
index 2ddf583..79924a3 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -22,13 +22,12 @@ extern bool majority(const nodes_t & l1, const nodes_t & l2);
 
 class proposer_acceptor {
     private:
-        mutex proposer_mutex;
-        mutex acceptor_mutex;
+        std::mutex proposer_mutex, acceptor_mutex;
 
         paxos_change *delegate;
         node_t me;
 
-        rpcs pxs{(in_port_t)stoi(me)};
+        rpcs pxs{(in_port_t)std::stoi(me)};
 
         bool break1 = false;
         bool break2 = false;
@@ -42,7 +41,7 @@ class proposer_acceptor {
         prop_t accepted = {0, me};  // number of highest proposal accepted
         value_t accepted_value;     // value of highest proposal accepted
         unsigned instance_h = 0;    // number of the highest instance we have decided
-        map<unsigned,value_t> values;   // vals of each instance
+        std::map<unsigned,value_t> values;   // vals of each instance
 
         friend class log;
         class log l = {this, me};
index 1ce3be7..8f8f816 100644 (file)
@@ -23,7 +23,7 @@ namespace paxos_protocol {
         MEMBERS(oldinstance, accept, n_a, v_a)
     };
     using node_t = string;
-    using nodes_t = vector<node_t>;
+    using nodes_t = std::vector<node_t>;
     using value_t = string;
 
     REMOTE_PROCEDURE_BASE(0x11000);
index d118539..c7e8f95 100644 (file)
@@ -30,7 +30,7 @@ connection::~connection() {
     // will be active
     poll_mgr::shared_mgr.block_remove_fd(fd);
     VERIFY(dead_);
-    VERIFY(!wpdu_.buf.size());
+    VERIFY(wpdu_.status == unused);
 }
 
 shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
@@ -42,28 +42,23 @@ shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_de
         return nullptr;
     }
     IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
-    return make_shared<connection>(delegate, std::move(s), lossy);
+    return std::make_shared<connection>(delegate, std::move(s), lossy);
 }
 
 bool connection::send(const string & b) {
     lock ml(m_);
 
-    waiters_++;
-    while (!dead_ && wpdu_.buf.size())
+    while (!dead_ && wpdu_.status != unused)
         send_wait_.wait(ml);
-    waiters_--;
 
     if (dead_)
         return false;
 
-    wpdu_.buf = b;
-    wpdu_.solong = 0;
+    wpdu_ = {inflight, b, 0};
 
-    if (lossy_) {
-        if ((random()%100) < lossy_) {
-            IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
-            shutdown(fd,SHUT_RDWR);
-        }
+    if (lossy_ && (random()%100) < lossy_) {
+        IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
+        shutdown(fd,SHUT_RDWR);
     }
 
     if (!writepdu()) {
@@ -71,17 +66,15 @@ bool connection::send(const string & b) {
         ml.unlock();
         poll_mgr::shared_mgr.block_remove_fd(fd);
         ml.lock();
-    } else if (wpdu_.solong != wpdu_.buf.size()) {
+    } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
         // should be rare to need to explicitly add write callback
         poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
-        while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
+        while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
             send_complete_.wait(ml);
     }
-    bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
-    wpdu_.solong = 0;
-    wpdu_.buf.clear();
-    if (waiters_ > 0)
-        send_wait_.notify_all();
+    bool ret = (!dead_ && wpdu_.status == inflight && wpdu_.cursor == b.size());
+    wpdu_ = {unused, "", 0};
+    send_wait_.notify_all();
     return ret;
 }
 
@@ -90,7 +83,7 @@ void connection::write_cb(int s) {
     lock ml(m_);
     VERIFY(!dead_);
     VERIFY(fd == s);
-    if (wpdu_.buf.size() == 0) {
+    if (wpdu_.status != inflight) {
         poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
         return;
     }
@@ -98,14 +91,30 @@ void connection::write_cb(int s) {
         poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
         dead_ = true;
     } else {
-        VERIFY(wpdu_.solong != size_t_max);
-        if (wpdu_.solong < wpdu_.buf.size()) {
+        VERIFY(wpdu_.status != error);
+        if (wpdu_.cursor < wpdu_.buf.size())
             return;
-        }
     }
     send_complete_.notify_one();
 }
 
+bool connection::writepdu() {
+    VERIFY(wpdu_.status == inflight);
+    if (wpdu_.cursor == wpdu_.buf.size())
+        return true;
+
+    ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
+    if (n < 0) {
+        if (errno != EAGAIN) {
+            IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
+            wpdu_ = {error, "", 0};
+        }
+        return (errno == EAGAIN);
+    }
+    wpdu_.cursor += (size_t)n;
+    return true;
+}
+
 // fd is ready to be read
 void connection::read_cb(int s) {
     lock ml(m_);
@@ -115,7 +124,7 @@ void connection::read_cb(int s) {
 
     IF_LEVEL(5) LOG << "got data on fd " << s;
 
-    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
+    if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
         if (!readpdu()) {
             IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
             poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
@@ -124,36 +133,17 @@ void connection::read_cb(int s) {
         }
     }
 
-    if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
+    if (rpdu_.status == inflight && rpdu_.buf.size() == rpdu_.cursor) {
         if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
             // connection_delegate has successfully consumed the pdu
-            rpdu_.buf.clear();
-            rpdu_.solong = 0;
+            rpdu_ = {unused, "", 0};
         }
     }
 }
 
-bool connection::writepdu() {
-    VERIFY(wpdu_.solong != size_t_max);
-    if (wpdu_.solong == wpdu_.buf.size())
-        return true;
-
-    ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
-    if (n < 0) {
-        if (errno != EAGAIN) {
-            IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
-            wpdu_.solong = size_t_max;
-            wpdu_.buf.clear();
-        }
-        return (errno == EAGAIN);
-    }
-    wpdu_.solong += (size_t)n;
-    return true;
-}
-
 bool connection::readpdu() {
     IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
-    if (!rpdu_.buf.size()) {
+    if (rpdu_.status == unused) {
         rpc_protocol::rpc_sz_t sz1;
         ssize_t n = fd.read(sz1);
 
@@ -179,22 +169,20 @@ bool connection::readpdu() {
 
         IF_LEVEL(5) LOG << "read size of datagram = " << sz;
 
-        rpdu_.buf.assign(sz+sizeof(sz1), 0);
-        rpdu_.solong = sizeof(sz1);
+        rpdu_ = {inflight, string(sz+sizeof(sz1), 0), sizeof(sz1)};
     }
 
-    ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+    ssize_t n = fd.read(&rpdu_.buf[rpdu_.cursor], rpdu_.buf.size() - rpdu_.cursor);
 
     IF_LEVEL(5) LOG << "read " << n << " bytes";
 
     if (n <= 0) {
         if (errno == EAGAIN)
             return true;
-        rpdu_.buf.clear();
-        rpdu_.solong = 0;
+        rpdu_ = {unused, "", 0};
         return false;
     }
-    rpdu_.solong += (size_t)n;
+    rpdu_.cursor += (size_t)n;
     return true;
 }
 
@@ -239,11 +227,10 @@ void connection_listener::read_cb(int) {
     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
     if (s1 < 0) {
         perror("connection_listener::accept_conn error");
-        throw runtime_error("connection listener failure");
+        throw std::runtime_error("connection listener failure");
     }
 
     IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
-    auto ch = make_shared<connection>(delegate_, s1, lossy_);
 
     // garbage collect dead connections
     for (auto i = conns_.begin(); i != conns_.end();) {
@@ -253,5 +240,5 @@ void connection_listener::read_cb(int) {
             ++i;
     }
 
-    conns_[s1] = ch;
+    conns_[s1] = std::make_shared<connection>(delegate_, s1, lossy_);
 }
index 1bcb7b6..68bd902 100644 (file)
@@ -7,8 +7,6 @@
 #include "poll_mgr.h"
 #include "file.h"
 
-constexpr size_t size_t_max = numeric_limits<size_t>::max();
-
 class connection;
 
 class connection_delegate {
@@ -17,7 +15,10 @@ class connection_delegate {
         virtual ~connection_delegate();
 };
 
-class connection : private aio_callback, public enable_shared_from_this<connection> {
+using std::chrono::steady_clock;
+using time_point = std::chrono::time_point<steady_clock>;
+
+class connection : private aio_callback, public std::enable_shared_from_this<connection> {
     public:
         connection(connection_delegate * delegate, socket_t && f1, int lossytest=0);
         ~connection();
@@ -28,7 +29,7 @@ class connection : private aio_callback, public enable_shared_from_this<connecti
 
         static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
 
-        const time_point<steady_clock> create_time = steady_clock::now();
+        const time_point create_time = steady_clock::now();
         const file_t fd;
 
     private:
@@ -41,18 +42,20 @@ class connection : private aio_callback, public enable_shared_from_this<connecti
         connection_delegate * delegate_;
         bool dead_ = false;
 
+        enum charbuf_status_t { unused, inflight, error };
+
         struct charbuf {
+            charbuf_status_t status;
             string buf;
-            size_t solong = 0; // number of bytes written or read so far
+            size_t cursor; // number of bytes written or read so far
         };
 
-        charbuf wpdu_;
-        charbuf rpdu_;
+        charbuf wpdu_ = {unused, "", 0};
+        charbuf rpdu_ = {unused, "", 0};
 
-        int waiters_ = 0;
         int lossy_ = 0;
 
-        mutex m_;
+        std::mutex m_;
         cond send_complete_;
         cond send_wait_;
 };
@@ -67,11 +70,11 @@ class connection_listener : private aio_callback {
         void read_cb(int s);
 
         in_port_t port_;
-        mutex m_;
+        std::mutex m_;
 
         socket_t tcp_; // listens for connections
         connection_delegate * delegate_;
         int lossy_;
-        map<int, shared_ptr<connection>> conns_;
+        std::map<int, shared_ptr<connection>> conns_;
 };
 #endif
index f8a7224..dfb4d05 100644 (file)
@@ -37,8 +37,8 @@ class fifo {
         }
 
     private:
-        list<T> q_;
-        mutex m_;
+        std::list<T> q_;
+        std::mutex m_;
         cond non_empty_c_; // q went non-empty
         cond has_space_c_; // q is not longer overfull
         size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
index 8592e4b..08b6aaa 100644 (file)
@@ -122,7 +122,7 @@ tuple_marshall_imp(marshall & m, tuple<Args...> & t, tuple_indices<Indices...>)
     // to be evaluated in order.  Order matters because the elements must be
     // serialized consistently!  The empty struct resulting from construction
     // is discarded.
-    (void)pass{(m << get<Indices>(t))...};
+    (void)pass{(m << std::get<Indices>(t))...};
     return m;
 }
 
@@ -133,7 +133,7 @@ operator<<(marshall & m, tuple<Args...> && t) {
 
 template <class... Args, size_t... Indices> inline unmarshall &
 tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, tuple_indices<Indices...>) {
-    (void)pass{(u >> get<Indices>(t))...};
+    (void)pass{(u >> std::get<Indices>(t))...};
     return u;
 }
 
@@ -182,22 +182,22 @@ operator>>(unmarshall & u, A & x) {
 
 // std::pair<A, B>
 template <class A, class B> inline marshall &
-operator<<(marshall & m, const pair<A,B> & d) {
+operator<<(marshall & m, const std::pair<A,B> & d) {
     return m << d.first << d.second;
 }
 
 template <class A, class B> inline unmarshall &
-operator>>(unmarshall & u, pair<A,B> & d) {
+operator>>(unmarshall & u, std::pair<A,B> & d) {
     return u >> d.first >> d.second;
 }
 
 // std::map<A, B>
 template <class A, class B> inline unmarshall &
-operator>>(unmarshall & u, map<A,B> & x) {
+operator>>(unmarshall & u, std::map<A,B> & x) {
     uint32_t n = u._grab<uint32_t>();
     x.clear();
     while (n--)
-        x.emplace(u._grab<pair<A,B>>());
+        x.emplace(u._grab<std::pair<A,B>>());
     return u;
 }
 
@@ -221,12 +221,12 @@ inline unmarshall & operator>>(unmarshall & u, string & s) {
 // Marshalling for strongly-typed enums
 //
 
-template <class E> typename enable_if<is_enum<E>::value, marshall>::type &
+template <class E> typename enable_if<std::is_enum<E>::value, marshall>::type &
 operator<<(marshall & m, E e) {
     return m << from_enum(e);
 }
 
-template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
+template <class E> typename enable_if<std::is_enum<E>::value, unmarshall>::type &
 operator>>(unmarshall & u, E & e) {
     e = to_enum<E>(u._grab<enum_type_t<E>>());
     return u;
index b895696..6754acc 100644 (file)
@@ -43,17 +43,17 @@ struct VerifyOnFailure {
 // One for function pointers...
 
 template <class F, class R, class RV, class args_type, size_t... Indices>
-typename enable_if<!is_member_function_pointer<F>::value, RV>::type inline 
+typename enable_if<!std::is_member_function_pointer<F>::value, RV>::type inline 
 invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
-    return f(r, get<Indices>(t)...);
+    return f(r, std::get<Indices>(t)...);
 }
 
 // And one for pointers to member functions...
 
 template <class F, class C, class RV, class R, class args_type, size_t... Indices>
-typename enable_if<is_member_function_pointer<F>::value, RV>::type inline 
+typename enable_if<std::is_member_function_pointer<F>::value, RV>::type inline 
 invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
-    return (c->*f)(r, get<Indices>(t)...);
+    return (c->*f)(r, std::get<Indices>(t)...);
 }
 
 // The class marshalled_func_imp uses partial template specialization to
@@ -76,13 +76,13 @@ struct marshalled_func_imp<F, C, RV(R &, Args...), ErrorHandler> {
         // This type definition represents storage for f's unmarshalled
         // arguments.  decay is (most notably) stripping off const
         // qualifiers.
-        using ArgsStorage = tuple<typename decay<Args>::type...>;
+        using ArgsStorage = tuple<typename std::decay<Args>::type...>;
         // Allocate a handler (i.e. function) to hold the lambda
         // which will unmarshall RPCs and call f.
         return new handler([=](unmarshall && u, marshall & m) -> RV {
             // Unmarshall each argument with the correct type and store the
             // result in a tuple.
-            ArgsStorage t{u._grab<typename decay<Args>::type>()...};
+            ArgsStorage t{u._grab<typename std::decay<Args>::type>()...};
             // Verify successful unmarshalling of the entire input stream.
             if (!u.okdone())
                 return (RV)ErrorHandler::unmarshall_args_failure();
index d29abd2..83289a8 100644 (file)
@@ -7,6 +7,8 @@
 #include <sys/epoll.h>
 #endif
 
+using std::vector;
+
 aio_callback::~aio_callback() {}
 
 poll_mgr poll_mgr::shared_mgr;
@@ -33,7 +35,7 @@ class SelectAIO : public wait_manager {
         fd_set rfds_, wfds_;
         int highfds_;
         file_t pipe_[2];
-        mutex m_;
+        std::mutex m_;
 };
 
 #ifdef __linux__ 
index 44ac9f8..d8cfd20 100644 (file)
@@ -33,10 +33,10 @@ class poll_mgr {
         void wait_loop();
 
     private:
-        mutex m_;
+        std::mutex m_;
         cond changedone_c_;
 
-        map<int, aio_callback *> callbacks_;
+        std::map<int, aio_callback *> callbacks_;
         unique_ptr<class wait_manager> aio_;
         bool pending_change_=false, shutdown_=false;
 
index 00f6d2e..de33675 100644 (file)
@@ -60,6 +60,9 @@
 #include <unistd.h>
 #include <string.h>
 
+using std::list;
+using namespace std::chrono;
+
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
@@ -186,7 +189,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
             lock cal(ca.m);
             while (!ca.done) {
                 IF_LEVEL(2) LOG << "wait";
-                if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
+                if (ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout) {
                     IF_LEVEL(2) LOG << "timeout";
                     break;
                 }
@@ -404,7 +407,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
 
     switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) {
         case NEW: // new request
-            rh.ret = (*f)(forward<unmarshall>(req), rep);
+            rh.ret = (*f)(std::forward<unmarshall>(req), rep);
             if (rh.ret == rpc_protocol::unmarshall_args_failure) {
                 LOG << "failed to unmarshall the arguments. You are "
                     << "probably calling RPC 0x" << std::hex << proc << " with the wrong "
@@ -555,6 +558,6 @@ static sockaddr_in make_sockaddr(const string & hostandport) {
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
         dst.sin_addr.s_addr = a.s_addr;
     }
-    dst.sin_port = hton((in_port_t)stoi(port));
+    dst.sin_port = hton((in_port_t)std::stoi(port));
     return dst;
 }
index b44c057..58e9381 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
 #include "marshall_wrap.h"
 #include "connection.h"
 
+using std::chrono::milliseconds;
+
 namespace rpc {
     static constexpr milliseconds to_max{12000};
     static constexpr milliseconds to_min{100};
 }
 
-template<class P, class R, class ...Args> struct is_valid_call : false_type {};
+template<class P, class R, class ...Args>
+struct is_valid_call : false_type {};
 
 template<class S, class R, class ...Args>
 struct is_valid_call<S(R &, Args...), R, Args...> : true_type {};
 
-template<class P, class F> struct is_valid_registration : false_type {};
+template<class P, class F>
+struct is_valid_registration : false_type {};
 
 template<class S, class R, class ...Args>
-struct is_valid_registration<S(R &, typename decay<Args>::type...), S(R &, Args...)> : true_type {};
+struct is_valid_registration<
+    S(R &, typename std::decay<Args>::type...),
+    S(R &, Args...)> : true_type {};
 
 template<class P, class C, class S, class R, class ...Args>
-struct is_valid_registration<P, S(C::*)(R &, Args...)> : is_valid_registration<P, S(R &, Args...)> {};
+struct is_valid_registration<
+    P,
+    S(C::*)(R &, Args...)> : is_valid_registration<P, S(R &, Args...)> {};
 
 // rpc client endpoint.
 // manages a xid space per destination socket
@@ -48,7 +56,7 @@ class rpcc : private connection_delegate {
             string *rep;
             int intret;
             bool done = false;
-            mutex m;
+            std::mutex m;
             cond c;
         };
 
@@ -65,17 +73,17 @@ class rpcc : private connection_delegate {
 
         shared_ptr<connection> chan_;
 
-        mutex m_; // protect insert/delete to calls[]
-        mutex chan_m_;
+        std::mutex m_; // protect insert/delete to calls[]
+        std::mutex chan_m_;
 
         bool destroy_wait_ = false;
         cond destroy_wait_c_;
 
-        map<int, caller *> calls_;
+        std::map<int, caller *> calls_;
 
         // xid starts with 1 and latest received reply starts with 0
         xid_t xid_ = 1;
-        list<xid_t> xid_rep_window_ = {0};
+        std::list<xid_t> xid_rep_window_ = {0};
 
         struct request {
             void clear() { buf.clear(); xid = -1; }
@@ -126,7 +134,7 @@ class rpcc : private connection_delegate {
         template<class P, class R, typename ...Args>
         inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args & ... args) {
             static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
-            return call_m(proc.id, to, r, forward<marshall>(marshall(args...)));
+            return call_m(proc.id, to, r, std::forward<marshall>(marshall(args...)));
         }
 };
 
@@ -164,7 +172,7 @@ class rpcs : private connection_delegate {
         // provide at most once semantics by maintaining a window of replies
         // per client that that client hasn't acknowledged receiving yet.
         // indexed by client nonce.
-        map<nonce_t, list<reply_t>> reply_window_;
+        std::map<nonce_t, std::list<reply_t>> reply_window_;
 
         void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
 
@@ -172,16 +180,16 @@ class rpcs : private connection_delegate {
                 xid_t rep_xid, string & b);
 
         // latest connection to the client
-        map<nonce_t, shared_ptr<connection>> conns_;
+        std::map<nonce_t, shared_ptr<connection>> conns_;
 
         bool reachable_ = true;
 
         // map proc # to function
-        map<proc_id_t, handler *> procs_;
+        std::map<proc_id_t, handler *> procs_;
 
-        mutex procs_m_; // protect insert/delete to procs[]
-        mutex reply_window_m_; // protect reply window et al
-        mutex conns_m_; // protect conns_
+        std::mutex procs_m_; // protect insert/delete to procs[]
+        std::mutex reply_window_m_; // protect reply window et al
+        std::mutex conns_m_; // protect conns_
 
         void dispatch(shared_ptr<connection> c, const string & buf);
 
index bf840d2..fb170e7 100644 (file)
@@ -19,6 +19,8 @@ static in_port_t port;
 
 using std::cout;
 using std::endl;
+using namespace std::chrono;
+using std::vector;
 
 // server-side handlers. they must be methods of some class
 // to simplify rpcs::reg(). a server process can have handlers
@@ -393,7 +395,7 @@ int main(int argc, char *argv[]) {
 
     if (isclient) {
         // server's address.
-        dst = "127.0.0.1:" + to_string(port);
+        dst = "127.0.0.1:" + std::to_string(port);
 
 
         // start the client.  bind it to the server.
index a864784..9525032 100644 (file)
@@ -18,7 +18,7 @@ class thread_pool {
         bool blockadd_;
 
         fifo<job_t> jobq_;
-        vector<thread> th_;
+        std::vector<thread> th_;
 
         void do_worker();
 };
diff --git a/rsm.cc b/rsm.cc
index 5812b33..cb986fe 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -83,6 +83,8 @@
 #include "rsm_client.h"
 #include <unistd.h>
 
+using std::vector;
+
 rsm_state_transfer::~rsm_state_transfer() {}
 
 rsm::rsm(const string & _first, const string & _me) : primary(_first)
@@ -103,7 +105,7 @@ rsm::rsm(const string & _first, const string & _me) : primary(_first)
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
+    testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1));
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 }
@@ -129,7 +131,7 @@ void rsm::recovery() {
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
-                this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary?
+                std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
                 ml.lock();
             }
         }
diff --git a/rsm.h b/rsm.h
index bf2f221..dfbb25c 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -18,7 +18,7 @@ class rsm_state_transfer {
 
 class rsm : public config_view_change {
     protected:
-        map<rpc_protocol::proc_id_t, handler *> procs;
+        std::map<rpc_protocol::proc_id_t, handler *> procs;
         unique_ptr<config> cfg;
         rsm_state_transfer *stf = nullptr;
         rpcs *rsmrpc;
@@ -31,7 +31,7 @@ class rsm : public config_view_change {
         bool inviewchange = true;
         unsigned vid_commit = 0;  // Latest view id that is known to rsm layer
         unsigned vid_insync;  // The view id that this node is synchronizing for
-        vector<string> backups;   // A list of unsynchronized backups
+        std::vector<string> backups;   // A list of unsynchronized backups
 
         // For testing purposes
         unique_ptr<rpcs> testsvr;
@@ -39,7 +39,7 @@ class rsm : public config_view_change {
         bool dopartition = false;
         bool breakpoints[2] = {};
 
-        rsm_client_protocol::status client_members(vector<string> & r, int i);
+        rsm_client_protocol::status client_members(std::vector<string> & r, int i);
         rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq);
         rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src,
                 viewstamp last, unsigned vid);
@@ -48,7 +48,7 @@ class rsm : public config_view_change {
         rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal);
         rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b);
 
-        mutex rsm_mutex, invoke_mutex;
+        std::mutex rsm_mutex, invoke_mutex;
         cond recovery_cond, sync_cond;
 
         void execute(rpc_protocol::proc_id_t procno, const string & req, string & r);
index 303d038..90b5b06 100644 (file)
@@ -15,8 +15,8 @@
 class rsm_client {
     protected:
         string primary;
-        vector<string> known_mems;
-        mutex rsm_client_mutex;
+        std::vector<string> known_mems;
+        std::mutex rsm_client_mutex;
         void primary_failure(lock & rsm_client_mutex_lock);
         bool init_members(lock & rsm_client_mutex_lock);
         rsm_protocol::status invoke(unsigned int proc, string & rep, const string & req);
index 28773c1..5a5b7dd 100644 (file)
@@ -8,7 +8,7 @@ namespace rsm_client_protocol {
     enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY};
     REMOTE_PROCEDURE_BASE(0x9000);
     REMOTE_PROCEDURE(1, invoke, (string &, rpc_protocol::proc_id_t, string));
-    REMOTE_PROCEDURE(2, members, (vector<string> &, int));
+    REMOTE_PROCEDURE(2, members, (std::vector<string> &, int));
 }
 
 struct viewstamp {
index e821c99..d5d27fd 100644 (file)
@@ -17,9 +17,9 @@ int main(int argc, char *argv[]) {
     rsmtest_client *lc = new rsmtest_client(argv[1]);
     string command(argv[2]);
     if (command == "partition") {
-        LOG_NONMEMBER << "net_repair returned " << lc->net_repair(stoi(argv[3]));
+        LOG_NONMEMBER << "net_repair returned " << lc->net_repair(std::stoi(argv[3]));
     } else if (command == "breakpoint") {
-        int b = stoi(argv[3]);
+        int b = std::stoi(argv[3]);
         LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b);
     } else {
         LOG_NONMEMBER << "Unknown command " << argv[2];
diff --git a/rsm_tester.py b/rsm_tester.py
new file mode 100755 (executable)
index 0000000..32f0ad2
--- /dev/null
@@ -0,0 +1,628 @@
+#!/usr/bin/env python
+
+import subprocess as sp
+import signal
+import os
+import sys
+import time
+import getopt
+import random
+
+pid = []
+logs = []
+views = [] # expected views
+in_views = {} # the number of views a node is expected to be present
+p = []
+t = None
+always_kill = 0
+quit = False
+
+def killprocess(num, frame):
+    print "killprocess: forcestop all spawned processes...%s" % (str(pid),)
+    global quit
+    quit = True
+    for p in pid:
+        os.kill(p, signal.SIGKILL)
+
+for sig in ['HUP', 'INT', 'ABRT', 'QUIT', 'TERM']:
+    num = getattr(signal, 'SIG'+sig)
+    signal.signal(num, killprocess)
+
+def paxos_log(port):
+  return "paxos-%d.log" % port
+
+def die(*s):
+    print >>sys.stderr, ''.join(s)
+    exit(1)
+
+def mydie(*s):
+    if always_kill:
+        killprocess()
+    die(*s)
+
+def usleep(us):
+    time.sleep(us/1e6)
+
+def cleanup():
+    for p in pid:
+        os.kill(p, signal.SIGKILL)
+    for l in logs:
+        try:
+            os.unlink(l)
+        except OSError:
+            pass
+    usleep(200000)
+
+def spawn(p, *a):
+    sa = map(str, a)
+    aa = '-'.join(sa)
+    try:
+        pid = os.fork()
+    except OSError, e:
+        mydie("Cannot fork: %s" % (repr(e),))
+    if pid:
+        # parent
+        logs.append("%s-%s.log" % (p, aa))
+        if 'lock_server' in p:
+            logs.append(paxos_log(a[1]))
+        return pid
+    else:
+        # child
+        os.close(1)
+        sys.stdout = open("%s-%s.log" % (p, aa), 'w')
+        os.close(2)
+        os.dup(1)
+        sys.stderr = sys.stdout
+        print "%s %s" % (p, ' '.join(sa))
+        try:
+            os.execv(p, [p] + sa)
+        except OSError, e:
+            mydie("Cannot start new %s %s %s", (p, repr(sa), repr(e)))
+
+def randports(num):
+    return sorted([random.randint(0, 54000/2)*2+10000 for i in xrange(num)])
+
+def print_config(ports):
+    try:
+        config = open("config", 'w')
+    except IOError:
+        mydie("Couldn't open config for writing")
+    for p in ports:
+        print >>config, "%05d" % (p,)
+    config.close()
+
+def spawn_ls(master, port):
+    return spawn("./lock_server", master, port)
+
+def check_views(l, vs, last_v=None):
+    try:
+        f = open(l, 'r')
+        log = f.readlines()
+        f.close()
+    except IOError:
+        mydie("Failed: couldn't read %s" % (l,))
+    i = 0
+    last_view = None
+    for line in log:
+        if not line.startswith('done'):
+            continue
+        words = line.split(' ')
+        num = int(words[1])
+        view = map(int, words[2:])
+        last_view = view
+        if i >= len(vs):
+            # let there be extra views
+            continue
+        expected = vs[i]
+        if tuple(expected) != tuple(view):
+            mydie("Failed: In log %s at view %s is (%s), but expected %s (%s)" %
+                  (l, str(num), repr(view), str(i), repr(expected)))
+        i+=1
+    if i < len(vs):
+        mydie("Failed: In log %s, not enough views seen!" % (l,))
+    if last_v is not None and tuple(last_v) != tuple(last_view):
+        mydie("Failed: In log %s last view didn't match, got view %s, but expected %s" %
+              (l, repr(last_view), repr(last_v)))
+
+def get_num_views(log, including):
+    try:
+        f = open(log, 'r')
+    except IOError:
+        return 0
+    log = f.readlines()
+    f.close()
+    return len([x for x in log if 'done ' in x and str(including) in x])
+
+def wait_for_view_change(log, num_views, including, timeout):
+    start = time.time()
+    while get_num_views(log, including) < num_views and (start + timeout > time.time()) and not quit:
+        try:
+            f = open(log, 'r')
+            loglines = f.readlines()
+            f.close()
+            lastv = [x for x in loglines if 'done' in x][-1].strip()
+            print "   Waiting for %s to be present in >=%s views in %s (Last view: %s)" % \
+                  (including, str(num_views), log, lastv)
+            usleep(100000)
+        except IOError:
+            continue
+
+    if get_num_views(log, including) < num_views:
+        mydie("Failed: Timed out waiting for %s to be in >=%s in log %s" %
+              (including, str(num_views), log))
+    else:
+        print "   Done: %s is in >=%s views in %s" % (including, str(num_views), log)
+
+def waitpid_to(pid, to):
+    start = time.time()
+    done_pid = -1
+    while done_pid <= 0 and (time.time() - start) < to:
+      usleep(100000)
+      done_pid = os.waitpid(pid, os.WNOHANG)
+
+    if done_pid <= 0:
+        os.kill(pid, signal.SIGKILL)
+        mydie("Failed: Timed out waiting for process %s" % (str(pid),))
+    else:
+        return 1
+
+def wait_and_check_expected_view(v):
+    views.append(v)
+    for vv in v:
+        in_views[vv] += 1
+    for port in v:
+        wait_for_view_change(paxos_log(port), in_views[port], port, 20)
+    for port in v:
+        log = paxos_log(port)
+        check_views(log, views)
+
+def start_nodes(n, command):
+    global pid, logs, views
+    pid = []
+    logs = []
+    views = []
+    for pp in p:
+        in_views[pp] = 0
+
+    for i in xrange(n):
+        if command == "ls":
+            pid.append(spawn_ls(p[0],p[i]))
+            print "Start lock_server on %s" % (str(p[i]),)
+        usleep(100000)
+
+        wait_and_check_expected_view(p[:i+1])
+
+options, arguments = getopt.getopt(sys.argv[1:], "s:k")
+options = dict(options)
+
+if 's' in options:
+    random.seed(options[s])
+
+if 'k' in options:
+    always_kill = 1
+
+# get a sorted list of random ports
+p = randports(5)
+print_config(p)
+
+NUM_TESTS = 17
+do_run = [0] * NUM_TESTS
+
+# see which tests are set
+if len(arguments):
+    for t in arguments:
+        t = int(t)
+        if t < NUM_TESTS and t >= 0:
+            do_run[t] = 1
+else:
+    # turn on all tests
+    for i in xrange(NUM_TESTS):
+        do_run[i] = 1
+
+if do_run[0]:
+    print "test0: start 3-process lock server"
+    start_nodes(3,"ls")
+    cleanup()
+    usleep(200000)
+
+if do_run[1]:
+    print "test1: start 3-process lock server, kill third server"
+    start_nodes(3,"ls")
+    print "Kill third server (PID: %s) on port %s" % (str(pid[2]), str(p[2]))
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(500000)
+    # it should go through 4 views
+    v4 = [p[0], p[1]]
+    wait_and_check_expected_view(v4)
+    cleanup()
+    usleep(200000)
+
+if do_run[2]:
+    print "test2: start 3-process lock server, kill first server"
+    start_nodes(3,"ls")
+    print "Kill first (PID: $pid[0]) on port $p[0]"
+    os.kill(pid[0], signal.SIGTERM)
+    usleep(500000)
+    # it should go through 4 views
+    v4 = [p[1], p[2]]
+    wait_and_check_expected_view(v4)
+    cleanup()
+    usleep(200000)
+
+if do_run[3]:
+    print "test3: start 3-process lock_server, kill a server, restart a server"
+    start_nodes(3,"ls")
+    print "Kill server (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(500000)
+    v4 = (p[0], p[1])
+    wait_and_check_expected_view(v4)
+    print "Restart killed server on port $p[2]"
+    pid[2] = spawn_ls (p[0], p[2])
+    usleep(500000)
+    v5 = (p[0], p[1], p[2])
+    wait_and_check_expected_view(v5)
+    cleanup()
+    usleep(200000)
+
+if do_run[4]:
+    print "test4: 3-process lock_server, kill third server, kill second server, restart third server, kill third server again, restart second server, re-restart third server, check logs"
+    start_nodes(3,"ls")
+    print "Kill server (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(500000)
+    v4 = (p[0], p[1])
+    wait_and_check_expected_view(v4)
+    print "Kill server (PID: $pid[1]) on port $p[1]"
+    os.kill(pid[1], signal.SIGTERM)
+    usleep(500000)
+    #no view change can happen because of a lack of majority
+    print "Restarting server on port $p[2]"
+    pid[2] = spawn_ls(p[0], p[2])
+    usleep(500000)
+    #no view change can happen because of a lack of majority
+    for port in p[0:1+2]:
+        num_v = get_num_views(paxos_log(port), port)
+        if num_v != in_views[port]:
+            die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority")
+    # kill node 3 again,
+    print "Kill server (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(500000)
+    print "Restarting server on port $p[1]"
+    pid[1] = spawn_ls(p[0], p[1])
+    usleep(700000)
+    for port in p[0:1+1]:
+        in_views[port] = get_num_views(paxos_log(port), port)
+        print "   Node $port is present in ", in_views[port], " views in ", paxos_log(port), ""
+    print "Restarting server on port $p[2]"
+    pid[2] = spawn_ls(p[0], p[2])
+    lastv = (p[0],p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    # now check the paxos logs and make sure the logs go through the right
+    # views
+    for port in lastv:
+        check_views(paxos_log(port), views, lastv)
+    cleanup()
+
+if do_run[5]:
+    print "test5: 3-process lock_server, send signal 1 to first server, kill third server, restart third server, check logs"
+    start_nodes(3,"ls")
+    print "Sending paxos breakpoint 1 to first server on port $p[0]"
+    spawn("./rsm_tester", p[0]+1, "breakpoint", 3)
+    usleep(100000)
+    print "Kill third server (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(500000)
+    for port in p[0:1+2]:
+        num_v = get_num_views(paxos_log(port), port)
+        if num_v != in_views[port]:
+            die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority")
+    print "Restarting third server on port $p[2]"
+    pid[2]= spawn_ls(p[0], p[2])
+    lastv = (p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    usleep(1000000)
+    # now check the paxos logs and make sure the logs go through the right
+    # views
+    for port in lastv:
+        check_views(paxos_log(port), views, lastv)
+    cleanup()
+
+if do_run[6]:
+    print "test6: 4-process lock_server, send signal 2 to first server, kill fourth server, restart fourth server, check logs"
+    start_nodes(4,"ls")
+    print "Sending paxos breakpoint 2 to first server on port $p[0]"
+    spawn("./rsm_tester", p[0]+1, "breakpoint", 4)
+    usleep(100000)
+    print "Kill fourth server (PID: $pid[3]) on port $p[3]"
+    os.kill(pid[3], signal.SIGTERM)
+    usleep(500000)
+    for port in (p[1],p[2]):
+        num_v = get_num_views(paxos_log(port), port)
+        if num_v != in_views[port]:
+            die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority")
+    usleep(500000)
+    print "Restarting fourth server on port $p[3]"
+    pid[3] = spawn_ls(p[1], p[3])
+    usleep(500000)
+    v5 = (p[0],p[1],p[2])
+    for port in v5:
+        in_views[port]+=1
+    views.append(v5)
+    usleep(1000000)
+    # the 6th view will be (2,3)  or (1,2,3,4)
+    v6 = (p[1],p[2])
+    for port in v6:
+        in_views[port]+=1
+    for port in v6:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 30)
+    # final will be (2,3,4)
+    lastv = (p[1],p[2],p[3])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    for port in lastv:
+        check_views(paxos_log(port), views, lastv)
+    cleanup()
+
+if do_run[7]:
+    print "test7: 4-process lock_server, send signal 2 to first server, kill fourth server, kill other servers, restart other servers, restart fourth server, check logs"
+    start_nodes(4,"ls")
+    print "Sending paxos breakpoint 2 to first server on port $p[0]"
+    spawn("./rsm_tester", p[0]+1, "breakpoint", 4)
+    usleep(300000)
+    print "Kill fourth server (PID: $pid[3]) on port $p[3]"
+    os.kill(pid[3], signal.SIGTERM)
+    usleep(500000)
+    print "Kill third server (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    print "Kill second server (PID: $pid[1]) on port $p[1]"
+    os.kill(pid[1], signal.SIGTERM)
+    usleep(500000)
+    print "Restarting second server on port $p[1]"
+    pid[1] = spawn_ls(p[0], p[1])
+    usleep(500000)
+    print "Restarting third server on port $p[2]"
+    pid[2] = spawn_ls(p[0], p[2])
+    usleep(500000)
+    #no view change is possible by now because there is no majority
+    for port in (p[1],p[2]):
+        num_v = get_num_views(paxos_log(port), port)
+        if num_v != in_views[port]:
+            die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority")
+    print "Restarting fourth server on port $p[3]"
+    pid[3] = spawn_ls(p[1], p[3])
+    usleep(500000)
+    v5 = (p[0], p[1], p[2])
+    views.append(v5)
+    for port in v5:
+        in_views[port]+=1
+    usleep(1500000)
+    lastv = (p[1],p[2],p[3])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    for port in lastv:
+        check_views(paxos_log(port), views, lastv)
+    cleanup()
+
+if do_run[8]:
+    print "test8: start 3-process lock service"
+    start_nodes(3,"ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 8")
+    cleanup()
+    usleep(200000)
+
+if do_run[9]:
+    print "test9: start 3-process rsm, kill second slave while lock_tester is running"
+    start_nodes(3,"ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(random.randint(1,1000000))
+    print "Kill slave (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(300000)
+    # it should go through 4 views
+    v4 = (p[0], p[1])
+    wait_and_check_expected_view(v4)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 9")
+    cleanup()
+    usleep(200000)
+
+if do_run[10]:
+    print "test10: start 3-process rsm, kill second slave and restarts it later while lock_tester is running"
+    start_nodes(3,"ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(random.randint(1,1000000))
+    print "Kill slave (PID: $pid[2]) on port $p[2]"
+    os.kill(pid[2], signal.SIGTERM)
+    usleep(300000)
+    # it should go through 4 views
+    v4 = (p[0], p[1])
+    wait_and_check_expected_view(v4)
+    usleep(300000)
+    print "Restarting killed lock_server on port $p[2]"
+    pid[2] = spawn_ls(p[0], p[2])
+    v5 = (p[0],p[1],p[2])
+    wait_and_check_expected_view(v5)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 10")
+    cleanup()
+    usleep(200000)
+
+if do_run[11]:
+    print "test11: start 3-process rsm, kill primary while lock_tester is running"
+    start_nodes(3,"ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(random.randint(1,1000000))
+    print "Kill primary (PID: $pid[0]) on port $p[0]"
+    os.kill(pid[0], signal.SIGTERM)
+    usleep(300000)
+    # it should go through 4 views
+    v4 = (p[1], p[2])
+    wait_and_check_expected_view(v4)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 11")
+    cleanup()
+    usleep(200000)
+
+if do_run[12]:
+    print "test12: start 3-process rsm, kill master at break1 and restart it while lock_tester is running"
+    start_nodes(3, "ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(100000)
+    print "Kill master (PID: $pid[0]) on port $p[0] at breakpoint 1"
+    spawn("./rsm_tester", p[0]+1, "breakpoint", 1)
+    usleep(100000)
+    # it should go through 5 views
+    v4 = (p[1], p[2])
+    wait_and_check_expected_view(v4)
+    print "Restarting killed lock_server on port $p[0]"
+    pid[0] = spawn_ls(p[1], p[0])
+    usleep(300000)
+    # the last view should include all nodes
+    lastv = (p[0],p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    for port in lastv:
+        check_views(paxos_log(port), views, lastv)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 12")
+    cleanup()
+    usleep(200000)
+
+if do_run[13]:
+    print "test13: start 3-process rsm, kill slave at break1 and restart it while lock_tester is running"
+    start_nodes(3, "ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(100000)
+    print "Kill slave (PID: $pid[2]) on port $p[2] at breakpoint 1"
+    spawn("./rsm_tester", p[2]+1, "breakpoint", 1)
+    usleep(100000)
+    # it should go through 4 views
+    v4 = (p[0], p[1])
+    wait_and_check_expected_view(v4)
+    print "Restarting killed lock_server on port $p[2]"
+    pid[2] = spawn_ls(p[0], p[2])
+    usleep(300000)
+    # the last view should include all nodes
+    lastv = (p[0],p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    for port in lastv:
+        check_views(paxos_log(port), views, lastv)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 13")
+    cleanup()
+    usleep(200000)
+
+if do_run[14]:
+    print "test14: start 5-process rsm, kill slave break1, kill slave break2"
+    start_nodes(5, "ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(100000)
+    print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1"
+    spawn("./rsm_tester", p[4]+1, "breakpoint", 1)
+    print "Kill slave (PID: $pid[3]) on port $p[3] at breakpoint 2"
+    spawn("./rsm_tester", p[3]+1, "breakpoint", 2)
+    usleep(100000)
+    # two view changes:
+    print "first view change wait"
+    lastv = (p[0],p[1],p[2],p[3])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    print "second view change wait"
+    lastv = (p[0],p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 14")
+    cleanup()
+    usleep(200000)
+
+if do_run[15]:
+    print "test15: start 5-process rsm, kill slave break1, kill primary break2"
+    start_nodes(5, "ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(100000)
+    print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1"
+    spawn("./rsm_tester", p[4]+1, "breakpoint", 1)
+    print "Kill primary (PID: $pid[0]) on port $p[0] at breakpoint 2"
+    spawn("./rsm_tester", p[0]+1, "breakpoint", 2)
+    usleep(100000)
+    # two view changes:
+    print "first view change wait"
+    lastv = (p[0],p[1],p[2],p[3])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    print "second view change wait"
+    lastv = (p[1],p[2],p[3])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 15")
+    cleanup()
+    usleep(200000)
+
+if do_run[16]:
+    print "test16: start 3-process rsm, partition primary, heal it"
+    start_nodes(3, "ls")
+    print "Start lock_tester $p[0]"
+    t = spawn("./lock_tester", p[0])
+    usleep(100000)
+    print "Partition primary (PID: $pid[0]) on port $p[0] at breakpoint"
+    spawn("./rsm_tester", p[0]+1, "partition", 0)
+    usleep(300000)
+    print "first view change wait"
+    lastv = (p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    usleep(100000)
+    print "Heal partition primary (PID: $pid[0]) on port $p[0] at breakpoint"
+    spawn("./rsm_tester", p[0]+1, "partition", 1)
+    usleep(100000)
+    # xxx it should test that this is the 5th view!
+    print "second view change wait"
+    lastv = (p[0], p[1],p[2])
+    for port in lastv:
+        wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20)
+    print "   Wait for lock_tester to finish (waitpid $t)"
+    waitpid_to(t, 600)
+    if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"):
+        mydie("Failed lock tester for test 16")
+    cleanup()
+    usleep(200000)
+
+print "tests done OK"
+
+try:
+    os.unlink("config")
+except OSError:
+    pass
index 96728f6..98cc6f6 100644 (file)
@@ -1,18 +1,21 @@
 #include "threaded_log.h"
 
-static mutex log_mutex;
-static map<thread::id, int> thread_name_map;
+static std::mutex log_mutex;
+static std::map<thread::id, int> thread_name_map;
 static int next_thread_num = 0;
-static map<const void *, int> instance_name_map;
+static std::map<const void *, int> instance_name_map;
 static int next_instance_num = 0;
 int DEBUG_LEVEL = 0;
 
+using namespace std::chrono;
+
 locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) {
-    auto thread = this_thread::get_id();
+    auto thread = std::this_thread::get_id();
     int tid = thread_name_map[thread];
     if (tid==0)
         tid = thread_name_map[thread] = ++next_thread_num;
-    auto utime = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000;
+    auto utime = duration_cast<microseconds>(
+            system_clock::now().time_since_epoch()).count() % 1000000000;
     f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " ";
     f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid;
     f << " " << std::setw(20) << file << " " << std::setw(18) << func;
diff --git a/types.h b/types.h
index 7de35e9..7ab04cc 100644 (file)
--- a/types.h
+++ b/types.h
 #define types_h
 
 #include <sys/types.h>
-
 #include <algorithm>
-
 #include <condition_variable>
-using cond = std::condition_variable;
-using std::cv_status;
-
 #include <chrono>
-using std::chrono::duration_cast;
-using std::chrono::microseconds;
-using std::chrono::milliseconds;
-using std::chrono::nanoseconds;
-using std::chrono::seconds;
-using std::chrono::steady_clock;
-using std::chrono::system_clock;
-using std::chrono::time_point;
-using std::chrono::time_point_cast;
-
 #include <exception>
-
 #include <fstream>
-using std::ifstream;
-using std::ofstream;
-
 #include <functional>
-
 #include <iomanip>
 #include <iostream>
-
 #include <limits>
-using std::numeric_limits;
-
 #include <list>
-using std::list;
-
 #include <map>
-using std::map;
-
 #include <memory>
-using std::enable_shared_from_this;
-using std::make_shared;
-using std::shared_ptr;
-using std::unique_ptr;
-using std::weak_ptr;
-
 #include <mutex>
-using std::mutex;
-using lock = std::unique_lock<std::mutex>;
-
 #include <stdexcept>
-using std::runtime_error;
-
 #include <sstream>
-
 #include <string>
+#include <thread>
+#include <tuple>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
 using std::string;
-using std::to_string;
-using std::stoi;
 
-#include <thread>
+using cond = std::condition_variable;
+using lock = std::unique_lock<std::mutex>;
 using std::thread;
-using std::call_once;
-using std::once_flag;
-namespace this_thread {
-    using namespace std::this_thread;
-}
 
-#include <tuple>
+using std::shared_ptr;
+using std::unique_ptr;
+
 using std::tuple;
-using std::get;
-using std::tie;
 
-#include <type_traits>
-using std::decay;
-using std::true_type;
-using std::false_type;
-using std::is_enum;
-using std::is_member_function_pointer;
-using std::is_same;
-using std::underlying_type;
 using std::enable_if;
-using std::remove_reference;
-using std::add_const;
-
-#include <utility>
-using std::pair;
-using std::declval;
-using std::forward;
-
-#include <vector>
-using std::vector;
+using std::false_type;
+using std::true_type;
 
 // type traits and manipulators
 
 template <class A, typename I=void> struct is_const_iterable : false_type {};
 
 template<class A> struct is_const_iterable<A,
-    decltype(declval<A &>().cbegin(), declval<A &>().cend(), void())
+    decltype(std::declval<A &>().cbegin(), std::declval<A &>().cend(), void())
 > : true_type {};
 
 template <class A, typename I=void> struct supports_emplace_back : false_type {};
 
 template<class A> struct supports_emplace_back<A,
-    decltype(declval<A &>().emplace_back(declval<typename A::value_type>()), void())
+    decltype(std::declval<A &>().emplace_back(std::declval<typename A::value_type>()), void())
 > : true_type {};
 
-template<typename E>
-using enum_type_t = typename enable_if<is_enum<E>::value, typename underlying_type<E>::type>::type;
+template<typename E> using enum_type_t = typename enable_if<
+    std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
+
 template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
 template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
 
@@ -117,13 +63,13 @@ template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept {
 template <class A, typename I=void> struct is_tuple_convertible : false_type {};
 
 template<class A> struct is_tuple_convertible<A,
-    decltype(declval<A &>()._tuple_(), void())
+    decltype(std::declval<A &>()._tuple_(), void())
 > : true_type {};
 
 // string manipulation
 
 template <class A, class B>
-std::ostream & operator<<(std::ostream & o, const pair<A,B> & d) {
+std::ostream & operator<<(std::ostream & o, const std::pair<A,B> & d) {
     return o << "<" << d.first << "," << d.second << ">";
 }
 
@@ -140,8 +86,8 @@ implode(const C & v, string delim=" ") {
     return oss.str();
 }
 
-inline vector<string> explode(const string & s, string delim=" ") {
-    vector<string> out;
+inline std::vector<string> explode(const string & s, string delim=" ") {
+    std::vector<string> out;
     size_t start = 0, end = 0;
     while ((end = s.find(delim, start)) != string::npos) {
         out.push_back(s.substr(start, end - start));
@@ -152,7 +98,9 @@ inline vector<string> explode(const string & s, string delim=" ") {
 }
 
 template <class A>
-typename enable_if<is_const_iterable<A>::value && !is_same<A,string>::value, std::ostream>::type &
+typename enable_if<
+    is_const_iterable<A>::value &&
+    !std::is_same<A,string>::value, std::ostream>::type &
 operator<<(std::ostream & o, const A & a) {
     return o << "[" << implode(a, ", ") << "]";
 }
@@ -168,8 +116,8 @@ operator<<(std::ostream & o, const A & a) {
 // };
 
 #define MEMBERS(...) \
-inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \
-inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); }
+inline auto _tuple_() -> decltype(std::tie(__VA_ARGS__)) { return std::tie(__VA_ARGS__); } \
+inline auto _tuple_() const -> decltype(std::tie(__VA_ARGS__)) { return std::tie(__VA_ARGS__); }
 
 // struct ordering and comparison operations; requires the use of MEMBERS.
 // usage:
@@ -184,8 +132,7 @@ LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \
 LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \
 LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
 
-// crucial tool for tuple indexing in variadic templates
-//
+// Tuple indexing in variadic templates.
 // This implementation of tuple_indices is redistributed under the MIT
 // License as an insubstantial portion of the LLVM compiler infrastructure.