srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
-static sockaddr_in make_sockaddr(const string &hostandport);
+static sockaddr_in make_sockaddr(const string & hostandport);
-rpcc::rpcc(const string & d, bool retrans) :
- dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
+rpcc::rpcc(const string & d) : dst_(make_sockaddr(d))
{
- if (retrans) {
- set_rand_seed();
- clt_nonce_ = (nonce_t)random();
- } else {
- // special client nonce 0 means this client does not
- // require at-most-once logic from the server
- // because it uses tcp and never retries a failed connection
- clt_nonce_ = 0;
- }
+ set_rand_seed();
+ clt_nonce_ = (nonce_t)random();
char *loss_env = getenv("RPC_LOSSY");
if (loss_env)
lossytest_ = atoi(loss_env);
- // xid starts with 1 and latest received reply starts with 0
- xid_rep_window_.push_back(0);
-
IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
}
int rpcc::bind(milliseconds to) {
nonce_t r;
- int ret = call_timeout(rpc_protocol::bind, to, r);
+ rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r);
if (ret == 0) {
lock ml(m_);
bind_done_ = true;
lock ml(m_);
if (calls_.size()) {
LOG("force callers to fail");
- for (auto &p : calls_) {
+ for (auto & p : calls_) {
caller *ca = p.second;
IF_LEVEL(2) LOG("force caller to fail");
}
}
-int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
+int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
caller ca(0, &rep);
xid_t xid_rep;
while (1) {
if (transmit) {
- get_refconn(ch);
+ get_latest_connection(ch);
if (ch) {
if (reachable_) {
request forgot;
if (nextdeadline >= finaldeadline)
break;
- if (retrans_ && (!ch || ch->isdead())) {
- // since connection is dead, retransmit
- // on the new connection
+ // retransmit on new connection if connection is dead
+ if (!ch || ch->isdead())
transmit = true;
- }
}
{
// may need to update the xid again here, in case the
// packet times out before it's even sent by the channel.
// I don't think there's any harm in maybe doing it twice
- update_xid_rep(ca.xid);
+ update_xid_rep(ca.xid, ml);
if (destroy_wait_)
destroy_wait_c_.notify_one();
return (ca.done? ca.intret : rpc_protocol::timeout_failure);
}
-void rpcc::get_refconn(shared_ptr<connection> & ch) {
+void rpcc::get_latest_connection(shared_ptr<connection> & ch) {
lock ml(chan_m_);
if (!chan_ || chan_->isdead())
chan_ = connection::to_dst(dst_, this, lossytest_);
lock ml(m_);
- update_xid_rep(h.xid);
+ update_xid_rep(h.xid, ml);
if (calls_.find(h.xid) == calls_.end()) {
IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
return true;
}
-// assumes thread holds mutex m
-void
-rpcc::update_xid_rep(int xid)
-{
+void rpcc::update_xid_rep(xid_t xid, lock & m_lock) {
+ VERIFY(m_lock);
if (xid <= xid_rep_window_.front())
return;
}
}
-rpcs::rpcs(in_port_t p1)
- : port_(p1), reachable_ (true)
+rpcs::rpcs(in_port_t p1) : port_(p1)
{
set_rand_seed();
nonce_ = (nonce_t)random();
IF_LEVEL(2) LOG("created with nonce " << nonce_);
reg(rpc_protocol::bind, &rpcs::rpcbind, this);
- dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
}
void rpcs::start() {
// must delete listener before dispatchpool
listener_ = nullptr;
dispatchpool_ = nullptr;
- free_reply_window();
}
bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b));
}
-void rpcs::reg1(proc_id_t proc, handler *h) {
- lock pl(procs_m_);
- VERIFY(procs_.count(proc) == 0);
- procs_[proc] = h;
- VERIFY(procs_.count(proc) >= 1);
-}
-
void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
unmarshall req(buf, true);
f = procs_[proc];
}
- rpcs::rpcstate_t stat;
- string b1;
-
- if (h.clt_nonce) {
- // have i seen this client before?
- {
- lock rwl(reply_window_m_);
- // if we don't know about this clt_nonce, create a cleanup object
- if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
- VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
- reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
- " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
- }
- }
-
- // save the latest good connection to the client
- {
- lock rwl(conns_m_);
- if (conns_.find(h.clt_nonce) == conns_.end())
- conns_[h.clt_nonce] = c;
- else if (conns_[h.clt_nonce]->create_time < c->create_time)
- conns_[h.clt_nonce] = c;
+ // have i seen this client before?
+ {
+ lock rwl(reply_window_m_);
+ // if we don't know about this clt_nonce, create a cleanup object
+ if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
+ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+ reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
+ IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
+ " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
}
+ }
- stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
- } else {
- // this client does not require at most once logic
- stat = NEW;
+ // save the latest good connection to the client
+ {
+ lock rwl(conns_m_);
+ if (conns_.find(h.clt_nonce) == conns_.end())
+ conns_[h.clt_nonce] = c;
+ else if (conns_[h.clt_nonce]->create_time < c->create_time)
+ conns_[h.clt_nonce] = c;
}
- switch (stat) {
+ string b1;
+
+ switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) {
case NEW: // new request
- rh.ret = (*f)(req, rep);
- if (rh.ret == rpc_protocol::unmarshal_args_failure) {
+ rh.ret = (*f)(forward<unmarshall>(req), rep);
+ if (rh.ret == rpc_protocol::unmarshall_args_failure) {
LOG("failed to unmarshall the arguments. You are " <<
"probably calling RPC 0x" << hex << proc << " with the wrong " <<
"types of arguments.");
IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
- if (h.clt_nonce > 0) {
- // only record replies for clients that require at-most-once logic
- add_reply(h.clt_nonce, h.xid, b1);
- }
+ add_reply(h.clt_nonce, h.xid, b1);
// get the latest connection to the client
{
// DONE: seen this xid, previous reply returned in b.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+rpcs::check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
xid_t xid_rep, string & b)
{
lock rwl(reply_window_m_);
- list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> & l = reply_window_[clt_nonce];
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
// rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
// and passes the return value in b.
// add_reply() should remember b.
-// free_reply_window() and checkduplicate_and_update are responsible for
-// cleaning up the remembered values.
void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
lock rwl(reply_window_m_);
// remember the RPC reply value
- list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> & l = reply_window_[clt_nonce];
list<reply_t>::iterator it = l.begin();
// skip to our place in the list
for (it++; it != l.end() && it->xid < xid; it++);
}
}
-void rpcs::free_reply_window(void) {
- lock rwl(reply_window_m_);
- reply_window_.clear();
-}
-
-int rpcs::rpcbind(nonce_t &r) {
+rpc_protocol::status rpcs::rpcbind(nonce_t & r) {
IF_LEVEL(2) LOG("called return nonce " << nonce_);
r = nonce_;
return 0;
}
-static sockaddr_in make_sockaddr(const string &hostandport) {
+static sockaddr_in make_sockaddr(const string & hostandport) {
string host = "127.0.0.1";
string port = hostandport;
auto colon = hostandport.find(':');