}
connection::~connection() {
+ closeconn();
VERIFY(dead_);
VERIFY(!wpdu_.buf.size());
}
-void connection::incref() {
- lock rl(ref_m_);
- refno_++;
-}
-
-bool connection::isdead() {
- lock ml(m_);
- return dead_;
-}
-
void connection::closeconn() {
{
lock ml(m_);
PollMgr::Instance().block_remove_fd(fd_);
}
-void connection::decref() {
- bool dead = false;
- {
- lock rl(ref_m_);
- refno_--;
- VERIFY(refno_>=0);
- if (refno_==0) {
- lock ml(m_);
- dead = dead_;
- }
- }
- if (dead)
- delete this;
-}
-
-int connection::compare(connection *another) {
- if (create_time_ > another->create_time_)
- return 1;
- if (create_time_ < another->create_time_)
- return -1;
- return 0;
-}
-
bool connection::send(const string & b) {
lock ml(m_);
send_complete_.notify_one();
}
-//fd_ is ready to be read
+// fd_ is ready to be read
void connection::read_cb(int s) {
lock ml(m_);
VERIFY(fd_ == s);
}
if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
- if (mgr_->got_pdu(this, rpdu_.buf)) {
- //chanmgr has successfully consumed the pdu
+ if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
+ // chanmgr has successfully consumed the pdu
rpdu_.buf.clear();
rpdu_.solong = 0;
}
pipe_[1].close();
th_.join();
- // close all the active connections
- map<int, connection *>::iterator i;
- for (i = conns_.begin(); i != conns_.end(); i++) {
- i->second->closeconn();
- i->second->decref();
- }
+ for (auto & i : conns_)
+ i.second->closeconn();
}
void tcpsconn::process_accept() {
}
IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
- connection *ch = new connection(mgr_, s1, lossy_);
+ auto ch = make_shared<connection>(mgr_, s1, lossy_);
- // garbage collect all dead connections with refcount of 1
+ // garbage collect dead connections
for (auto i = conns_.begin(); i != conns_.end();) {
- if (i->second->isdead() && i->second->ref() == 1) {
- IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
- i->second->decref();
- // Careful not to reuse i right after erase. (i++) will
- // be evaluated before the erase call because in C++,
- // there is a sequence point before a function call.
- // See http://en.wikipedia.org/wiki/Sequence_point.
+ if (i->second->isdead())
conns_.erase(i++);
- } else
+ else
++i;
}
}
}
-connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
+shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
int s = socket(AF_INET, SOCK_STREAM, 0);
int yes = 1;
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
close(s);
- return NULL;
+ return nullptr;
}
IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
- return new connection(mgr, s, lossy);
+ return make_shared<connection>(mgr, s, lossy);
}
class chanmgr {
public:
- virtual bool got_pdu(connection *c, const string & b) = 0;
+ virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
virtual ~chanmgr() {}
};
-class connection : public aio_callback {
+class connection : public aio_callback, public enable_shared_from_this<connection> {
public:
struct charbuf {
string buf;
~connection();
int channo() { return fd_; }
- bool isdead();
+ bool isdead() { lock ml(m_); return dead_; }
void closeconn();
bool send(const string & b);
void write_cb(int s);
void read_cb(int s);
- void incref();
- void decref();
- int ref() { lock rl(ref_m_); return refno_; }
-
- int compare(connection *another);
+ time_point<steady_clock> create_time() const { return create_time_; }
private:
time_point<steady_clock> create_time_;
int waiters_ = 0;
- int refno_ = 1;
int lossy_ = 0;
mutex m_;
- mutex ref_m_;
cond send_complete_;
cond send_wait_;
};
socket_t tcp_; // listens for connections
chanmgr *mgr_;
int lossy_;
- map<int, connection *> conns_;
+ map<int, shared_ptr<connection>> conns_;
void process_accept();
};
int lossy;
};
-connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
+shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
#endif
rpcc::rpcc(const string & d, bool retrans) :
dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+ retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
{
if(retrans){
set_rand_seed();
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
+ cancel();
IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
- if(chan_){
+ if(chan_)
chan_->closeconn();
- chan_->decref();
- }
VERIFY(calls_.size() == 0);
}
// Cancel all outstanding calls
void rpcc::cancel(void) {
lock ml(m_);
- LOG("force callers to fail");
- for(auto &p : calls_){
- caller *ca = p.second;
+ if (calls_.size()) {
+ LOG("force callers to fail");
+ for(auto &p : calls_){
+ caller *ca = p.second;
- IF_LEVEL(2) LOG("force caller to fail");
- {
- lock cl(ca->m);
- ca->done = true;
- ca->intret = rpc_const::cancel_failure;
- ca->c.notify_one();
+ IF_LEVEL(2) LOG("force caller to fail");
+ {
+ lock cl(ca->m);
+ ca->done = true;
+ ca->intret = rpc_const::cancel_failure;
+ ca->c.notify_one();
+ }
}
- }
- while (calls_.size () > 0){
- destroy_wait_ = true;
- destroy_wait_c_.wait(ml);
+ while (calls_.size () > 0){
+ destroy_wait_ = true;
+ destroy_wait_c_.wait(ml);
+ }
+ LOG("done");
}
- LOG("done");
}
int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
bool transmit = true;
- connection *ch = NULL;
+ shared_ptr<connection> ch;
- while (1){
- if(transmit){
- get_refconn(&ch);
- if(ch){
- if(reachable_) {
+ while (1) {
+ if(transmit) {
+ get_refconn(ch);
+ if (ch) {
+ if (reachable_) {
request forgot;
{
lock ml(m_);
}
}
- if(retrans_ && (!ch || ch->isdead())){
+ if(retrans_ && (!ch || ch->isdead())) {
// since connection is dead, retransmit
// on the new connection
transmit = true;
" xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
- if(ch)
- ch->decref();
-
// destruction of req automatically frees its buffer
return (ca.done? ca.intret : rpc_const::timeout_failure);
}
void
-rpcc::get_refconn(connection **ch)
+rpcc::get_refconn(shared_ptr<connection> & ch)
{
lock ml(chan_m_);
- if(!chan_ || chan_->isdead()){
- if(chan_)
- chan_->decref();
+ if (!chan_ || chan_->isdead())
chan_ = connect_to_dst(dst_, this, lossytest_);
- }
- if(ch && chan_){
- if(*ch){
- (*ch)->decref();
- }
- *ch = chan_;
- (*ch)->incref();
- }
+
+ if (chan_)
+ ch = chan_;
}
// PollMgr's thread is being used to
//
// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *, const string & b)
+rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
{
unmarshall rep(b, true);
reply_header h;
}
bool
-rpcs::got_pdu(connection *c, const string & b)
+rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
{
if(!reachable_){
IF_LEVEL(1) LOG("not reachable");
return true;
}
- djob_t *j = new djob_t{c, b};
- c->incref();
- bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
- if(!succ || !reachable_){
- c->decref();
- delete j;
- }
- return succ;
+ return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
}
void
}
}
-void
-rpcs::dispatch(djob_t *j)
-{
- connection *c = j->conn;
- unmarshall req(j->buf, true);
- delete j;
+void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
+ unmarshall req(buf, true);
request_header h;
req.unpack_header(h);
proc_t proc = h.proc;
- if(!req.ok()){
- IF_LEVEL(1) LOG("unmarshall header failed!!!");
- c->decref();
+ if (!req.ok()) {
+ IF_LEVEL(1) LOG("unmarshall header failed");
return;
}
lock pl(procs_m_);
if(procs_.count(proc) < 1){
cerr << "unknown proc " << hex << proc << "." << endl;
- c->decref();
VERIFY(0);
return;
}
// save the latest good connection to the client
{
lock rwl(conns_m_);
- if(conns_.find(h.clt_nonce) == conns_.end()){
- c->incref();
+ if (conns_.find(h.clt_nonce) == conns_.end())
conns_[h.clt_nonce] = c;
- } else if(conns_[h.clt_nonce]->compare(c) < 0){
- conns_[h.clt_nonce]->decref();
- c->incref();
+ else if(conns_[h.clt_nonce]->create_time() < c->create_time())
conns_[h.clt_nonce] = c;
- }
}
stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
// get the latest connection to the client
{
lock rwl(conns_m_);
- if(c->isdead() && c != conns_[h.clt_nonce]){
- c->decref();
+ if (c->isdead())
c = conns_[h.clt_nonce];
- c->incref();
- }
}
c->send(rep);
c->send(rep);
break;
}
- c->decref();
}
// rpcs::dispatch calls this when an RPC request arrives.
cond c;
};
- void get_refconn(connection **ch);
+ void get_refconn(shared_ptr<connection> & ch);
void update_xid_rep(int xid);
bool retrans_;
bool reachable_;
- connection *chan_;
+ shared_ptr<connection> chan_;
mutex m_; // protect insert/delete to calls[]
mutex chan_m_;
void cancel();
- bool got_pdu(connection *c, const string & b);
+ bool got_pdu(const shared_ptr<connection> & c, const string & b);
template<class R, typename ...Args>
inline int call(proc_t proc, R & r, const Args&... args);
void updatestat(proc_t proc);
// latest connection to the client
- map<unsigned int, connection *> conns_;
+ map<unsigned int, shared_ptr<connection>> conns_;
// counting
const size_t counting_;
protected:
- struct djob_t {
- connection *conn;
- string buf;
- };
- void dispatch(djob_t *);
+ void dispatch(shared_ptr<connection> c, const string & buf);
// internal handler registration
void reg1(proc_t proc, handler *);
void set_reachable(bool r) { reachable_ = r; }
- bool got_pdu(connection *c, const string & b);
+ bool got_pdu(const shared_ptr<connection> & c, const string & b);
struct ReturnOnFailure {
static inline int unmarshall_args_failure() {
#include <map>
using std::map;
+#include <memory>
+using std::enable_shared_from_this;
+using std::make_shared;
+using std::shared_ptr;
+using std::unique_ptr;
+
#include <mutex>
using std::mutex;
using lock = std::unique_lock<std::mutex>;