Fixed a race condition!
authorPeter Iannucci <iannucci@mit.edu>
Fri, 11 Oct 2013 03:58:57 +0000 (23:58 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Fri, 11 Oct 2013 03:58:57 +0000 (23:58 -0400)
config.cc
handle.cc
handle.h
lock_client.cc
lock_client.h
lock_smain.cc
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm.h

index 35654d8..5d04cd2 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -89,6 +89,7 @@ void config::paxos_commit(unsigned instance, const string &value) {
         if (!isamember(mem, newmem) && me != mem) {
             LOG("delete " << mem);
             invalidate_handle(mem);
+            //handle(mem).invalidate();
         }
     }
 
@@ -214,6 +215,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
         case rpc_const::atmostonce_failure:
         case rpc_const::oldsrv_failure:
             invalidate_handle(m);
+            //h.invalidate();
             break;
         default:
             LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
index 42d038f..79b3b4c 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -2,64 +2,69 @@
 
 class hinfo {
 public:
-    unique_ptr<rpcc> cl;
-    bool del = false;
-    string m;
+    unique_ptr<rpcc> client;
+    bool valid = true;
+    string destination;
     mutex client_mutex;
-    hinfo(const string & m_) : m(m_) {}
+    hinfo(const string & destination_) : destination(destination_) {}
 };
 
 static mutex mgr_mutex;
 static map<string, shared_ptr<hinfo>> hmap;
 
-static shared_ptr<hinfo> acquire_handle(string m) {
+handle::handle(const string & destination) {
     lock ml(mgr_mutex);
-    shared_ptr<hinfo> h = hmap[m];
-    if (!h || h->del)
-        return (hmap[m] = make_shared<hinfo>(m));
-    return h;
+    h = hmap[destination];
+    if (!h || !h->valid)
+        h = (hmap[destination] = make_shared<hinfo>(destination));
 }
 
-static void delete_handle(const string & m, lock &) {
-    if (hmap.find(m) == hmap.end()) {
-        LOG_NONMEMBER("cl " << m << " isn't in cl list");
-        return;
-    }
-
-    hmap[m]->del = true;
-    LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count());
-    hmap.erase(m);
-}
-
-void invalidate_handle(const string & m) {
-    lock ml(mgr_mutex);
-    delete_handle(m, ml);
-}
-
-handle::handle(const string & m) : h(acquire_handle(m)) {}
-
 rpcc * handle::safebind() {
     if (!h)
         return nullptr;
-    lock ml(h->client_mutex);
-    if (h->del)
+    lock cl(h->client_mutex);
+    if (!h->valid)
         return nullptr;
-    if (!h->cl) {
-        unique_ptr<rpcc> cl(new rpcc(h->m));
-        LOG("trying to bind..." << h->m);
+    if (!h->client) {
+        unique_ptr<rpcc> client(new rpcc(h->destination));
+        LOG("trying to bind..." << h->destination);
         // The test script assumes that the failure can be detected by paxos and
         // rsm layer within few seconds. We have to set the timeout with a small
         // value to support the assumption.
         // 
         // With RPC_LOSSY=5, tests may fail due to delays and time outs.
-        int ret = cl->bind(milliseconds(1000));
+        int ret = client->bind(milliseconds(1000));
         if (ret < 0) {
-            LOG("bind failure! " << h->m << " " << ret);
-            h->del = true;
+            LOG("bind failure! " << h->destination << " " << ret);
+            h->valid = false;
         } else {
-            LOG("bind succeeded " << h->m);
-            h->cl = move(cl);
+            LOG("bind succeeded " << h->destination);
+            h->client = move(client);
         }
     }
-    return h->cl.get();
+    return h->client.get();
+}
+
+void handle::invalidate() {
+    {
+        lock cl(h->client_mutex);
+        h->valid = false;
+
+        LOG_NONMEMBER("cl " << h->destination << " refcnt " << h.use_count());
+    }
+    lock ml(mgr_mutex);
+    hmap.erase(h->destination);
+    h = nullptr;
+}
+
+void invalidate_handle(const string & m) {
+    lock ml(mgr_mutex);
+    if (hmap.find(m) == hmap.end()) {
+        LOG_NONMEMBER("cl " << m << " isn't in cl list");
+        return;
+    }
+
+    hmap[m]->valid = false;
+    LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count());
+    hmap.erase(m);
 }
index dc2edb7..f4df61a 100644 (file)
--- a/handle.h
+++ b/handle.h
@@ -52,6 +52,8 @@ class handle {
          *   }
          */
         rpcc *safebind();
+
+        void invalidate();
 };
 
 void invalidate_handle(const string & m);
index 0b071f5..388de88 100644 (file)
@@ -30,7 +30,7 @@ lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
 }
 
 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
-    cl = new rpcc(xdst);
+    cl = unique_ptr<rpcc>(new rpcc(xdst));
     if (cl->bind() < 0)
         LOG("lock_client: call bind");
 
@@ -38,11 +38,12 @@ lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xi
     rlock_port = ((random()%32000) | (0x1 << 10));
     id = "127.0.0.1:" + to_string(rlock_port);
     last_port = rlock_port;
-    rpcs *rlsrpc = new rpcs(rlock_port);
+    rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
     rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
-    rsmc = new rsm_client(xdst);
+    rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
     releaser_thread = thread(&lock_client::releaser, this);
