X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/ab6c1548ac2b1907bca92c8ce43e919c1a649a6f..ab9eee5d7f1fbe7a3fe6229d4a78136efb14371b:/rpc/rpc.cc diff --git a/rpc/rpc.cc b/rpc/rpc.cc index e6ec410..e33b25e 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -52,7 +52,7 @@ // x exited worker threads). // -#include "rpc.h" +#include "include/rpc/rpc.h" #include #include @@ -106,7 +106,7 @@ shared_ptr rpcc::bind_cached(const string & destination) { lock cl = lock(client->bind_m_); if (!client->bind_done_) { LOG_NONMEMBER << "bind(\"" << destination << "\")"; - int ret = client->bind(milliseconds(1000)); + int ret = client->bind(1000ms); if (ret < 0) { LOG_NONMEMBER << "bind failure! " << destination << " " << ret; client.reset(); @@ -145,14 +145,15 @@ void rpcc::cancel(lock & m_lock) { } } -int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { +int rpcc::call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, string & rep, const marshall & req) { caller ca(0, &rep); xid_t xid_rep; + string datagram; { lock ml(m_); - if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) { + if ((proc.id != rpc_protocol::bind.id && !bind_done_) || (proc.id == rpc_protocol::bind.id && bind_done_)) { IF_LEVEL(1) LOG << "rpcc has not been bound to dst or binding twice"; return rpc_protocol::bind_failure; } @@ -163,9 +164,9 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { ca.xid = xid_++; calls_[ca.xid] = &ca; - req.write_header(rpc_protocol::request_header{ - ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front() - }); + datagram = marshall::datagram(rpc_protocol::request_header{ + ca.xid, proc.id, clt_nonce_, srv_nonce_, xid_rep_window_.front() + }, req); xid_rep = xid_rep_window_.front(); } @@ -190,10 +191,10 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { } if (forgot.isvalid()) ch->send(forgot.buf); - ch->send(req); + ch->send(datagram); } else IF_LEVEL(1) LOG << "not reachable"; - IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc + IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc.id << " xid " << std::dec << ca.xid << " clt_nonce " << clt_nonce_; } transmit = false; // only send once on a given channel @@ -242,7 +243,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { { lock ml(m_); if (!dup_req_.isvalid()) { - dup_req_.buf = req; + dup_req_.buf = datagram; dup_req_.xid = ca.xid; } if (xid_rep > xid_rep_done_) @@ -251,7 +252,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { lock cal(ca.m); - IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc + IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc.id << " xid " << std::dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret; @@ -271,11 +272,9 @@ void rpcc::get_latest_connection(shared_ptr & ch) { // Runs in poll_mgr's thread as an upcall from the connection object to the // rpcc. Does not call blocking RPC handlers. bool rpcc::got_pdu(const shared_ptr &, const string & b) { - unmarshall rep(b, true); rpc_protocol::reply_header h; - rep.read_header(h); - if (!rep.ok()) { + if (!unmarshall::datagram(b, h)) { IF_LEVEL(1) LOG << "unmarshall header failed!!!"; return true; } @@ -353,30 +352,28 @@ bool rpcs::got_pdu(const shared_ptr & c, const string & b) { } void rpcs::dispatch(shared_ptr c, const string & buf) { - unmarshall req(buf, true); - rpc_protocol::request_header h; - req.read_header(h); - proc_id_t proc = h.proc; - if (!req.ok()) { + auto req = unmarshall::datagram(buf, h); + + if (!req) { IF_LEVEL(1) LOG << "unmarshall header failed"; return; } + proc_id_t proc = h.proc; + IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep " << std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; - marshall rep; rpc_protocol::reply_header rh{h.xid,0}; // is client sending to an old instance of server? if (h.srv_nonce != 0 && h.srv_nonce != nonce_) { IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce - << " (current " << nonce_ << ") proc " << std::hex << h.proc; + << " (current " << nonce_ << ") proc " << std::hex << proc; rh.ret = rpc_protocol::oldsrv_failure; - rep.write_header(rh); - c->send(rep); + c->send(marshall::datagram(rh)); return; } @@ -387,7 +384,6 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { if (procs_.count(proc) < 1) { LOG << "unknown proc 0x" << std::hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_; VERIFY(0); - return; } f = procs_[proc]; @@ -414,27 +410,29 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { conns_[h.clt_nonce] = c; } - string b1; + string stored_reply; - switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) { + switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, stored_reply)) { case NEW: // new request - rh.ret = (*f)(std::forward(req), rep); - if (rh.ret == rpc_protocol::unmarshall_args_failure) { - LOG << "failed to unmarshall the arguments. You are " - << "probably calling RPC 0x" << std::hex << proc << " with the wrong " - << "types of arguments."; - VERIFY(0); - } - VERIFY(rh.ret >= 0); + { + marshall rep; + rh.ret = (*f)(std::forward(req), rep); + if (rh.ret == rpc_protocol::unmarshall_args_failure) { + LOG << "failed to unmarshall the arguments. You are " + << "probably calling RPC 0x" << std::hex << proc << " with the wrong " + << "types of arguments."; + VERIFY(0); + } + VERIFY(rh.ret >= 0); - rep.write_header(rh); - b1 = rep; + stored_reply = marshall::datagram(rh, rep); + } - IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc " + IF_LEVEL(2) LOG << "sending and saving reply of size " << stored_reply.size() << " for rpc " << h.xid << ", proc " << std::hex << proc << " ret " << std::dec << rh.ret << ", clt " << h.clt_nonce; - add_reply(h.clt_nonce, h.xid, b1); + add_reply(h.clt_nonce, h.xid, stored_reply); // get the latest connection to the client { @@ -443,18 +441,17 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { c = conns_[h.clt_nonce]; } - c->send(rep); + c->send(stored_reply); break; case INPROGRESS: // server is working on this request break; case DONE: // duplicate and we still have the response - c->send(b1); + c->send(stored_reply); break; case FORGOTTEN: // very old request and we don't have the response anymore IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce; rh.ret = rpc_protocol::atmostonce_failure; - rep.write_header(rh); - c->send(rep); + c->send(marshall::datagram(rh)); break; } }