From: Peter Iannucci Date: Tue, 17 Sep 2013 01:53:49 +0000 (-0400) Subject: Lots of clean-ups and simplifications X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/dfe8486473094c0769fd1922329c3f0dfd8f43c0 Lots of clean-ups and simplifications --- diff --git a/Makefile b/Makefile index 4a239bc..b160065 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,10 @@ -CXXFLAGS = -g -MMD -Werror -I. -std=c++11 -LDFLAGS = -CXX = g++ -CC = g++ +CXXFLAGS ?= -g -MMD -Werror -I. -std=c++11 +LDFLAGS ?= +CXX ?= g++ +CC ?= g++ +EXTRA_TARGETS ?= -all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest +all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS) rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o rm -f $@ @@ -31,6 +32,6 @@ rsm_tester: $(rsm_tester) rpc/librpc.a -include rpc/*.d clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester -.PHONY: clean +.PHONY: clean $(EXTRA_TARGETS) clean: rm -rf $(clean_files) diff --git a/Makefile.osx b/Makefile.osx new file mode 100644 index 0000000..75f2e2b --- /dev/null +++ b/Makefile.osx @@ -0,0 +1,12 @@ +#PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations +#PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors +CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) +LDFLAGS = -stdlib=libc++ +CXX = clang++ +CC = clang++ +EXTRA_TARGETS = signatures + +socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw +signatures : lock_server lock_tester rpc/rpctest + echo $^ | sudo xargs -n 1 $(socketfilterfw) -s || true + echo $^ | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true diff --git a/handle.cc b/handle.cc index e998b3c..ff38a56 100644 --- a/handle.cc +++ b/handle.cc @@ -25,13 +25,11 @@ handle::safebind() rpcc *cl = new rpcc(dstsock); tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); int ret; - // Starting with lab 6, our test script assumes that the failure - // can be detected by paxos and rsm layer within few seconds. We have - // to set the timeout with a small value to support the assumption. + // The test script assumes that the failure can be detected by paxos and + // rsm layer within few seconds. We have to set the timeout with a small + // value to support the assumption. // - // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of - // lab 6 and lab 7 because the rpc layer may delay your RPC request, - // and cause a time out failure. Please make sure RPC_LOSSY is set to 0. + // With RPC_LOSSY=5, tests may fail due to delays and time outs. ret = cl->bind(rpcc::to(1000)); if (ret < 0) { tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); diff --git a/lock.h b/lock.h index 00d4374..b23f6cf 100644 --- a/lock.h +++ b/lock.h @@ -6,6 +6,7 @@ using std::mutex; using lock = std::unique_lock; +using cond = std::condition_variable; class adopt_lock : public lock { public: diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc index c3f75e8..0e43ec5 100644 --- a/lock_server_cache_rsm.cc +++ b/lock_server_cache_rsm.cc @@ -20,6 +20,10 @@ lock_state::lock_state(): { } +lock_state::lock_state(const lock_state &other) { + *this = other; +} + lock_state& lock_state::operator=(const lock_state& o) { held = o.held; held_by = o.held_by; @@ -34,50 +38,14 @@ ostringstream & operator<<(ostringstream &o, const pair &d) { return o; } -template -marshall & operator<<(marshall &m, const list &d) { - m << vector(d.begin(), d.end()); - return m; -} - -template -unmarshall & operator>>(unmarshall &u, list &d) { - vector v; - u >> v; - d.assign(v.begin(), v.end()); - return u; -} - - -template -marshall & operator<<(marshall &m, const pair &d) { - m << d.first; - m << d.second; - return m; -} - -template -unmarshall & operator>>(unmarshall &u, pair &d) { - u >> d.first; - u >> d.second; - return u; -} - marshall & operator<<(marshall &m, const lock_state &d) { - m << d.held; - m << d.held_by; - m << d.wanted_by; - return m; + return m << d.held << d.held_by << d.wanted_by; } unmarshall & operator>>(unmarshall &u, lock_state &d) { - u >> d.held; - u >> d.held_by; - u >> d.wanted_by; - return u; + return u >> d.held >> d.held_by >> d.wanted_by; } - lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); // by the semantics of map, this will create diff --git a/lock_server_cache_rsm.h b/lock_server_cache_rsm.h index 4a33361..a765e52 100644 --- a/lock_server_cache_rsm.h +++ b/lock_server_cache_rsm.h @@ -23,6 +23,7 @@ typedef pair holder; class lock_state { public: lock_state(); + lock_state(const lock_state &other); bool held; holder held_by; list wanted_by; diff --git a/lock_smain.cc b/lock_smain.cc index c256cbe..69cc433 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -14,8 +14,6 @@ char tprintf_thread_prefix = 's'; int main(int argc, char *argv[]) { - int count = 0; - setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); @@ -26,11 +24,6 @@ main(int argc, char *argv[]) exit(1); } - char *count_env = getenv("RPC_COUNT"); - if(count_env != NULL){ - count = atoi(count_env); - } - rsm rsm(argv[1], argv[2]); lock_server_cache_rsm ls(&rsm); rsm.set_state_transfer((rsm_state_transfer *)&ls); diff --git a/rpc/connection.cc b/rpc/connection.cc index fec7a4c..6e865e8 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -25,7 +25,7 @@ connection::connection(chanmgr *m1, int f1, int l1) signal(SIGPIPE, SIG_IGN); - VERIFY(gettimeofday(&create_time_, NULL) == 0); + create_time_ = std::chrono::steady_clock::now(); PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } @@ -98,13 +98,9 @@ connection::ref() int connection::compare(connection *another) { - if (create_time_.tv_sec > another->create_time_.tv_sec) + if (create_time_ > another->create_time_) return 1; - if (create_time_.tv_sec < another->create_time_.tv_sec) - return -1; - if (create_time_.tv_usec > another->create_time_.tv_usec) - return 1; - if (create_time_.tv_usec < another->create_time_.tv_usec) + if (create_time_ < another->create_time_) return -1; return 0; } @@ -137,9 +133,9 @@ connection::send(char *b, int sz) ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); ml.lock(); - }else{ + } else { if (wpdu_.solong == wpdu_.sz) { - }else{ + } else { //should be rare to need to explicitly add write callback PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { @@ -293,7 +289,7 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) sin.sin_port = htons(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); - if(tcp_ < 0){ + if (tcp_ < 0) { perror("tcpsconn::tcpsconn accept_loop socket:"); VERIFY(0); } @@ -302,12 +298,12 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ + if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { perror("accept_loop tcp bind:"); VERIFY(0); } - if(listen(tcp_, 1000) < 0) { + if (listen(tcp_, 1000) < 0) { perror("tcpsconn::tcpsconn listen:"); VERIFY(0); } @@ -359,21 +355,21 @@ tcpsconn::process_accept() s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); - // garbage collect all dead connections with refcount of 1 - std::map::iterator i; - for (i = conns_.begin(); i != conns_.end();) { - if (i->second->isdead() && i->second->ref() == 1) { - jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", - i->second->channo()); - i->second->decref(); - // Careful not to reuse i right after erase. (i++) will - // be evaluated before the erase call because in C++, - // there is a sequence point before a function call. - // See http://en.wikipedia.org/wiki/Sequence_point. - conns_.erase(i++); - } else - ++i; - } + // garbage collect all dead connections with refcount of 1 + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end();) { + if (i->second->isdead() && i->second->ref() == 1) { + jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", + i->second->channo()); + i->second->decref(); + // Careful not to reuse i right after erase. (i++) will + // be evaluated before the erase call because in C++, + // there is a sequence point before a function call. + // See http://en.wikipedia.org/wiki/Sequence_point. + conns_.erase(i++); + } else + ++i; + } conns_[ch->channo()] = ch; } @@ -385,7 +381,6 @@ tcpsconn::accept_conn() int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; try { - while (1) { FD_ZERO(&rfds); FD_SET(pipe_[0], &rfds); @@ -417,17 +412,16 @@ tcpsconn::accept_conn() } catch (thread_exit_exception e) { - return; } } connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { - int s= socket(AF_INET, SOCK_STREAM, 0); + int s = socket(AF_INET, SOCK_STREAM, 0); int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); close(s); diff --git a/rpc/connection.h b/rpc/connection.h index 1ef4470..16b0398 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -61,7 +61,7 @@ class connection : public aio_callback { charbuf wpdu_; charbuf rpdu_; - struct timeval create_time_; + std::chrono::time_point create_time_; int waiters_; int refno_; diff --git a/rpc/fifo.h b/rpc/fifo.h index d190c26..93a79cf 100644 --- a/rpc/fifo.h +++ b/rpc/fifo.h @@ -1,60 +1,39 @@ #ifndef fifo_h #define fifo_h -// fifo template -// blocks enq() and deq() when queue is FULL or EMPTY - -#include #include -#include -#include -#include -#include "lang/verify.h" #include "lock.h" +// blocks enq() and deq() when queue is FULL or EMPTY template class fifo { public: - fifo(int m=0); + fifo(int limit=0) : max_(limit) {}; bool enq(T, bool blocking=true); void deq(T *); - bool size(); + bool size() { + lock ml(m_); + return q_.size(); + }; private: std::list q_; mutex m_; - std::condition_variable non_empty_c_; // q went non-empty - std::condition_variable has_space_c_; // q is not longer overfull - unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit + cond non_empty_c_; // q went non-empty + cond has_space_c_; // q is not longer overfull + unsigned int max_; // maximum capacity of the queue, block enq threads if exceeds this limit }; -template -fifo::fifo(int limit) : max_(limit) -{ -} - -template bool -fifo::size() -{ - lock ml(m_); - return q_.size(); -} - template bool fifo::enq(T e, bool blocking) { lock ml(m_); - while (1) { - if (!max_ || q_.size() < max_) { - q_.push_back(e); - break; - } - if (blocking) { - has_space_c_.wait(ml); - } - else + while (max_ && q_.size() >= max_) { + if (!blocking) return false; + has_space_c_.wait(ml); } + q_.push_back(e); non_empty_c_.notify_one(); return true; } @@ -63,20 +42,12 @@ template void fifo::deq(T *e) { lock ml(m_); - - while(1) { - if(q_.empty()){ - non_empty_c_.wait(ml); - } else { - *e = q_.front(); - q_.pop_front(); - if (max_ && q_.size() < max_) { - has_space_c_.notify_one(); - } - break; - } - } - return; + while(q_.empty()) + non_empty_c_.wait(ml); + *e = q_.front(); + q_.pop_front(); + if (max_ && q_.size() < max_) + has_space_c_.notify_one(); } #endif diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc index de02fc2..9399b09 100644 --- a/rpc/jsl_log.cc +++ b/rpc/jsl_log.cc @@ -1,9 +1 @@ -#include "jsl_log.h" - int JSL_DEBUG_LEVEL = 0; -void -jsl_set_debug(int level) { - JSL_DEBUG_LEVEL = level; -} - - diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h index c6ea812..66a2dd3 100644 --- a/rpc/jsl_log.h +++ b/rpc/jsl_log.h @@ -13,6 +13,4 @@ extern int JSL_DEBUG_LEVEL; #define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);} -void jsl_set_debug(int level); - #endif diff --git a/rpc/marshall.h b/rpc/marshall.h index 644a220..fcb5bab 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -28,18 +28,11 @@ struct reply_header { int ret; }; -typedef uint64_t rpc_checksum_t; typedef int rpc_sz_t; //size of initial buffer allocation #define DEFAULT_RPC_SZ 1024 -#define RPC_HEADER_SZ_NO_CHECKSUM (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) -#if RPC_CHECKSUMMING -//size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum -#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM + sizeof(rpc_checksum_t)) -#else -#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM) -#endif +#define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) class marshall { private: @@ -82,9 +75,6 @@ class marshall { int saved_sz = _ind; //leave the first 4-byte empty for channel to fill size of pdu _ind = sizeof(rpc_sz_t); -#if RPC_CHECKSUMMING - _ind += sizeof(rpc_checksum_t); -#endif pack(h.xid); pack(h.proc); pack((int)h.clt_nonce); @@ -97,9 +87,6 @@ class marshall { int saved_sz = _ind; //leave the first 4-byte empty for channel to fill size of pdu _ind = sizeof(rpc_sz_t); -#if RPC_CHECKSUMMING - _ind += sizeof(rpc_checksum_t); -#endif pack(h.xid); pack(h.ret); _ind = saved_sz; @@ -113,6 +100,7 @@ class marshall { return; } }; + marshall& operator<<(marshall &, bool); marshall& operator<<(marshall &, unsigned int); marshall& operator<<(marshall &, int); @@ -123,6 +111,40 @@ marshall& operator<<(marshall &, short); marshall& operator<<(marshall &, unsigned long long); marshall& operator<<(marshall &, const std::string &); +template marshall & +operator<<(marshall &m, std::vector v) +{ + m << (unsigned int) v.size(); + for(unsigned i = 0; i < v.size(); i++) + m << v[i]; + return m; +} + +template marshall & +operator<<(marshall &m, const std::map &d) { + typename std::map::const_iterator i; + + m << (unsigned int) d.size(); + + for (i = d.begin(); i != d.end(); i++) { + m << i->first << i->second; + } + return m; +} + +template marshall & +operator<<(marshall &m, const std::list &d) { + m << std::vector(d.begin(), d.end()); + return m; +} + +template marshall & +operator<<(marshall &m, const std::pair &d) { + m << d.first; + m << d.second; + return m; +} + class unmarshall { private: char *_buf; @@ -173,9 +195,6 @@ class unmarshall { void unpack_req_header(req_header *h) { //the first 4-byte is for channel to fill size of pdu _ind = sizeof(rpc_sz_t); -#if RPC_CHECKSUMMING - _ind += sizeof(rpc_checksum_t); -#endif unpack(&h->xid); unpack(&h->proc); unpack((int *)&h->clt_nonce); @@ -187,9 +206,6 @@ class unmarshall { void unpack_reply_header(reply_header *h) { //the first 4-byte is for channel to fill size of pdu _ind = sizeof(rpc_sz_t); -#if RPC_CHECKSUMMING - _ind += sizeof(rpc_checksum_t); -#endif unpack(&h->xid); unpack(&h->ret); _ind = RPC_HEADER_SZ; @@ -206,54 +222,50 @@ unmarshall& operator>>(unmarshall &, int &); unmarshall& operator>>(unmarshall &, unsigned long long &); unmarshall& operator>>(unmarshall &, std::string &); -template marshall & -operator<<(marshall &m, std::vector v) -{ - m << (unsigned int) v.size(); - for(unsigned i = 0; i < v.size(); i++) - m << v[i]; - return m; -} - template unmarshall & operator>>(unmarshall &u, std::vector &v) { unsigned n; u >> n; - for(unsigned i = 0; i < n; i++){ - C z; - u >> z; - v.push_back(z); - } + v.clear(); + while (n--) { + C c; + u >> c; + v.push_back(c); + } return u; } -template marshall & -operator<<(marshall &m, const std::map &d) { - typename std::map::const_iterator i; - - m << (unsigned int) d.size(); - - for (i = d.begin(); i != d.end(); i++) { - m << i->first << i->second; - } - return m; -} - template unmarshall & operator>>(unmarshall &u, std::map &d) { - unsigned int n; + unsigned n; u >> n; - d.clear(); - - for (unsigned int lcv = 0; lcv < n; lcv++) { - A a; - B b; - u >> a >> b; - d[a] = b; - } + while (n--) { + A a; + B b; + u >> a >> b; + d[a] = b; + } return u; } +template unmarshall & +operator>>(unmarshall &u, std::list &l) { + unsigned n; + u >> n; + l.clear(); + while (n--) { + C c; + u >> c; + l.push_back(c); + } + return u; +} + +template unmarshall & +operator>>(unmarshall &u, std::pair &d) { + return u >> d.first >> d.second; +} + #endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 5de3984..d53776a 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -424,7 +424,7 @@ rpcs::got_pdu(connection *c, char *b, int sz) djob_t *j = new djob_t(c, b, sz); c->incref(); - bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j); + bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j)); if(!succ || !reachable_){ c->decref(); delete j; diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index d90e494..115f484 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -199,24 +199,6 @@ simple_tests(rpcc *c) VERIFY(rep.size() == 70000); printf(" -- small request, big reply .. ok\n"); -#if 0 - // too few arguments - intret = c->call(22, (std::string)"just one", rep); - VERIFY(intret < 0); - printf(" -- too few arguments .. failed ok\n"); - - // too many arguments; proc #23 expects just one. - intret = c->call(23, 1001, 1002, rep); - VERIFY(intret < 0); - printf(" -- too many arguments .. failed ok\n"); - - // wrong return value size - int wrongrep; - intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep); - VERIFY(intret < 0); - printf(" -- wrong ret value size .. failed ok\n"); -#endif - // specify a timeout value to an RPC that should succeed (udp) int xx = 0; intret = c->call(23, 77, xx, rpcc::to(3000)); @@ -412,8 +394,7 @@ main(int argc, char *argv[]) } if (debug_level > 0) { - //__loginit.initNow(); - jsl_set_debug(debug_level); + JSL_DEBUG_LEVEL = debug_level; jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); } diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index 26226cd..146764f 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -3,58 +3,40 @@ #include #include "lang/verify.h" -static void -do_worker(void *arg) -{ - ThrPool *tp = (ThrPool *)arg; - while (1) { - ThrPool::job_t j; - if (!tp->takeJob(&j)) - break; //die - - (void)(j.f)(j.a); - } -} - -//if blocking, then addJob() blocks when queue is full -//otherwise, addJob() simply returns false when queue is full +// if blocking, then addJob() blocks when queue is full +// otherwise, addJob() simply returns false when queue is full ThrPool::ThrPool(int sz, bool blocking) : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) { - for (int i = 0; i < sz; i++) { - th_.push_back(std::thread(do_worker, this)); - } + for (int i=0; if!=NULL); + job_t j; + while (1) { + jobq_.deq(&j); + if (!j) + break; + j(); + } } - diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h index 4427aee..2a1a749 100644 --- a/rpc/thr_pool.h +++ b/rpc/thr_pool.h @@ -6,57 +6,23 @@ #include "fifo.h" +typedef std::function job_t; + class ThrPool { public: - struct job_t { - void (*f)(void *); //function point - void *a; //function arguments - }; - ThrPool(int sz, bool blocking=true); ~ThrPool(); - template bool addObjJob(C *o, void (C::*m)(A), A a); - void waitDone(); - bool takeJob(job_t *j); + bool addJob(const job_t &j); private: int nthreads_; bool blockadd_; - fifo jobq_; std::vector th_; - bool addJob(void (*f)(void *), void *a); + void do_worker(); }; -template bool -ThrPool::addObjJob(C *o, void (C::*m)(A), A a) -{ - - class objfunc_wrapper { - public: - C *o; - void (C::*m)(A a); - A a; - static void func(void *vvv) { - objfunc_wrapper *x = (objfunc_wrapper*)vvv; - C *o = x->o; - void (C::*m)(A ) = x->m; - A a = x->a; - (o->*m)(a); - delete x; - } - }; - - objfunc_wrapper *x = new objfunc_wrapper; - x->o = o; - x->m = m; - x->a = a; - return addJob(&objfunc_wrapper::func, (void *)x); -} - - #endif - diff --git a/rsm.cc b/rsm.cc index 478e0e0..b93c701 100644 --- a/rsm.cc +++ b/rsm.cc @@ -95,8 +95,6 @@ rsm::rsm(std::string _first, std::string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - std::thread th; - last_myvs.vid = 0; last_myvs.seqno = 0; myvs = last_myvs; @@ -124,7 +122,7 @@ rsm::rsm(std::string _first, std::string _me) : { lock ml(rsm_mutex); - th = std::thread(&rsm::recovery, this); + std::thread(&rsm::recovery, this).detach(); } } @@ -232,7 +230,7 @@ bool rsm::statetransfer(std::string m) { rsm_protocol::transferres r; handle h(m); - int ret; + int ret = 0; tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n", m.c_str(), last_myvs.vid, last_myvs.seqno); rpcc *cl; @@ -278,7 +276,7 @@ bool rsm::statetransferdone(std::string m) { bool rsm::join(std::string m) { handle h(m); - int ret; + int ret = 0; rsm_protocol::joinres r; tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid, diff --git a/rsm_client.cc b/rsm_client.cc index 9beb0b3..eed356f 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -29,7 +29,7 @@ void rsm_client::primary_failure() { } rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) { - int ret; + int ret = 0; lock ml(rsm_client_mutex); while (1) { printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str()); diff --git a/rsm_tester.pl b/rsm_tester.pl index 7018164..7f73505 100755 --- a/rsm_tester.pl +++ b/rsm_tester.pl @@ -43,9 +43,6 @@ sub spawn { if (my $pid = fork) { # parent push( @logs, "$p-$aa.log" ); - if( $p =~ /config_server/ ) { - push( @logs, paxos_log($a[1]) ); - } if( $p =~ /lock_server/ ) { push( @logs, paxos_log($a[1]) ); } @@ -91,12 +88,6 @@ sub spawn_ls { return spawn( "./lock_server", $master, $port ); } -sub spawn_config { - my $master = shift; - my $port = shift; - return spawn( "./config_server", $master, $port ); -} - sub check_views { my $l = shift; @@ -232,9 +223,6 @@ sub start_nodes ($$){ if ($command eq "ls") { @pid = (@pid, spawn_ls($p[0],$p[$i])); print "Start lock_server on $p[$i]\n"; - }elsif ($command eq "config_server"){ - @pid = (@pid, spawn_config($p[0],$p[$i])); - print "Start config on $p[$i]\n"; } sleep 1;