X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/eb3d5c6416c0f0d1cad35e52af3231de7866fea8..c06ef44e7af1571710fd31dd0ab068dd77b1eb2d:/rpc/rpc.cc diff --git a/rpc/rpc.cc b/rpc/rpc.cc index a451c9f..5d28f8f 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -60,17 +60,14 @@ #include #include -inline void set_rand_seed() { - auto now = time_point_cast(steady_clock::now()); - srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); -} +using std::list; +using namespace std::chrono; static sockaddr_in make_sockaddr(const string & hostandport); rpcc::rpcc(const string & d) : dst_(make_sockaddr(d)) { - set_rand_seed(); - clt_nonce_ = (nonce_t)random(); + clt_nonce_ = (nonce_t)global->random_generator(); char *loss_env = getenv("RPC_LOSSY"); if (loss_env) @@ -82,14 +79,15 @@ rpcc::rpcc(const string & d) : dst_(make_sockaddr(d)) // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { - cancel(); + lock ml(m_); + cancel(ml); IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1); chan_.reset(); VERIFY(calls_.size() == 0); } int rpcc::bind(milliseconds to) { - nonce_t r; + nonce_t r = 0; rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r); if (ret == 0) { lock ml(m_); @@ -101,9 +99,29 @@ int rpcc::bind(milliseconds to) { return ret; } +shared_ptr rpcc::bind_cached(const string & destination) { + auto client = global->get_handle(destination); + lock cl = lock(client->bind_m_); + if (!client->bind_done_) { + LOG_NONMEMBER << "bind(\"" << destination << "\")"; + int ret = client->bind(milliseconds(1000)); + if (ret < 0) { + LOG_NONMEMBER << "bind failure! " << destination << " " << ret; + client.reset(); + } else { + LOG_NONMEMBER << "bind succeeded " << destination; + } + } + return client; +} + +void rpcc::unbind_cached(const string & destination) { + global->erase_handle(destination); +} + // Cancel all outstanding calls -void rpcc::cancel(void) { - lock ml(m_); +void rpcc::cancel(lock & m_lock) { + VERIFY(m_lock); if (calls_.size()) { LOG << "force callers to fail"; for (auto & p : calls_) { @@ -119,7 +137,7 @@ void rpcc::cancel(void) { destroy_wait_ = true; while (calls_.size () > 0) - destroy_wait_c_.wait(ml); + destroy_wait_c_.wait(m_lock); LOG << "done"; } @@ -143,7 +161,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { ca.xid = xid_++; calls_[ca.xid] = &ca; - req.pack_header(rpc_protocol::request_header{ + req.write_header(rpc_protocol::request_header{ ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front() }); xid_rep = xid_rep_window_.front(); @@ -173,20 +191,20 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { ch->send(req); } else IF_LEVEL(1) LOG << "not reachable"; - IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << hex << proc - << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_; + IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc + << " xid " << std::dec << ca.xid << " clt_nonce " << clt_nonce_; } transmit = false; // only send once on a given channel } - auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline); + auto nextdeadline = std::min(steady_clock::now() + curr_to, finaldeadline); curr_to *= 2; { lock cal(ca.m); while (!ca.done) { IF_LEVEL(2) LOG << "wait"; - if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) { + if (ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout) { IF_LEVEL(2) LOG << "timeout"; break; } @@ -231,8 +249,8 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { lock cal(ca.m); - IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << hex << proc - << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" + IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc + << " xid " << std::dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret; // destruction of req automatically frees its buffer @@ -258,7 +276,7 @@ rpcc::got_pdu(const shared_ptr &, const string & b) { unmarshall rep(b, true); rpc_protocol::reply_header h; - rep.unpack_header(h); + rep.read_header(h); if (!rep.ok()) { IF_LEVEL(1) LOG << "unmarshall header failed!!!"; @@ -311,8 +329,7 @@ compress: rpcs::rpcs(in_port_t p1) : port_(p1) { - set_rand_seed(); - nonce_ = (nonce_t)random(); + nonce_ = (nonce_t)global->random_generator(); IF_LEVEL(2) LOG << "created with nonce " << nonce_; reg(rpc_protocol::bind, &rpcs::rpcbind, this); @@ -342,7 +359,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { unmarshall req(buf, true); rpc_protocol::request_header h; - req.unpack_header(h); + req.read_header(h); proc_id_t proc = h.proc; if (!req.ok()) { @@ -350,8 +367,8 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { return; } - IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << hex << proc << ", last_rep " - << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; + IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep " + << std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; marshall rep; rpc_protocol::reply_header rh{h.xid,0}; @@ -359,9 +376,9 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { // is client sending to an old instance of server? if (h.srv_nonce != 0 && h.srv_nonce != nonce_) { IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce - << " (current " << nonce_ << ") proc " << hex << h.proc; + << " (current " << nonce_ << ") proc " << std::hex << h.proc; rh.ret = rpc_protocol::oldsrv_failure; - rep.pack_header(rh); + rep.write_header(rh); c->send(rep); return; } @@ -371,7 +388,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { { lock pl(procs_m_); if (procs_.count(proc) < 1) { - LOG << "unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_; + LOG << "unknown proc 0x" << std::hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_; VERIFY(0); return; } @@ -404,20 +421,21 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) { case NEW: // new request - rh.ret = (*f)(forward(req), rep); + rh.ret = (*f)(std::forward(req), rep); if (rh.ret == rpc_protocol::unmarshall_args_failure) { LOG << "failed to unmarshall the arguments. You are " - << "probably calling RPC 0x" << hex << proc << " with the wrong " + << "probably calling RPC 0x" << std::hex << proc << " with the wrong " << "types of arguments."; VERIFY(0); } VERIFY(rh.ret >= 0); - rep.pack_header(rh); + rep.write_header(rh); b1 = rep; IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc " - << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce; + << h.xid << ", proc " << std::hex << proc << " ret " << std::dec + << rh.ret << ", clt " << h.clt_nonce; add_reply(h.clt_nonce, h.xid, b1); @@ -438,7 +456,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { case FORGOTTEN: // very old request and we don't have the response anymore IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce; rh.ret = rpc_protocol::atmostonce_failure; - rep.pack_header(rh); + rep.write_header(rh); c->send(rep); break; } @@ -554,6 +572,6 @@ static sockaddr_in make_sockaddr(const string & hostandport) { memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); dst.sin_addr.s_addr = a.s_addr; } - dst.sin_port = hton((in_port_t)stoi(port)); + dst.sin_port = hton((in_port_t)std::stoi(port)); return dst; }