More logging clean-ups. Static type-checking for RPC calls and
authorPeter Iannucci <iannucci@mit.edu>
Thu, 17 Oct 2013 06:56:52 +0000 (02:56 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 17 Oct 2013 06:56:52 +0000 (02:56 -0400)
handlers.

23 files changed:
config.cc
lock_client.cc
lock_demo.cc
lock_protocol.h
lock_server.cc
lock_server.h
lock_smain.cc
lock_tester.cc
paxos.cc
paxos.h
paxos_protocol.h
rpc/connection.cc
rpc/marshall.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpc_protocol.h
rpc/rpctest.cc
rsm.cc
rsm.h
rsm_client.cc
rsm_client.h
rsm_protocol.h
rsm_tester.cc

index 5d04cd2..abd2f9c 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -203,7 +203,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     handle h(m);
 
     cfg_mutex_lock.unlock();
     handle h(m);
 
     cfg_mutex_lock.unlock();
-    int r = 0, ret = rpc_const::bind_failure;
+    int r = 0, ret = rpc_protocol::bind_failure;
     if (rpcc *cl = h.safebind())
         ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
     cfg_mutex_lock.lock();
     if (rpcc *cl = h.safebind())
         ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
     cfg_mutex_lock.lock();
@@ -212,8 +212,8 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     switch (ret) {
         case paxos_protocol::OK:
             break;
     switch (ret) {
         case paxos_protocol::OK:
             break;
-        case rpc_const::atmostonce_failure:
-        case rpc_const::oldsrv_failure:
+        case rpc_protocol::atmostonce_failure:
+        case rpc_protocol::oldsrv_failure:
             invalidate_handle(m);
             //h.invalidate();
             break;
             invalidate_handle(m);
             //h.invalidate();
             break;
index 388de88..8864ce9 100644 (file)
@@ -73,7 +73,7 @@ void lock_client::releaser() [[noreturn]] {
 int lock_client::stat(lock_protocol::lockid_t lid) {
     VERIFY(0);
     int r;
 int lock_client::stat(lock_protocol::lockid_t lid) {
     VERIFY(0);
     int r;
-    auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
+    auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
     VERIFY (ret == lock_protocol::OK);
     return r;
 }
     VERIFY (ret == lock_protocol::OK);
     return r;
 }
index 714c4e3..97c2964 100644 (file)
@@ -4,7 +4,7 @@ char log_thread_prefix = 'd';
 
 int main(int argc, char *argv[]) {
     if(argc != 2) {
 
 int main(int argc, char *argv[]) {
     if(argc != 2) {
-        cerr << "Usage: " << argv[0] << " [host:]port" << endl;
+        LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port");
         return 1;
     }
 
         return 1;
     }
 
index 1e45ddc..5589c07 100644 (file)
@@ -6,24 +6,24 @@
 #include "types.h"
 #include "rpc/rpc.h"
 
 #include "types.h"
 #include "rpc/rpc.h"
 
-class lock_protocol {
-    public:
-        enum status : status_t { OK, RETRY, RPCERR, NOENT, IOERR };
-        using lockid_t = string;
-        using xid_t = uint64_t;
-        enum rpc_numbers : proc_t {
-            acquire = 0x7001,
-            release,
-            stat,
-        };
+typedef string callback_t;
+
+namespace lock_protocol {
+    enum status : rpc_protocol::status { OK, RETRY, RPCERR, NOENT, IOERR };
+    using lockid_t = string;
+    using xid_t = uint64_t;
+    REMOTE_PROCEDURE_BASE(0x7000);
+    REMOTE_PROCEDURE(1, acquire, (int &, lockid_t, callback_t, xid_t));
+    REMOTE_PROCEDURE(2, release, (int &, lockid_t, callback_t, xid_t));
+    REMOTE_PROCEDURE(3, stat, (int &, lockid_t, callback_t));
 };
 
 };
 
-class rlock_protocol {
-    public:
-        enum status : status_t { OK, RPCERR };
-        enum rpc_numbers : proc_t {
-            revoke = 0x8001,
-            retry,
-        };
+namespace rlock_protocol {
+    using lockid_t = lock_protocol::lockid_t;
+    using xid_t = lock_protocol::xid_t;
+    enum status : rpc_protocol::status { OK, RPCERR };
+    REMOTE_PROCEDURE_BASE(0x8000);
+    REMOTE_PROCEDURE(1, revoke, (int &, lockid_t, xid_t));
+    REMOTE_PROCEDURE(2, retry, (int &, lockid_t, xid_t));
 };
 #endif
 };
 #endif
index 40098d7..522a917 100644 (file)
@@ -92,7 +92,7 @@ void lock_server::retryer() [[noreturn]] {
     }
 }
 
     }
 }
 
-int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
     LOG("lid=" << lid << " client=" << id << "," << xid);
     holder_t h = holder_t(id, xid);
     lock_state &st = get_lock_state(lid);
     LOG("lid=" << lid << " client=" << id << "," << xid);
     holder_t h = holder_t(id, xid);
     lock_state &st = get_lock_state(lid);
@@ -150,7 +150,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t &
     return lock_protocol::RETRY;
 }
 
     return lock_protocol::RETRY;
 }
 
-int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
     LOG("lid=" << lid << " client=" << id << "," << xid);
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
     LOG("lid=" << lid << " client=" << id << "," << xid);
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
@@ -176,7 +176,7 @@ void lock_server::unmarshal_state(const string & state) {
     rep >> nacquire >> lock_table;
 }
 
     rep >> nacquire >> lock_table;
 }
 
-lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
+lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid, const callback_t &) {
     LOG("stat request for " << lid);
     VERIFY(0);
     r = nacquire;
     LOG("stat request for " << lid);
     VERIFY(0);
     r = nacquire;
index 69ac2b8..6ba4902 100644 (file)
@@ -6,7 +6,6 @@
 #include "rsm.h"
 #include "rpc/fifo.h"
 
 #include "rsm.h"
 #include "rpc/fifo.h"
 
-typedef string callback_t;
 typedef pair<callback_t, lock_protocol::xid_t> holder_t;
 
 class lock_state {
 typedef pair<callback_t, lock_protocol::xid_t> holder_t;
 
 class lock_state {
@@ -38,13 +37,13 @@ class lock_server : public rsm_state_transfer {
         rsm *rsm_;
     public:
         lock_server(rsm *r = 0);
         rsm *rsm_;
     public:
         lock_server(rsm *r = 0);
-        lock_protocol::status stat(int &, lock_protocol::lockid_t);
         void revoker();
         void retryer();
         string marshal_state();
         void unmarshal_state(const string & state);
         void revoker();
         void retryer();
         string marshal_state();
         void unmarshal_state(const string & state);
-        int acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
-        int release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+        lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+        lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+        lock_protocol::status stat(int &, lock_protocol::lockid_t, const callback_t & id);
 };
 
 #endif
 };
 
 #endif
index 90066e1..0a3c209 100644 (file)
@@ -13,7 +13,7 @@ int main(int argc, char *argv[]) {
     srandom((uint32_t)getpid());
 
     if(argc != 3){
     srandom((uint32_t)getpid());
 
     if(argc != 3){
-        cerr << "Usage: " << argv[0] << " [master:]port [me:]port" << endl;
+        LOG_NONMEMBER("Usage: " << argv[0] << " [master:]port [me:]port");
         exit(1);
     }
 
         exit(1);
     }
 
index b2df781..0204a71 100644 (file)
@@ -26,8 +26,7 @@ void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 0) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 0) {
-        cout << "error: server granted " << lid << " twice" << endl;
-        cerr << "error: server granted " << lid << " twice" << endl;
+        LOG_NONMEMBER("error: server granted " << lid << " twice");
         exit(1);
     }
     ct[x] += 1;
         exit(1);
     }
     ct[x] += 1;
