-void
-rpcs::dispatch(djob_t *j)
-{
- connection *c = j->conn;
- unmarshall req(j->buf, j->sz);
- delete j;
-
- req_header h;
- req.unpack_req_header(&h);
- int proc = h.proc;
-
- if(!req.ok()){
- jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
- c->decref();
- return;
- }
-
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
- h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
-
- marshall rep;
- reply_header rh(h.xid,0);
-
- // is client sending to an old instance of server?
- if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
- h.srv_nonce, nonce_, h.proc);
- rh.ret = rpc_const::oldsrv_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
- return;
- }
-
- handler *f;
- // is RPC proc a registered procedure?
- {
- ScopedLock pl(&procs_m_);
- if(procs_.count(proc) < 1){
- fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
- proc);
- c->decref();
- VERIFY(0);
- return;
- }
-
- f = procs_[proc];
- }
-
- rpcs::rpcstate_t stat;
- char *b1;
- int sz1;
-
- if(h.clt_nonce){
- // have i seen this client before?
- {
- ScopedLock rwl(&reply_window_m_);
- // if we don't know about this clt_nonce, create a cleanup object
- if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
- VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
- reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
- h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
- }
- }
-
- // save the latest good connection to the client
- {
- ScopedLock rwl(&conss_m_);
- if(conns_.find(h.clt_nonce) == conns_.end()){
- c->incref();
- conns_[h.clt_nonce] = c;
- } else if(conns_[h.clt_nonce]->compare(c) < 0){
- conns_[h.clt_nonce]->decref();
- c->incref();
- conns_[h.clt_nonce] = c;
- }
- }
-
- stat = checkduplicate_and_update(h.clt_nonce, h.xid,
- h.xid_rep, &b1, &sz1);
- } else {
- // this client does not require at most once logic
- stat = NEW;
- }
-
- switch (stat){
- case NEW: // new request
- if(counting_){
- updatestat(proc);
- }
-
- rh.ret = f->fn(req, rep);
- if (rh.ret == rpc_const::unmarshal_args_failure) {
- fprintf(stderr, "rpcs::dispatch: failed to"
- " unmarshall the arguments. You are"
- " probably calling RPC 0x%x with wrong"
- " types of arguments.\n", proc);
- VERIFY(0);
- }
- VERIFY(rh.ret >= 0);
-
- rep.pack_reply_header(rh);
- rep.take_buf(&b1,&sz1);
-
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
- sz1, h.xid, proc, rh.ret, h.clt_nonce);
-
- if(h.clt_nonce > 0){
- // only record replies for clients that require at-most-once logic
- add_reply(h.clt_nonce, h.xid, b1, sz1);
- }
-
- // get the latest connection to the client
- {
- ScopedLock rwl(&conss_m_);
- if(c->isdead() && c != conns_[h.clt_nonce]){
- c->decref();
- c = conns_[h.clt_nonce];
- c->incref();
- }
- }
-
- c->send(b1, sz1);
- if(h.clt_nonce == 0){
- // reply is not added to at-most-once window, free it
- free(b1);
- }
- break;
- case INPROGRESS: // server is working on this request
- break;
- case DONE: // duplicate and we still have the response
- c->send(b1, sz1);
- break;
- case FORGOTTEN: // very old request and we don't have the response anymore
- jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
- h.xid, h.clt_nonce);
- rh.ret = rpc_const::atmostonce_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
- break;
- }
- c->decref();
+ if (!req.ok()) {
+ IF_LEVEL(1) LOG("unmarshall header failed");
+ return;
+ }
+
+ IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+ dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
+
+ marshall rep;
+ rpc_protocol::reply_header rh{h.xid,0};
+
+ // is client sending to an old instance of server?
+ if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
+ IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
+ " (current " << nonce_ << ") proc " << hex << h.proc);
+ rh.ret = rpc_protocol::oldsrv_failure;
+ rep.pack_header(rh);
+ c->send(rep);
+ return;
+ }
+
+ handler *f;
+ // is RPC proc a registered procedure?
+ {
+ lock pl(procs_m_);
+ if (procs_.count(proc) < 1) {
+ LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
+ VERIFY(0);
+ return;
+ }
+
+ f = procs_[proc];
+ }
+
+ // have i seen this client before?
+ {
+ lock rwl(reply_window_m_);
+ // if we don't know about this clt_nonce, create a cleanup object
+ if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
+ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+ reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
+ IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
+ " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
+ }
+ }
+
+ // save the latest good connection to the client
+ {
+ lock rwl(conns_m_);
+ if (conns_.find(h.clt_nonce) == conns_.end())
+ conns_[h.clt_nonce] = c;
+ else if (conns_[h.clt_nonce]->create_time < c->create_time)
+ conns_[h.clt_nonce] = c;
+ }
+
+ string b1;
+
+ switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) {
+ case NEW: // new request
+ rh.ret = (*f)(forward<unmarshall>(req), rep);
+ if (rh.ret == rpc_protocol::unmarshall_args_failure) {
+ LOG("failed to unmarshall the arguments. You are " <<
+ "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+ "types of arguments.");
+ VERIFY(0);
+ }
+ VERIFY(rh.ret >= 0);
+
+ rep.pack_header(rh);
+ b1 = rep;
+
+ IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
+ h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
+
+ add_reply(h.clt_nonce, h.xid, b1);
+
+ // get the latest connection to the client
+ {
+ lock rwl(conns_m_);
+ if (c->isdead())
+ c = conns_[h.clt_nonce];
+ }
+
+ c->send(rep);
+ break;
+ case INPROGRESS: // server is working on this request
+ break;
+ case DONE: // duplicate and we still have the response
+ c->send(b1);
+ break;
+ case FORGOTTEN: // very old request and we don't have the response anymore
+ IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
+ rh.ret = rpc_protocol::atmostonce_failure;
+ rep.pack_header(rh);
+ c->send(rep);
+ break;
+ }