More cleanups to marshalling logic.
[invirt/third/libt4.git] / rpc / rpc.cc
index d53776a..9d9f1e2 100644 (file)
@@ -128,7 +128,7 @@ int
 rpcc::bind(TO to)
 {
     int r;
 rpcc::bind(TO to)
 {
     int r;
-    int ret = call(rpc_const::bind, 0, r, to);
+    int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
         lock ml(m_);
         bind_done_ = true;
     if(ret == 0){
         lock ml(m_);
         bind_done_ = true;
@@ -189,10 +189,8 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
-                             xid_rep_window_.front());
-        req.pack_req_header(h);
-                xid_rep = xid_rep_window_.front();
+        req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        xid_rep = xid_rep_window_.front();
     }
 
     TO curr_to;
     }
 
     TO curr_to;
@@ -387,7 +385,6 @@ compress:
     }
 }
 
     }
 }
 
-
 rpcs::rpcs(unsigned int p1, int count)
   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
 {
 rpcs::rpcs(unsigned int p1, int count)
   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
 {
@@ -400,7 +397,7 @@ rpcs::rpcs(unsigned int p1, int count)
         lossytest_ = atoi(loss_env);
     }
 
         lossytest_ = atoi(loss_env);
     }
 
-    reg(rpc_const::bind, this, &rpcs::rpcbind);
+    reg(rpc_const::bind, &rpcs::rpcbind, this);
     dispatchpool_ = new ThrPool(6,false);
 
     listener_ = new tcpsconn(this, port_, lossytest_);
     dispatchpool_ = new ThrPool(6,false);
 
     listener_ = new tcpsconn(this, port_, lossytest_);
@@ -477,7 +474,7 @@ rpcs::dispatch(djob_t *j)
     unmarshall req(j->buf, j->sz);
     delete j;
 
     unmarshall req(j->buf, j->sz);
     delete j;
 
-    req_header h;
+    request_header h;
     req.unpack_req_header(&h);
     int proc = h.proc;
 
     req.unpack_req_header(&h);
     int proc = h.proc;
 
@@ -564,14 +561,14 @@ rpcs::dispatch(djob_t *j)
                 updatestat(proc);
             }
 
                 updatestat(proc);
             }
 
-            rh.ret = f->fn(req, rep);
-                        if (rh.ret == rpc_const::unmarshal_args_failure) {
-                                fprintf(stderr, "rpcs::dispatch: failed to"
-                                       " unmarshall the arguments. You are"
-                                       " probably calling RPC 0x%x with wrong"
-                                       " types of arguments.\n", proc);
-                                VERIFY(0);
-                        }
+            rh.ret = (*f)(req, rep);
+            if (rh.ret == rpc_const::unmarshal_args_failure) {
+                fprintf(stderr, "rpcs::dispatch: failed to"
+                        " unmarshall the arguments. You are"
+                        " probably calling RPC 0x%x with wrong"
+                        " types of arguments.\n", proc);
+                VERIFY(0);
+            }
             VERIFY(rh.ret >= 0);
 
             rep.pack_reply_header(rh);
             VERIFY(rh.ret >= 0);
 
             rep.pack_reply_header(rh);
@@ -725,116 +722,60 @@ rpcs::free_reply_window(void)
 
 // rpc handler
 int
 
 // rpc handler
 int
-rpcs::rpcbind(int a, int &r)
+rpcs::rpcbind(int &r, int a)
 {
     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
     r = nonce_;
     return 0;
 }
 
 {
     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
     r = nonce_;
     return 0;
 }
 
-void
-marshall::rawbyte(unsigned char x)
-{
-    if(_ind >= _capa){
-        _capa *= 2;
-        VERIFY (_buf != NULL);
-        _buf = (char *)realloc(_buf, _capa);
-        VERIFY(_buf);
-    }
-    _buf[_ind++] = x;
-}
-
-void
-marshall::rawbytes(const char *p, int n)
-{
-    if((_ind+n) > _capa){
-        _capa = _capa > n? 2*_capa:(_capa+n);
-        VERIFY (_buf != NULL);
-        _buf = (char *)realloc(_buf, _capa);
-        VERIFY(_buf);
-    }
-    memcpy(_buf+_ind, p, n);
-    _ind += n;
-}
-
-marshall &
-operator<<(marshall &m, bool x)
-{
-    m.rawbyte(x);
-    return m;
-}
-
 marshall &
 marshall &
-operator<<(marshall &m, unsigned char x)
-{
+operator<<(marshall &m, uint8_t x) {
     m.rawbyte(x);
     return m;
 }
 
 marshall &
     m.rawbyte(x);
     return m;
 }
 
 marshall &
-operator<<(marshall &m, char x)
-{
-    m << (unsigned char) x;
+operator<<(marshall &m, uint16_t x) {
+    x = htons(x);
+    m.rawbytes((char *)&x, 2);
     return m;
 }
 
     return m;
 }
 
-
 marshall &
 marshall &