@@ -37,7 +36,7 @@ void check_release(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 1) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 1) {
-        cerr << "error: client released un-held lock " << lid << endl;
+        LOG_NONMEMBER("error: client released un-held lock " << lid);
         exit(1);
     }
     ct[x] -= 1;
         exit(1);
     }
     ct[x] -= 1;
@@ -123,7 +122,7 @@ main(int argc, char *argv[])
     srandom((uint32_t)getpid());
 
     if (argc < 2) {
     srandom((uint32_t)getpid());
 
     if (argc < 2) {
-        cerr << "Usage: " << argv[0] << " [host:]port [test]" << endl;
+        LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [test]");
         exit(1);
     }
 
         exit(1);
     }
 
index 8b00ad8..ab60302 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -240,10 +240,10 @@ void proposer_acceptor::breakpoint2() {
 
 void proposer_acceptor::breakpoint(int b) {
     if (b == 3) {
 
 void proposer_acceptor::breakpoint(int b) {
     if (b == 3) {
-        LOG("Proposer: breakpoint 1");
+        LOG("breakpoint 1");
         break1 = true;
     } else if (b == 4) {
         break1 = true;
     } else if (b == 4) {
-        LOG("Proposer: breakpoint 2");
+        LOG("breakpoint 2");
         break2 = true;
     }
 }
         break2 = true;
     }
 }
diff --git a/paxos.h b/paxos.h
index 642d3ff..8c9fc8f 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -7,10 +7,9 @@
 #include "log.h"
 
 using prepareres = paxos_protocol::prepareres;
 #include "log.h"
 
 using prepareres = paxos_protocol::prepareres;
-
-using node_t = string;
-using nodes_t = vector<node_t>;
-using value_t = string;
+using node_t = paxos_protocol::node_t;
+using nodes_t = paxos_protocol::nodes_t;
+using value_t = paxos_protocol::value_t;
 
 class paxos_change {
     public:
 
 class paxos_change {
     public:
index 5e8afdd..1f5fd3e 100644 (file)
@@ -14,24 +14,25 @@ struct prop_t {
 
 MARSHALLABLE(prop_t)
 
 
 MARSHALLABLE(prop_t)
 
-class paxos_protocol {
-    public:
-        enum status : status_t { OK, ERR };
-        enum rpc_numbers : proc_t {
-            preparereq = 0x11001,
-            acceptreq,
-            decidereq,
-            heartbeat,
-        };
-
-        struct prepareres {
-            bool oldinstance;
-            bool accept;
-            prop_t n_a;
-            string v_a;
-
-            MEMBERS(oldinstance, accept, n_a, v_a)
-        };
+namespace paxos_protocol {
+    enum status : rpc_protocol::status { OK, ERR };
+    struct prepareres {
+        bool oldinstance;
+        bool accept;
+        prop_t n_a;
+        string v_a;
+
+        MEMBERS(oldinstance, accept, n_a, v_a)
+    };
+    using node_t = string;
+    using nodes_t = vector<node_t>;
+    using value_t = string;
+
+    REMOTE_PROCEDURE_BASE(0x11000);
+    REMOTE_PROCEDURE(1, preparereq, (prepareres &, node_t, unsigned, prop_t));
+    REMOTE_PROCEDURE(2, acceptreq, (bool &, node_t, unsigned, prop_t, value_t));
+    REMOTE_PROCEDURE(3, decidereq, (int &, node_t, unsigned, value_t));
+    REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
 };
 
 MARSHALLABLE(paxos_protocol::prepareres)
 };
 
 MARSHALLABLE(paxos_protocol::prepareres)
index 315a82e..e269a3d 100644 (file)
@@ -160,7 +160,7 @@ bool connection::writepdu() {
 bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
 bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
-        rpc_sz_t sz1;
+        rpc_protocol::rpc_sz_t sz1;
         ssize_t n = fd_.read(sz1);
 
         if (n == 0)
         ssize_t n = fd_.read(sz1);
 
         if (n == 0)
@@ -178,7 +178,7 @@ bool connection::readpdu() {
 
         size_t sz = ntoh(sz1);
 
 
         size_t sz = ntoh(sz1);
 
-        if (sz > MAX_PDU) {
+        if (sz > rpc_protocol::MAX_PDU) {
             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
             return false;
         }
             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
             return false;
         }
index 69b10df..6412612 100644 (file)
@@ -13,8 +13,8 @@ class unmarshall;
 
 class marshall {
     private:
 
 class marshall {
     private:
-        string buf_ = string(DEFAULT_RPC_SZ, 0); // Raw bytes buffer
-        size_t index_ = RPC_HEADER_SZ; // Read/write head position
+        string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); // Raw bytes buffer
+        size_t index_ = rpc_protocol::RPC_HEADER_SZ; // Read/write head position
 
     public:
         template <typename... Args>
 
     public:
         template <typename... Args>
@@ -32,14 +32,14 @@ class marshall {
         // with header
         inline operator string() const { return buf_.substr(0,index_); }
         // without header
         // with header
         inline operator string() const { return buf_.substr(0,index_); }
         // without header
-        inline string content() const { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); }
+        inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); }
 
         // letting S be a defaulted template parameter forces the compiler to
         // delay looking up operator<<(marshall&, rpc_sz_t) until we define it
         // (i.e. we define an operator for marshalling uint32_t)
 
         // letting S be a defaulted template parameter forces the compiler to
         // delay looking up operator<<(marshall&, rpc_sz_t) until we define it
         // (i.e. we define an operator for marshalling uint32_t)
-        template <class T, class S=rpc_sz_t> inline void
+        template <class T, class S=rpc_protocol::rpc_sz_t> inline void
         pack_header(const T & h) {
         pack_header(const T & h) {
-            VERIFY(sizeof(T)+sizeof(S) <= RPC_HEADER_SZ);
+            VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ);
             size_t saved_sz = index_;
             index_ = 0;
             *this << (S)(saved_sz - sizeof(S)) << (T)h;
             size_t saved_sz = index_;
             index_ = 0;
             *this << (S)(saved_sz - sizeof(S)) << (T)h;
@@ -55,10 +55,10 @@ class unmarshall {
 
     public:
         unmarshall(const string &s, bool has_header)
 
     public:
         unmarshall(const string &s, bool has_header)
-            : buf_(s),index_(RPC_HEADER_SZ) {
+            : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) {
             if (!has_header)
             if (!has_header)
-                buf_.insert(0, RPC_HEADER_SZ, 0);
-            ok_ = (buf_.size() >= RPC_HEADER_SZ);
+                buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
+            ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
         }
 
         bool ok() const { return ok_; }
         }
 
         bool ok() const { return ok_; }
@@ -74,11 +74,11 @@ class unmarshall {
 
         template <class T> inline void
         unpack_header(T & h) {
 
         template <class T> inline void
         unpack_header(T & h) {
-            VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ);
+            VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ);
             // first 4 bytes hold length field
             // first 4 bytes hold length field
-            index_ = sizeof(rpc_sz_t);
+            index_ = sizeof(rpc_protocol::rpc_sz_t);
             *this >> h;
             *this >> h;
-            index_ = RPC_HEADER_SZ;
+            index_ = rpc_protocol::RPC_HEADER_SZ;
         }
 
         template <class T> inline T _grab() { T t; *this >> t; return t; }
         }
 
         template <class T> inline T _grab() { T t; *this >> t; return t; }
@@ -154,8 +154,8 @@ inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_();
 inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
 
 // our first two marshallable structs...
 inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
 
 // our first two marshallable structs...
-MARSHALLABLE(request_header)
-MARSHALLABLE(reply_header)
+MARSHALLABLE(rpc_protocol::request_header)
+MARSHALLABLE(rpc_protocol::reply_header)
 
 //
 // Marshalling for STL containers
 
 //
 // Marshalling for STL containers
index 80e429e..964102f 100644 (file)
@@ -102,7 +102,7 @@ rpcc::~rpcc() {
 
 int rpcc::bind(milliseconds to) {
     unsigned int r;
 
 int rpcc::bind(milliseconds to) {
     unsigned int r;
-    int ret = call_timeout(rpc_const::bind, to, r, 0);
+    int ret = call_timeout(rpc_protocol::bind, to, r, 0);
     if (ret == 0) {
         lock ml(m_);
         bind_done_ = true;
     if (ret == 0) {
         lock ml(m_);
         bind_done_ = true;
@@ -125,7 +125,7 @@ void rpcc::cancel(void) {
 
             lock cl(ca->m);
             ca->done = true;
 
             lock cl(ca->m);
             ca->done = true;
-            ca->intret = rpc_const::cancel_failure;
+            ca->intret = rpc_protocol::cancel_failure;
             ca->c.notify_one();
         }
 
             ca->c.notify_one();
         }
 
@@ -137,25 +137,27 @@ void rpcc::cancel(void) {
     }
 }
 
     }
 }
 
-int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
+int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
     int xid_rep;
     {
         lock ml(m_);
 
 
     caller ca(0, &rep);
     int xid_rep;
     {
         lock ml(m_);
 
-        if ((proc != rpc_const::bind && !bind_done_) || (proc == rpc_const::bind && bind_done_)) {
+        if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
-            return rpc_const::bind_failure;
+            return rpc_protocol::bind_failure;
         }
 
         if (destroy_wait_)
         }
 
         if (destroy_wait_)
-            return rpc_const::cancel_failure;
+            return rpc_protocol::cancel_failure;
 
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
 
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        req.pack_header(rpc_protocol::request_header{
+                ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
+                });
         xid_rep = xid_rep_window_.front();
     }
 
         xid_rep = xid_rep_window_.front();
     }
 
@@ -248,7 +250,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
     // destruction of req automatically frees its buffer
                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
     // destruction of req automatically frees its buffer
-    return (ca.done? ca.intret : rpc_const::timeout_failure);
+    return (ca.done? ca.intret : rpc_protocol::timeout_failure);
 }
 
 void rpcc::get_refconn(shared_ptr<connection> & ch) {
 }
 
 void rpcc::get_refconn(shared_ptr<connection> & ch) {
@@ -269,7 +271,7 @@ bool
 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
 {
     unmarshall rep(b, true);
 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
 {
     unmarshall rep(b, true);
-    reply_header h;
+    rpc_protocol::reply_header h;
     rep.unpack_header(h);
 
     if (!rep.ok()) {
     rep.unpack_header(h);
 
     if (!rep.ok()) {
@@ -330,7 +332,7 @@ rpcs::rpcs(in_port_t p1, size_t count)
     nonce_ = (unsigned int)random();
     IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
     nonce_ = (unsigned int)random();
     IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
-    reg(rpc_const::bind, &rpcs::rpcbind, this);
+    reg(rpc_protocol::bind, &rpcs::rpcbind, this);
     dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
 }
 
     dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
 }
 
@@ -355,14 +357,14 @@ bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
     return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
 }
 
     return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
 }
 
-void rpcs::reg1(proc_t proc, handler *h) {
+void rpcs::reg1(proc_id_t proc, handler *h) {
     lock pl(procs_m_);
     VERIFY(procs_.count(proc) == 0);
     procs_[proc] = h;
     VERIFY(procs_.count(proc) >= 1);
 }
 
     lock pl(procs_m_);
     VERIFY(procs_.count(proc) == 0);
     procs_[proc] = h;
     VERIFY(procs_.count(proc) >= 1);
 }
 
-void rpcs::updatestat(proc_t proc) {
+void rpcs::updatestat(proc_id_t proc) {
     lock cl(count_m_);
     counts_[proc]++;
     curr_counts_--;
     lock cl(count_m_);
     counts_[proc]++;
     curr_counts_--;
@@ -388,9 +390,9 @@ void rpcs::updatestat(proc_t proc) {
 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     unmarshall req(buf, true);
 
 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     unmarshall req(buf, true);
 
-    request_header h;
+    rpc_protocol::request_header h;
     req.unpack_header(h);
     req.unpack_header(h);
-    proc_t proc = h.proc;
+    proc_id_t proc = h.proc;
 
     if (!req.ok()) {
         IF_LEVEL(1) LOG("unmarshall header failed");
 
     if (!req.ok()) {
         IF_LEVEL(1) LOG("unmarshall header failed");
@@ -401,13 +403,13 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
 
     marshall rep;
                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
 
     marshall rep;
-    reply_header rh{h.xid,0};
+    rpc_protocol::reply_header rh{h.xid,0};
 
     // is client sending to an old instance of server?
     if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
                         " (current " << nonce_ << ") proc " << hex << h.proc);
 
     // is client sending to an old instance of server?
     if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
                         " (current " << nonce_ << ") proc " << hex << h.proc);
-        rh.ret = rpc_const::oldsrv_failure;
+        rh.ret = rpc_protocol::oldsrv_failure;
         rep.pack_header(rh);
         c->send(rep);
         return;
         rep.pack_header(rh);
         c->send(rep);
         return;
@@ -463,10 +465,10 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
                 updatestat(proc);
 
             rh.ret = (*f)(req, rep);
                 updatestat(proc);
 
             rh.ret = (*f)(req, rep);
-            if (rh.ret == rpc_const::unmarshal_args_failure) {
-                cerr << "failed to unmarshall the arguments. You are " <<
-                        "probably calling RPC 0x" << hex << proc << " with the wrong " <<
-                        "types of arguments." << endl;
+            if (rh.ret == rpc_protocol::unmarshal_args_failure) {
+                LOG("failed to unmarshall the arguments. You are " <<
+                    "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+                    "types of arguments.");
                 VERIFY(0);
             }
             VERIFY(rh.ret >= 0);
                 VERIFY(0);
             }
             VERIFY(rh.ret >= 0);
@@ -498,7 +500,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
-            rh.ret = rpc_const::atmostonce_failure;
+            rh.ret = rpc_protocol::atmostonce_failure;
             rep.pack_header(rh);
             c->send(rep);
             break;
             rep.pack_header(rh);
             c->send(rep);
             break;
@@ -578,7 +580,7 @@ void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
-        cerr << "Could not find reply struct in add_reply" << endl;
+        LOG("Could not find reply struct in add_reply");
         l.insert(it, reply_t(xid, b));
     } else {
         *it = reply_t(xid, b);
         l.insert(it, reply_t(xid, b));
     } else {
         *it = reply_t(xid, b);
@@ -616,7 +618,7 @@ static sockaddr_in make_sockaddr(const string &hostandport) {
         struct hostent *hp = gethostbyname(host.c_str());
 
         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
         struct hostent *hp = gethostbyname(host.c_str());
 
         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
-            cerr << "cannot find host name " << host << endl;
+            LOG_NONMEMBER("cannot find host name " << host);
             exit(1);
         }
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
             exit(1);
         }
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
index 84c12f3..7b65101 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -5,6 +5,7 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 
 #include <sys/socket.h>
 #include <netinet/in.h>
 
+#include "rpc_protocol.h"
 #include "thr_pool.h"
 #include "marshall.h"
 #include "marshall_wrap.h"
 #include "thr_pool.h"
 #include "marshall.h"
 #include "marshall_wrap.h"
@@ -15,23 +16,27 @@ namespace rpc {
     static constexpr milliseconds to_min{100};
 }
 
     static constexpr milliseconds to_min{100};
 }
 
-class rpc_const {
-    public:
-        static const unsigned int bind = 1;   // handler number reserved for bind
-        static const int timeout_failure = -1;
-        static const int unmarshal_args_failure = -2;
-        static const int unmarshal_reply_failure = -3;
-        static const int atmostonce_failure = -4;
-        static const int oldsrv_failure = -5;
-        static const int bind_failure = -6;
-        static const int cancel_failure = -7;
-};
+template<class P, class R, class ...Args> struct is_valid_call : false_type {};
+
+template<class S, class R, class ...Args>
+struct is_valid_call<S(R &, Args...), R, Args...> : true_type {};
+
+template<class P, class F> struct is_valid_registration : false_type {};
+
+template<class S, class R, class ...Args>
+struct is_valid_registration<S(R &, typename decay<Args>::type...), S(R &, Args...)> : true_type {};
+
+template<class P, class C, class S, class R, class ...Args>
+struct is_valid_registration<P, S(C::*)(R &, Args...)> : is_valid_registration<P, S(R &, Args...)> {};
 
 // rpc client endpoint.
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
 class rpcc : private connection_delegate {
     private:
 
 // rpc client endpoint.
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
 class rpcc : private connection_delegate {
     private:
+        using proc_id_t = rpc_protocol::proc_id_t;
+        template <class S>
+        using proc_t = rpc_protocol::proc_t<S>;
 
         // manages per rpc info
         struct caller {
 
         // manages per rpc info
         struct caller {
@@ -78,20 +83,20 @@ class rpcc : private connection_delegate {
         request dup_req_;
         int xid_rep_done_;
 
         request dup_req_;
         int xid_rep_done_;
 
-        int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
+        int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to);
 
         template<class R>
 
         template<class R>
-        int call_m(proc_t proc, marshall &req, R & r, milliseconds to) {
+        int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) {
             string rep;
             int intret = call1(proc, req, rep, to);
             unmarshall u(rep, true);
             if (intret < 0) return intret;
             u >> r;
             if (u.okdone() != true) {
             string rep;
             int intret = call1(proc, req, rep, to);
             unmarshall u(rep, true);
             if (intret < 0) return intret;
             u >> r;
             if (u.okdone() != true) {
-                cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
-                    "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
+                LOG("rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
+                    "calling RPC 0x" << hex << proc << " with the wrong return type.");
                 VERIFY(0);
                 VERIFY(0);
-                return rpc_const::unmarshal_reply_failure;
+                return rpc_protocol::unmarshal_reply_failure;
             }
             return intret;
         }
             }
             return intret;
         }
@@ -111,21 +116,25 @@ class rpcc : private connection_delegate {
 
         void cancel();
 
 
         void cancel();
 
-        template<class R, typename ...Args>
-        inline int call(proc_t proc, R & r, const Args&... args) {
+        template<class P, class R, typename ...Args>
+        inline int call(proc_t<P> proc, R & r, const Args&... args) {
             return call_timeout(proc, rpc::to_max, r, args...);
         }
 
             return call_timeout(proc, rpc::to_max, r, args...);
         }
 
-        template<class R, typename ...Args>
-        inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args) {
+        template<class P, class R, typename ...Args>
+        inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args&... args) {
+            static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
             marshall m{args...};
             marshall m{args...};
-            return call_m(proc, m, r, to);
+            return call_m(proc.id, m, r, to);
         }
 };
 
 // rpc server endpoint.
 class rpcs : private connection_delegate {
     private:
         }
 };
 
 // rpc server endpoint.
 class rpcs : private connection_delegate {
     private:
+        using proc_id_t = rpc_protocol::proc_id_t;
+        template <class S>
+        using proc_t = rpc_protocol::proc_t<S>;
 
         typedef enum {
             NEW,  // new RPC, not a duplicate
 
         typedef enum {
             NEW,  // new RPC, not a duplicate
@@ -160,7 +169,7 @@ class rpcs : private connection_delegate {
         rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
                 int xid, int rep_xid, string & b);
 
         rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
                 int xid, int rep_xid, string & b);
 
-        void updatestat(proc_t proc);
+        void updatestat(proc_id_t proc);
 
         // latest connection to the client
         map<unsigned int, shared_ptr<connection>> conns_;
 
         // latest connection to the client
         map<unsigned int, shared_ptr<connection>> conns_;
@@ -168,12 +177,12 @@ class rpcs : private connection_delegate {
         // counting
         const size_t counting_;
         size_t curr_counts_;
         // counting
         const size_t counting_;
         size_t curr_counts_;
-        map<proc_t, size_t> counts_;
+        map<proc_id_t, size_t> counts_;
 
         bool reachable_;
 
         // map proc # to function
 
         bool reachable_;
 
         // map proc # to function
-        map<proc_t, handler *> procs_;
+        map<proc_id_t, handler *> procs_;
 
         mutex procs_m_; // protect insert/delete to procs[]
         mutex count_m_;  // protect modification of counts
 
         mutex procs_m_; // protect insert/delete to procs[]
         mutex count_m_;  // protect modification of counts
@@ -183,13 +192,13 @@ class rpcs : private connection_delegate {
         void dispatch(shared_ptr<connection> c, const string & buf);
 
         // internal handler registration
         void dispatch(shared_ptr<connection> c, const string & buf);
 
         // internal handler registration
-        void reg1(proc_t proc, handler *);
+        void reg1(proc_id_t proc, handler *);
 
         unique_ptr<thread_pool> dispatchpool_;
         unique_ptr<tcpsconn> listener_;
 
         // RPC handler for clients binding
 
         unique_ptr<thread_pool> dispatchpool_;
         unique_ptr<tcpsconn> listener_;
 
         // RPC handler for clients binding
-        int rpcbind(unsigned int &r, int a);
+        rpc_protocol::status rpcbind(unsigned int &r, int a);
 
         bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
 
         bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
@@ -200,13 +209,14 @@ class rpcs : private connection_delegate {
 
         void set_reachable(bool r) { reachable_ = r; }
 
 
         void set_reachable(bool r) { reachable_ = r; }
 
-        template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
+        template<class P, class F, class C=void> void reg(proc_t<P> proc, F f, C *c=nullptr) {
+            static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
             struct ReturnOnFailure {
                 static inline int unmarshall_args_failure() {
             struct ReturnOnFailure {
                 static inline int unmarshall_args_failure() {
-                    return rpc_const::unmarshal_args_failure;
+                    return rpc_protocol::unmarshal_args_failure;
                 }
             };
                 }
             };
-            reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
+            reg1(proc.id, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
         }
 
         void start();
         }
 
         void start();
index 8107f04..881de9b 100644 (file)
@@ -3,33 +3,57 @@
 
 #include "types.h"
 
 
 #include "types.h"
 
-using proc_t = uint32_t;
-using status_t = int32_t;
-using rpc_sz_t = uint32_t;
-
-struct request_header {
-    int xid;
-    proc_t proc;
-    unsigned int clt_nonce;
-    unsigned int srv_nonce;
-    int xid_rep;
-
-    MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
+namespace rpc_protocol {
+    using proc_id_t = uint32_t;
+
+    using status = int32_t;
+    using rpc_sz_t = uint32_t;
+
+    enum : status {
+        timeout_failure = -1,
+        unmarshal_args_failure = -2,
+        unmarshal_reply_failure = -3,
+        atmostonce_failure = -4,
+        oldsrv_failure = -5,
+        bind_failure = -6,
+        cancel_failure = -7
+    };
+
+    struct request_header {
+        int xid;
+        proc_id_t proc;
+        unsigned int clt_nonce;
+        unsigned int srv_nonce;
+        int xid_rep;
+
+        MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
+    };
+
+    struct reply_header {
+        int xid;
+        int ret;
+
+        MEMBERS(xid, ret)
+    };
+
+    template <typename Signature>
+    struct proc_t {
+        using signature = Signature;
+        proc_id_t id;
+    };
+
+    const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+    const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
+    const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+
+#define REMOTE_PROCEDURE_BASE(_base_) enum proc_no : ::rpc_protocol::proc_id_t { base = _base_ };
+#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr ::rpc_protocol::proc_t<status _args_> _name_{base + _offset_};
+
+    REMOTE_PROCEDURE_BASE(0);
+    REMOTE_PROCEDURE(1, bind, (unsigned int &, int)); // handler number reserved for bind
 };
 
 };
 
-ENDIAN_SWAPPABLE(request_header)
-
-struct reply_header {
-    int xid;
-    int ret;
-
-    MEMBERS(xid, ret)
-};
-
-ENDIAN_SWAPPABLE(reply_header)
-
-const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
-const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
-const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+ENDIAN_SWAPPABLE(rpc_protocol::request_header)
+ENDIAN_SWAPPABLE(rpc_protocol::reply_header)
 
 #endif
 
 #endif
index 354b528..1963ada 100644 (file)
@@ -27,6 +27,15 @@ class srv {
         int handle_bigrep(string &r, const size_t a);
 };
 
         int handle_bigrep(string &r, const size_t a);
 };
 
+namespace srv_protocol {
+    using status = rpc_protocol::status;
+    REMOTE_PROCEDURE_BASE(0);
+    REMOTE_PROCEDURE(22, _22, (string &, string, string));
+    REMOTE_PROCEDURE(23, fast, (int &, int));
+    REMOTE_PROCEDURE(24, slow, (int &, int));
+    REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
+};
+
 // a handler. a and b are arguments, r is the result.
 // there can be multiple arguments but only one result.
 // the caller also gets to see the int return value
 // a handler. a and b are arguments, r is the result.
 // there can be multiple arguments but only one result.
 // the caller also gets to see the int return value
@@ -34,54 +43,43 @@ class srv {
 // rpcs::reg() decides how to unmarshall by looking
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
 // rpcs::reg() decides how to unmarshall by looking
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
-int
-srv::handle_22(string &r, const string a, string b)
-{
+int srv::handle_22(string &r, const string a, string b) {
     r = a + b;
     return 0;
 }
 
     r = a + b;
     return 0;
 }
 
-int
-srv::handle_fast(int &r, const int a)
-{
+int srv::handle_fast(int &r, const int a) {
     r = a + 1;
     return 0;
 }
 
     r = a + 1;
     return 0;
 }
 
-int
-srv::handle_slow(int &r, const int a)
-{
+int srv::handle_slow(int &r, const int a) {
     usleep(random() % 500);
     r = a + 2;
     return 0;
 }
 
     usleep(random() % 500);
     r = a + 2;
     return 0;
 }
 
-int
-srv::handle_bigrep(string &r, const size_t len)
-{
+int srv::handle_bigrep(string &r, const size_t len) {
     r = string((size_t)len, 'x');
     return 0;
 }
 
 srv service;
 
     r = string((size_t)len, 'x');
     return 0;
 }
 
 srv service;
 
-void startserver()
-{
+void startserver() {
     server = new rpcs(port);
     server = new rpcs(port);
-    server->reg(22, &srv::handle_22, &service);
-    server->reg(23, &srv::handle_fast, &service);
-    server->reg(24, &srv::handle_slow, &service);
-    server->reg(25, &srv::handle_bigrep, &service);
+    server->reg(srv_protocol::_22, &srv::handle_22, &service);
+    server->reg(srv_protocol::fast, &srv::handle_fast, &service);
+    server->reg(srv_protocol::slow, &srv::handle_slow, &service);
+    server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
     server->start();
 }
 
     server->start();
 }
 
-void
-testmarshall()
-{
+void testmarshall() {
     marshall m;
     marshall m;
-    request_header rh{1,2,3,4,5};
+    rpc_protocol::request_header rh{1,2,3,4,5};
     m.pack_header(rh);
     m.pack_header(rh);
-    VERIFY(((string)m).size()==RPC_HEADER_SZ);
+    VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
     int i = 12345;
     unsigned long long l = 1223344455L;
     string s = "hallo....";
     int i = 12345;
     unsigned long long l = 1223344455L;
     string s = "hallo....";
@@ -90,10 +88,10 @@ testmarshall()
     m << s;
 
     string b = m;
     m << s;
 
     string b = m;
-    VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+    VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
 
     unmarshall un(b, true);
 
     unmarshall un(b, true);
-    request_header rh1;
+    rpc_protocol::request_header rh1;
     un.unpack_header(rh1);
     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
     int i1;
     un.unpack_header(rh1);
     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
     int i1;
@@ -106,20 +104,18 @@ testmarshall()
     VERIFY(i1==i && l1==l && s1==s);
 }
 
     VERIFY(i1==i && l1==l && s1==s);
 }
 
-void
-client1(size_t cl)
-{
+void client1(size_t cl) {
     // test concurrency.
     size_t which_cl = cl % NUM_CL;
 
     for(int i = 0; i < 100; i++){
     // test concurrency.
     size_t which_cl = cl % NUM_CL;
 
     for(int i = 0; i < 100; i++){
-        int arg = (random() % 2000);
+        unsigned long arg = (random() % 2000);
         string rep;
         string rep;
-        int ret = clients[which_cl]->call(25, rep, arg);
+        int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
         VERIFY(ret == 0);
         VERIFY(ret == 0);
-        if ((int)rep.size()!=arg)
+        if ((unsigned long)rep.size()!=arg)
             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
-        VERIFY((int)rep.size() == arg);
+        VERIFY((unsigned long)rep.size() == arg);
     }
 
     // test rpc replies coming back not in the order of
     }
 
     // test rpc replies coming back not in the order of
@@ -131,7 +127,7 @@ client1(size_t cl)
 
         auto start = steady_clock::now();
 
 
         auto start = steady_clock::now();
 
-        int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+        int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
         auto end = steady_clock::now();
         auto diff = duration_cast<milliseconds>(end - start).count();
         if (ret != 0)
         auto end = steady_clock::now();
         auto diff = duration_cast<milliseconds>(end - start).count();
         if (ret != 0)
@@ -141,60 +137,53 @@ client1(size_t cl)
     }
 }
 
     }
 }
 
-void
-client2(size_t cl)
-{
+void client2(size_t cl) {
     size_t which_cl = cl % NUM_CL;
 
     time_t t1;
     time(&t1);
 
     while(time(0) - t1 < 10){
     size_t which_cl = cl % NUM_CL;
 
     time_t t1;
     time(&t1);
 
     while(time(0) - t1 < 10){
-        int arg = (random() % 2000);
+        unsigned long arg = (random() % 2000);
         string rep;
         string rep;
-        int ret = clients[which_cl]->call(25, rep, arg);
-        if ((int)rep.size()!=arg)
+        int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
+        if ((unsigned long)rep.size()!=arg)
             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
-        VERIFY((int)rep.size() == arg);
+        VERIFY((unsigned long)rep.size() == arg);
     }
 }
 
     }
 }
 
-void
-client3(void *xx)
-{
+void client3(void *xx) {
     rpcc *c = (rpcc *) xx;
 
     for(int i = 0; i < 4; i++){
         int rep = 0;
     rpcc *c = (rpcc *) xx;
 
     for(int i = 0; i < 4; i++){
         int rep = 0;
-        int ret = c->call_timeout(24, milliseconds(300), rep, i);
-        VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+        int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
+        VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
     }
 }
 
     }
 }
 
-
-void
-simple_tests(rpcc *c)
-{
+void simple_tests(rpcc *c) {
     cout << "simple_tests" << endl;
     // an RPC call to procedure #22.
     // rpcc::call() looks at the argument types to decide how
     // to marshall the RPC call packet, and how to unmarshall
     // the reply packet.
     string rep;
     cout << "simple_tests" << endl;
     // an RPC call to procedure #22.
     // rpcc::call() looks at the argument types to decide how
     // to marshall the RPC call packet, and how to unmarshall
     // the reply packet.
     string rep;
-    int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+    int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
     VERIFY(intret == 0); // this is what handle_22 returns
     VERIFY(rep == "hello goodbye");
     cout << "   -- string concat RPC .. ok" << endl;
 
     // small request, big reply (perhaps req via UDP, reply via TCP)
     VERIFY(intret == 0); // this is what handle_22 returns
     VERIFY(rep == "hello goodbye");
     cout << "   -- string concat RPC .. ok" << endl;
 
     // small request, big reply (perhaps req via UDP, reply via TCP)
-    intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
+    intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
     VERIFY(intret == 0);
     VERIFY(rep.size() == 70000);
     cout << "   -- small request, big reply .. ok" << endl;
 
     // specify a timeout value to an RPC that should succeed (udp)
     int xx = 0;
     VERIFY(intret == 0);
     VERIFY(rep.size() == 70000);
     cout << "   -- small request, big reply .. ok" << endl;
 
     // specify a timeout value to an RPC that should succeed (udp)
     int xx = 0;
-    intret = c->call_timeout(23, milliseconds(300), xx, 77);
+    intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
     VERIFY(intret == 0 && xx == 78);
     cout << "   -- no spurious timeout .. ok" << endl;
 
     VERIFY(intret == 0 && xx == 78);
     cout << "   -- no spurious timeout .. ok" << endl;
 
@@ -202,14 +191,14 @@ simple_tests(rpcc *c)
     {
         string arg(1000, 'x');
         string rep2;
     {
         string arg(1000, 'x');
         string rep2;
-        c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
+        c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
         VERIFY(rep2.size() == 1001);
         cout << "   -- no spurious timeout .. ok" << endl;
     }
 
     // huge RPC
     string big(1000000, 'x');
         VERIFY(rep2.size() == 1001);
         cout << "   -- no spurious timeout .. ok" << endl;
     }
 
     // huge RPC
     string big(1000000, 'x');
-    intret = c->call(22, rep, big, (string)"z");
+    intret = c->call(srv_protocol::_22, rep, big, (string)"z");
     VERIFY(rep.size() == 1000001);
     cout << "   -- huge 1M rpc request .. ok" << endl;
 
     VERIFY(rep.size() == 1000001);
     cout << "   -- huge 1M rpc request .. ok" << endl;
 
@@ -224,9 +213,7 @@ simple_tests(rpcc *c)
     cout << "simple_tests OK" << endl;
 }
 
     cout << "simple_tests OK" << endl;
 }
 
-void 
-concurrent_test(size_t nt)
-{
+void concurrent_test(size_t nt) {
     // create threads that make lots of calls in parallel,
     // to test thread synchronization for concurrent calls
     // and dispatches.
     // create threads that make lots of calls in parallel,
     // to test thread synchronization for concurrent calls
     // and dispatches.
@@ -243,9 +230,7 @@ concurrent_test(size_t nt)
     cout << " OK" << endl;
 }
 
     cout << " OK" << endl;
 }
 
-void 
-lossy_test()
-{
+void lossy_test() {
     cout << "start lossy_test ...";
     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
 
     cout << "start lossy_test ...";
     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
 
@@ -274,9 +259,7 @@ lossy_test()
     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
 }
 
     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
 }
 
-void 
-failure_test()
-{
+void failure_test() {
     rpcc *client1;
     rpcc *client = clients[0];
 
     rpcc *client1;
     rpcc *client = clients[0];
 
@@ -293,8 +276,8 @@ failure_test()
     startserver();
 
     string rep;
     startserver();
 
     string rep;
-    int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
-    VERIFY(intret == rpc_const::oldsrv_failure);
+    int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == rpc_protocol::oldsrv_failure);
     cout << "   -- call recovered server with old client .. failed ok" << endl;
 
     delete client;
     cout << "   -- call recovered server with old client .. failed ok" << endl;
 
     delete client;
@@ -303,7 +286,7 @@ failure_test()
     VERIFY (client->bind() >= 0);
     VERIFY (client->bind() < 0);
 
     VERIFY (client->bind() >= 0);
     VERIFY (client->bind() < 0);
 
-    intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+    intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
     VERIFY(intret == 0);
     VERIFY(rep == "hello goodbye");
 
     VERIFY(intret == 0);
     VERIFY(rep == "hello goodbye");
 
@@ -344,9 +327,7 @@ failure_test()
     cout << "failure_test OK" << endl;
 }
 
     cout << "failure_test OK" << endl;
 }
 
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
 
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
 
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
@@ -393,7 +374,7 @@ main(int argc, char *argv[])
     testmarshall();
 
     if (isserver) {
     testmarshall();
 
     if (isserver) {
-        cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+        cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
         startserver();
     }
 
         startserver();
     }
 
@@ -424,7 +405,6 @@ main(int argc, char *argv[])
         exit(0);
     }
 
         exit(0);
     }
 
-    while (1) {
+    while (1)
         usleep(100000);
         usleep(100000);
-    }
 }
 }
diff --git a/rsm.cc b/rsm.cc
index 4e73f9f..7e90b03 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -115,7 +115,7 @@ void rsm::start() {
     thread(&rsm::recovery, this).detach();
 }
 
     thread(&rsm::recovery, this).detach();
 }
 
-void rsm::reg1(int proc, handler *h) {
+void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
     lock ml(rsm_mutex);
     procs[proc] = h;
 }
     lock ml(rsm_mutex);
     procs[proc] = h;
 }
@@ -130,7 +130,7 @@ void rsm::recovery() [[noreturn]] {
             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
             // cfg->view_id in this manner involves a race.  I suspect not.
             if (join(primary, ml)) {
             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
             // cfg->view_id in this manner involves a race.  I suspect not.
             if (join(primary, ml)) {
-                LOG("recovery: joined");
+                LOG("joined");
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
@@ -139,13 +139,13 @@ void rsm::recovery() [[noreturn]] {
             }
         }
         vid_insync = vid_commit;
             }
         }
         vid_insync = vid_commit;
-        LOG("recovery: sync vid_insync " << vid_insync);
+        LOG("sync vid_insync " << vid_insync);
         if (primary == cfg->myaddr()) {
             r = sync_with_backups(ml);
         } else {
             r = sync_with_primary(ml);
         }
         if (primary == cfg->myaddr()) {
             r = sync_with_backups(ml);
         } else {
             r = sync_with_primary(ml);
         }
-        LOG("recovery: sync done");
+        LOG("sync done");
 
         // If there was a commited viewchange during the synchronization, restart
         // the recovery
 
         // If there was a commited viewchange during the synchronization, restart
         // the recovery
@@ -157,7 +157,7 @@ void rsm::recovery() [[noreturn]] {
             myvs.seqno = 1;
             inviewchange = false;
         }
             myvs.seqno = 1;
             inviewchange = false;
         }
-        LOG("recovery: go to sleep " << insync << " " << inviewchange);
+        LOG("go to sleep " << insync << " " << inviewchange);
         recovery_cond.wait(ml);
     }
 }
         recovery_cond.wait(ml);
     }
 }
