Lots of clean-ups and simplifications
authorPeter Iannucci <iannucci@mit.edu>
Tue, 17 Sep 2013 01:53:49 +0000 (21:53 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 25 Sep 2013 01:45:10 +0000 (21:45 -0400)
20 files changed:
Makefile
Makefile.osx [new file with mode: 0644]
handle.cc
lock.h
lock_server_cache_rsm.cc
lock_server_cache_rsm.h
lock_smain.cc
rpc/connection.cc
rpc/connection.h
rpc/fifo.h
rpc/jsl_log.cc
rpc/jsl_log.h
rpc/marshall.h
rpc/rpc.cc
rpc/rpctest.cc
rpc/thr_pool.cc
rpc/thr_pool.h
rsm.cc
rsm_client.cc
rsm_tester.pl

index 4a239bc..b160065 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,9 +1,10 @@
-CXXFLAGS = -g -MMD -Werror -I. -std=c++11
-LDFLAGS = 
-CXX = g++
-CC = g++
+CXXFLAGS ?= -g -MMD -Werror -I. -std=c++11
+LDFLAGS ?= 
+CXX ?= g++
+CC ?= g++
+EXTRA_TARGETS ?=
 
-all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest
+all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
 
 rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o
        rm -f $@
@@ -31,6 +32,6 @@ rsm_tester: $(rsm_tester) rpc/librpc.a
 -include rpc/*.d
 
 clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester
-.PHONY: clean
+.PHONY: clean $(EXTRA_TARGETS)
 clean: 
        rm -rf $(clean_files)
diff --git a/Makefile.osx b/Makefile.osx
new file mode 100644 (file)
index 0000000..75f2e2b
--- /dev/null
@@ -0,0 +1,12 @@
+#PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations
+#PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors
+CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY)
+LDFLAGS = -stdlib=libc++
+CXX = clang++
+CC = clang++
+EXTRA_TARGETS = signatures
+
+socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw
+signatures : lock_server lock_tester rpc/rpctest
+       echo $^ | sudo xargs -n 1 $(socketfilterfw) -s || true
+       echo $^ | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true
index e998b3c..ff38a56 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -25,13 +25,11 @@ handle::safebind()
     rpcc *cl = new rpcc(dstsock);
     tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
     int ret;
-    // Starting with lab 6, our test script assumes that the failure
-    // can be detected by paxos and rsm layer within few seconds. We have
-    // to set the timeout with a small value to support the assumption.
+    // The test script assumes that the failure can be detected by paxos and
+    // rsm layer within few seconds. We have to set the timeout with a small
+    // value to support the assumption.
     // 
-    // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
-    // lab 6 and lab 7 because the rpc layer may delay your RPC request, 
-    // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
+    // With RPC_LOSSY=5, tests may fail due to delays and time outs.
     ret = cl->bind(rpcc::to(1000));
     if (ret < 0) {
         tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
diff --git a/lock.h b/lock.h
index 00d4374..b23f6cf 100644 (file)
--- a/lock.h
+++ b/lock.h
@@ -6,6 +6,7 @@
 
 using std::mutex;
 using lock = std::unique_lock<std::mutex>;
+using cond = std::condition_variable;
 
 class adopt_lock : public lock {
 public:
index c3f75e8..0e43ec5 100644 (file)
@@ -20,6 +20,10 @@ lock_state::lock_state():
 {
 }
 
+lock_state::lock_state(const lock_state &other) {
+    *this = other;
+}
+
 lock_state& lock_state::operator=(const lock_state& o) {
     held = o.held;
     held_by = o.held_by;
@@ -34,50 +38,14 @@ ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
     return o;
 }
 
-template <class A>
-marshall & operator<<(marshall &m, const list<A> &d) {
-    m << vector<A>(d.begin(), d.end());
-    return m;
-}
-
-template <class A>
-unmarshall & operator>>(unmarshall &u, list<A> &d) {
-    vector<A> v;
-    u >> v;
-    d.assign(v.begin(), v.end());
-    return u;
-}
-
-
-template <class A, class B>
-marshall & operator<<(marshall &m, const pair<A,B> &d) {
-    m << d.first;
-    m << d.second;
-    return m;
-}
-
-template <class A, class B>
-unmarshall & operator>>(unmarshall &u, pair<A,B> &d) {
-    u >> d.first;
-    u >> d.second;
-    return u;
-}
-
 marshall & operator<<(marshall &m, const lock_state &d) {
-    m << d.held;
-    m << d.held_by;
-    m << d.wanted_by;
-       return m;
+       return m << d.held << d.held_by << d.wanted_by;
 }
 
 unmarshall & operator>>(unmarshall &u, lock_state &d) {
-    u >> d.held;
-    u >> d.held_by;
-    u >> d.wanted_by;
-       return u;
+       return u >> d.held >> d.held_by >> d.wanted_by;
 }
 
-
 lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
     // by the semantics of map, this will create
index 4a33361..a765e52 100644 (file)
@@ -23,6 +23,7 @@ typedef pair<callback, lock_protocol::xid_t> holder;
 class lock_state {
 public:
     lock_state();
+    lock_state(const lock_state &other);
     bool held;
     holder held_by;
     list<holder> wanted_by;
index c256cbe..69cc433 100644 (file)
@@ -14,8 +14,6 @@ char tprintf_thread_prefix = 's';
 int
 main(int argc, char *argv[])
 {
-    int count = 0;
-
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
 
@@ -26,11 +24,6 @@ main(int argc, char *argv[])
         exit(1);
     }
 
-    char *count_env = getenv("RPC_COUNT");
-    if(count_env != NULL){
-        count = atoi(count_env);
-    }
-
     rsm rsm(argv[1], argv[2]);
     lock_server_cache_rsm ls(&rsm);
     rsm.set_state_transfer((rsm_state_transfer *)&ls);
index fec7a4c..6e865e8 100644 (file)
@@ -25,7 +25,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
 
     signal(SIGPIPE, SIG_IGN);
 
-    VERIFY(gettimeofday(&create_time_, NULL) == 0);
+    create_time_ = std::chrono::steady_clock::now();
 
     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
 }
@@ -98,13 +98,9 @@ connection::ref()
 int
 connection::compare(connection *another)
 {
-    if (create_time_.tv_sec > another->create_time_.tv_sec)
+    if (create_time_ > another->create_time_)
         return 1;
-    if (create_time_.tv_sec < another->create_time_.tv_sec)
-        return -1;
-    if (create_time_.tv_usec > another->create_time_.tv_usec)
-        return 1;
-    if (create_time_.tv_usec < another->create_time_.tv_usec)
+    if (create_time_ < another->create_time_)
         return -1;
     return 0;
 }
@@ -137,9 +133,9 @@ connection::send(char *b, int sz)
         ml.unlock();
                PollMgr::Instance()->block_remove_fd(fd_);
         ml.lock();
-       }else{
+       } else {
                if (wpdu_.solong == wpdu_.sz) {
-               }else{
+               } else {
                        //should be rare to need to explicitly add write callback
                        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
                        while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
@@ -293,7 +289,7 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
        sin.sin_port = htons(port);
 
        tcp_ = socket(AF_INET, SOCK_STREAM, 0);
-       if(tcp_ < 0){
+       if (tcp_ < 0) {
                perror("tcpsconn::tcpsconn accept_loop socket:");
                VERIFY(0);
        }
@@ -302,12 +298,12 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
        setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
        setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
 
-       if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){
+       if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
                perror("accept_loop tcp bind:");
                VERIFY(0);
        }
 
-       if(listen(tcp_, 1000) < 0) {
+       if (listen(tcp_, 1000) < 0) {
                perror("tcpsconn::tcpsconn listen:");
                VERIFY(0);
        }
@@ -359,21 +355,21 @@ tcpsconn::process_accept()
                        s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
        connection *ch = new connection(mgr_, s1, lossy_);
 
-        // garbage collect all dead connections with refcount of 1
-        std::map<int, connection *>::iterator i;
-        for (i = conns_.begin(); i != conns_.end();) {
-                if (i->second->isdead() && i->second->ref() == 1) {
-                       jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
-                                       i->second->channo());
-                        i->second->decref();
-                        // Careful not to reuse i right after erase. (i++) will
-                        // be evaluated before the erase call because in C++,
-                        // there is a sequence point before a function call.
-                        // See http://en.wikipedia.org/wiki/Sequence_point.
-                        conns_.erase(i++);
-                } else
-                        ++i;
-        }
+    // garbage collect all dead connections with refcount of 1
+    std::map<int, connection *>::iterator i;
+    for (i = conns_.begin(); i != conns_.end();) {
+        if (i->second->isdead() && i->second->ref() == 1) {
+            jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
+                    i->second->channo());
+            i->second->decref();
+            // Careful not to reuse i right after erase. (i++) will
+            // be evaluated before the erase call because in C++,
+            // there is a sequence point before a function call.
+            // See http://en.wikipedia.org/wiki/Sequence_point.
+            conns_.erase(i++);
+        } else
+            ++i;
+    }
 
        conns_[ch->channo()] = ch;
 }
@@ -385,7 +381,6 @@ tcpsconn::accept_conn()
        int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
 
     try {
-
         while (1) {
             FD_ZERO(&rfds);
             FD_SET(pipe_[0], &rfds);
@@ -417,17 +412,16 @@ tcpsconn::accept_conn()
     }
     catch (thread_exit_exception e)
     {
-        return;
     }
 }
 
 connection *
 connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
 {
-       int s= socket(AF_INET, SOCK_STREAM, 0);
+       int s = socket(AF_INET, SOCK_STREAM, 0);
        int yes = 1;
        setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
-       if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+       if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
                jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
                                inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
                close(s);
index 1ef4470..16b0398 100644 (file)
@@ -61,7 +61,7 @@ class connection : public aio_callback {
         charbuf wpdu_;
         charbuf rpdu_;
 
-        struct timeval create_time_;
+        std::chrono::time_point<std::chrono::steady_clock> create_time_;
 
         int waiters_;
         int refno_;
index d190c26..93a79cf 100644 (file)
@@ -1,60 +1,39 @@
 #ifndef fifo_h
 #define fifo_h
 
-// fifo template
-// blocks enq() and deq() when queue is FULL or EMPTY
-
-#include <errno.h>
 #include <list>
-#include <sys/time.h>
-#include <time.h>
-#include <errno.h>
-#include "lang/verify.h"
 #include "lock.h"
 
+// blocks enq() and deq() when queue is FULL or EMPTY
 template<class T>
 class fifo {
        public:
-               fifo(int m=0);
+               fifo(int limit=0) : max_(limit) {};
                bool enq(T, bool blocking=true);
                void deq(T *);
-               bool size();
+               bool size() {
+            lock ml(m_);
+            return q_.size();
+        };
 
        private:
                std::list<T> q_;
         mutex m_;
-        std::condition_variable non_empty_c_; // q went non-empty
-               std::condition_variable has_space_c_; // q is not longer overfull
-               unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit
+        cond non_empty_c_; // q went non-empty
+               cond has_space_c_; // q is not longer overfull
+               unsigned int max_; // maximum capacity of the queue, block enq threads if exceeds this limit
 };
 
-template<class T>
-fifo<T>::fifo(int limit) : max_(limit)
-{
-}
-
-template<class T> bool
-fifo<T>::size()
-{
-    lock ml(m_);
-       return q_.size();
-}
-
 template<class T> bool
 fifo<T>::enq(T e, bool blocking)
 {
     lock ml(m_);
-       while (1) {
-               if (!max_ || q_.size() < max_) {
-                       q_.push_back(e);
-                       break;
-               }
-               if (blocking) {
-            has_space_c_.wait(ml);
-        }
-               else
+       while (max_ && q_.size() >= max_) {
+               if (!blocking)
                        return false;
+        has_space_c_.wait(ml);
        }
+    q_.push_back(e);
     non_empty_c_.notify_one();
        return true;
 }
@@ -63,20 +42,12 @@ template<class T> void
 fifo<T>::deq(T *e)
 {
        lock ml(m_);
-
-       while(1) {
-               if(q_.empty()){
-            non_empty_c_.wait(ml);
-               } else {
-                       *e = q_.front();
-                       q_.pop_front();
-                       if (max_ && q_.size() < max_) {
-                has_space_c_.notify_one();
-                       }
-                       break;
-               }
-       }
-       return;
+       while(q_.empty())
+        non_empty_c_.wait(ml);
+    *e = q_.front();
+    q_.pop_front();
+    if (max_ && q_.size() < max_)
+        has_space_c_.notify_one();
 }
 
 #endif
index de02fc2..9399b09 100644 (file)
@@ -1,9 +1 @@
-#include "jsl_log.h"
-
 int JSL_DEBUG_LEVEL = 0;
-void
-jsl_set_debug(int level) {
-    JSL_DEBUG_LEVEL = level;
-}
-
-
index c6ea812..66a2dd3 100644 (file)
@@ -13,6 +13,4 @@ extern int JSL_DEBUG_LEVEL;
 
 #define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);}
 
-void jsl_set_debug(int level);
-
 #endif
index 644a220..fcb5bab 100644 (file)
@@ -28,18 +28,11 @@ struct reply_header {
        int ret;
 };
 
-typedef uint64_t rpc_checksum_t;
 typedef int rpc_sz_t;
 
 //size of initial buffer allocation 
 #define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ_NO_CHECKSUM (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
-#if RPC_CHECKSUMMING
-//size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
-#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM + sizeof(rpc_checksum_t))
-#else
-#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM)
-#endif
+#define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
 
 class marshall {
        private:
@@ -82,9 +75,6 @@ class marshall {
                        int saved_sz = _ind;
                        //leave the first 4-byte empty for channel to fill size of pdu
                        _ind = sizeof(rpc_sz_t); 
-#if RPC_CHECKSUMMING
-                       _ind += sizeof(rpc_checksum_t);
-#endif
                        pack(h.xid);
                        pack(h.proc);
                        pack((int)h.clt_nonce);
@@ -97,9 +87,6 @@ class marshall {
                        int saved_sz = _ind;
                        //leave the first 4-byte empty for channel to fill size of pdu
                        _ind = sizeof(rpc_sz_t); 
-#if RPC_CHECKSUMMING
-                       _ind += sizeof(rpc_checksum_t);
-#endif
                        pack(h.xid);
                        pack(h.ret);
                        _ind = saved_sz;
@@ -113,6 +100,7 @@ class marshall {
                        return;
                }
 };
+
 marshall& operator<<(marshall &, bool);
 marshall& operator<<(marshall &, unsigned int);
 marshall& operator<<(marshall &, int);
@@ -123,6 +111,40 @@ marshall& operator<<(marshall &, short);
 marshall& operator<<(marshall &, unsigned long long);
 marshall& operator<<(marshall &, const std::string &);
 
+template <class C> marshall &
+operator<<(marshall &m, std::vector<C> v)
+{
+       m << (unsigned int) v.size();
+       for(unsigned i = 0; i < v.size(); i++)
+               m << v[i];
+       return m;
+}
+
+template <class A, class B> marshall &
+operator<<(marshall &m, const std::map<A,B> &d) {
+       typename std::map<A,B>::const_iterator i;
+
+       m << (unsigned int) d.size();
+
+       for (i = d.begin(); i != d.end(); i++) {
+               m << i->first << i->second;
+       }
+       return m;
+}
+
+template <class A> marshall &
+operator<<(marshall &m, const std::list<A> &d) {
+    m << std::vector<A>(d.begin(), d.end());
+    return m;
+}
+
+template <class A, class B> marshall &
+operator<<(marshall &m, const std::pair<A,B> &d) {
+    m << d.first;
+    m << d.second;
+    return m;
+}
+
 class unmarshall {
        private:
                char *_buf;
@@ -173,9 +195,6 @@ class unmarshall {
                void unpack_req_header(req_header *h) {
                        //the first 4-byte is for channel to fill size of pdu
                        _ind = sizeof(rpc_sz_t); 
-#if RPC_CHECKSUMMING
-                       _ind += sizeof(rpc_checksum_t);
-#endif
                        unpack(&h->xid);
                        unpack(&h->proc);
                        unpack((int *)&h->clt_nonce);
@@ -187,9 +206,6 @@ class unmarshall {
                void unpack_reply_header(reply_header *h) {
                        //the first 4-byte is for channel to fill size of pdu
                        _ind = sizeof(rpc_sz_t); 
-#if RPC_CHECKSUMMING
-                       _ind += sizeof(rpc_checksum_t);
-#endif
                        unpack(&h->xid);
                        unpack(&h->ret);
                        _ind = RPC_HEADER_SZ;
@@ -206,54 +222,50 @@ unmarshall& operator>>(unmarshall &, int &);
 unmarshall& operator>>(unmarshall &, unsigned long long &);
 unmarshall& operator>>(unmarshall &, std::string &);
 
-template <class C> marshall &
-operator<<(marshall &m, std::vector<C> v)
-{
-       m << (unsigned int) v.size();
-       for(unsigned i = 0; i < v.size(); i++)
-               m << v[i];
-       return m;
-}
-
 template <class C> unmarshall &
 operator>>(unmarshall &u, std::vector<C> &v)
 {
        unsigned n;
        u >> n;
-       for(unsigned i = 0; i < n; i++){
-               C z;
-               u >> z;
-               v.push_back(z);
-       }
+    v.clear();
+    while (n--) {
+        C c;
+        u >> c;
+        v.push_back(c);
+    }
        return u;
 }
 
-template <class A, class B> marshall &
-operator<<(marshall &m, const std::map<A,B> &d) {
-       typename std::map<A,B>::const_iterator i;
-
-       m << (unsigned int) d.size();
-
-       for (i = d.begin(); i != d.end(); i++) {
-               m << i->first << i->second;
-       }
-       return m;
-}
-
 template <class A, class B> unmarshall &
 operator>>(unmarshall &u, std::map<A,B> &d) {
-       unsigned int n;
+       unsigned n;
        u >> n;
-
        d.clear();
-
-       for (unsigned int lcv = 0; lcv < n; lcv++) {
-               A a;
-               B b;
-               u >> a >> b;
-               d[a] = b;
-       }
+    while (n--) {
+        A a;
+        B b;
+        u >> a >> b;
+        d[a] = b;
+    }
        return u;
 }
 
+template <class C> unmarshall &
+operator>>(unmarshall &u, std::list<C> &l) {
+    unsigned n;
+    u >> n;
+    l.clear();
+    while (n--) {
+        C c;
+        u >> c;
+        l.push_back(c);
+    }
+    return u;
+}
+
+template <class A, class B> unmarshall &
+operator>>(unmarshall &u, std::pair<A,B> &d) {
+    return u >> d.first >> d.second;
+}
+
 #endif
index 5de3984..d53776a 100644 (file)
@@ -424,7 +424,7 @@ rpcs::got_pdu(connection *c, char *b, int sz)
 
     djob_t *j = new djob_t(c, b, sz);
     c->incref();
-    bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
+    bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
     if(!succ || !reachable_){
         c->decref();
         delete j;
index d90e494..115f484 100644 (file)
@@ -199,24 +199,6 @@ simple_tests(rpcc *c)
        VERIFY(rep.size() == 70000);
        printf("   -- small request, big reply .. ok\n");
 
-#if 0
-       // too few arguments
-       intret = c->call(22, (std::string)"just one", rep);
-       VERIFY(intret < 0);
-       printf("   -- too few arguments .. failed ok\n");
-
-       // too many arguments; proc #23 expects just one.
-       intret = c->call(23, 1001, 1002, rep);
-       VERIFY(intret < 0);
-       printf("   -- too many arguments .. failed ok\n");
-
-       // wrong return value size
-       int wrongrep;
-       intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
-       VERIFY(intret < 0);
-       printf("   -- wrong ret value size .. failed ok\n");
-#endif
-
        // specify a timeout value to an RPC that should succeed (udp)
        int xx = 0;
        intret = c->call(23, 77, xx, rpcc::to(3000));
@@ -412,8 +394,7 @@ main(int argc, char *argv[])
        }
 
        if (debug_level > 0) {
-               //__loginit.initNow();
-               jsl_set_debug(debug_level);
+               JSL_DEBUG_LEVEL = debug_level;
                jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
        }
 
index 26226cd..146764f 100644 (file)
@@ -3,58 +3,40 @@
 #include <errno.h>
 #include "lang/verify.h"
 
-static void
-do_worker(void *arg)
-{
-       ThrPool *tp = (ThrPool *)arg;
-       while (1) {
-               ThrPool::job_t j;
-               if (!tp->takeJob(&j))
-                       break; //die
-
-               (void)(j.f)(j.a);
-       }
-}
-
-//if blocking, then addJob() blocks when queue is full
-//otherwise, addJob() simply returns false when queue is full
+// if blocking, then addJob() blocks when queue is full
+// otherwise, addJob() simply returns false when queue is full
 ThrPool::ThrPool(int sz, bool blocking)
 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
 {
-       for (int i = 0; i < sz; i++) {
-        th_.push_back(std::thread(do_worker, this));
-       }
+       for (int i=0; i<nthreads_; i++)
+        th_.push_back(std::thread(&ThrPool::do_worker, this));
 }
 
-//IMPORTANT: this function can be called only when no external thread 
-//will ever use this thread pool again or is currently blocking on it
+// IMPORTANT: this function can be called only when no external thread 
+// will ever use this thread pool again or is currently blocking on it
 ThrPool::~ThrPool()
 {
-       for (int i = 0; i < nthreads_; i++) {
-               job_t j;
-               j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit
-               jobq_.enq(j);
-       }
+       for (int i=0; i<nthreads_; i++)
+               jobq_.enq(job_t());
 
-       for (int i = 0; i < nthreads_; i++) {
+       for (int i=0; i<nthreads_; i++)
         th_[i].join();
-       }
 }
 
 bool 
-ThrPool::addJob(void (*f)(void *), void *a)
+ThrPool::addJob(const job_t &j)
 {
-       job_t j;
-       j.f = f;
-       j.a = a;
-
        return jobq_.enq(j,blockadd_);
 }
 
-bool 
-ThrPool::takeJob(job_t *j)
+void
+ThrPool::do_worker()
 {
-       jobq_.deq(j);
-       return (j->f!=NULL);
+    job_t j;
+       while (1) {
+        jobq_.deq(&j);
+               if (!j)
+                       break;
+               j();
+       }
 }
-
index 4427aee..2a1a749 100644 (file)
@@ -6,57 +6,23 @@
 
 #include "fifo.h"
 
+typedef std::function<void()> job_t;
+
 class ThrPool {
        public:
-               struct job_t {
-                       void (*f)(void *); //function point
-                       void *a; //function arguments
-               };
-
                ThrPool(int sz, bool blocking=true);
                ~ThrPool();
-               template<class C, class A> bool addObjJob(C *o, void (C::*m)(A), A a);
-               void waitDone();
 
-               bool takeJob(job_t *j);
+               bool addJob(const job_t &j);
 
        private:
                int nthreads_;
                bool blockadd_;
 
-
                fifo<job_t> jobq_;
                std::vector<std::thread> th_;
 
-               bool addJob(void (*f)(void *), void *a);
+        void do_worker();
 };
 
-template <class C, class A> bool 
-ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
-{
-
-       class objfunc_wrapper {
-               public:
-                       C *o;
-                       void (C::*m)(A a);
-                       A a;
-                       static void func(void *vvv) {
-                               objfunc_wrapper *x = (objfunc_wrapper*)vvv;
-                               C *o = x->o;
-                               void (C::*m)(A ) = x->m;
-                               A a = x->a;
-                               (o->*m)(a);
-                               delete x;
-                       }
-       };
-
-       objfunc_wrapper *x = new objfunc_wrapper;
-       x->o = o;
-       x->m = m;
-       x->a = a;
-       return addJob(&objfunc_wrapper::func, (void *)x);
-}
-
-
 #endif
-
diff --git a/rsm.cc b/rsm.cc
index 478e0e0..b93c701 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -95,8 +95,6 @@ rsm::rsm(std::string _first, std::string _me) :
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
     partitioned (false), dopartition(false), break1(false), break2(false)
 {
-    std::thread th;
-
     last_myvs.vid = 0;
     last_myvs.seqno = 0;
     myvs = last_myvs;
@@ -124,7 +122,7 @@ rsm::rsm(std::string _first, std::string _me) :
 
     {
         lock ml(rsm_mutex);
-        th = std::thread(&rsm::recovery, this);
+        std::thread(&rsm::recovery, this).detach();
     }
 }
 
@@ -232,7 +230,7 @@ bool rsm::statetransfer(std::string m)
 {
     rsm_protocol::transferres r;
     handle h(m);
-    int ret;
+    int ret = 0;
     tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n",
             m.c_str(), last_myvs.vid, last_myvs.seqno);
     rpcc *cl;
@@ -278,7 +276,7 @@ bool rsm::statetransferdone(std::string m) {
 
 bool rsm::join(std::string m) {
     handle h(m);
-    int ret;
+    int ret = 0;
     rsm_protocol::joinres r;
 
     tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid,
index 9beb0b3..eed356f 100644 (file)
@@ -29,7 +29,7 @@ void rsm_client::primary_failure() {
 }
 
 rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
-    int ret;
+    int ret = 0;
     lock ml(rsm_client_mutex);
     while (1) {
         printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
index 7018164..7f73505 100755 (executable)
@@ -43,9 +43,6 @@ sub spawn {
   if (my $pid = fork) {
 # parent
     push( @logs, "$p-$aa.log" );
-    if( $p =~ /config_server/ ) {
-      push( @logs, paxos_log($a[1]) );
-    }
     if( $p =~ /lock_server/ ) {
       push( @logs, paxos_log($a[1]) );
     }
@@ -91,12 +88,6 @@ sub spawn_ls {
   return spawn( "./lock_server", $master, $port );
 }
 
-sub spawn_config {
-  my $master = shift;
-  my $port = shift;
-  return spawn( "./config_server", $master, $port );
-}
-
 sub check_views {
 
   my $l = shift;
@@ -232,9 +223,6 @@ sub start_nodes ($$){
                if ($command eq "ls") {
                        @pid = (@pid, spawn_ls($p[0],$p[$i]));
                        print "Start lock_server on $p[$i]\n";
-               }elsif ($command eq "config_server"){
-                       @pid = (@pid, spawn_config($p[0],$p[$i]));
-                       print "Start config on $p[$i]\n";
                }
     sleep 1;