/*
- The rpcc class handles client-side RPC. Each rpcc is bound to a
- single RPC server. The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC
+ server. The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
The rpcs class handles the server side of RPC. Each rpcs handles multiple
connections from different rpcc objects. The jobs of rpcs include accepting
Both rpcc and rpcs use the connection class as an abstraction for the
underlying communication channel. To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns). When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns). When a
request/reply is received, connection makes a callback into the corresponding
rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
number of threads needed to manage these connections; without async IO, at
least one thread is needed per connection to read data without blocking other
activities.) Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests. The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests. The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object. A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object. A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
This version of the RPC library explicitly joins exited threads to make sure
no outstanding references exist before deleting objects.
there are no outstanding calls on the rpcc object.
To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3. delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish. It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections. 3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish. It is interesting how a thread pool can be deleted
without using thread cancellation. The trick is to inject x "poison pills" for
a thread pool of x threads. Upon getting a poison pill instead of a normal
task, a worker thread will exit (and thread pool destructor waits to join all
x exited worker threads).
*/
-#include "types.h"
#include "rpc.h"
-#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <unistd.h>
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
inline void set_rand_seed() {
auto now = time_point_cast<nanoseconds>(steady_clock::now());
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
rpcc::rpcc(const string & d, bool retrans) :
dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+ retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
{
- if(retrans){
+ if (retrans) {
set_rand_seed();
clt_nonce_ = (unsigned int)random();
} else {
}
char *loss_env = getenv("RPC_LOSSY");
- if(loss_env)
+ if (loss_env)
lossytest_ = atoi(loss_env);
// xid starts with 1 and latest received reply starts with 0
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
+ cancel();
IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
- if(chan_){
+ if (chan_)
chan_->closeconn();
- chan_->decref();
- }
VERIFY(calls_.size() == 0);
}
-int rpcc::bind(TO to) {
+int rpcc::bind(milliseconds to) {
unsigned int r;
- int ret = call_timeout(rpc_const::bind, to, r, 0);
- if(ret == 0){
+ int ret = call_timeout(rpc_protocol::bind, to, r, 0);
+ if (ret == 0) {
lock ml(m_);
bind_done_ = true;
srv_nonce_ = r;
// Cancel all outstanding calls
void rpcc::cancel(void) {
lock ml(m_);
- LOG("force callers to fail");
- for(auto &p : calls_){
- caller *ca = p.second;
+ if (calls_.size()) {
+ LOG("force callers to fail");
+ for (auto &p : calls_) {
+ caller *ca = p.second;
+
+ IF_LEVEL(2) LOG("force caller to fail");
- IF_LEVEL(2) LOG("force caller to fail");
- {
lock cl(ca->m);
ca->done = true;
- ca->intret = rpc_const::cancel_failure;
+ ca->intret = rpc_protocol::cancel_failure;
ca->c.notify_one();
}
- }
- while (calls_.size () > 0){
destroy_wait_ = true;
- destroy_wait_c_.wait(ml);
+ while (calls_.size () > 0)
+ destroy_wait_c_.wait(ml);
+
+ LOG("done");
}
- LOG("done");
}
-int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
caller ca(0, &rep);
int xid_rep;
{
lock ml(m_);
- if((proc != rpc_const::bind && !bind_done_) ||
- (proc == rpc_const::bind && bind_done_)){
+ if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
- return rpc_const::bind_failure;
+ return rpc_protocol::bind_failure;
}
- if(destroy_wait_){
- return rpc_const::cancel_failure;
- }
+ if (destroy_wait_)
+ return rpc_protocol::cancel_failure;
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ req.pack_header(rpc_protocol::request_header{
+ ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
+ });
xid_rep = xid_rep_window_.front();
}
- TO curr_to;
- auto finaldeadline = steady_clock::now() + milliseconds(to.to),
- nextdeadline = finaldeadline;
-
- curr_to.to = to_min.to;
+ milliseconds curr_to = rpc::to_min;
+ auto finaldeadline = steady_clock::now() + to;
bool transmit = true;
- connection *ch = NULL;
+ shared_ptr<connection> ch;
- while (1){
- if(transmit){
- get_refconn(&ch);
- if(ch){
- if(reachable_) {
+ while (1) {
+ if (transmit) {
+ get_refconn(ch);
+ if (ch) {
+ if (reachable_) {
request forgot;
{
lock ml(m_);
transmit = false; // only send once on a given channel
}
- if(finaldeadline == time_point<steady_clock>::min())
- break;
-
- nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
- if(nextdeadline > finaldeadline) {
- nextdeadline = finaldeadline;
- finaldeadline = time_point<steady_clock>::min();
- }
+ auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline);
+ curr_to *= 2;
{
lock cal(ca.m);
- while (!ca.done){
+ while (!ca.done) {
IF_LEVEL(2) LOG("wait");
- if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
+ if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
IF_LEVEL(2) LOG("timeout");
break;
}
}
- if(ca.done){
+ if (ca.done) {
IF_LEVEL(2) LOG("reply received");
break;
}
}
- if(retrans_ && (!ch || ch->isdead())){
+ if (nextdeadline >= finaldeadline)
+ break;
+
+ if (retrans_ && (!ch || ch->isdead())) {
// since connection is dead, retransmit
// on the new connection
transmit = true;
}
- curr_to.to <<= 1;
}
{
// I don't think there's any harm in maybe doing it twice
update_xid_rep(ca.xid);
- if(destroy_wait_){
- destroy_wait_c_.notify_one();
- }
+ if (destroy_wait_)
+ destroy_wait_c_.notify_one();
}
if (ca.done && lossytest_)
IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
" xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
- ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
-
- if(ch)
- ch->decref();
+ ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
// destruction of req automatically frees its buffer
- return (ca.done? ca.intret : rpc_const::timeout_failure);
+ return (ca.done? ca.intret : rpc_protocol::timeout_failure);
}
-void
-rpcc::get_refconn(connection **ch)
-{
+void rpcc::get_refconn(shared_ptr<connection> & ch) {
lock ml(chan_m_);
- if(!chan_ || chan_->isdead()){
- if(chan_)
- chan_->decref();
- chan_ = connect_to_dst(dst_, this, lossytest_);
- }
- if(ch && chan_){
- if(*ch){
- (*ch)->decref();
- }
- *ch = chan_;
- (*ch)->incref();
- }
+ if (!chan_ || chan_->isdead())
+ chan_ = connection::to_dst(dst_, this, lossytest_);
+
+ if (chan_)
+ ch = chan_;
}
// PollMgr's thread is being used to
//
// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *, const string & b)
+rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
{
unmarshall rep(b, true);
- reply_header h;
+ rpc_protocol::reply_header h;
rep.unpack_header(h);
- if(!rep.ok()){
+ if (!rep.ok()) {
IF_LEVEL(1) LOG("unmarshall header failed!!!");
return true;
}
update_xid_rep(h.xid);
- if(calls_.find(h.xid) == calls_.end()){
+ if (calls_.find(h.xid) == calls_.end()) {
IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
return true;
}
caller *ca = calls_[h.xid];
lock cl(ca->m);
- if(!ca->done){
+ if (!ca->done) {
*ca->rep = b;
ca->intret = h.ret;
- if(ca->intret < 0){
+ if (ca->intret < 0) {
IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
}
ca->done = 1;
void
rpcc::update_xid_rep(int xid)
{
- if(xid <= xid_rep_window_.front()){
+ if (xid <= xid_rep_window_.front())
return;
- }
- for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
- if(*it > xid){
+ for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++) {
+ if (*it > xid) {
xid_rep_window_.insert(it, xid);
goto compress;
}
compress:
auto it = xid_rep_window_.begin();
- for (it++; it != xid_rep_window_.end(); it++){
+ for (it++; it != xid_rep_window_.end(); it++) {
while (xid_rep_window_.front() + 1 == *it)
xid_rep_window_.pop_front();
}
}
-rpcs::rpcs(unsigned int p1, size_t count)
+rpcs::rpcs(in_port_t p1, size_t count)
: port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
{
set_rand_seed();
nonce_ = (unsigned int)random();
IF_LEVEL(2) LOG("created with nonce " << nonce_);
- reg(rpc_const::bind, &rpcs::rpcbind, this);
- dispatchpool_ = new ThrPool(6, false);
+ reg(rpc_protocol::bind, &rpcs::rpcbind, this);
+ dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
+}
+void rpcs::start() {
char *loss_env = getenv("RPC_LOSSY");
- listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
+ listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
}
-rpcs::~rpcs()
-{
+rpcs::~rpcs() {
// must delete listener before dispatchpool
- delete listener_;
- delete dispatchpool_;
+ listener_ = nullptr;
+ dispatchpool_ = nullptr;
free_reply_window();
}
-bool
-rpcs::got_pdu(connection *c, const string & b)
-{
- if(!reachable_){
+bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
+ if (!reachable_) {
IF_LEVEL(1) LOG("not reachable");
return true;
}
- djob_t *j = new djob_t{c, b};
- c->incref();
- bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
- if(!succ || !reachable_){
- c->decref();
- delete j;
- }
- return succ;
+ return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
}
-void
-rpcs::reg1(proc_t proc, handler *h)
-{
+void rpcs::reg1(proc_id_t proc, handler *h) {
lock pl(procs_m_);
VERIFY(procs_.count(proc) == 0);
procs_[proc] = h;
VERIFY(procs_.count(proc) >= 1);
}
-void
-rpcs::updatestat(proc_t proc)
-{
+void rpcs::updatestat(proc_id_t proc) {
lock cl(count_m_);
counts_[proc]++;
curr_counts_--;
- if(curr_counts_ == 0){
+ if (curr_counts_ == 0) {
LOG("RPC STATS: ");
for (auto i = counts_.begin(); i != counts_.end(); i++)
LOG(hex << i->first << ":" << dec << i->second);
size_t totalrep = 0, maxrep = 0;
for (auto clt : reply_window_) {
totalrep += clt.second.size();
- if(clt.second.size() > maxrep)
+ if (clt.second.size() > maxrep)
maxrep = clt.second.size();
}
IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
}
}
-void
-rpcs::dispatch(djob_t *j)
-{
- connection *c = j->conn;
- unmarshall req(j->buf, true);
- delete j;
+void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
+ unmarshall req(buf, true);
- request_header h;
+ rpc_protocol::request_header h;
req.unpack_header(h);
- proc_t proc = h.proc;
+ proc_id_t proc = h.proc;
- if(!req.ok()){
- IF_LEVEL(1) LOG("unmarshall header failed!!!");
- c->decref();
+ if (!req.ok()) {
+ IF_LEVEL(1) LOG("unmarshall header failed");
return;
}
dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
marshall rep;
- reply_header rh{h.xid,0};
+ rpc_protocol::reply_header rh{h.xid,0};
// is client sending to an old instance of server?
- if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
+ if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
" (current " << nonce_ << ") proc " << hex << h.proc);
- rh.ret = rpc_const::oldsrv_failure;
+ rh.ret = rpc_protocol::oldsrv_failure;
rep.pack_header(rh);
c->send(rep);
return;
// is RPC proc a registered procedure?
{
lock pl(procs_m_);
- if(procs_.count(proc) < 1){
- cerr << "unknown proc " << hex << proc << "." << endl;
- c->decref();
+ if (procs_.count(proc) < 1) {
+ LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
VERIFY(0);
return;
}
rpcs::rpcstate_t stat;
string b1;
- if(h.clt_nonce){
+ if (h.clt_nonce) {
// have i seen this client before?
{
lock rwl(reply_window_m_);
// if we don't know about this clt_nonce, create a cleanup object
- if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
+ if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
// save the latest good connection to the client
{
- lock rwl(conss_m_);
- if(conns_.find(h.clt_nonce) == conns_.end()){
- c->incref();
+ lock rwl(conns_m_);
+ if (conns_.find(h.clt_nonce) == conns_.end())
conns_[h.clt_nonce] = c;
- } else if(conns_[h.clt_nonce]->compare(c) < 0){
- conns_[h.clt_nonce]->decref();
- c->incref();
+ else if (conns_[h.clt_nonce]->create_time() < c->create_time())
conns_[h.clt_nonce] = c;
- }
}
stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
switch (stat) {
case NEW: // new request
- if (counting_){
+ if (counting_)
updatestat(proc);
- }
rh.ret = (*f)(req, rep);
- if (rh.ret == rpc_const::unmarshal_args_failure) {
- cerr << "failed to unmarshall the arguments. You are " <<
- "probably calling RPC 0x" << hex << proc << " with the wrong " <<
- "types of arguments." << endl;
+ if (rh.ret == rpc_protocol::unmarshal_args_failure) {
+ LOG("failed to unmarshall the arguments. You are " <<
+ "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+ "types of arguments.");
VERIFY(0);
}
VERIFY(rh.ret >= 0);
// get the latest connection to the client
{
- lock rwl(conss_m_);
- if(c->isdead() && c != conns_[h.clt_nonce]){
- c->decref();
+ lock rwl(conns_m_);
+ if (c->isdead())
c = conns_[h.clt_nonce];
- c->incref();
- }
}
c->send(rep);
break;
case FORGOTTEN: // very old request and we don't have the response anymore
IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
- rh.ret = rpc_const::atmostonce_failure;
+ rh.ret = rpc_protocol::atmostonce_failure;
rep.pack_header(rh);
c->send(rep);
break;
}
- c->decref();
}
// rpcs::dispatch calls this when an RPC request arrives.
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
if (it == l.end() || it->xid != xid) {
- cerr << "Could not find reply struct in add_reply" << endl;
+ LOG("Could not find reply struct in add_reply");
l.insert(it, reply_t(xid, b));
} else {
*it = reply_t(xid, b);
return 0;
}
-static sockaddr_in make_sockaddr(const string &host, const string &port);
-
static sockaddr_in make_sockaddr(const string &hostandport) {
+ string host = "127.0.0.1";
+ string port = hostandport;
auto colon = hostandport.find(':');
- if (colon == string::npos)
- return make_sockaddr("127.0.0.1", hostandport);
- else
- return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
-}
+ if (colon != string::npos) {
+ host = hostandport.substr(0, colon);
+ port = hostandport.substr(colon+1);
+ }
-static sockaddr_in make_sockaddr(const string &host, const string &port) {
- sockaddr_in dst;
- bzero(&dst, sizeof(dst));
+ sockaddr_in dst{}; // zero initialize
dst.sin_family = AF_INET;
struct in_addr a{inet_addr(host.c_str())};
- if(a.s_addr != INADDR_NONE)
+ if (a.s_addr != INADDR_NONE)
dst.sin_addr.s_addr = a.s_addr;
else {
struct hostent *hp = gethostbyname(host.c_str());
if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
- cerr << "cannot find host name " << host << endl;
+ LOG_NONMEMBER("cannot find host name " << host);
exit(1);
}
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
dst.sin_addr.s_addr = a.s_addr;
}
- dst.sin_port = hton((uint16_t)stoi(port));
+ dst.sin_port = hton((in_port_t)stoi(port));
return dst;
}