From: Peter Iannucci Date: Wed, 27 Nov 2013 06:17:02 +0000 (-0500) Subject: Got rid of most using directives. Ported tests to python. X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/03b35a9a1bd1f583e32b27d260b223a0989d6c75 Got rid of most using directives. Ported tests to python. --- diff --git a/config.cc b/config.cc index e1b0963..4931a9f 100644 --- a/config.cc +++ b/config.cc @@ -1,6 +1,8 @@ #include "config.h" #include "handle.h" +using std::vector; + // The config module maintains views. As a node joins or leaves a // view, the next view will be the same as previous view, except with // the new node added or removed. The first view contains only node diff --git a/config.h b/config.h index 26a612d..d2b33ba 100644 --- a/config.h +++ b/config.h @@ -17,11 +17,11 @@ class config : public paxos_change { string me; config_view_change *vc; proposer_acceptor paxos; - vector mems; - mutex cfg_mutex; + std::vector mems; + std::mutex cfg_mutex; cond config_cond; paxos_protocol::status heartbeat(int & r, string m, unsigned instance); - void get_view(unsigned instance, vector & m, lock & cfg_mutex_lock); + void get_view(unsigned instance, std::vector & m, lock & cfg_mutex_lock); bool remove(const string &, lock & cfg_mutex_lock); void reconstruct(lock & cfg_mutex_lock); typedef enum { @@ -35,7 +35,7 @@ class config : public paxos_change { unsigned view_id() { return my_view_id; } const string & myaddr() const { return me; } string dump() { return paxos.dump(); } - void get_view(unsigned instance, vector & m); + void get_view(unsigned instance, std::vector & m); void restore(const string & s); bool add(const string &, unsigned view_id); bool ismember(const string & m, unsigned view_id); diff --git a/endian.h b/endian.h index f34d371..feb3bbd 100644 --- a/endian.h +++ b/endian.h @@ -20,14 +20,18 @@ inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); } template inline T ntoh(T t) { return hton(t); } -template inline tuple::type...> +template +inline tuple::type...> tuple_hton_imp(tuple && t, tuple_indices) { - return tuple::type...>(hton(get(t))...); + return tuple< + typename std::remove_reference::type... + >(hton(std::get(t))...); } -template inline tuple::type...> +template +inline tuple::type...> hton(tuple && t) { - return tuple_hton_imp(forward>(t), TUPLE_INDICES(Args)); + return tuple_hton_imp(std::forward>(t), TUPLE_INDICES(Args)); } template inline typename diff --git a/handle.cc b/handle.cc index 17d04af..926cb59 100644 --- a/handle.cc +++ b/handle.cc @@ -5,18 +5,18 @@ public: unique_ptr client; bool valid = true; string destination; - mutex client_mutex; + std::mutex client_mutex; hinfo(const string & destination_) : destination(destination_) {} }; -static mutex mgr_mutex; -static map> hmap; +static std::mutex mgr_mutex; +static std::map> hmap; handle::handle(const string & destination) : destination_(destination) { lock ml(mgr_mutex); h = hmap[destination]; if (!h || !h->valid) - h = (hmap[destination] = make_shared(destination)); + h = (hmap[destination] = std::make_shared(destination)); } rpcc * handle::safebind() { diff --git a/lock_client.cc b/lock_client.cc index 7a44940..9949eac 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -4,7 +4,7 @@ #include void lock_state::wait(lock & mutex_lock) { - auto self = this_thread::get_id(); + auto self = std::this_thread::get_id(); c[self].wait(mutex_lock); c.erase(self); } @@ -20,8 +20,6 @@ void lock_state::signal(thread::id who) { c[who].notify_one(); } -typedef map lock_map; - in_port_t lock_client::last_port = 0; lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { @@ -36,7 +34,7 @@ lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xi srandom((uint32_t)time(NULL)^last_port); rlock_port = ((random()%32000) | (0x1 << 10)); - id = "127.0.0.1:" + to_string(rlock_port); + id = "127.0.0.1:" + std::to_string(rlock_port); last_port = rlock_port; rlsrpc = unique_ptr(new rpcs(rlock_port)); rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this); @@ -81,7 +79,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) { lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { lock_state & st = get_lock_state(lid); lock sl(st.m); - auto self = this_thread::get_id(); + auto self = std::this_thread::get_id(); // check for reentrancy VERIFY(st.state != lock_state::locked || st.held_by != self); @@ -147,7 +145,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { lock_state & st = get_lock_state(lid); lock sl(st.m); - auto self = this_thread::get_id(); + auto self = std::this_thread::get_id(); VERIFY(st.state == lock_state::locked && st.held_by == self); st.state = lock_state::free; LOG << "Lock " << lid << ": free"; diff --git a/lock_client.h b/lock_client.h index 31bf905..5cdb26e 100644 --- a/lock_client.h +++ b/lock_client.h @@ -26,17 +26,17 @@ public: acquiring, releasing } state = none; - thread::id held_by; - list wanted_by; - mutex m; - map c; + std::thread::id held_by; + std::list wanted_by; + std::mutex m; + std::map c; lock_protocol::xid_t xid; void wait(lock & mutex_lock); void signal(); void signal(thread::id who); }; -typedef map lock_map; +typedef std::map lock_map; // Clients that caches locks. The server can revoke locks using // lock_revoke_server. @@ -50,10 +50,10 @@ class lock_client { in_port_t rlock_port; string hostname; string id; - mutex xid_mutex; + std::mutex xid_mutex; lock_protocol::xid_t next_xid; fifo release_fifo; - mutex lock_table_lock; + std::mutex lock_table_lock; lock_map lock_table; lock_state & get_lock_state(lock_protocol::lockid_t lid); public: diff --git a/lock_server.h b/lock_server.h index da8b788..88b9e11 100644 --- a/lock_server.h +++ b/lock_server.h @@ -6,7 +6,7 @@ #include "rsm.h" #include "rpc/fifo.h" -typedef pair holder_t; +typedef std::pair holder_t; class lock_state { public: @@ -14,20 +14,20 @@ public: lock_state(const lock_state & other); bool held; holder_t held_by; - list wanted_by; - map old_requests; - mutex m; + std::list wanted_by; + std::map old_requests; + std::mutex m; lock_state & operator=(const lock_state &); MEMBERS(held, held_by, wanted_by) }; -typedef map lock_map; +typedef std::map lock_map; class lock_server : private rsm_state_transfer { private: int nacquire; - mutex lock_table_lock; + std::mutex lock_table_lock; lock_map lock_table; lock_state & get_lock_state(lock_protocol::lockid_t lid); fifo retry_fifo; diff --git a/lock_tester.cc b/lock_tester.cc index e9ec0a8..7143401 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -20,7 +20,7 @@ static lock_protocol::lockid_t c = "3"; // doesn't grant the same lock to both clients. // it assumes that lock names are distinct in the first byte. static int ct[256]; -static mutex count_mutex; +static std::mutex count_mutex; static void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); diff --git a/log.cc b/log.cc index 2f1d679..decb827 100644 --- a/log.cc +++ b/log.cc @@ -12,7 +12,7 @@ log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) { } void log::logread(void) { - ifstream from(name); + std::ifstream from(name); string type; unsigned instance; diff --git a/paxos.h b/paxos.h index 2ddf583..79924a3 100644 --- a/paxos.h +++ b/paxos.h @@ -22,13 +22,12 @@ extern bool majority(const nodes_t & l1, const nodes_t & l2); class proposer_acceptor { private: - mutex proposer_mutex; - mutex acceptor_mutex; + std::mutex proposer_mutex, acceptor_mutex; paxos_change *delegate; node_t me; - rpcs pxs{(in_port_t)stoi(me)}; + rpcs pxs{(in_port_t)std::stoi(me)}; bool break1 = false; bool break2 = false; @@ -42,7 +41,7 @@ class proposer_acceptor { prop_t accepted = {0, me}; // number of highest proposal accepted value_t accepted_value; // value of highest proposal accepted unsigned instance_h = 0; // number of the highest instance we have decided - map values; // vals of each instance + std::map values; // vals of each instance friend class log; class log l = {this, me}; diff --git a/paxos_protocol.h b/paxos_protocol.h index 1ce3be7..8f8f816 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -23,7 +23,7 @@ namespace paxos_protocol { MEMBERS(oldinstance, accept, n_a, v_a) }; using node_t = string; - using nodes_t = vector; + using nodes_t = std::vector; using value_t = string; REMOTE_PROCEDURE_BASE(0x11000); diff --git a/rpc/connection.cc b/rpc/connection.cc index d118539..c7e8f95 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -30,7 +30,7 @@ connection::~connection() { // will be active poll_mgr::shared_mgr.block_remove_fd(fd); VERIFY(dead_); - VERIFY(!wpdu_.buf.size()); + VERIFY(wpdu_.status == unused); } shared_ptr connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) { @@ -42,28 +42,23 @@ shared_ptr connection::to_dst(const sockaddr_in & dst, connection_de return nullptr; } IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port); - return make_shared(delegate, std::move(s), lossy); + return std::make_shared(delegate, std::move(s), lossy); } bool connection::send(const string & b) { lock ml(m_); - waiters_++; - while (!dead_ && wpdu_.buf.size()) + while (!dead_ && wpdu_.status != unused) send_wait_.wait(ml); - waiters_--; if (dead_) return false; - wpdu_.buf = b; - wpdu_.solong = 0; + wpdu_ = {inflight, b, 0}; - if (lossy_) { - if ((random()%100) < lossy_) { - IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd; - shutdown(fd,SHUT_RDWR); - } + if (lossy_ && (random()%100) < lossy_) { + IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd; + shutdown(fd,SHUT_RDWR); } if (!writepdu()) { @@ -71,17 +66,15 @@ bool connection::send(const string & b) { ml.unlock(); poll_mgr::shared_mgr.block_remove_fd(fd); ml.lock(); - } else if (wpdu_.solong != wpdu_.buf.size()) { + } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) { // should be rare to need to explicitly add write callback poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this); - while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) + while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size()) send_complete_.wait(ml); } - bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size()); - wpdu_.solong = 0; - wpdu_.buf.clear(); - if (waiters_ > 0) - send_wait_.notify_all(); + bool ret = (!dead_ && wpdu_.status == inflight && wpdu_.cursor == b.size()); + wpdu_ = {unused, "", 0}; + send_wait_.notify_all(); return ret; } @@ -90,7 +83,7 @@ void connection::write_cb(int s) { lock ml(m_); VERIFY(!dead_); VERIFY(fd == s); - if (wpdu_.buf.size() == 0) { + if (wpdu_.status != inflight) { poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY); return; } @@ -98,14 +91,30 @@ void connection::write_cb(int s) { poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; } else { - VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong < wpdu_.buf.size()) { + VERIFY(wpdu_.status != error); + if (wpdu_.cursor < wpdu_.buf.size()) return; - } } send_complete_.notify_one(); } +bool connection::writepdu() { + VERIFY(wpdu_.status == inflight); + if (wpdu_.cursor == wpdu_.buf.size()) + return true; + + ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor)); + if (n < 0) { + if (errno != EAGAIN) { + IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno; + wpdu_ = {error, "", 0}; + } + return (errno == EAGAIN); + } + wpdu_.cursor += (size_t)n; + return true; +} + // fd is ready to be read void connection::read_cb(int s) { lock ml(m_); @@ -115,7 +124,7 @@ void connection::read_cb(int s) { IF_LEVEL(5) LOG << "got data on fd " << s; - if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { + if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) { if (!readpdu()) { IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying"; poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); @@ -124,36 +133,17 @@ void connection::read_cb(int s) { } } - if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { + if (rpdu_.status == inflight && rpdu_.buf.size() == rpdu_.cursor) { if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) { // connection_delegate has successfully consumed the pdu - rpdu_.buf.clear(); - rpdu_.solong = 0; + rpdu_ = {unused, "", 0}; } } } -bool connection::writepdu() { - VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong == wpdu_.buf.size()) - return true; - - ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); - if (n < 0) { - if (errno != EAGAIN) { - IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno; - wpdu_.solong = size_t_max; - wpdu_.buf.clear(); - } - return (errno == EAGAIN); - } - wpdu_.solong += (size_t)n; - return true; -} - bool connection::readpdu() { IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size(); - if (!rpdu_.buf.size()) { + if (rpdu_.status == unused) { rpc_protocol::rpc_sz_t sz1; ssize_t n = fd.read(sz1); @@ -179,22 +169,20 @@ bool connection::readpdu() { IF_LEVEL(5) LOG << "read size of datagram = " << sz; - rpdu_.buf.assign(sz+sizeof(sz1), 0); - rpdu_.solong = sizeof(sz1); + rpdu_ = {inflight, string(sz+sizeof(sz1), 0), sizeof(sz1)}; } - ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); + ssize_t n = fd.read(&rpdu_.buf[rpdu_.cursor], rpdu_.buf.size() - rpdu_.cursor); IF_LEVEL(5) LOG << "read " << n << " bytes"; if (n <= 0) { if (errno == EAGAIN) return true; - rpdu_.buf.clear(); - rpdu_.solong = 0; + rpdu_ = {unused, "", 0}; return false; } - rpdu_.solong += (size_t)n; + rpdu_.cursor += (size_t)n; return true; } @@ -239,11 +227,10 @@ void connection_listener::read_cb(int) { int s1 = accept(tcp_, (sockaddr *)&sin, &slen); if (s1 < 0) { perror("connection_listener::accept_conn error"); - throw runtime_error("connection listener failure"); + throw std::runtime_error("connection listener failure"); } IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port); - auto ch = make_shared(delegate_, s1, lossy_); // garbage collect dead connections for (auto i = conns_.begin(); i != conns_.end();) { @@ -253,5 +240,5 @@ void connection_listener::read_cb(int) { ++i; } - conns_[s1] = ch; + conns_[s1] = std::make_shared(delegate_, s1, lossy_); } diff --git a/rpc/connection.h b/rpc/connection.h index 1bcb7b6..68bd902 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -7,8 +7,6 @@ #include "poll_mgr.h" #include "file.h" -constexpr size_t size_t_max = numeric_limits::max(); - class connection; class connection_delegate { @@ -17,7 +15,10 @@ class connection_delegate { virtual ~connection_delegate(); }; -class connection : private aio_callback, public enable_shared_from_this { +using std::chrono::steady_clock; +using time_point = std::chrono::time_point; + +class connection : private aio_callback, public std::enable_shared_from_this { public: connection(connection_delegate * delegate, socket_t && f1, int lossytest=0); ~connection(); @@ -28,7 +29,7 @@ class connection : private aio_callback, public enable_shared_from_this to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0); - const time_point create_time = steady_clock::now(); + const time_point create_time = steady_clock::now(); const file_t fd; private: @@ -41,18 +42,20 @@ class connection : private aio_callback, public enable_shared_from_this> conns_; + std::map> conns_; }; #endif diff --git a/rpc/fifo.h b/rpc/fifo.h index f8a7224..dfb4d05 100644 --- a/rpc/fifo.h +++ b/rpc/fifo.h @@ -37,8 +37,8 @@ class fifo { } private: - list q_; - mutex m_; + std::list q_; + std::mutex m_; cond non_empty_c_; // q went non-empty cond has_space_c_; // q is not longer overfull size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit diff --git a/rpc/marshall.h b/rpc/marshall.h index 8592e4b..08b6aaa 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -122,7 +122,7 @@ tuple_marshall_imp(marshall & m, tuple & t, tuple_indices) // to be evaluated in order. Order matters because the elements must be // serialized consistently! The empty struct resulting from construction // is discarded. - (void)pass{(m << get(t))...}; + (void)pass{(m << std::get(t))...}; return m; } @@ -133,7 +133,7 @@ operator<<(marshall & m, tuple && t) { template inline unmarshall & tuple_unmarshall_imp(unmarshall & u, tuple t, tuple_indices) { - (void)pass{(u >> get(t))...}; + (void)pass{(u >> std::get(t))...}; return u; } @@ -182,22 +182,22 @@ operator>>(unmarshall & u, A & x) { // std::pair template inline marshall & -operator<<(marshall & m, const pair & d) { +operator<<(marshall & m, const std::pair & d) { return m << d.first << d.second; } template inline unmarshall & -operator>>(unmarshall & u, pair & d) { +operator>>(unmarshall & u, std::pair & d) { return u >> d.first >> d.second; } // std::map template inline unmarshall & -operator>>(unmarshall & u, map & x) { +operator>>(unmarshall & u, std::map & x) { uint32_t n = u._grab(); x.clear(); while (n--) - x.emplace(u._grab>()); + x.emplace(u._grab>()); return u; } @@ -221,12 +221,12 @@ inline unmarshall & operator>>(unmarshall & u, string & s) { // Marshalling for strongly-typed enums // -template typename enable_if::value, marshall>::type & +template typename enable_if::value, marshall>::type & operator<<(marshall & m, E e) { return m << from_enum(e); } -template typename enable_if::value, unmarshall>::type & +template typename enable_if::value, unmarshall>::type & operator>>(unmarshall & u, E & e) { e = to_enum(u._grab>()); return u; diff --git a/rpc/marshall_wrap.h b/rpc/marshall_wrap.h index b895696..6754acc 100644 --- a/rpc/marshall_wrap.h +++ b/rpc/marshall_wrap.h @@ -43,17 +43,17 @@ struct VerifyOnFailure { // One for function pointers... template -typename enable_if::value, RV>::type inline +typename enable_if::value, RV>::type inline invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { - return f(r, get(t)...); + return f(r, std::get(t)...); } // And one for pointers to member functions... template -typename enable_if::value, RV>::type inline +typename enable_if::value, RV>::type inline invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices) { - return (c->*f)(r, get(t)...); + return (c->*f)(r, std::get(t)...); } // The class marshalled_func_imp uses partial template specialization to @@ -76,13 +76,13 @@ struct marshalled_func_imp { // This type definition represents storage for f's unmarshalled // arguments. decay is (most notably) stripping off const // qualifiers. - using ArgsStorage = tuple::type...>; + using ArgsStorage = tuple::type...>; // Allocate a handler (i.e. function) to hold the lambda // which will unmarshall RPCs and call f. return new handler([=](unmarshall && u, marshall & m) -> RV { // Unmarshall each argument with the correct type and store the // result in a tuple. - ArgsStorage t{u._grab::type>()...}; + ArgsStorage t{u._grab::type>()...}; // Verify successful unmarshalling of the entire input stream. if (!u.okdone()) return (RV)ErrorHandler::unmarshall_args_failure(); diff --git a/rpc/poll_mgr.cc b/rpc/poll_mgr.cc index d29abd2..83289a8 100644 --- a/rpc/poll_mgr.cc +++ b/rpc/poll_mgr.cc @@ -7,6 +7,8 @@ #include #endif +using std::vector; + aio_callback::~aio_callback() {} poll_mgr poll_mgr::shared_mgr; @@ -33,7 +35,7 @@ class SelectAIO : public wait_manager { fd_set rfds_, wfds_; int highfds_; file_t pipe_[2]; - mutex m_; + std::mutex m_; }; #ifdef __linux__ diff --git a/rpc/poll_mgr.h b/rpc/poll_mgr.h index 44ac9f8..d8cfd20 100644 --- a/rpc/poll_mgr.h +++ b/rpc/poll_mgr.h @@ -33,10 +33,10 @@ class poll_mgr { void wait_loop(); private: - mutex m_; + std::mutex m_; cond changedone_c_; - map callbacks_; + std::map callbacks_; unique_ptr aio_; bool pending_change_=false, shutdown_=false; diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 00f6d2e..de33675 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -60,6 +60,9 @@ #include #include +using std::list; +using namespace std::chrono; + inline void set_rand_seed() { auto now = time_point_cast(steady_clock::now()); srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); @@ -186,7 +189,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { lock cal(ca.m); while (!ca.done) { IF_LEVEL(2) LOG << "wait"; - if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) { + if (ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout) { IF_LEVEL(2) LOG << "timeout"; break; } @@ -404,7 +407,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) { case NEW: // new request - rh.ret = (*f)(forward(req), rep); + rh.ret = (*f)(std::forward(req), rep); if (rh.ret == rpc_protocol::unmarshall_args_failure) { LOG << "failed to unmarshall the arguments. You are " << "probably calling RPC 0x" << std::hex << proc << " with the wrong " @@ -555,6 +558,6 @@ static sockaddr_in make_sockaddr(const string & hostandport) { memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); dst.sin_addr.s_addr = a.s_addr; } - dst.sin_port = hton((in_port_t)stoi(port)); + dst.sin_port = hton((in_port_t)std::stoi(port)); return dst; } diff --git a/rpc/rpc.h b/rpc/rpc.h index b44c057..58e9381 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -11,23 +11,31 @@ #include "marshall_wrap.h" #include "connection.h" +using std::chrono::milliseconds; + namespace rpc { static constexpr milliseconds to_max{12000}; static constexpr milliseconds to_min{100}; } -template struct is_valid_call : false_type {}; +template +struct is_valid_call : false_type {}; template struct is_valid_call : true_type {}; -template struct is_valid_registration : false_type {}; +template +struct is_valid_registration : false_type {}; template -struct is_valid_registration::type...), S(R &, Args...)> : true_type {}; +struct is_valid_registration< + S(R &, typename std::decay::type...), + S(R &, Args...)> : true_type {}; template -struct is_valid_registration : is_valid_registration {}; +struct is_valid_registration< + P, + S(C::*)(R &, Args...)> : is_valid_registration {}; // rpc client endpoint. // manages a xid space per destination socket @@ -48,7 +56,7 @@ class rpcc : private connection_delegate { string *rep; int intret; bool done = false; - mutex m; + std::mutex m; cond c; }; @@ -65,17 +73,17 @@ class rpcc : private connection_delegate { shared_ptr chan_; - mutex m_; // protect insert/delete to calls[] - mutex chan_m_; + std::mutex m_; // protect insert/delete to calls[] + std::mutex chan_m_; bool destroy_wait_ = false; cond destroy_wait_c_; - map calls_; + std::map calls_; // xid starts with 1 and latest received reply starts with 0 xid_t xid_ = 1; - list xid_rep_window_ = {0}; + std::list xid_rep_window_ = {0}; struct request { void clear() { buf.clear(); xid = -1; } @@ -126,7 +134,7 @@ class rpcc : private connection_delegate { template inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args & ... args) { static_assert(is_valid_call::value, "RPC called with incorrect argument types"); - return call_m(proc.id, to, r, forward(marshall(args...))); + return call_m(proc.id, to, r, std::forward(marshall(args...))); } }; @@ -164,7 +172,7 @@ class rpcs : private connection_delegate { // provide at most once semantics by maintaining a window of replies // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - map> reply_window_; + std::map> reply_window_; void add_reply(nonce_t clt_nonce, xid_t xid, const string & b); @@ -172,16 +180,16 @@ class rpcs : private connection_delegate { xid_t rep_xid, string & b); // latest connection to the client - map> conns_; + std::map> conns_; bool reachable_ = true; // map proc # to function - map procs_; + std::map procs_; - mutex procs_m_; // protect insert/delete to procs[] - mutex reply_window_m_; // protect reply window et al - mutex conns_m_; // protect conns_ + std::mutex procs_m_; // protect insert/delete to procs[] + std::mutex reply_window_m_; // protect reply window et al + std::mutex conns_m_; // protect conns_ void dispatch(shared_ptr c, const string & buf); diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index bf840d2..fb170e7 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -19,6 +19,8 @@ static in_port_t port; using std::cout; using std::endl; +using namespace std::chrono; +using std::vector; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers @@ -393,7 +395,7 @@ int main(int argc, char *argv[]) { if (isclient) { // server's address. - dst = "127.0.0.1:" + to_string(port); + dst = "127.0.0.1:" + std::to_string(port); // start the client. bind it to the server. diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h index a864784..9525032 100644 --- a/rpc/thr_pool.h +++ b/rpc/thr_pool.h @@ -18,7 +18,7 @@ class thread_pool { bool blockadd_; fifo jobq_; - vector th_; + std::vector th_; void do_worker(); }; diff --git a/rsm.cc b/rsm.cc index 5812b33..cb986fe 100644 --- a/rsm.cc +++ b/rsm.cc @@ -83,6 +83,8 @@ #include "rsm_client.h" #include +using std::vector; + rsm_state_transfer::~rsm_state_transfer() {} rsm::rsm(const string & _first, const string & _me) : primary(_first) @@ -103,7 +105,7 @@ rsm::rsm(const string & _first, const string & _me) : primary(_first) rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1)); + testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1)); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); } @@ -129,7 +131,7 @@ void rsm::recovery() { commit_change(cfg->view_id(), ml); } else { ml.unlock(); - this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary? + std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary? ml.lock(); } } diff --git a/rsm.h b/rsm.h index bf2f221..dfbb25c 100644 --- a/rsm.h +++ b/rsm.h @@ -18,7 +18,7 @@ class rsm_state_transfer { class rsm : public config_view_change { protected: - map procs; + std::map procs; unique_ptr cfg; rsm_state_transfer *stf = nullptr; rpcs *rsmrpc; @@ -31,7 +31,7 @@ class rsm : public config_view_change { bool inviewchange = true; unsigned vid_commit = 0; // Latest view id that is known to rsm layer unsigned vid_insync; // The view id that this node is synchronizing for - vector backups; // A list of unsynchronized backups + std::vector backups; // A list of unsynchronized backups // For testing purposes unique_ptr testsvr; @@ -39,7 +39,7 @@ class rsm : public config_view_change { bool dopartition = false; bool breakpoints[2] = {}; - rsm_client_protocol::status client_members(vector & r, int i); + rsm_client_protocol::status client_members(std::vector & r, int i); rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq); rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid); @@ -48,7 +48,7 @@ class rsm : public config_view_change { rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal); rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b); - mutex rsm_mutex, invoke_mutex; + std::mutex rsm_mutex, invoke_mutex; cond recovery_cond, sync_cond; void execute(rpc_protocol::proc_id_t procno, const string & req, string & r); diff --git a/rsm_client.h b/rsm_client.h index 303d038..90b5b06 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -15,8 +15,8 @@ class rsm_client { protected: string primary; - vector known_mems; - mutex rsm_client_mutex; + std::vector known_mems; + std::mutex rsm_client_mutex; void primary_failure(lock & rsm_client_mutex_lock); bool init_members(lock & rsm_client_mutex_lock); rsm_protocol::status invoke(unsigned int proc, string & rep, const string & req); diff --git a/rsm_protocol.h b/rsm_protocol.h index 28773c1..5a5b7dd 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -8,7 +8,7 @@ namespace rsm_client_protocol { enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY}; REMOTE_PROCEDURE_BASE(0x9000); REMOTE_PROCEDURE(1, invoke, (string &, rpc_protocol::proc_id_t, string)); - REMOTE_PROCEDURE(2, members, (vector &, int)); + REMOTE_PROCEDURE(2, members, (std::vector &, int)); } struct viewstamp { diff --git a/rsm_tester.cc b/rsm_tester.cc index e821c99..d5d27fd 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -17,9 +17,9 @@ int main(int argc, char *argv[]) { rsmtest_client *lc = new rsmtest_client(argv[1]); string command(argv[2]); if (command == "partition") { - LOG_NONMEMBER << "net_repair returned " << lc->net_repair(stoi(argv[3])); + LOG_NONMEMBER << "net_repair returned " << lc->net_repair(std::stoi(argv[3])); } else if (command == "breakpoint") { - int b = stoi(argv[3]); + int b = std::stoi(argv[3]); LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b); } else { LOG_NONMEMBER << "Unknown command " << argv[2]; diff --git a/rsm_tester.py b/rsm_tester.py new file mode 100755 index 0000000..32f0ad2 --- /dev/null +++ b/rsm_tester.py @@ -0,0 +1,628 @@ +#!/usr/bin/env python + +import subprocess as sp +import signal +import os +import sys +import time +import getopt +import random + +pid = [] +logs = [] +views = [] # expected views +in_views = {} # the number of views a node is expected to be present +p = [] +t = None +always_kill = 0 +quit = False + +def killprocess(num, frame): + print "killprocess: forcestop all spawned processes...%s" % (str(pid),) + global quit + quit = True + for p in pid: + os.kill(p, signal.SIGKILL) + +for sig in ['HUP', 'INT', 'ABRT', 'QUIT', 'TERM']: + num = getattr(signal, 'SIG'+sig) + signal.signal(num, killprocess) + +def paxos_log(port): + return "paxos-%d.log" % port + +def die(*s): + print >>sys.stderr, ''.join(s) + exit(1) + +def mydie(*s): + if always_kill: + killprocess() + die(*s) + +def usleep(us): + time.sleep(us/1e6) + +def cleanup(): + for p in pid: + os.kill(p, signal.SIGKILL) + for l in logs: + try: + os.unlink(l) + except OSError: + pass + usleep(200000) + +def spawn(p, *a): + sa = map(str, a) + aa = '-'.join(sa) + try: + pid = os.fork() + except OSError, e: + mydie("Cannot fork: %s" % (repr(e),)) + if pid: + # parent + logs.append("%s-%s.log" % (p, aa)) + if 'lock_server' in p: + logs.append(paxos_log(a[1])) + return pid + else: + # child + os.close(1) + sys.stdout = open("%s-%s.log" % (p, aa), 'w') + os.close(2) + os.dup(1) + sys.stderr = sys.stdout + print "%s %s" % (p, ' '.join(sa)) + try: + os.execv(p, [p] + sa) + except OSError, e: + mydie("Cannot start new %s %s %s", (p, repr(sa), repr(e))) + +def randports(num): + return sorted([random.randint(0, 54000/2)*2+10000 for i in xrange(num)]) + +def print_config(ports): + try: + config = open("config", 'w') + except IOError: + mydie("Couldn't open config for writing") + for p in ports: + print >>config, "%05d" % (p,) + config.close() + +def spawn_ls(master, port): + return spawn("./lock_server", master, port) + +def check_views(l, vs, last_v=None): + try: + f = open(l, 'r') + log = f.readlines() + f.close() + except IOError: + mydie("Failed: couldn't read %s" % (l,)) + i = 0 + last_view = None + for line in log: + if not line.startswith('done'): + continue + words = line.split(' ') + num = int(words[1]) + view = map(int, words[2:]) + last_view = view + if i >= len(vs): + # let there be extra views + continue + expected = vs[i] + if tuple(expected) != tuple(view): + mydie("Failed: In log %s at view %s is (%s), but expected %s (%s)" % + (l, str(num), repr(view), str(i), repr(expected))) + i+=1 + if i < len(vs): + mydie("Failed: In log %s, not enough views seen!" % (l,)) + if last_v is not None and tuple(last_v) != tuple(last_view): + mydie("Failed: In log %s last view didn't match, got view %s, but expected %s" % + (l, repr(last_view), repr(last_v))) + +def get_num_views(log, including): + try: + f = open(log, 'r') + except IOError: + return 0 + log = f.readlines() + f.close() + return len([x for x in log if 'done ' in x and str(including) in x]) + +def wait_for_view_change(log, num_views, including, timeout): + start = time.time() + while get_num_views(log, including) < num_views and (start + timeout > time.time()) and not quit: + try: + f = open(log, 'r') + loglines = f.readlines() + f.close() + lastv = [x for x in loglines if 'done' in x][-1].strip() + print " Waiting for %s to be present in >=%s views in %s (Last view: %s)" % \ + (including, str(num_views), log, lastv) + usleep(100000) + except IOError: + continue + + if get_num_views(log, including) < num_views: + mydie("Failed: Timed out waiting for %s to be in >=%s in log %s" % + (including, str(num_views), log)) + else: + print " Done: %s is in >=%s views in %s" % (including, str(num_views), log) + +def waitpid_to(pid, to): + start = time.time() + done_pid = -1 + while done_pid <= 0 and (time.time() - start) < to: + usleep(100000) + done_pid = os.waitpid(pid, os.WNOHANG) + + if done_pid <= 0: + os.kill(pid, signal.SIGKILL) + mydie("Failed: Timed out waiting for process %s" % (str(pid),)) + else: + return 1 + +def wait_and_check_expected_view(v): + views.append(v) + for vv in v: + in_views[vv] += 1 + for port in v: + wait_for_view_change(paxos_log(port), in_views[port], port, 20) + for port in v: + log = paxos_log(port) + check_views(log, views) + +def start_nodes(n, command): + global pid, logs, views + pid = [] + logs = [] + views = [] + for pp in p: + in_views[pp] = 0 + + for i in xrange(n): + if command == "ls": + pid.append(spawn_ls(p[0],p[i])) + print "Start lock_server on %s" % (str(p[i]),) + usleep(100000) + + wait_and_check_expected_view(p[:i+1]) + +options, arguments = getopt.getopt(sys.argv[1:], "s:k") +options = dict(options) + +if 's' in options: + random.seed(options[s]) + +if 'k' in options: + always_kill = 1 + +# get a sorted list of random ports +p = randports(5) +print_config(p) + +NUM_TESTS = 17 +do_run = [0] * NUM_TESTS + +# see which tests are set +if len(arguments): + for t in arguments: + t = int(t) + if t < NUM_TESTS and t >= 0: + do_run[t] = 1 +else: + # turn on all tests + for i in xrange(NUM_TESTS): + do_run[i] = 1 + +if do_run[0]: + print "test0: start 3-process lock server" + start_nodes(3,"ls") + cleanup() + usleep(200000) + +if do_run[1]: + print "test1: start 3-process lock server, kill third server" + start_nodes(3,"ls") + print "Kill third server (PID: %s) on port %s" % (str(pid[2]), str(p[2])) + os.kill(pid[2], signal.SIGTERM) + usleep(500000) + # it should go through 4 views + v4 = [p[0], p[1]] + wait_and_check_expected_view(v4) + cleanup() + usleep(200000) + +if do_run[2]: + print "test2: start 3-process lock server, kill first server" + start_nodes(3,"ls") + print "Kill first (PID: $pid[0]) on port $p[0]" + os.kill(pid[0], signal.SIGTERM) + usleep(500000) + # it should go through 4 views + v4 = [p[1], p[2]] + wait_and_check_expected_view(v4) + cleanup() + usleep(200000) + +if do_run[3]: + print "test3: start 3-process lock_server, kill a server, restart a server" + start_nodes(3,"ls") + print "Kill server (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + usleep(500000) + v4 = (p[0], p[1]) + wait_and_check_expected_view(v4) + print "Restart killed server on port $p[2]" + pid[2] = spawn_ls (p[0], p[2]) + usleep(500000) + v5 = (p[0], p[1], p[2]) + wait_and_check_expected_view(v5) + cleanup() + usleep(200000) + +if do_run[4]: + print "test4: 3-process lock_server, kill third server, kill second server, restart third server, kill third server again, restart second server, re-restart third server, check logs" + start_nodes(3,"ls") + print "Kill server (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + usleep(500000) + v4 = (p[0], p[1]) + wait_and_check_expected_view(v4) + print "Kill server (PID: $pid[1]) on port $p[1]" + os.kill(pid[1], signal.SIGTERM) + usleep(500000) + #no view change can happen because of a lack of majority + print "Restarting server on port $p[2]" + pid[2] = spawn_ls(p[0], p[2]) + usleep(500000) + #no view change can happen because of a lack of majority + for port in p[0:1+2]: + num_v = get_num_views(paxos_log(port), port) + if num_v != in_views[port]: + die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority") + # kill node 3 again, + print "Kill server (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + usleep(500000) + print "Restarting server on port $p[1]" + pid[1] = spawn_ls(p[0], p[1]) + usleep(700000) + for port in p[0:1+1]: + in_views[port] = get_num_views(paxos_log(port), port) + print " Node $port is present in ", in_views[port], " views in ", paxos_log(port), "" + print "Restarting server on port $p[2]" + pid[2] = spawn_ls(p[0], p[2]) + lastv = (p[0],p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + # now check the paxos logs and make sure the logs go through the right + # views + for port in lastv: + check_views(paxos_log(port), views, lastv) + cleanup() + +if do_run[5]: + print "test5: 3-process lock_server, send signal 1 to first server, kill third server, restart third server, check logs" + start_nodes(3,"ls") + print "Sending paxos breakpoint 1 to first server on port $p[0]" + spawn("./rsm_tester", p[0]+1, "breakpoint", 3) + usleep(100000) + print "Kill third server (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + usleep(500000) + for port in p[0:1+2]: + num_v = get_num_views(paxos_log(port), port) + if num_v != in_views[port]: + die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority") + print "Restarting third server on port $p[2]" + pid[2]= spawn_ls(p[0], p[2]) + lastv = (p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + usleep(1000000) + # now check the paxos logs and make sure the logs go through the right + # views + for port in lastv: + check_views(paxos_log(port), views, lastv) + cleanup() + +if do_run[6]: + print "test6: 4-process lock_server, send signal 2 to first server, kill fourth server, restart fourth server, check logs" + start_nodes(4,"ls") + print "Sending paxos breakpoint 2 to first server on port $p[0]" + spawn("./rsm_tester", p[0]+1, "breakpoint", 4) + usleep(100000) + print "Kill fourth server (PID: $pid[3]) on port $p[3]" + os.kill(pid[3], signal.SIGTERM) + usleep(500000) + for port in (p[1],p[2]): + num_v = get_num_views(paxos_log(port), port) + if num_v != in_views[port]: + die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority") + usleep(500000) + print "Restarting fourth server on port $p[3]" + pid[3] = spawn_ls(p[1], p[3]) + usleep(500000) + v5 = (p[0],p[1],p[2]) + for port in v5: + in_views[port]+=1 + views.append(v5) + usleep(1000000) + # the 6th view will be (2,3) or (1,2,3,4) + v6 = (p[1],p[2]) + for port in v6: + in_views[port]+=1 + for port in v6: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 30) + # final will be (2,3,4) + lastv = (p[1],p[2],p[3]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + for port in lastv: + check_views(paxos_log(port), views, lastv) + cleanup() + +if do_run[7]: + print "test7: 4-process lock_server, send signal 2 to first server, kill fourth server, kill other servers, restart other servers, restart fourth server, check logs" + start_nodes(4,"ls") + print "Sending paxos breakpoint 2 to first server on port $p[0]" + spawn("./rsm_tester", p[0]+1, "breakpoint", 4) + usleep(300000) + print "Kill fourth server (PID: $pid[3]) on port $p[3]" + os.kill(pid[3], signal.SIGTERM) + usleep(500000) + print "Kill third server (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + print "Kill second server (PID: $pid[1]) on port $p[1]" + os.kill(pid[1], signal.SIGTERM) + usleep(500000) + print "Restarting second server on port $p[1]" + pid[1] = spawn_ls(p[0], p[1]) + usleep(500000) + print "Restarting third server on port $p[2]" + pid[2] = spawn_ls(p[0], p[2]) + usleep(500000) + #no view change is possible by now because there is no majority + for port in (p[1],p[2]): + num_v = get_num_views(paxos_log(port), port) + if num_v != in_views[port]: + die("$num_v views in ", paxos_log(port), " : no new views should be formed due to the lack of majority") + print "Restarting fourth server on port $p[3]" + pid[3] = spawn_ls(p[1], p[3]) + usleep(500000) + v5 = (p[0], p[1], p[2]) + views.append(v5) + for port in v5: + in_views[port]+=1 + usleep(1500000) + lastv = (p[1],p[2],p[3]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + for port in lastv: + check_views(paxos_log(port), views, lastv) + cleanup() + +if do_run[8]: + print "test8: start 3-process lock service" + start_nodes(3,"ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 8") + cleanup() + usleep(200000) + +if do_run[9]: + print "test9: start 3-process rsm, kill second slave while lock_tester is running" + start_nodes(3,"ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(random.randint(1,1000000)) + print "Kill slave (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + usleep(300000) + # it should go through 4 views + v4 = (p[0], p[1]) + wait_and_check_expected_view(v4) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 9") + cleanup() + usleep(200000) + +if do_run[10]: + print "test10: start 3-process rsm, kill second slave and restarts it later while lock_tester is running" + start_nodes(3,"ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(random.randint(1,1000000)) + print "Kill slave (PID: $pid[2]) on port $p[2]" + os.kill(pid[2], signal.SIGTERM) + usleep(300000) + # it should go through 4 views + v4 = (p[0], p[1]) + wait_and_check_expected_view(v4) + usleep(300000) + print "Restarting killed lock_server on port $p[2]" + pid[2] = spawn_ls(p[0], p[2]) + v5 = (p[0],p[1],p[2]) + wait_and_check_expected_view(v5) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 10") + cleanup() + usleep(200000) + +if do_run[11]: + print "test11: start 3-process rsm, kill primary while lock_tester is running" + start_nodes(3,"ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(random.randint(1,1000000)) + print "Kill primary (PID: $pid[0]) on port $p[0]" + os.kill(pid[0], signal.SIGTERM) + usleep(300000) + # it should go through 4 views + v4 = (p[1], p[2]) + wait_and_check_expected_view(v4) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 11") + cleanup() + usleep(200000) + +if do_run[12]: + print "test12: start 3-process rsm, kill master at break1 and restart it while lock_tester is running" + start_nodes(3, "ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(100000) + print "Kill master (PID: $pid[0]) on port $p[0] at breakpoint 1" + spawn("./rsm_tester", p[0]+1, "breakpoint", 1) + usleep(100000) + # it should go through 5 views + v4 = (p[1], p[2]) + wait_and_check_expected_view(v4) + print "Restarting killed lock_server on port $p[0]" + pid[0] = spawn_ls(p[1], p[0]) + usleep(300000) + # the last view should include all nodes + lastv = (p[0],p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + for port in lastv: + check_views(paxos_log(port), views, lastv) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 12") + cleanup() + usleep(200000) + +if do_run[13]: + print "test13: start 3-process rsm, kill slave at break1 and restart it while lock_tester is running" + start_nodes(3, "ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(100000) + print "Kill slave (PID: $pid[2]) on port $p[2] at breakpoint 1" + spawn("./rsm_tester", p[2]+1, "breakpoint", 1) + usleep(100000) + # it should go through 4 views + v4 = (p[0], p[1]) + wait_and_check_expected_view(v4) + print "Restarting killed lock_server on port $p[2]" + pid[2] = spawn_ls(p[0], p[2]) + usleep(300000) + # the last view should include all nodes + lastv = (p[0],p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + for port in lastv: + check_views(paxos_log(port), views, lastv) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 13") + cleanup() + usleep(200000) + +if do_run[14]: + print "test14: start 5-process rsm, kill slave break1, kill slave break2" + start_nodes(5, "ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(100000) + print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1" + spawn("./rsm_tester", p[4]+1, "breakpoint", 1) + print "Kill slave (PID: $pid[3]) on port $p[3] at breakpoint 2" + spawn("./rsm_tester", p[3]+1, "breakpoint", 2) + usleep(100000) + # two view changes: + print "first view change wait" + lastv = (p[0],p[1],p[2],p[3]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + print "second view change wait" + lastv = (p[0],p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 14") + cleanup() + usleep(200000) + +if do_run[15]: + print "test15: start 5-process rsm, kill slave break1, kill primary break2" + start_nodes(5, "ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(100000) + print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1" + spawn("./rsm_tester", p[4]+1, "breakpoint", 1) + print "Kill primary (PID: $pid[0]) on port $p[0] at breakpoint 2" + spawn("./rsm_tester", p[0]+1, "breakpoint", 2) + usleep(100000) + # two view changes: + print "first view change wait" + lastv = (p[0],p[1],p[2],p[3]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + print "second view change wait" + lastv = (p[1],p[2],p[3]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 15") + cleanup() + usleep(200000) + +if do_run[16]: + print "test16: start 3-process rsm, partition primary, heal it" + start_nodes(3, "ls") + print "Start lock_tester $p[0]" + t = spawn("./lock_tester", p[0]) + usleep(100000) + print "Partition primary (PID: $pid[0]) on port $p[0] at breakpoint" + spawn("./rsm_tester", p[0]+1, "partition", 0) + usleep(300000) + print "first view change wait" + lastv = (p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + usleep(100000) + print "Heal partition primary (PID: $pid[0]) on port $p[0] at breakpoint" + spawn("./rsm_tester", p[0]+1, "partition", 1) + usleep(100000) + # xxx it should test that this is the 5th view! + print "second view change wait" + lastv = (p[0], p[1],p[2]) + for port in lastv: + wait_for_view_change(paxos_log(port), in_views[port]+1, port, 20) + print " Wait for lock_tester to finish (waitpid $t)" + waitpid_to(t, 600) + if os.system("grep \"passed all tests successfully\" lock_tester-$p[0].log"): + mydie("Failed lock tester for test 16") + cleanup() + usleep(200000) + +print "tests done OK" + +try: + os.unlink("config") +except OSError: + pass diff --git a/threaded_log.cc b/threaded_log.cc index 96728f6..98cc6f6 100644 --- a/threaded_log.cc +++ b/threaded_log.cc @@ -1,18 +1,21 @@ #include "threaded_log.h" -static mutex log_mutex; -static map thread_name_map; +static std::mutex log_mutex; +static std::map thread_name_map; static int next_thread_num = 0; -static map instance_name_map; +static std::map instance_name_map; static int next_instance_num = 0; int DEBUG_LEVEL = 0; +using namespace std::chrono; + locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) { - auto thread = this_thread::get_id(); + auto thread = std::this_thread::get_id(); int tid = thread_name_map[thread]; if (tid==0) tid = thread_name_map[thread] = ++next_thread_num; - auto utime = duration_cast(system_clock::now().time_since_epoch()).count() % 1000000000; + auto utime = duration_cast( + system_clock::now().time_since_epoch()).count() % 1000000000; f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " "; f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid; f << " " << std::setw(20) << file << " " << std::setw(18) << func; diff --git a/types.h b/types.h index 7de35e9..7ab04cc 100644 --- a/types.h +++ b/types.h @@ -2,114 +2,60 @@ #define types_h #include - #include - #include -using cond = std::condition_variable; -using std::cv_status; - #include -using std::chrono::duration_cast; -using std::chrono::microseconds; -using std::chrono::milliseconds; -using std::chrono::nanoseconds; -using std::chrono::seconds; -using std::chrono::steady_clock; -using std::chrono::system_clock; -using std::chrono::time_point; -using std::chrono::time_point_cast; - #include - #include -using std::ifstream; -using std::ofstream; - #include - #include #include - #include -using std::numeric_limits; - #include -using std::list; - #include -using std::map; - #include -using std::enable_shared_from_this; -using std::make_shared; -using std::shared_ptr; -using std::unique_ptr; -using std::weak_ptr; - #include -using std::mutex; -using lock = std::unique_lock; - #include -using std::runtime_error; - #include - #include +#include +#include +#include +#include +#include + using std::string; -using std::to_string; -using std::stoi; -#include +using cond = std::condition_variable; +using lock = std::unique_lock; using std::thread; -using std::call_once; -using std::once_flag; -namespace this_thread { - using namespace std::this_thread; -} -#include +using std::shared_ptr; +using std::unique_ptr; + using std::tuple; -using std::get; -using std::tie; -#include -using std::decay; -using std::true_type; -using std::false_type; -using std::is_enum; -using std::is_member_function_pointer; -using std::is_same; -using std::underlying_type; using std::enable_if; -using std::remove_reference; -using std::add_const; - -#include -using std::pair; -using std::declval; -using std::forward; - -#include -using std::vector; +using std::false_type; +using std::true_type; // type traits and manipulators template struct is_const_iterable : false_type {}; template struct is_const_iterable().cbegin(), declval().cend(), void()) + decltype(std::declval().cbegin(), std::declval().cend(), void()) > : true_type {}; template struct supports_emplace_back : false_type {}; template struct supports_emplace_back().emplace_back(declval()), void()) + decltype(std::declval().emplace_back(std::declval()), void()) > : true_type {}; -template -using enum_type_t = typename enable_if::value, typename underlying_type::type>::type; +template using enum_type_t = typename enable_if< + std::is_enum::value, typename std::underlying_type::type>::type; + template constexpr inline enum_type_t from_enum(E e) noexcept { return (enum_type_t)e; } template constexpr inline E to_enum(enum_type_t value) noexcept { return (E)value; } @@ -117,13 +63,13 @@ template constexpr inline E to_enum(enum_type_t value) noexcept { template struct is_tuple_convertible : false_type {}; template struct is_tuple_convertible()._tuple_(), void()) + decltype(std::declval()._tuple_(), void()) > : true_type {}; // string manipulation template -std::ostream & operator<<(std::ostream & o, const pair & d) { +std::ostream & operator<<(std::ostream & o, const std::pair & d) { return o << "<" << d.first << "," << d.second << ">"; } @@ -140,8 +86,8 @@ implode(const C & v, string delim=" ") { return oss.str(); } -inline vector explode(const string & s, string delim=" ") { - vector out; +inline std::vector explode(const string & s, string delim=" ") { + std::vector out; size_t start = 0, end = 0; while ((end = s.find(delim, start)) != string::npos) { out.push_back(s.substr(start, end - start)); @@ -152,7 +98,9 @@ inline vector explode(const string & s, string delim=" ") { } template -typename enable_if::value && !is_same::value, std::ostream>::type & +typename enable_if< + is_const_iterable::value && + !std::is_same::value, std::ostream>::type & operator<<(std::ostream & o, const A & a) { return o << "[" << implode(a, ", ") << "]"; } @@ -168,8 +116,8 @@ operator<<(std::ostream & o, const A & a) { // }; #define MEMBERS(...) \ -inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \ -inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } +inline auto _tuple_() -> decltype(std::tie(__VA_ARGS__)) { return std::tie(__VA_ARGS__); } \ +inline auto _tuple_() const -> decltype(std::tie(__VA_ARGS__)) { return std::tie(__VA_ARGS__); } // struct ordering and comparison operations; requires the use of MEMBERS. // usage: @@ -184,8 +132,7 @@ LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \ LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \ LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=) -// crucial tool for tuple indexing in variadic templates -// +// Tuple indexing in variadic templates. // This implementation of tuple_indices is redistributed under the MIT // License as an insubstantial portion of the LLVM compiler infrastructure.