@@ -285,7 +285,7 @@ void rsm::commit_change(unsigned vid) {
 void rsm::commit_change(unsigned vid, lock &) {
     if (vid <= vid_commit)
         return;
 void rsm::commit_change(unsigned vid, lock &) {
     if (vid <= vid_commit)
         return;
-    LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
+    LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
             last_myvs.seqno << ") " << primary << " insync " << insync);
     vid_commit = vid;
     inviewchange = true;
             last_myvs.seqno << ") " << primary << " insync " << insync);
     vid_commit = vid;
     inviewchange = true;
@@ -297,7 +297,7 @@ void rsm::commit_change(unsigned vid, lock &) {
 }
 
 
 }
 
 
-void rsm::execute(int procno, const string & req, string & r) {
+void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
@@ -313,7 +313,7 @@ void rsm::execute(int procno, const string & req, string & r) {
 // number, and invokes it on all members of the replicated state
 // machine.
 //
 // number, and invokes it on all members of the replicated state
 // machine.
 //
-rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) {
+rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
     LOG("invoke procno 0x" << hex << procno);
     lock ml(invoke_mutex);
     vector<string> m;
     LOG("invoke procno 0x" << hex << procno);
     lock ml(invoke_mutex);
     vector<string> m;
@@ -367,7 +367,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & req) {
+rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
     LOG("invoke procno 0x" << hex << proc);
     lock ml(invoke_mutex);
     vector<string> m;
     LOG("invoke procno 0x" << hex << proc);
     lock ml(invoke_mutex);
     vector<string> m;
@@ -491,7 +491,7 @@ void rsm::set_primary(unsigned vid) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
-        LOG("set_primary: primary stays " << primary);
+        LOG("primary stays " << primary);
         return;
     }
 
         return;
     }
 
