object. A connection object is deleted only when the underlying connection is
dead and the reference count reaches zero.
- The previous version of the RPC library uses pthread_cancel* routines
- to implement the deletion of rpcc and rpcs objects. The idea is to cancel
- all active threads that might be holding a reference to an object before
- deleting that object. However, pthread_cancel is not robust and there are
- always bugs where outstanding references to deleted objects persist.
- This version of the RPC library does not do pthread_cancel, but explicitly
- joins exited threads to make sure no outstanding references exist before
- deleting objects.
+ This version of the RPC library explicitly joins exited threads to make sure
+ no outstanding references exist before deleting objects.
To delete a rpcc object safely, the users of the library must ensure that
there are no outstanding calls on the rpcc object.
3. delete the dispatch thread pool which involves waiting for current active
RPC handlers to finish. It is interesting how a thread pool can be deleted
without using thread cancellation. The trick is to inject x "poison pills" for
- a thread pool of x threads. Upon getting a poison pill instead of a normal
+ a thread pool of x threads. Upon getting a poison pill instead of a normal
task, a worker thread will exit (and thread pool destructor waits to join all
x exited worker threads).
*/
+#include "types.h"
#include "rpc.h"
-#include "method_thread.h"
-#include "slock.h"
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
-#include <time.h>
#include <netdb.h>
#include <unistd.h>
-#include "jsl_log.h"
-#include "gettime.h"
-#include "lang/verify.h"
-
const rpcc::TO rpcc::to_max = { 120000 };
const rpcc::TO rpcc::to_min = { 1000 };
-rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
-: xid(xxid), un(xun), done(false)
-{
- VERIFY(pthread_mutex_init(&m,0) == 0);
- VERIFY(pthread_cond_init(&c, 0) == 0);
+inline void set_rand_seed() {
+ auto now = time_point_cast<nanoseconds>(steady_clock::now());
+ srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
-rpcc::caller::~caller()
-{
- VERIFY(pthread_mutex_destroy(&m) == 0);
- VERIFY(pthread_cond_destroy(&c) == 0);
-}
+static sockaddr_in make_sockaddr(const string &hostandport);
-inline
-void set_rand_seed()
+rpcc::rpcc(const string & d, bool retrans) :
+ dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+ retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
{
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- srandom((int)ts.tv_nsec^((int)getpid()));
-}
+ if(retrans){
+ set_rand_seed();
+ clt_nonce_ = (unsigned int)random();
+ } else {
+ // special client nonce 0 means this client does not
+ // require at-most-once logic from the server
+ // because it uses tcp and never retries a failed connection
+ clt_nonce_ = 0;
+ }
-rpcc::rpcc(sockaddr_in d, bool retrans) :
- dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
-{
- VERIFY(pthread_mutex_init(&m_, 0) == 0);
- VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
- VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
-
- if(retrans){
- set_rand_seed();
- clt_nonce_ = random();
- } else {
- // special client nonce 0 means this client does not
- // require at-most-once logic from the server
- // because it uses tcp and never retries a failed connection
- clt_nonce_ = 0;
- }
-
- char *loss_env = getenv("RPC_LOSSY");
- if(loss_env != NULL){
- lossytest_ = atoi(loss_env);
- }
-
- // xid starts with 1 and latest received reply starts with 0
- xid_rep_window_.push_back(0);
-
- jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
- clt_nonce_, lossytest_);
+ char *loss_env = getenv("RPC_LOSSY");
+ if(loss_env)
+ lossytest_ = atoi(loss_env);
+
+ // xid starts with 1 and latest received reply starts with 0
+ xid_rep_window_.push_back(0);
+
+ IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
}
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
-rpcc::~rpcc()
-{
- jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
- clt_nonce_, chan_?chan_->channo():-1);
- if(chan_){
- chan_->closeconn();
- chan_->decref();
- }
- VERIFY(calls_.size() == 0);
- VERIFY(pthread_mutex_destroy(&m_) == 0);
- VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
+rpcc::~rpcc() {
+ IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
+ if(chan_){
+ chan_->closeconn();
+ chan_->decref();
+ }
+ VERIFY(calls_.size() == 0);
}
-int
-rpcc::bind(TO to)
-{
- int r;
- int ret = call(rpc_const::bind, 0, r, to);
- if(ret == 0){
- ScopedLock ml(&m_);
- bind_done_ = true;
- srv_nonce_ = r;
- } else {
- jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
- inet_ntoa(dst_.sin_addr), ret);
- }
- return ret;
+int rpcc::bind(TO to) {
+ unsigned int r;
+ int ret = call_timeout(rpc_const::bind, to, r, 0);
+ if(ret == 0){
+ lock ml(m_);
+ bind_done_ = true;
+ srv_nonce_ = r;
+ } else {
+ IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
+ }
+ return ret;
};
// Cancel all outstanding calls
-void
-rpcc::cancel(void)
-{
- ScopedLock ml(&m_);
- printf("rpcc::cancel: force callers to fail\n");
- std::map<int,caller*>::iterator iter;
- for(iter = calls_.begin(); iter != calls_.end(); iter++){
- caller *ca = iter->second;
+void rpcc::cancel(void) {
+ lock ml(m_);
+ LOG("force callers to fail");
+ for(auto &p : calls_){
+ caller *ca = p.second;
- jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
- {
- ScopedLock cl(&ca->m);
- ca->done = true;
- ca->intret = rpc_const::cancel_failure;
- VERIFY(pthread_cond_signal(&ca->c) == 0);
+ IF_LEVEL(2) LOG("force caller to fail");
+ {
+ lock cl(ca->m);
+ ca->done = true;
+ ca->intret = rpc_const::cancel_failure;
+ ca->c.notify_one();
+ }
}
- }
- while (calls_.size () > 0){
- destroy_wait_ = true;
- VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
- }
- printf("rpcc::cancel: done\n");
+ while (calls_.size () > 0){
+ destroy_wait_ = true;
+ destroy_wait_c_.wait(ml);
+ }
+ LOG("done");
}
-int
-rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
- TO to)
-{
+int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+
+ caller ca(0, &rep);
+ int xid_rep;
+ {
+ lock ml(m_);
+
+ if((proc != rpc_const::bind && !bind_done_) ||
+ (proc == rpc_const::bind && bind_done_)){
+ IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
+ return rpc_const::bind_failure;
+ }
+
+ if(destroy_wait_){
+ return rpc_const::cancel_failure;
+ }
+
+ ca.xid = xid_++;
+ calls_[ca.xid] = &ca;
+
+ req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ xid_rep = xid_rep_window_.front();
+ }
+
+ TO curr_to;
+ auto finaldeadline = steady_clock::now() + milliseconds(to.to),
+ nextdeadline = finaldeadline;
+
+ curr_to.to = to_min.to;
+
+ bool transmit = true;
+ connection *ch = NULL;
+
+ while (1){
+ if(transmit){
+ get_refconn(&ch);
+ if(ch){
+ if(reachable_) {
+ request forgot;
+ {
+ lock ml(m_);
+ if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
+ forgot = dup_req_;
+ dup_req_.clear();
+ }
+ }
+ if (forgot.isvalid())
+ ch->send(forgot.buf);
+ ch->send(req);
+ }
+ else IF_LEVEL(1) LOG("not reachable");
+ IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
+ " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
+ }
+ transmit = false; // only send once on a given channel
+ }
+
+ if(finaldeadline == time_point<steady_clock>::min())
+ break;
+
+ nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
+ if(nextdeadline > finaldeadline) {
+ nextdeadline = finaldeadline;
+ finaldeadline = time_point<steady_clock>::min();
+ }
- caller ca(0, &rep);
- int xid_rep;
- {
- ScopedLock ml(&m_);
-
- if((proc != rpc_const::bind && !bind_done_) ||
- (proc == rpc_const::bind && bind_done_)){
- jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
- return rpc_const::bind_failure;
- }
-
- if(destroy_wait_){
- return rpc_const::cancel_failure;
- }
-
- ca.xid = xid_++;
- calls_[ca.xid] = &ca;
-
- req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
- xid_rep_window_.front());
- req.pack_req_header(h);
- xid_rep = xid_rep_window_.front();
- }
-
- TO curr_to;
- struct timespec now, nextdeadline, finaldeadline;
-
- clock_gettime(CLOCK_REALTIME, &now);
- add_timespec(now, to.to, &finaldeadline);
- curr_to.to = to_min.to;
-
- bool transmit = true;
- connection *ch = NULL;
-
- while (1){
- if(transmit){
- get_refconn(&ch);
- if(ch){
- if(reachable_) {
- request forgot;
- {
- ScopedLock ml(&m_);
- if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
- forgot = dup_req_;
- dup_req_.clear();
- }
- }
- if (forgot.isvalid())
- ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
- ch->send(req.cstr(), req.size());
- }
- else jsl_log(JSL_DBG_1, "not reachable\n");
- jsl_log(JSL_DBG_2,
- "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
- clt_nonce_, proc, ca.xid, clt_nonce_);
- }
- transmit = false; // only send once on a given channel
- }
-
- if(!finaldeadline.tv_sec)
- break;
-
- clock_gettime(CLOCK_REALTIME, &now);
- add_timespec(now, curr_to.to, &nextdeadline);
- if(cmp_timespec(nextdeadline,finaldeadline) > 0){
- nextdeadline = finaldeadline;
- finaldeadline.tv_sec = 0;
- }
-
- {
- ScopedLock cal(&ca.m);
- while (!ca.done){
- jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
- if(pthread_cond_timedwait(&ca.c, &ca.m,
- &nextdeadline) == ETIMEDOUT){
- jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
- break;
- }
- }
- if(ca.done){
- jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
- break;
- }
- }
-
- if(retrans_ && (!ch || ch->isdead())){
- // since connection is dead, retransmit
- // on the new connection
- transmit = true;
- }
- curr_to.to <<= 1;
- }
-
- {
- // no locking of ca.m since only this thread changes ca.xid
- ScopedLock ml(&m_);
- calls_.erase(ca.xid);
- // may need to update the xid again here, in case the
- // packet times out before it's even sent by the channel.
- // I don't think there's any harm in maybe doing it twice
- update_xid_rep(ca.xid);
-
- if(destroy_wait_){
- VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
- }
- }
-
- if (ca.done && lossytest_)
{
- ScopedLock ml(&m_);
- if (!dup_req_.isvalid()) {
- dup_req_.buf.assign(req.cstr(), req.size());
- dup_req_.xid = ca.xid;
+ lock cal(ca.m);
+ while (!ca.done){
+ IF_LEVEL(2) LOG("wait");
+ if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
+ IF_LEVEL(2) LOG("timeout");
+ break;
}
- if (xid_rep > xid_rep_done_)
- xid_rep_done_ = xid_rep;
+ }
+ if(ca.done){
+ IF_LEVEL(2) LOG("reply received");
+ break;
+ }
+ }
+
+ if(retrans_ && (!ch || ch->isdead())){
+ // since connection is dead, retransmit
+ // on the new connection
+ transmit = true;
}
+ curr_to.to <<= 1;
+ }
- ScopedLock cal(&ca.m);
+ {
+ // no locking of ca.m since only this thread changes ca.xid
+ lock ml(m_);
+ calls_.erase(ca.xid);
+ // may need to update the xid again here, in case the
+ // packet times out before it's even sent by the channel.
+ // I don't think there's any harm in maybe doing it twice
+ update_xid_rep(ca.xid);
+
+ if(destroy_wait_){
+ destroy_wait_c_.notify_one();
+ }
+ }
- jsl_log(JSL_DBG_2,
- "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
- clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
- ntohs(dst_.sin_port), ca.done, ca.intret);
+ if (ca.done && lossytest_)
+ {
+ lock ml(m_);
+ if (!dup_req_.isvalid()) {
+ dup_req_.buf = req;
+ dup_req_.xid = ca.xid;
+ }
+ if (xid_rep > xid_rep_done_)
+ xid_rep_done_ = xid_rep;
+ }
+
+ lock cal(ca.m);
+
+ IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
+ " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
+ ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
- if(ch)
- ch->decref();
+ if(ch)
+ ch->decref();
- // destruction of req automatically frees its buffer
- return (ca.done? ca.intret : rpc_const::timeout_failure);
+ // destruction of req automatically frees its buffer
+ return (ca.done? ca.intret : rpc_const::timeout_failure);
}
void
rpcc::get_refconn(connection **ch)
{
- ScopedLock ml(&chan_m_);
- if(!chan_ || chan_->isdead()){
- if(chan_)
- chan_->decref();
- chan_ = connect_to_dst(dst_, this, lossytest_);
- }
- if(ch && chan_){
- if(*ch){
- (*ch)->decref();
- }
- *ch = chan_;
- (*ch)->incref();
- }
+ lock ml(chan_m_);
+ if(!chan_ || chan_->isdead()){
+ if(chan_)
+ chan_->decref();
+ chan_ = connect_to_dst(dst_, this, lossytest_);
+ }
+ if(ch && chan_){
+ if(*ch){
+ (*ch)->decref();
+ }
+ *ch = chan_;
+ (*ch)->incref();
+ }
}
-// PollMgr's thread is being used to
-// make this upcall from connection object to rpcc.
+// PollMgr's thread is being used to
+// make this upcall from connection object to rpcc.
// this funtion must not block.
//
-// this function keeps no reference for connection *c
+// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *c, char *b, int sz)
+rpcc::got_pdu(connection *, const string & b)
{
- unmarshall rep(b, sz);
- reply_header h;
- rep.unpack_reply_header(&h);
-
- if(!rep.ok()){
- jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
- return true;
- }
-
- ScopedLock ml(&m_);
-
- update_xid_rep(h.xid);
-
- if(calls_.find(h.xid) == calls_.end()){
- jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
- return true;
- }
- caller *ca = calls_[h.xid];
-
- ScopedLock cl(&ca->m);
- if(!ca->done){
- ca->un->take_in(rep);
- ca->intret = h.ret;
- if(ca->intret < 0){
- jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
- h.xid, ca->intret);
- }
- ca->done = 1;
- }
- VERIFY(pthread_cond_broadcast(&ca->c) == 0);
- return true;
+ unmarshall rep(b, true);
+ reply_header h;
+ rep.unpack_header(h);
+
+ if(!rep.ok()){
+ IF_LEVEL(1) LOG("unmarshall header failed!!!");
+ return true;
+ }
+
+ lock ml(m_);
+
+ update_xid_rep(h.xid);
+
+ if(calls_.find(h.xid) == calls_.end()){
+ IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
+ return true;
+ }
+ caller *ca = calls_[h.xid];
+
+ lock cl(ca->m);
+ if(!ca->done){
+ *ca->rep = b;
+ ca->intret = h.ret;
+ if(ca->intret < 0){
+ IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
+ }
+ ca->done = 1;
+ }
+ ca->c.notify_all();
+ return true;
}
// assumes thread holds mutex m
-void
-rpcc::update_xid_rep(unsigned int xid)
+void
+rpcc::update_xid_rep(int xid)
{
- std::list<unsigned int>::iterator it;
-
- if(xid <= xid_rep_window_.front()){
- return;
- }
+ if(xid <= xid_rep_window_.front()){
+ return;
+ }
- for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
- if(*it > xid){
- xid_rep_window_.insert(it, xid);
- goto compress;
- }
- }
- xid_rep_window_.push_back(xid);
+ for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+ if(*it > xid){
+ xid_rep_window_.insert(it, xid);
+ goto compress;
+ }
+ }
+ xid_rep_window_.push_back(xid);
compress:
- it = xid_rep_window_.begin();
- for (it++; it != xid_rep_window_.end(); it++){
- while (xid_rep_window_.front() + 1 == *it)
- xid_rep_window_.pop_front();
- }
+ auto it = xid_rep_window_.begin();
+ for (it++; it != xid_rep_window_.end(); it++){
+ while (xid_rep_window_.front() + 1 == *it)
+ xid_rep_window_.pop_front();
+ }
}
-
-rpcs::rpcs(unsigned int p1, int count)
- : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
+rpcs::rpcs(in_port_t p1, size_t count)
+ : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
{
- VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
- VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
- VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
- VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
-
- set_rand_seed();
- nonce_ = random();
- jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
-
- char *loss_env = getenv("RPC_LOSSY");
- if(loss_env != NULL){
- lossytest_ = atoi(loss_env);
- }
+ set_rand_seed();
+ nonce_ = (unsigned int)random();
+ IF_LEVEL(2) LOG("created with nonce " << nonce_);
- reg(rpc_const::bind, this, &rpcs::rpcbind);
- dispatchpool_ = new ThrPool(6,false);
+ reg(rpc_const::bind, &rpcs::rpcbind, this);
+ dispatchpool_ = new ThrPool(6, false);
- listener_ = new tcpsconn(this, port_, lossytest_);
+ char *loss_env = getenv("RPC_LOSSY");
+ listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
}
rpcs::~rpcs()
{
- // must delete listener before dispatchpool
- delete listener_;
- delete dispatchpool_;
- free_reply_window();
+ // must delete listener before dispatchpool
+ delete listener_;
+ delete dispatchpool_;
+ free_reply_window();
}
bool
-rpcs::got_pdu(connection *c, char *b, int sz)
+rpcs::got_pdu(connection *c, const string & b)
{
- if(!reachable_){
- jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
- return true;
- }
+ if(!reachable_){
+ IF_LEVEL(1) LOG("not reachable");
+ return true;
+ }
- djob_t *j = new djob_t(c, b, sz);
- c->incref();
- bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
- if(!succ || !reachable_){
- c->decref();
- delete j;
- }
- return succ;
+ djob_t *j = new djob_t{c, b};
+ c->incref();
+ bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
+ if(!succ || !reachable_){
+ c->decref();
+ delete j;
+ }
+ return succ;
}
void
-rpcs::reg1(unsigned int proc, handler *h)
+rpcs::reg1(proc_t proc, handler *h)
{
- ScopedLock pl(&procs_m_);
- VERIFY(procs_.count(proc) == 0);
- procs_[proc] = h;
- VERIFY(procs_.count(proc) >= 1);
+ lock pl(procs_m_);
+ VERIFY(procs_.count(proc) == 0);
+ procs_[proc] = h;
+ VERIFY(procs_.count(proc) >= 1);
}
void
-rpcs::updatestat(unsigned int proc)
-{
- ScopedLock cl(&count_m_);
- counts_[proc]++;
- curr_counts_--;
- if(curr_counts_ == 0){
- std::map<int, int>::iterator i;
- printf("RPC STATS: ");
- for (i = counts_.begin(); i != counts_.end(); i++){
- printf("%x:%d ", i->first, i->second);
- }
- printf("\n");
-
- ScopedLock rwl(&reply_window_m_);
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
-
- unsigned int totalrep = 0, maxrep = 0;
- for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- totalrep += clt->second.size();
- if(clt->second.size() > maxrep)
- maxrep = clt->second.size();
- }
- jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
- (int) reply_window_.size()-1, totalrep, maxrep);
- curr_counts_ = counting_;
- }
+rpcs::updatestat(proc_t proc)
+{
+ lock cl(count_m_);
+ counts_[proc]++;
+ curr_counts_--;
+ if(curr_counts_ == 0){
+ LOG("RPC STATS: ");
+ for (auto i = counts_.begin(); i != counts_.end(); i++)
+ LOG(hex << i->first << ":" << dec << i->second);
+
+ lock rwl(reply_window_m_);
+
+ size_t totalrep = 0, maxrep = 0;
+ for (auto clt : reply_window_) {
+ totalrep += clt.second.size();
+ if(clt.second.size() > maxrep)
+ maxrep = clt.second.size();
+ }
+ IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
+ totalrep << " max per client " << maxrep);
+ curr_counts_ = counting_;
+ }
}
void
rpcs::dispatch(djob_t *j)
{
- connection *c = j->conn;
- unmarshall req(j->buf, j->sz);
- delete j;
-
- req_header h;
- req.unpack_req_header(&h);
- int proc = h.proc;
-
- if(!req.ok()){
- jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
- c->decref();
- return;
- }
-
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
- h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
-
- marshall rep;
- reply_header rh(h.xid,0);
-
- // is client sending to an old instance of server?
- if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
- h.srv_nonce, nonce_, h.proc);
- rh.ret = rpc_const::oldsrv_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
- return;
- }
-
- handler *f;
- // is RPC proc a registered procedure?
- {
- ScopedLock pl(&procs_m_);
- if(procs_.count(proc) < 1){
- fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
- proc);
- c->decref();
- VERIFY(0);
- return;
- }
-
- f = procs_[proc];
- }
-
- rpcs::rpcstate_t stat;
- char *b1;
- int sz1;
-
- if(h.clt_nonce){
- // have i seen this client before?
- {
- ScopedLock rwl(&reply_window_m_);
- // if we don't know about this clt_nonce, create a cleanup object
- if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
- VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+ connection *c = j->conn;
+ unmarshall req(j->buf, true);
+ delete j;
+
+ request_header h;
+ req.unpack_header(h);
+ proc_t proc = h.proc;
+
+ if(!req.ok()){
+ IF_LEVEL(1) LOG("unmarshall header failed!!!");
+ c->decref();
+ return;
+ }
+
+ IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+ dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
+
+ marshall rep;
+ reply_header rh{h.xid,0};
+
+ // is client sending to an old instance of server?
+ if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
+ IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
+ " (current " << nonce_ << ") proc " << hex << h.proc);
+ rh.ret = rpc_const::oldsrv_failure;
+ rep.pack_header(rh);
+ c->send(rep);
+ return;
+ }
+
+ handler *f;
+ // is RPC proc a registered procedure?
+ {
+ lock pl(procs_m_);
+ if(procs_.count(proc) < 1){
+ cerr << "unknown proc " << hex << proc << "." << endl;
+ c->decref();
+ VERIFY(0);
+ return;
+ }
+
+ f = procs_[proc];
+ }
+
+ rpcs::rpcstate_t stat;
+ string b1;
+
+ if(h.clt_nonce){
+ // have i seen this client before?
+ {
+ lock rwl(reply_window_m_);
+ // if we don't know about this clt_nonce, create a cleanup object
+ if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
+ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
- h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
- }
- }
-
- // save the latest good connection to the client
- {
- ScopedLock rwl(&conss_m_);
- if(conns_.find(h.clt_nonce) == conns_.end()){
- c->incref();
- conns_[h.clt_nonce] = c;
- } else if(conns_[h.clt_nonce]->compare(c) < 0){
- conns_[h.clt_nonce]->decref();
- c->incref();
- conns_[h.clt_nonce] = c;
- }
- }
-
- stat = checkduplicate_and_update(h.clt_nonce, h.xid,
- h.xid_rep, &b1, &sz1);
- } else {
- // this client does not require at most once logic
- stat = NEW;
- }
-
- switch (stat){
- case NEW: // new request
- if(counting_){
- updatestat(proc);
- }
-
- rh.ret = f->fn(req, rep);
- if (rh.ret == rpc_const::unmarshal_args_failure) {
- fprintf(stderr, "rpcs::dispatch: failed to"
- " unmarshall the arguments. You are"
- " probably calling RPC 0x%x with wrong"
- " types of arguments.\n", proc);
- VERIFY(0);
- }
- VERIFY(rh.ret >= 0);
-
- rep.pack_reply_header(rh);
- rep.take_buf(&b1,&sz1);
-
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
- sz1, h.xid, proc, rh.ret, h.clt_nonce);
-
- if(h.clt_nonce > 0){
- // only record replies for clients that require at-most-once logic
- add_reply(h.clt_nonce, h.xid, b1, sz1);
- }
-
- // get the latest connection to the client
- {
- ScopedLock rwl(&conss_m_);
- if(c->isdead() && c != conns_[h.clt_nonce]){
- c->decref();
- c = conns_[h.clt_nonce];
- c->incref();
- }
- }
-
- c->send(b1, sz1);
- if(h.clt_nonce == 0){
- // reply is not added to at-most-once window, free it
- free(b1);
- }
- break;
- case INPROGRESS: // server is working on this request
- break;
- case DONE: // duplicate and we still have the response
- c->send(b1, sz1);
- break;
- case FORGOTTEN: // very old request and we don't have the response anymore
- jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
- h.xid, h.clt_nonce);
- rh.ret = rpc_const::atmostonce_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
- break;
- }
- c->decref();
+ IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
+ " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
+ }
+ }
+
+ // save the latest good connection to the client
+ {
+ lock rwl(conss_m_);
+ if(conns_.find(h.clt_nonce) == conns_.end()){
+ c->incref();
+ conns_[h.clt_nonce] = c;
+ } else if(conns_[h.clt_nonce]->compare(c) < 0){
+ conns_[h.clt_nonce]->decref();
+ c->incref();
+ conns_[h.clt_nonce] = c;
+ }
+ }
+
+ stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
+ } else {
+ // this client does not require at most once logic
+ stat = NEW;
+ }
+
+ switch (stat) {
+ case NEW: // new request
+ if (counting_){
+ updatestat(proc);
+ }
+
+ rh.ret = (*f)(req, rep);
+ if (rh.ret == rpc_const::unmarshal_args_failure) {
+ cerr << "failed to unmarshall the arguments. You are " <<
+ "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+ "types of arguments." << endl;
+ VERIFY(0);
+ }
+ VERIFY(rh.ret >= 0);
+
+ rep.pack_header(rh);
+ b1 = rep;
+
+ IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
+ h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
+
+ if (h.clt_nonce > 0) {
+ // only record replies for clients that require at-most-once logic
+ add_reply(h.clt_nonce, h.xid, b1);
+ }
+
+ // get the latest connection to the client
+ {
+ lock rwl(conss_m_);
+ if(c->isdead() && c != conns_[h.clt_nonce]){
+ c->decref();
+ c = conns_[h.clt_nonce];
+ c->incref();
+ }
+ }
+
+ c->send(rep);
+ break;
+ case INPROGRESS: // server is working on this request
+ break;
+ case DONE: // duplicate and we still have the response
+ c->send(b1);
+ break;
+ case FORGOTTEN: // very old request and we don't have the response anymore
+ IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
+ rh.ret = rpc_const::atmostonce_failure;
+ rep.pack_header(rh);
+ c->send(rep);
+ break;
+ }
+ c->decref();
}
// rpcs::dispatch calls this when an RPC request arrives.
// returns one of:
// NEW: never seen this xid before.
// INPROGRESS: seen this xid, and still processing it.
-// DONE: seen this xid, previous reply returned in *b and *sz.
+// DONE: seen this xid, previous reply returned in b.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
-rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
- unsigned int xid_rep, char **b, int *sz)
+rpcs::rpcstate_t
+rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
+ int xid_rep, string & b)
{
- ScopedLock rwl(&reply_window_m_);
+ lock rwl(reply_window_m_);
- std::list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> &l = reply_window_[clt_nonce];
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
- unsigned int past_xid_rep = l.begin()->xid;
+ int past_xid_rep = l.begin()->xid;
- std::list<reply_t>::iterator start = l.begin(), it;
- it = ++start;
+ list<reply_t>::iterator start = l.begin(), it = ++start;
- if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
+ if (past_xid_rep < xid_rep || past_xid_rep == -1) {
// scan for deletion candidates
- for (; it != l.end() && it->xid < xid_rep; it++) {
- if (it->cb_present)
- free(it->buf);
- }
+ while (it != l.end() && it->xid < xid_rep)
+ it++;
l.erase(start, it);
l.begin()->xid = xid_rep;
}
- if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
+ if (xid < past_xid_rep && past_xid_rep != -1)
return FORGOTTEN;
// skip non-deletion candidates
if (it != l.end() && it->xid == xid) {
if (it->cb_present) {
// return information about the remembered reply
- *b = it->buf;
- *sz = it->sz;
+ b = it->buf;
return DONE;
- } else {
- return INPROGRESS;
}
+ return INPROGRESS;
} else {
// remember that a new request has arrived
l.insert(it, reply_t(xid));
}
// rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
-// and passes the return value in b and sz.
-// add_reply() should remember b and sz.
-// free_reply_window() and checkduplicate_and_update is responsible for
-// calling free(b).
-void
-rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
- char *b, int sz)
-{
- ScopedLock rwl(&reply_window_m_);
+// and passes the return value in b.
+// add_reply() should remember b.
+// free_reply_window() and checkduplicate_and_update are responsible for
+// cleaning up the remembered values.
+void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
+ lock rwl(reply_window_m_);
// remember the RPC reply value
- std::list<reply_t> &l = reply_window_[clt_nonce];
- std::list<reply_t>::iterator it = l.begin();
+ list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t>::iterator it = l.begin();
// skip to our place in the list
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
if (it == l.end() || it->xid != xid) {
- fprintf(stderr, "Could not find reply struct in add_reply");
- l.insert(it, reply_t(xid, b, sz));
+ cerr << "Could not find reply struct in add_reply" << endl;
+ l.insert(it, reply_t(xid, b));
} else {
- *it = reply_t(xid, b, sz);
+ *it = reply_t(xid, b);
}
}
-void
-rpcs::free_reply_window(void)
-{
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
- std::list<reply_t>::iterator it;
-
- ScopedLock rwl(&reply_window_m_);
- for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- for (it = clt->second.begin(); it != clt->second.end(); it++){
- if (it->cb_present)
- free(it->buf);
- }
- clt->second.clear();
- }
- reply_window_.clear();
-}
-
-// rpc handler
-int
-rpcs::rpcbind(int a, int &r)
-{
- jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
- r = nonce_;
- return 0;
-}
-
-void
-marshall::rawbyte(unsigned char x)
-{
- if(_ind >= _capa){
- _capa *= 2;
- VERIFY (_buf != NULL);
- _buf = (char *)realloc(_buf, _capa);
- VERIFY(_buf);
- }
- _buf[_ind++] = x;
-}
-
-void
-marshall::rawbytes(const char *p, int n)
-{
- if((_ind+n) > _capa){
- _capa = _capa > n? 2*_capa:(_capa+n);
- VERIFY (_buf != NULL);
- _buf = (char *)realloc(_buf, _capa);
- VERIFY(_buf);
- }
- memcpy(_buf+_ind, p, n);
- _ind += n;
-}
-
-marshall &
-operator<<(marshall &m, bool x)
-{
- m.rawbyte(x);
- return m;
-}
-
-marshall &
-operator<<(marshall &m, unsigned char x)
-{
- m.rawbyte(x);
- return m;
-}
-
-marshall &
-operator<<(marshall &m, char x)
-{
- m << (unsigned char) x;
- return m;
-}
-
-
-marshall &
-operator<<(marshall &m, unsigned short x)
-{
- m.rawbyte((x >> 8) & 0xff);
- m.rawbyte(x & 0xff);
- return m;
+void rpcs::free_reply_window(void) {
+ lock rwl(reply_window_m_);
+ reply_window_.clear();
}
-marshall &
-operator<<(marshall &m, short x)
-{
- m << (unsigned short) x;
- return m;
-}
-
-marshall &
-operator<<(marshall &m, unsigned int x)
-{
- // network order is big-endian
- m.rawbyte((x >> 24) & 0xff);
- m.rawbyte((x >> 16) & 0xff);
- m.rawbyte((x >> 8) & 0xff);
- m.rawbyte(x & 0xff);
- return m;
-}
-
-marshall &
-operator<<(marshall &m, int x)
-{
- m << (unsigned int) x;
- return m;
-}
-
-marshall &
-operator<<(marshall &m, const std::string &s)
-{
- m << (unsigned int) s.size();
- m.rawbytes(s.data(), s.size());
- return m;
-}
-
-marshall &
-operator<<(marshall &m, unsigned long long x)
-{
- m << (unsigned int) (x >> 32);
- m << (unsigned int) x;
- return m;
+int rpcs::rpcbind(unsigned int &r, int) {
+ IF_LEVEL(2) LOG("called return nonce " << nonce_);
+ r = nonce_;
+ return 0;
}
-void
-marshall::pack(int x)
-{
- rawbyte((x >> 24) & 0xff);
- rawbyte((x >> 16) & 0xff);
- rawbyte((x >> 8) & 0xff);
- rawbyte(x & 0xff);
-}
+static sockaddr_in make_sockaddr(const string &host, const string &port);
-void
-unmarshall::unpack(int *x)
-{
- (*x) = (rawbyte() & 0xff) << 24;
- (*x) |= (rawbyte() & 0xff) << 16;
- (*x) |= (rawbyte() & 0xff) << 8;
- (*x) |= rawbyte() & 0xff;
+static sockaddr_in make_sockaddr(const string &hostandport) {
+ auto colon = hostandport.find(':');
+ if (colon == string::npos)
+ return make_sockaddr("127.0.0.1", hostandport);
+ else
+ return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
}
-// take the contents from another unmarshall object
-void
-unmarshall::take_in(unmarshall &another)
-{
- if(_buf)
- free(_buf);
- another.take_buf(&_buf, &_sz);
- _ind = RPC_HEADER_SZ;
- _ok = _sz >= RPC_HEADER_SZ?true:false;
-}
+static sockaddr_in make_sockaddr(const string &host, const string &port) {
+ sockaddr_in dst;
+ bzero(&dst, sizeof(dst));
+ dst.sin_family = AF_INET;
-bool
-unmarshall::okdone()
-{
- if(ok() && _ind == _sz){
- return true;
- } else {
- return false;
- }
-}
+ struct in_addr a{inet_addr(host.c_str())};
-unsigned int
-unmarshall::rawbyte()
-{
- char c = 0;
- if(_ind >= _sz)
- _ok = false;
- else
- c = _buf[_ind++];
- return c;
-}
+ if(a.s_addr != INADDR_NONE)
+ dst.sin_addr.s_addr = a.s_addr;
+ else {
+ struct hostent *hp = gethostbyname(host.c_str());
-unmarshall &
-operator>>(unmarshall &u, bool &x)
-{
- x = (bool) u.rawbyte() ;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned char &x)
-{
- x = (unsigned char) u.rawbyte() ;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, char &x)
-{
- x = (char) u.rawbyte();
- return u;
-}
-
-
-unmarshall &
-operator>>(unmarshall &u, unsigned short &x)
-{
- x = (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, short &x)
-{
- x = (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned int &x)
-{
- x = (u.rawbyte() & 0xff) << 24;
- x |= (u.rawbyte() & 0xff) << 16;
- x |= (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, int &x)
-{
- x = (u.rawbyte() & 0xff) << 24;
- x |= (u.rawbyte() & 0xff) << 16;
- x |= (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned long long &x)
-{
- unsigned int h, l;
- u >> h;
- u >> l;
- x = l | ((unsigned long long) h << 32);
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, std::string &s)
-{
- unsigned sz;
- u >> sz;
- if(u.ok())
- u.rawbytes(s, sz);
- return u;
-}
-
-void
-unmarshall::rawbytes(std::string &ss, unsigned int n)
-{
- if((_ind+n) > (unsigned)_sz){
- _ok = false;
- } else {
- std::string tmps = std::string(_buf+_ind, n);
- swap(ss, tmps);
- VERIFY(ss.size() == n);
- _ind += n;
- }
-}
-
-bool operator<(const sockaddr_in &a, const sockaddr_in &b){
- return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
- ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
- ((a.sin_port < b.sin_port))));
-}
-
-/*---------------auxilary function--------------*/
-void
-make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
-
- char host[200];
- const char *localhost = "127.0.0.1";
- const char *port = index(hostandport, ':');
- if(port == NULL){
- memcpy(host, localhost, strlen(localhost)+1);
- port = hostandport;
- } else {
- memcpy(host, hostandport, port-hostandport);
- host[port-hostandport] = '\0';
- port++;
- }
-
- make_sockaddr(host, port, dst);
-
-}
-
-void
-make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
-
- in_addr_t a;
-
- bzero(dst, sizeof(*dst));
- dst->sin_family = AF_INET;
-
- a = inet_addr(host);
- if(a != INADDR_NONE){
- dst->sin_addr.s_addr = a;
- } else {
- struct hostent *hp = gethostbyname(host);
- if(hp == 0 || hp->h_length != 4){
- fprintf(stderr, "cannot find host name %s\n", host);
- exit(1);
- }
- dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
- }
- dst->sin_port = htons(atoi(port));
-}
-
-int
-cmp_timespec(const struct timespec &a, const struct timespec &b)
-{
- if(a.tv_sec > b.tv_sec)
- return 1;
- else if(a.tv_sec < b.tv_sec)
- return -1;
- else {
- if(a.tv_nsec > b.tv_nsec)
- return 1;
- else if(a.tv_nsec < b.tv_nsec)
- return -1;
- else
- return 0;
- }
-}
-
-void
-add_timespec(const struct timespec &a, int b, struct timespec *result)
-{
- // convert to millisec, add timeout, convert back
- result->tv_sec = a.tv_sec + b/1000;
- result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
- VERIFY(result->tv_nsec >= 0);
- while (result->tv_nsec > 1000000000){
- result->tv_sec++;
- result->tv_nsec-=1000000000;
- }
-}
-
-int
-diff_timespec(const struct timespec &end, const struct timespec &start)
-{
- int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
- VERIFY(diff || end.tv_sec == start.tv_sec);
- if(end.tv_nsec > start.tv_nsec){
- diff += (end.tv_nsec-start.tv_nsec)/1000000;
- } else {
- diff -= (start.tv_nsec-end.tv_nsec)/1000000;
- }
- return diff;
+ if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
+ cerr << "cannot find host name " << host << endl;
+ exit(1);
+ }
+ memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
+ dst.sin_addr.s_addr = a.s_addr;
+ }
+ dst.sin_port = hton((in_port_t)stoi(port));
+ return dst;
}