if (!isamember(mem, newmem) && me != mem) {
LOG("delete " << mem);
invalidate_handle(mem);
+ //handle(mem).invalidate();
}
}
case rpc_const::atmostonce_failure:
case rpc_const::oldsrv_failure:
invalidate_handle(m);
+ //h.invalidate();
break;
default:
LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
class hinfo {
public:
- unique_ptr<rpcc> cl;
- bool del = false;
- string m;
+ unique_ptr<rpcc> client;
+ bool valid = true;
+ string destination;
mutex client_mutex;
- hinfo(const string & m_) : m(m_) {}
+ hinfo(const string & destination_) : destination(destination_) {}
};
static mutex mgr_mutex;
static map<string, shared_ptr<hinfo>> hmap;
-static shared_ptr<hinfo> acquire_handle(string m) {
+handle::handle(const string & destination) {
lock ml(mgr_mutex);
- shared_ptr<hinfo> h = hmap[m];
- if (!h || h->del)
- return (hmap[m] = make_shared<hinfo>(m));
- return h;
+ h = hmap[destination];
+ if (!h || !h->valid)
+ h = (hmap[destination] = make_shared<hinfo>(destination));
}
-static void delete_handle(const string & m, lock &) {
- if (hmap.find(m) == hmap.end()) {
- LOG_NONMEMBER("cl " << m << " isn't in cl list");
- return;
- }
-
- hmap[m]->del = true;
- LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count());
- hmap.erase(m);
-}
-
-void invalidate_handle(const string & m) {
- lock ml(mgr_mutex);
- delete_handle(m, ml);
-}
-
-handle::handle(const string & m) : h(acquire_handle(m)) {}
-
rpcc * handle::safebind() {
if (!h)
return nullptr;
- lock ml(h->client_mutex);
- if (h->del)
+ lock cl(h->client_mutex);
+ if (!h->valid)
return nullptr;
- if (!h->cl) {
- unique_ptr<rpcc> cl(new rpcc(h->m));
- LOG("trying to bind..." << h->m);
+ if (!h->client) {
+ unique_ptr<rpcc> client(new rpcc(h->destination));
+ LOG("trying to bind..." << h->destination);
// The test script assumes that the failure can be detected by paxos and
// rsm layer within few seconds. We have to set the timeout with a small
// value to support the assumption.
//
// With RPC_LOSSY=5, tests may fail due to delays and time outs.
- int ret = cl->bind(milliseconds(1000));
+ int ret = client->bind(milliseconds(1000));
if (ret < 0) {
- LOG("bind failure! " << h->m << " " << ret);
- h->del = true;
+ LOG("bind failure! " << h->destination << " " << ret);
+ h->valid = false;
} else {
- LOG("bind succeeded " << h->m);
- h->cl = move(cl);
+ LOG("bind succeeded " << h->destination);
+ h->client = move(client);
}
}
- return h->cl.get();
+ return h->client.get();
+}
+
+void handle::invalidate() {
+ {
+ lock cl(h->client_mutex);
+ h->valid = false;
+
+ LOG_NONMEMBER("cl " << h->destination << " refcnt " << h.use_count());
+ }
+ lock ml(mgr_mutex);
+ hmap.erase(h->destination);
+ h = nullptr;
+}
+
+void invalidate_handle(const string & m) {
+ lock ml(mgr_mutex);
+ if (hmap.find(m) == hmap.end()) {
+ LOG_NONMEMBER("cl " << m << " isn't in cl list");
+ return;
+ }
+
+ hmap[m]->valid = false;
+ LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count());
+ hmap.erase(m);
}
* }
*/
rpcc *safebind();
+
+ void invalidate();
};
void invalidate_handle(const string & m);
}
lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
- cl = new rpcc(xdst);
+ cl = unique_ptr<rpcc>(new rpcc(xdst));
if (cl->bind() < 0)
LOG("lock_client: call bind");
rlock_port = ((random()%32000) | (0x1 << 10));
id = "127.0.0.1:" + to_string(rlock_port);
last_port = rlock_port;
- rpcs *rlsrpc = new rpcs(rlock_port);
+ rlsrpc = unique_ptr<rpcs>(new rpcs(rlock_port));
rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
- rsmc = new rsm_client(xdst);
+ rsmc = unique_ptr<rsm_client>(new rsm_client(xdst));
releaser_thread = thread(&lock_client::releaser, this);
+ rlsrpc->start();
}
void lock_client::releaser() [[noreturn]] {
// lock_revoke_server.
class lock_client {
private:
- rpcc *cl;
+ unique_ptr<rpcc> cl;
+ unique_ptr<rpcs> rlsrpc;
thread releaser_thread;
- rsm_client *rsmc;
+ unique_ptr<rsm_client> rsmc;
lock_release_user *lu;
in_port_t rlock_port;
string hostname;
rsm.reg(lock_protocol::release, &lock_server::release, &ls);
rsm.reg(lock_protocol::stat, &lock_server::stat, &ls);
+ rsm.start();
+
while(1)
sleep(1000);
}
{
set_rand_seed();
nonce_ = (unsigned int)random();
- IF_LEVEL(2) LOG("created with nonce " << nonce_);
+ IF_LEVEL(0) LOG("created with nonce " << nonce_);
reg(rpc_const::bind, &rpcs::rpcbind, this);
- dispatchpool_ = new ThrPool(6, false);
+ dispatchpool_ = unique_ptr<ThrPool>(new ThrPool(6, false));
+}
+void rpcs::start() {
char *loss_env = getenv("RPC_LOSSY");
- listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
+ listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
}
rpcs::~rpcs()
{
// must delete listener before dispatchpool
- delete listener_;
- delete dispatchpool_;
+ listener_ = nullptr;
+ dispatchpool_ = nullptr;
free_reply_window();
}
{
lock pl(procs_m_);
if(procs_.count(proc) < 1){
- cerr << "unknown proc " << hex << proc << "." << endl;
+ LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
VERIFY(0);
return;
}
}
int rpcs::rpcbind(unsigned int &r, int) {
- IF_LEVEL(2) LOG("called return nonce " << nonce_);
+ IF_LEVEL(0) LOG("called return nonce " << nonce_);
r = nonce_;
return 0;
}
// internal handler registration
void reg1(proc_t proc, handler *);
- ThrPool* dispatchpool_;
- tcpsconn *listener_;
+ unique_ptr<ThrPool> dispatchpool_;
+ unique_ptr<tcpsconn> listener_;
public:
rpcs(in_port_t port, size_t counts=0);
template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
}
+
+ void start();
};
#endif
server->reg(23, &srv::handle_fast, &service);
server->reg(24, &srv::handle_slow, &service);
server->reg(25, &srv::handle_bigrep, &service);
+ server->start();
}
void
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
- cfg = new config(_first, _me, this);
+ cfg = unique_ptr<config>(new config(_first, _me, this));
if (_first == _me) {
// Commit the first view here. We can not have acceptor::acceptor
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = new rpcs((in_port_t)stoi(_me) + 1);
+ testsvr = unique_ptr<rpcs>(new rpcs((in_port_t)stoi(_me) + 1));
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
+}
- {
- lock ml(rsm_mutex);
- thread(&rsm::recovery, this).detach();
- }
+void rsm::start() {
+ lock ml(rsm_mutex);
+ rsmrpc->start();
+ testsvr->start();
+ thread(&rsm::recovery, this).detach();
}
void rsm::reg1(int proc, handler *h) {
void reg1(int proc, handler *);
protected:
map<int, handler *> procs;
- config *cfg;
+ unique_ptr<config> cfg;
rsm_state_transfer *stf = nullptr;
rpcs *rsmrpc;
// On slave: expected viewstamp of next invoke request
vector<string> backups; // A list of unsynchronized backups
// For testing purposes
- rpcs *testsvr;
+ unique_ptr<rpcs> testsvr;
bool partitioned;
bool dopartition;
bool break1;
void commit_change(unsigned vid);
template<class F, class C=void> void reg(int proc, F f, C *c=nullptr);
+
+ void start();
};
template<class F, class C> void rsm::reg(int proc, F f, C *c) {