@@ -499,7 +499,7 @@ void rsm::set_primary(unsigned vid) {
     for (unsigned i = 0; i < p.size(); i++) {
         if (isamember(p[i], c)) {
             primary = p[i];
     for (unsigned i = 0; i < p.size(); i++) {
         if (isamember(p[i], c)) {
             primary = p[i];
-            LOG("set_primary: primary is " << primary);
+            LOG("primary is " << primary);
             return;
         }
     }
             return;
         }
     }
diff --git a/rsm.h b/rsm.h
index 8fdf2d5..b402bab 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -18,9 +18,9 @@ class rsm_state_transfer {
 
 class rsm : public config_view_change {
     private:
 
 class rsm : public config_view_change {
     private:
-        void reg1(int proc, handler *);
+        void reg1(rpc_protocol::proc_id_t proc, handler *);
     protected:
     protected:
-        map<int, handler *> procs;
+        map<rpc_protocol::proc_id_t, handler *> procs;
         unique_ptr<config> cfg;
         rsm_state_transfer *stf = nullptr;
         rpcs *rsmrpc;
         unique_ptr<config> cfg;
         rsm_state_transfer *stf = nullptr;
         rpcs *rsmrpc;
@@ -43,7 +43,7 @@ class rsm : public config_view_change {
         bool break2;
 
         rsm_client_protocol::status client_members(vector<string> &r, int i);
         bool break2;
 
         rsm_client_protocol::status client_members(vector<string> &r, int i);
-        rsm_protocol::status invoke(int &, int proc, viewstamp vs, const string & mreq);
+        rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq);
         rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src,
                 viewstamp last, unsigned vid);
         rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
         rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src,
                 viewstamp last, unsigned vid);
         rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
@@ -54,8 +54,8 @@ class rsm : public config_view_change {
         mutex rsm_mutex, invoke_mutex;
         cond recovery_cond, sync_cond;
 
         mutex rsm_mutex, invoke_mutex;
         cond recovery_cond, sync_cond;
 
-        void execute(int procno, const string & req, string & r);
-        rsm_client_protocol::status client_invoke(string & r, int procno, const string & req);
+        void execute(rpc_protocol::proc_id_t procno, const string & req, string & r);
+        rsm_client_protocol::status client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req);
         bool statetransfer(const string & m, lock & rsm_mutex_lock);
         bool statetransferdone(const string & m, lock & rsm_mutex_lock);
         bool join(const string & m, lock & rsm_mutex_lock);
         bool statetransfer(const string & m, lock & rsm_mutex_lock);
         bool statetransferdone(const string & m, lock & rsm_mutex_lock);
         bool join(const string & m, lock & rsm_mutex_lock);
@@ -75,13 +75,12 @@ class rsm : public config_view_change {
         void recovery();
         void commit_change(unsigned vid);
 
         void recovery();
         void commit_change(unsigned vid);
 
-        template<class F, class C=void> void reg(int proc, F f, C *c=nullptr);
+        template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
+            static_assert(is_valid_registration<P, F>::value, "RSM handler registered with incorrect argument types");
+            reg1(proc.id, marshalled_func<F>::wrap(f, c));
+        }
 
         void start();
 };
 
 
         void start();
 };
 
