rpcc::rpcc(const string & d, bool retrans) :
dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+ retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
{
if(retrans){
set_rand_seed();
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
+ cancel();
IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
- if(chan_){
+ if(chan_)
chan_->closeconn();
- chan_->decref();
- }
VERIFY(calls_.size() == 0);
}
// Cancel all outstanding calls
void rpcc::cancel(void) {
lock ml(m_);
- LOG("force callers to fail");
- for(auto &p : calls_){
- caller *ca = p.second;
+ if (calls_.size()) {
+ LOG("force callers to fail");
+ for(auto &p : calls_){
+ caller *ca = p.second;
- IF_LEVEL(2) LOG("force caller to fail");
- {
- lock cl(ca->m);
- ca->done = true;
- ca->intret = rpc_const::cancel_failure;
- ca->c.notify_one();
+ IF_LEVEL(2) LOG("force caller to fail");
+ {
+ lock cl(ca->m);
+ ca->done = true;
+ ca->intret = rpc_const::cancel_failure;
+ ca->c.notify_one();
+ }
}
- }
- while (calls_.size () > 0){
- destroy_wait_ = true;
- destroy_wait_c_.wait(ml);
+ while (calls_.size () > 0){
+ destroy_wait_ = true;
+ destroy_wait_c_.wait(ml);
+ }
+ LOG("done");
}
- LOG("done");
}
int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
bool transmit = true;
- connection *ch = NULL;
+ shared_ptr<connection> ch;
- while (1){
- if(transmit){
- get_refconn(&ch);
- if(ch){
- if(reachable_) {
+ while (1) {
+ if(transmit) {
+ get_refconn(ch);
+ if (ch) {
+ if (reachable_) {
request forgot;
{
lock ml(m_);
}
}
- if(retrans_ && (!ch || ch->isdead())){
+ if(retrans_ && (!ch || ch->isdead())) {
// since connection is dead, retransmit
// on the new connection
transmit = true;
" xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
- if(ch)
- ch->decref();
-
// destruction of req automatically frees its buffer
return (ca.done? ca.intret : rpc_const::timeout_failure);
}
void
-rpcc::get_refconn(connection **ch)
+rpcc::get_refconn(shared_ptr<connection> & ch)
{
lock ml(chan_m_);
- if(!chan_ || chan_->isdead()){
- if(chan_)
- chan_->decref();
+ if (!chan_ || chan_->isdead())
chan_ = connect_to_dst(dst_, this, lossytest_);
- }
- if(ch && chan_){
- if(*ch){
- (*ch)->decref();
- }
- *ch = chan_;
- (*ch)->incref();
- }
+
+ if (chan_)
+ ch = chan_;
}
// PollMgr's thread is being used to
//
// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *, const string & b)
+rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
{
unmarshall rep(b, true);
reply_header h;
}
bool
-rpcs::got_pdu(connection *c, const string & b)
+rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
{
if(!reachable_){
IF_LEVEL(1) LOG("not reachable");
return true;
}
- djob_t *j = new djob_t{c, b};
- c->incref();
- bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
- if(!succ || !reachable_){
- c->decref();
- delete j;
- }
- return succ;
+ return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
}
void
}
}
-void
-rpcs::dispatch(djob_t *j)
-{
- connection *c = j->conn;
- unmarshall req(j->buf, true);
- delete j;
+void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
+ unmarshall req(buf, true);
request_header h;
req.unpack_header(h);
proc_t proc = h.proc;
- if(!req.ok()){
- IF_LEVEL(1) LOG("unmarshall header failed!!!");
- c->decref();
+ if (!req.ok()) {
+ IF_LEVEL(1) LOG("unmarshall header failed");
return;
}
lock pl(procs_m_);
if(procs_.count(proc) < 1){
cerr << "unknown proc " << hex << proc << "." << endl;
- c->decref();
VERIFY(0);
return;
}
// save the latest good connection to the client
{
lock rwl(conns_m_);
- if(conns_.find(h.clt_nonce) == conns_.end()){
- c->incref();
+ if (conns_.find(h.clt_nonce) == conns_.end())
conns_[h.clt_nonce] = c;
- } else if(conns_[h.clt_nonce]->compare(c) < 0){
- conns_[h.clt_nonce]->decref();
- c->incref();
+ else if(conns_[h.clt_nonce]->create_time() < c->create_time())
conns_[h.clt_nonce] = c;
- }
}
stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
// get the latest connection to the client
{
lock rwl(conns_m_);
- if(c->isdead() && c != conns_[h.clt_nonce]){
- c->decref();
+ if (c->isdead())
c = conns_[h.clt_nonce];
- c->incref();
- }
}
c->send(rep);
c->send(rep);
break;
}
- c->decref();
}
// rpcs::dispatch calls this when an RPC request arrives.