-operator<<(marshall &m, unsigned short x)
-{
-    m.rawbyte((x >> 8) & 0xff);
-    m.rawbyte(x & 0xff);
+operator<<(marshall &m, uint32_t x) {
+    x = htonl(x);
+    m.rawbytes((char *)&x, 4);
     return m;
 }
 
     return m;
 }
 
-marshall &
-operator<<(marshall &m, short x)
-{
-    m << (unsigned short) x;
-    return m;
-}
+marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; }
+marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; }
+marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
+marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; }
+marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
 
 marshall &
 
 marshall &
-operator<<(marshall &m, unsigned int x)
-{
-    // network order is big-endian
-    m.rawbyte((x >> 24) & 0xff);
-    m.rawbyte((x >> 16) & 0xff);
-    m.rawbyte((x >> 8) & 0xff);
-    m.rawbyte(x & 0xff);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, int x)
-{
-    m << (unsigned int) x;
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, const std::string &s)
-{
+operator<<(marshall &m, const std::string &s) {
     m << (unsigned int) s.size();
     m.rawbytes(s.data(), s.size());
     return m;
 }
 
     m << (unsigned int) s.size();
     m.rawbytes(s.data(), s.size());
     return m;
 }
 
-marshall &
-operator<<(marshall &m, unsigned long long x)
-{
-    m << (unsigned int) (x >> 32);
-    m << (unsigned int) x;
-    return m;
+void marshall::pack_req_header(const request_header &h) {
+    int saved_sz = index_;
+    //leave the first 4-byte empty for channel to fill size of pdu
+    index_ = sizeof(rpc_sz_t);
+    *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
+    index_ = saved_sz;
 }
 
 }
 
-void
-marshall::pack(int x)
-{
-    rawbyte((x >> 24) & 0xff);
-    rawbyte((x >> 16) & 0xff);
-    rawbyte((x >> 8) & 0xff);
-    rawbyte(x & 0xff);
+void marshall::pack_reply_header(const reply_header &h) {
+    int saved_sz = index_;
+    //leave the first 4-byte empty for channel to fill size of pdu
+    index_ = sizeof(rpc_sz_t);
+    *this << h.xid << h.ret;
+    index_ = saved_sz;
 }
 
 void
 }
 
 void
@@ -850,56 +791,57 @@ unmarshall::unpack(int *x)
 void
 unmarshall::take_in(unmarshall &another)
 {
 void
 unmarshall::take_in(unmarshall &another)
 {
-    if(_buf)
-        free(_buf);
-    another.take_buf(&_buf, &_sz);
-    _ind = RPC_HEADER_SZ;
-    _ok = _sz >= RPC_HEADER_SZ?true:false;
+    if(buf_)
+        free(buf_);
+    another.take_buf(&buf_, &sz_);
+    index_ = RPC_HEADER_SZ;
+    ok_ = sz_ >= RPC_HEADER_SZ?true:false;
 }
 
 }
 
-bool
-unmarshall::okdone()
-{
-    if(ok() && _ind == _sz){
-        return true;
-    } else {
-        return false;
-    }
+inline bool
+unmarshall::ensure(size_t n) {
+    if (index_+n > sz_)
+        ok_ = false;
+    return ok_;
 }
 
 unsigned int
 unmarshall::rawbyte()
 {
 }
 
 unsigned int
 unmarshall::rawbyte()
 {
-    char c = 0;
-    if(_ind >= _sz)
-        _ok = false;
-    else
-        c = _buf[_ind++];
-    return c;
+    if (!ensure(1))
+        return 0;
+    return buf_[index_++];
+}
+
+void
+unmarshall::rawbytes(std::string &ss, size_t n)
+{
+    VERIFY(ensure(n));
+    ss.assign(buf_+index_, n);
+    index_ += n;
 }
 
 unmarshall &
 operator>>(unmarshall &u, bool &x)
 {
 }
 
 unmarshall &
 operator>>(unmarshall &u, bool &x)
 {
-    x = (bool) u.rawbyte() ;
+    x = (bool)u.rawbyte();
     return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, unsigned char &x)
 {
     return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, unsigned char &x)
 {
-    x = (unsigned char) u.rawbyte() ;
+    x = (unsigned char)u.rawbyte();
     return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, char &x)
 {
     return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, char &x)
 {
-    x = (char) u.rawbyte();
+    x = (char)u.rawbyte();
     return u;
 }
 
     return u;
 }
 
-
 unmarshall &
 operator>>(unmarshall &u, unsigned short &x)
 {
 unmarshall &
 operator>>(unmarshall &u, unsigned short &x)
 {
@@ -956,19 +898,6 @@ operator>>(unmarshall &u, std::string &s)
     return u;
 }
 
     return u;
 }
 
-void
-unmarshall::rawbytes(std::string &ss, unsigned int n)
-{
-    if((_ind+n) > (unsigned)_sz){
-        _ok = false;
-    } else {
-        std::string tmps = std::string(_buf+_ind, n);
-        swap(ss, tmps);
-        VERIFY(ss.size() == n);
-        _ind += n;
-    }
-}
-
 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&