-template<class F, class C> void rsm::reg(int proc, F f, C *c) {
-    reg1(proc, marshalled_func<F>::wrap(f, c));
-}
-
 #endif /* rsm_h */
 #endif /* rsm_h */
index fa04e26..9e86915 100644 (file)
@@ -7,7 +7,7 @@ rsm_client::rsm_client(string dst) : primary(dst) {
     LOG("create rsm_client");
     lock ml(rsm_client_mutex);
     VERIFY (init_members(ml));
     LOG("create rsm_client");
     lock ml(rsm_client_mutex);
     VERIFY (init_members(ml));
-    LOG("rsm_client: done");
+    LOG("done");
 }
 
 void rsm_client::primary_failure(lock &) {
 }
 
 void rsm_client::primary_failure(lock &) {
index 2a9c8c4..be66fec 100644 (file)
@@ -19,14 +19,16 @@ class rsm_client {
         mutex rsm_client_mutex;
         void primary_failure(lock & rsm_client_mutex_lock);
         bool init_members(lock & rsm_client_mutex_lock);
         mutex rsm_client_mutex;
         void primary_failure(lock & rsm_client_mutex_lock);
         bool init_members(lock & rsm_client_mutex_lock);
+        rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
+        template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
     public:
         rsm_client(string dst);
     public:
         rsm_client(string dst);
-        rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
 
 
-        template<class R, class ...Args>
-            int call(unsigned int proc, R & r, const Args & ...a1);
-    private:
-        template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
+        template<class P, class R, class ...Args>
+        int call(rpc_protocol::proc_t<P> proc, R & r, const Args & ...a1) {
+            static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
+            return call_m(proc.id, r, marshall{a1...});
+        }
 };
 
 inline string hexify(const string & s) {
 };
 
 inline string hexify(const string & s) {
@@ -55,7 +57,7 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
             "0x" << hex << proc << " with the wrong return type");
         LOG("here's what I got: \"" << hexify(rep) << "\"");
         VERIFY(0);
             "0x" << hex << proc << " with the wrong return type");
         LOG("here's what I got: \"" << hexify(rep) << "\"");
         VERIFY(0);
