From: Peter Iannucci Date: Fri, 11 Oct 2013 03:58:57 +0000 (-0400) Subject: Fixed a race condition! X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/b2609562b3d4fc548afcc0a3dfe4ff5fd4ae3d36 Fixed a race condition! --- diff --git a/config.cc b/config.cc index 35654d8..5d04cd2 100644 --- a/config.cc +++ b/config.cc @@ -89,6 +89,7 @@ void config::paxos_commit(unsigned instance, const string &value) { if (!isamember(mem, newmem) && me != mem) { LOG("delete " << mem); invalidate_handle(mem); + //handle(mem).invalidate(); } } @@ -214,6 +215,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { case rpc_const::atmostonce_failure: case rpc_const::oldsrv_failure: invalidate_handle(m); + //h.invalidate(); break; default: LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); diff --git a/handle.cc b/handle.cc index 42d038f..79b3b4c 100644 --- a/handle.cc +++ b/handle.cc @@ -2,64 +2,69 @@ class hinfo { public: - unique_ptr cl; - bool del = false; - string m; + unique_ptr client; + bool valid = true; + string destination; mutex client_mutex; - hinfo(const string & m_) : m(m_) {} + hinfo(const string & destination_) : destination(destination_) {} }; static mutex mgr_mutex; static map> hmap; -static shared_ptr acquire_handle(string m) { +handle::handle(const string & destination) { lock ml(mgr_mutex); - shared_ptr h = hmap[m]; - if (!h || h->del) - return (hmap[m] = make_shared(m)); - return h; + h = hmap[destination]; + if (!h || !h->valid) + h = (hmap[destination] = make_shared(destination)); } -static void delete_handle(const string & m, lock &) { - if (hmap.find(m) == hmap.end()) { - LOG_NONMEMBER("cl " << m << " isn't in cl list"); - return; - } - - hmap[m]->del = true; - LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count()); - hmap.erase(m); -} - -void invalidate_handle(const string & m) { - lock ml(mgr_mutex); - delete_handle(m, ml); -} - -handle::handle(const string & m) : h(acquire_handle(m)) {} - rpcc * handle::safebind() { if (!h) return nullptr; - lock ml(h->client_mutex); - if (h->del) + lock cl(h->client_mutex); + if (!h->valid) return nullptr; - if (!h->cl) { - unique_ptr cl(new rpcc(h->m)); - LOG("trying to bind..." << h->m); + if (!h->client) { + unique_ptr client(new rpcc(h->destination)); + LOG("trying to bind..." << h->destination); // The test script assumes that the failure can be detected by paxos and // rsm layer within few seconds. We have to set the timeout with a small // value to support the assumption. // // With RPC_LOSSY=5, tests may fail due to delays and time outs. - int ret = cl->bind(milliseconds(1000)); + int ret = client->bind(milliseconds(1000)); if (ret < 0) { - LOG("bind failure! " << h->m << " " << ret); - h->del = true; + LOG("bind failure! " << h->destination << " " << ret); + h->valid = false; } else { - LOG("bind succeeded " << h->m); - h->cl = move(cl); + LOG("bind succeeded " << h->destination); + h->client = move(client); } } - return h->cl.get(); + return h->client.get(); +} + +void handle::invalidate() { + { + lock cl(h->client_mutex); + h->valid = false; + + LOG_NONMEMBER("cl " << h->destination << " refcnt " << h.use_count()); + } + lock ml(mgr_mutex); + hmap.erase(h->destination); + h = nullptr; +} + +void invalidate_handle(const string & m) { + lock ml(mgr_mutex); + if (hmap.find(m) == hmap.end()) { + LOG_NONMEMBER("cl " << m << " isn't in cl list"); + return; + } + + hmap[m]->valid = false; + LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count()); + hmap.erase(m); } diff --git a/handle.h b/handle.h index dc2edb7..f4df61a 100644 --- a/handle.h +++ b/handle.h @@ -52,6 +52,8 @@ class handle { * } */ rpcc *safebind(); + + void invalidate(); }; void invalidate_handle(const string & m); diff --git a/lock_client.cc b/lock_client.cc index 0b071f5..388de88 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -30,7 +30,7 @@ lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { } lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) { - cl = new rpcc(xdst); + cl = unique_ptr(new rpcc(xdst)); if (cl->bind() < 0) LOG("lock_client: call bind"); @@ -38,11 +38,12 @@ lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xi rlock_port = ((random()%32000) | (0x1 << 10)); id = "127.0.0.1:" + to_string(rlock_port); last_port = rlock_port; - rpcs *rlsrpc = new rpcs(rlock_port); + rlsrpc = unique_ptr(new rpcs(rlock_port)); rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this); rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this); - rsmc = new rsm_client(xdst); + rsmc = unique_ptr(new rsm_client(xdst)); releaser_thread = thread(&lock_client::releaser, this); + rlsrpc->start(); } void lock_client::releaser() [[noreturn]] { diff --git a/lock_client.h b/lock_client.h index 5db2cbf..728fbf7 100644 --- a/lock_client.h +++ b/lock_client.h @@ -42,9 +42,10 @@ typedef map lock_map; // lock_revoke_server. class lock_client { private: - rpcc *cl; + unique_ptr cl; + unique_ptr rlsrpc; thread releaser_thread; - rsm_client *rsmc; + unique_ptr rsmc; lock_release_user *lu; in_port_t rlock_port; string hostname; diff --git a/lock_smain.cc b/lock_smain.cc index 3bd7376..90066e1 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -25,6 +25,8 @@ int main(int argc, char *argv[]) { rsm.reg(lock_protocol::release, &lock_server::release, &ls); rsm.reg(lock_protocol::stat, &lock_server::stat, &ls); + rsm.start(); + while(1) sleep(1000); } diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 2c1f1a5..c417f40 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -341,20 +341,22 @@ rpcs::rpcs(in_port_t p1, size_t count) { set_rand_seed(); nonce_ = (unsigned int)random(); - IF_LEVEL(2) LOG("created with nonce " << nonce_); + IF_LEVEL(0) LOG("created with nonce " << nonce_); reg(rpc_const::bind, &rpcs::rpcbind, this); - dispatchpool_ = new ThrPool(6, false); + dispatchpool_ = unique_ptr(new ThrPool(6, false)); +} +void rpcs::start() { char *loss_env = getenv("RPC_LOSSY"); - listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0); + listener_ = unique_ptr(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0)); } rpcs::~rpcs() { // must delete listener before dispatchpool - delete listener_; - delete dispatchpool_; + listener_ = nullptr; + dispatchpool_ = nullptr; free_reply_window(); } @@ -436,7 +438,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { { lock pl(procs_m_); if(procs_.count(proc) < 1){ - cerr << "unknown proc " << hex << proc << "." << endl; + LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_); VERIFY(0); return; } @@ -610,7 +612,7 @@ void rpcs::free_reply_window(void) { } int rpcs::rpcbind(unsigned int &r, int) { - IF_LEVEL(2) LOG("called return nonce " << nonce_); + IF_LEVEL(0) LOG("called return nonce " << nonce_); r = nonce_; return 0; } diff --git a/rpc/rpc.h b/rpc/rpc.h index 9ec2fd8..02c7c62 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -200,8 +200,8 @@ class rpcs : public chanmgr { // internal handler registration void reg1(proc_t proc, handler *); - ThrPool* dispatchpool_; - tcpsconn *listener_; + unique_ptr dispatchpool_; + unique_ptr listener_; public: rpcs(in_port_t port, size_t counts=0); @@ -223,6 +223,8 @@ class rpcs : public chanmgr { template void reg(proc_t proc, F f, C *c=nullptr) { reg1(proc, marshalled_func::wrap(f, c)); } + + void start(); }; #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 723df82..b5b975a 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -73,6 +73,7 @@ void startserver() server->reg(23, &srv::handle_fast, &service); server->reg(24, &srv::handle_slow, &service); server->reg(25, &srv::handle_bigrep, &service); + server->start(); } void diff --git a/rsm.cc b/rsm.cc index 54713cb..a44c24b 100644 --- a/rsm.cc +++ b/rsm.cc @@ -90,7 +90,7 @@ rsm::rsm(const string & _first, const string & _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - cfg = new config(_first, _me, this); + cfg = unique_ptr(new config(_first, _me, this)); if (_first == _me) { // Commit the first view here. We can not have acceptor::acceptor @@ -106,14 +106,16 @@ rsm::rsm(const string & _first, const string & _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = new rpcs((in_port_t)stoi(_me) + 1); + testsvr = unique_ptr(new rpcs((in_port_t)stoi(_me) + 1)); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); +} - { - lock ml(rsm_mutex); - thread(&rsm::recovery, this).detach(); - } +void rsm::start() { + lock ml(rsm_mutex); + rsmrpc->start(); + testsvr->start(); + thread(&rsm::recovery, this).detach(); } void rsm::reg1(int proc, handler *h) { diff --git a/rsm.h b/rsm.h index 1e1726d..8fdf2d5 100644 --- a/rsm.h +++ b/rsm.h @@ -21,7 +21,7 @@ class rsm : public config_view_change { void reg1(int proc, handler *); protected: map procs; - config *cfg; + unique_ptr cfg; rsm_state_transfer *stf = nullptr; rpcs *rsmrpc; // On slave: expected viewstamp of next invoke request @@ -36,7 +36,7 @@ class rsm : public config_view_change { vector backups; // A list of unsynchronized backups // For testing purposes - rpcs *testsvr; + unique_ptr testsvr; bool partitioned; bool dopartition; bool break1; @@ -76,6 +76,8 @@ class rsm : public config_view_change { void commit_change(unsigned vid); template void reg(int proc, F f, C *c=nullptr); + + void start(); }; template void rsm::reg(int proc, F f, C *c) {