+    rlsrpc->start();
 }
 
 void lock_client::releaser() [[noreturn]] {
index 5db2cbf..728fbf7 100644 (file)
@@ -42,9 +42,10 @@ typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 // lock_revoke_server.
 class lock_client {
     private:
-        rpcc *cl;
+        unique_ptr<rpcc> cl;
+        unique_ptr<rpcs> rlsrpc;
         thread releaser_thread;
-        rsm_client *rsmc;
+        unique_ptr<rsm_client> rsmc;
         lock_release_user *lu;
         in_port_t rlock_port;
         string hostname;
index 3bd7376..90066e1 100644 (file)
@@ -25,6 +25,8 @@ int main(int argc, char *argv[]) {
     rsm.reg(lock_protocol::release, &lock_server::release, &ls);
     rsm.reg(lock_protocol::stat, &lock_server::stat, &ls);
 
+    rsm.start();
+
     while(1)
         sleep(1000);
 }
index 2c1f1a5..c417f40 100644 (file)
@@ -341,20 +341,22 @@ rpcs::rpcs(in_port_t p1, size_t count)
 {
     set_rand_seed();
     nonce_ = (unsigned int)random();
-    IF_LEVEL(2) LOG("created with nonce " << nonce_);
+    IF_LEVEL(0) LOG("created with nonce " << nonce_);
 
     reg(rpc_const::bind, &rpcs::rpcbind, this);
-    dispatchpool_ = new ThrPool(6, false);
+    dispatchpool_ = unique_ptr<ThrPool>(new ThrPool(6, false));
+}
 
+void rpcs::start() {
     char *loss_env = getenv("RPC_LOSSY");
-    listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
+    listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
 }
 
 rpcs::~rpcs()
 {
     // must delete listener before dispatchpool
-    delete listener_;
-    delete dispatchpool_;
+    listener_ = nullptr;
+    dispatchpool_ = nullptr;
     free_reply_window();
 }
 
@@ -436,7 +438,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     {
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
-            cerr << "unknown proc " << hex << proc << "." << endl;
+            LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
             VERIFY(0);
             return;
         }
@@ -610,7 +612,7 @@ void rpcs::free_reply_window(void) {
 }
 
 int rpcs::rpcbind(unsigned int &r, int) {
-    IF_LEVEL(2) LOG("called return nonce " << nonce_);
+    IF_LEVEL(0) LOG("called return nonce " << nonce_);
     r = nonce_;
     return 0;
 }
index 9ec2fd8..02c7c62 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -200,8 +200,8 @@ class rpcs : public chanmgr {
     // internal handler registration
     void reg1(proc_t proc, handler *);
 
-    ThrPool* dispatchpool_;
-    tcpsconn *listener_;
+    unique_ptr<ThrPool> dispatchpool_;
+    unique_ptr<tcpsconn> listener_;
 
     public:
     rpcs(in_port_t port, size_t counts=0);
@@ -223,6 +223,8 @@ class rpcs : public chanmgr {
     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
         reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
     }
+
+    void start();
 };
 
 #endif
index 723df82..b5b975a 100644 (file)
@@ -73,6 +73,7 @@ void startserver()
     server->reg(23, &srv::handle_fast, &service);
     server->reg(24, &srv::handle_slow, &service);
     server->reg(25, &srv::handle_bigrep, &service);
+    server->start();
 }
 
 void
diff --git a/rsm.cc b/rsm.cc
index 54713cb..a44c24b 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -90,7 +90,7 @@ rsm::rsm(const string & _first, const string & _me) :
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
     partitioned (false), dopartition(false), break1(false), break2(false)
 {
-    cfg = new config(_first, _me, this);
+    cfg = unique_ptr<config>(new config(_first, _me, this));
 
     if (_first == _me) {
         // Commit the first view here. We can not have acceptor::acceptor
@@ -106,14 +106,16 @@ rsm::rsm(const string & _first, const string & _me) :
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr = new rpcs((in_port_t)stoi(_me) + 1);
+    testsvr = unique_ptr<rpcs>(new rpcs((in_port_t)stoi(_me) + 1));
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
+}
 
-    {
-        lock ml(rsm_mutex);
-        thread(&rsm::recovery, this).detach();
-    }
+void rsm::start() {
+    lock ml(rsm_mutex);
+    rsmrpc->start();
+    testsvr->start();
+    thread(&rsm::recovery, this).detach();
 }
 
 void rsm::reg1(int proc, handler *h) {
diff --git a/rsm.h b/rsm.h
index 1e1726d..8fdf2d5 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -21,7 +21,7 @@ class rsm : public config_view_change {
         void reg1(int proc, handler *);
     protected:
         map<int, handler *> procs;
-        config *cfg;
+        unique_ptr<config> cfg;
         rsm_state_transfer *stf = nullptr;
         rpcs *rsmrpc;
         // On slave: expected viewstamp of next invoke request
@@ -36,7 +36,7 @@ class rsm : public config_view_change {
         vector<string> backups;   // A list of unsynchronized backups
 
         // For testing purposes
-        rpcs *testsvr;
+        unique_ptr<rpcs> testsvr;
         bool partitioned;
         bool dopartition;
         bool break1;
@@ -76,6 +76,8 @@ class rsm : public config_view_change {
         void commit_change(unsigned vid);
 
         template<class F, class C=void> void reg(int proc, F f, C *c=nullptr);
+
+        void start();
 };
 
 template<class F, class C> void rsm::reg(int proc, F f, C *c) {