-        return rpc_const::unmarshal_reply_failure;
+        return rpc_protocol::unmarshal_reply_failure;
     }
     unmarshall u1(res, false);
     u1 >> r;
     }
     unmarshall u1(res, false);
     u1 >> r;
@@ -65,14 +67,9 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
             " with the wrong return type.");
         LOG("here's what I got: \"" << hexify(res) << "\"");
         VERIFY(0);
             " with the wrong return type.");
         LOG("here's what I got: \"" << hexify(res) << "\"");
         VERIFY(0);
-        return rpc_const::unmarshal_reply_failure;
+        return rpc_protocol::unmarshal_reply_failure;
     }
     return intret;
 }
 
     }
     return intret;
 }
 
-template<class R, class ...Args>
-int rsm_client::call(unsigned int proc, R & r, const Args & ...a1) {
-    return call_m(proc, r, marshall{a1...});
-}
-
 #endif
 #endif
index a2d13c2..d64c0af 100644 (file)
@@ -4,13 +4,11 @@
 #include "types.h"
 #include "rpc/rpc.h"
 
 #include "types.h"
 #include "rpc/rpc.h"
 
-class rsm_client_protocol {
-    public:
-        enum status : status_t {OK, ERR, NOTPRIMARY, BUSY};
-        enum rpc_numbers : proc_t {
-            invoke = 0x9001,
-            members,
-        };
+namespace rsm_client_protocol {
+    enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY};
+    REMOTE_PROCEDURE_BASE(0x9000);
+    REMOTE_PROCEDURE(1, invoke, (string &, rpc_protocol::proc_id_t, string));
+    REMOTE_PROCEDURE(2, members, (vector<string> &, int));
 };
 
 struct viewstamp {
 };
 
 struct viewstamp {
@@ -24,33 +22,30 @@ struct viewstamp {
 
 MARSHALLABLE(viewstamp)
 
 
 MARSHALLABLE(viewstamp)
 
-class rsm_protocol {
-    public:
-        enum status : status_t { OK, ERR, BUSY};
-        enum rpc_numbers : proc_t {
-            invoke = 0xa001,
-            transferreq,
-            transferdonereq,
-            joinreq,
-        };
-
-        struct transferres {
-            string state;
-            viewstamp last;
-
-            MEMBERS(state, last)
-        };
+namespace rsm_protocol {
+    enum status : rpc_protocol::status { OK, ERR, BUSY};
+
+    struct transferres {
+        string state;
+        viewstamp last;
+
+        MEMBERS(state, last)
+    };
+
+    REMOTE_PROCEDURE_BASE(0xa000);
+    REMOTE_PROCEDURE(1, invoke, (int &, rpc_protocol::proc_id_t, viewstamp, string));
+    REMOTE_PROCEDURE(2, transferreq, (transferres &, string, viewstamp, unsigned));
+    REMOTE_PROCEDURE(3, transferdonereq, (int &, string, unsigned));
+    REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp));
 };
 
 MARSHALLABLE(rsm_protocol::transferres)
 
 };
 
 MARSHALLABLE(rsm_protocol::transferres)
 
-class rsm_test_protocol {
-    public:
-        enum status : status_t {OK, ERR};
-        enum rpc_numbers : proc_t {
-            net_repair = 0x12001,
-            breakpoint = 0x12002,
-        };
+namespace rsm_test_protocol {
+    enum status : rpc_protocol::status {OK, ERR};
+    REMOTE_PROCEDURE_BASE(0x12000);
+    REMOTE_PROCEDURE(1, net_repair, (status &, int));
+    REMOTE_PROCEDURE(2, breakpoint, (status &, int));
 };
 
 #endif 
 };
 
 #endif 
index 1a8b833..469aea2 100644 (file)
@@ -10,7 +10,7 @@ char log_thread_prefix = 't';
 
 int main(int argc, char *argv[]) {
     if(argc != 4){
 
 int main(int argc, char *argv[]) {
     if(argc != 4){
-        cerr << "Usage: " << argv[0] << " [host:]port [partition] arg" << endl;
+        LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [partition] arg");
         return 1;
     }
 
         return 1;
     }
 
@@ -22,7 +22,7 @@ int main(int argc, char *argv[]) {
         int b = stoi(argv[3]);
         cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
     } else {
         int b = stoi(argv[3]);
         cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
     } else {
-        cerr << "Unknown command " << argv[2] << endl;
+        LOG_NONMEMBER("Unknown command " << argv[2]);
     }
     return 0;
 }
     }
     return 0;
 }