From: Peter Iannucci Date: Thu, 17 Oct 2013 06:56:52 +0000 (-0400) Subject: More logging clean-ups. Static type-checking for RPC calls and X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/6b5e09540e9392a7015fae1ad3b01b0973600ff2?ds=sidebyside More logging clean-ups. Static type-checking for RPC calls and handlers. --- diff --git a/config.cc b/config.cc index 5d04cd2..abd2f9c 100644 --- a/config.cc +++ b/config.cc @@ -203,7 +203,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { handle h(m); cfg_mutex_lock.unlock(); - int r = 0, ret = rpc_const::bind_failure; + int r = 0, ret = rpc_protocol::bind_failure; if (rpcc *cl = h.safebind()) ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); cfg_mutex_lock.lock(); @@ -212,8 +212,8 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { switch (ret) { case paxos_protocol::OK: break; - case rpc_const::atmostonce_failure: - case rpc_const::oldsrv_failure: + case rpc_protocol::atmostonce_failure: + case rpc_protocol::oldsrv_failure: invalidate_handle(m); //h.invalidate(); break; diff --git a/lock_client.cc b/lock_client.cc index 388de88..8864ce9 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -73,7 +73,7 @@ void lock_client::releaser() [[noreturn]] { int lock_client::stat(lock_protocol::lockid_t lid) { VERIFY(0); int r; - auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid); + auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id); VERIFY (ret == lock_protocol::OK); return r; } diff --git a/lock_demo.cc b/lock_demo.cc index 714c4e3..97c2964 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -4,7 +4,7 @@ char log_thread_prefix = 'd'; int main(int argc, char *argv[]) { if(argc != 2) { - cerr << "Usage: " << argv[0] << " [host:]port" << endl; + LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port"); return 1; } diff --git a/lock_protocol.h b/lock_protocol.h index 1e45ddc..5589c07 100644 --- a/lock_protocol.h +++ b/lock_protocol.h @@ -6,24 +6,24 @@ #include "types.h" #include "rpc/rpc.h" -class lock_protocol { - public: - enum status : status_t { OK, RETRY, RPCERR, NOENT, IOERR }; - using lockid_t = string; - using xid_t = uint64_t; - enum rpc_numbers : proc_t { - acquire = 0x7001, - release, - stat, - }; +typedef string callback_t; + +namespace lock_protocol { + enum status : rpc_protocol::status { OK, RETRY, RPCERR, NOENT, IOERR }; + using lockid_t = string; + using xid_t = uint64_t; + REMOTE_PROCEDURE_BASE(0x7000); + REMOTE_PROCEDURE(1, acquire, (int &, lockid_t, callback_t, xid_t)); + REMOTE_PROCEDURE(2, release, (int &, lockid_t, callback_t, xid_t)); + REMOTE_PROCEDURE(3, stat, (int &, lockid_t, callback_t)); }; -class rlock_protocol { - public: - enum status : status_t { OK, RPCERR }; - enum rpc_numbers : proc_t { - revoke = 0x8001, - retry, - }; +namespace rlock_protocol { + using lockid_t = lock_protocol::lockid_t; + using xid_t = lock_protocol::xid_t; + enum status : rpc_protocol::status { OK, RPCERR }; + REMOTE_PROCEDURE_BASE(0x8000); + REMOTE_PROCEDURE(1, revoke, (int &, lockid_t, xid_t)); + REMOTE_PROCEDURE(2, retry, (int &, lockid_t, xid_t)); }; #endif diff --git a/lock_server.cc b/lock_server.cc index 40098d7..522a917 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -92,7 +92,7 @@ void lock_server::retryer() [[noreturn]] { } } -int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { +lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { LOG("lid=" << lid << " client=" << id << "," << xid); holder_t h = holder_t(id, xid); lock_state &st = get_lock_state(lid); @@ -150,7 +150,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & return lock_protocol::RETRY; } -int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { +lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { LOG("lid=" << lid << " client=" << id << "," << xid); lock_state &st = get_lock_state(lid); lock sl(st.m); @@ -176,7 +176,7 @@ void lock_server::unmarshal_state(const string & state) { rep >> nacquire >> lock_table; } -lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { +lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid, const callback_t &) { LOG("stat request for " << lid); VERIFY(0); r = nacquire; diff --git a/lock_server.h b/lock_server.h index 69ac2b8..6ba4902 100644 --- a/lock_server.h +++ b/lock_server.h @@ -6,7 +6,6 @@ #include "rsm.h" #include "rpc/fifo.h" -typedef string callback_t; typedef pair holder_t; class lock_state { @@ -38,13 +37,13 @@ class lock_server : public rsm_state_transfer { rsm *rsm_; public: lock_server(rsm *r = 0); - lock_protocol::status stat(int &, lock_protocol::lockid_t); void revoker(); void retryer(); string marshal_state(); void unmarshal_state(const string & state); - int acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); - int release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); + lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); + lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); + lock_protocol::status stat(int &, lock_protocol::lockid_t, const callback_t & id); }; #endif diff --git a/lock_smain.cc b/lock_smain.cc index 90066e1..0a3c209 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -13,7 +13,7 @@ int main(int argc, char *argv[]) { srandom((uint32_t)getpid()); if(argc != 3){ - cerr << "Usage: " << argv[0] << " [master:]port [me:]port" << endl; + LOG_NONMEMBER("Usage: " << argv[0] << " [master:]port [me:]port"); exit(1); } diff --git a/lock_tester.cc b/lock_tester.cc index b2df781..0204a71 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -26,8 +26,7 @@ void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; if (ct[x] != 0) { - cout << "error: server granted " << lid << " twice" << endl; - cerr << "error: server granted " << lid << " twice" << endl; + LOG_NONMEMBER("error: server granted " << lid << " twice"); exit(1); } ct[x] += 1; @@ -37,7 +36,7 @@ void check_release(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; if (ct[x] != 1) { - cerr << "error: client released un-held lock " << lid << endl; + LOG_NONMEMBER("error: client released un-held lock " << lid); exit(1); } ct[x] -= 1; @@ -123,7 +122,7 @@ main(int argc, char *argv[]) srandom((uint32_t)getpid()); if (argc < 2) { - cerr << "Usage: " << argv[0] << " [host:]port [test]" << endl; + LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [test]"); exit(1); } diff --git a/paxos.cc b/paxos.cc index 8b00ad8..ab60302 100644 --- a/paxos.cc +++ b/paxos.cc @@ -240,10 +240,10 @@ void proposer_acceptor::breakpoint2() { void proposer_acceptor::breakpoint(int b) { if (b == 3) { - LOG("Proposer: breakpoint 1"); + LOG("breakpoint 1"); break1 = true; } else if (b == 4) { - LOG("Proposer: breakpoint 2"); + LOG("breakpoint 2"); break2 = true; } } diff --git a/paxos.h b/paxos.h index 642d3ff..8c9fc8f 100644 --- a/paxos.h +++ b/paxos.h @@ -7,10 +7,9 @@ #include "log.h" using prepareres = paxos_protocol::prepareres; - -using node_t = string; -using nodes_t = vector; -using value_t = string; +using node_t = paxos_protocol::node_t; +using nodes_t = paxos_protocol::nodes_t; +using value_t = paxos_protocol::value_t; class paxos_change { public: diff --git a/paxos_protocol.h b/paxos_protocol.h index 5e8afdd..1f5fd3e 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -14,24 +14,25 @@ struct prop_t { MARSHALLABLE(prop_t) -class paxos_protocol { - public: - enum status : status_t { OK, ERR }; - enum rpc_numbers : proc_t { - preparereq = 0x11001, - acceptreq, - decidereq, - heartbeat, - }; - - struct prepareres { - bool oldinstance; - bool accept; - prop_t n_a; - string v_a; - - MEMBERS(oldinstance, accept, n_a, v_a) - }; +namespace paxos_protocol { + enum status : rpc_protocol::status { OK, ERR }; + struct prepareres { + bool oldinstance; + bool accept; + prop_t n_a; + string v_a; + + MEMBERS(oldinstance, accept, n_a, v_a) + }; + using node_t = string; + using nodes_t = vector; + using value_t = string; + + REMOTE_PROCEDURE_BASE(0x11000); + REMOTE_PROCEDURE(1, preparereq, (prepareres &, node_t, unsigned, prop_t)); + REMOTE_PROCEDURE(2, acceptreq, (bool &, node_t, unsigned, prop_t, value_t)); + REMOTE_PROCEDURE(3, decidereq, (int &, node_t, unsigned, value_t)); + REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned)); }; MARSHALLABLE(paxos_protocol::prepareres) diff --git a/rpc/connection.cc b/rpc/connection.cc index 315a82e..e269a3d 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -160,7 +160,7 @@ bool connection::writepdu() { bool connection::readpdu() { IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); if (!rpdu_.buf.size()) { - rpc_sz_t sz1; + rpc_protocol::rpc_sz_t sz1; ssize_t n = fd_.read(sz1); if (n == 0) @@ -178,7 +178,7 @@ bool connection::readpdu() { size_t sz = ntoh(sz1); - if (sz > MAX_PDU) { + if (sz > rpc_protocol::MAX_PDU) { IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); return false; } diff --git a/rpc/marshall.h b/rpc/marshall.h index 69b10df..6412612 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -13,8 +13,8 @@ class unmarshall; class marshall { private: - string buf_ = string(DEFAULT_RPC_SZ, 0); // Raw bytes buffer - size_t index_ = RPC_HEADER_SZ; // Read/write head position + string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); // Raw bytes buffer + size_t index_ = rpc_protocol::RPC_HEADER_SZ; // Read/write head position public: template @@ -32,14 +32,14 @@ class marshall { // with header inline operator string() const { return buf_.substr(0,index_); } // without header - inline string content() const { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); } + inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); } // letting S be a defaulted template parameter forces the compiler to // delay looking up operator<<(marshall&, rpc_sz_t) until we define it // (i.e. we define an operator for marshalling uint32_t) - template inline void + template inline void pack_header(const T & h) { - VERIFY(sizeof(T)+sizeof(S) <= RPC_HEADER_SZ); + VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ); size_t saved_sz = index_; index_ = 0; *this << (S)(saved_sz - sizeof(S)) << (T)h; @@ -55,10 +55,10 @@ class unmarshall { public: unmarshall(const string &s, bool has_header) - : buf_(s),index_(RPC_HEADER_SZ) { + : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) { if (!has_header) - buf_.insert(0, RPC_HEADER_SZ, 0); - ok_ = (buf_.size() >= RPC_HEADER_SZ); + buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0); + ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ); } bool ok() const { return ok_; } @@ -74,11 +74,11 @@ class unmarshall { template inline void unpack_header(T & h) { - VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ); + VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ); // first 4 bytes hold length field - index_ = sizeof(rpc_sz_t); + index_ = sizeof(rpc_protocol::rpc_sz_t); *this >> h; - index_ = RPC_HEADER_SZ; + index_ = rpc_protocol::RPC_HEADER_SZ; } template inline T _grab() { T t; *this >> t; return t; } @@ -154,8 +154,8 @@ inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); } // our first two marshallable structs... -MARSHALLABLE(request_header) -MARSHALLABLE(reply_header) +MARSHALLABLE(rpc_protocol::request_header) +MARSHALLABLE(rpc_protocol::reply_header) // // Marshalling for STL containers diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 80e429e..964102f 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -102,7 +102,7 @@ rpcc::~rpcc() { int rpcc::bind(milliseconds to) { unsigned int r; - int ret = call_timeout(rpc_const::bind, to, r, 0); + int ret = call_timeout(rpc_protocol::bind, to, r, 0); if (ret == 0) { lock ml(m_); bind_done_ = true; @@ -125,7 +125,7 @@ void rpcc::cancel(void) { lock cl(ca->m); ca->done = true; - ca->intret = rpc_const::cancel_failure; + ca->intret = rpc_protocol::cancel_failure; ca->c.notify_one(); } @@ -137,25 +137,27 @@ void rpcc::cancel(void) { } } -int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { +int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { caller ca(0, &rep); int xid_rep; { lock ml(m_); - if ((proc != rpc_const::bind && !bind_done_) || (proc == rpc_const::bind && bind_done_)) { + if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) { IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice"); - return rpc_const::bind_failure; + return rpc_protocol::bind_failure; } if (destroy_wait_) - return rpc_const::cancel_failure; + return rpc_protocol::cancel_failure; ca.xid = xid_++; calls_[ca.xid] = &ca; - req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); + req.pack_header(rpc_protocol::request_header{ + ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front() + }); xid_rep = xid_rep_window_.front(); } @@ -248,7 +250,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); // destruction of req automatically frees its buffer - return (ca.done? ca.intret : rpc_const::timeout_failure); + return (ca.done? ca.intret : rpc_protocol::timeout_failure); } void rpcc::get_refconn(shared_ptr & ch) { @@ -269,7 +271,7 @@ bool rpcc::got_pdu(const shared_ptr &, const string & b) { unmarshall rep(b, true); - reply_header h; + rpc_protocol::reply_header h; rep.unpack_header(h); if (!rep.ok()) { @@ -330,7 +332,7 @@ rpcs::rpcs(in_port_t p1, size_t count) nonce_ = (unsigned int)random(); IF_LEVEL(2) LOG("created with nonce " << nonce_); - reg(rpc_const::bind, &rpcs::rpcbind, this); + reg(rpc_protocol::bind, &rpcs::rpcbind, this); dispatchpool_ = unique_ptr(new thread_pool(6, false)); } @@ -355,14 +357,14 @@ bool rpcs::got_pdu(const shared_ptr & c, const string & b) { return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b)); } -void rpcs::reg1(proc_t proc, handler *h) { +void rpcs::reg1(proc_id_t proc, handler *h) { lock pl(procs_m_); VERIFY(procs_.count(proc) == 0); procs_[proc] = h; VERIFY(procs_.count(proc) >= 1); } -void rpcs::updatestat(proc_t proc) { +void rpcs::updatestat(proc_id_t proc) { lock cl(count_m_); counts_[proc]++; curr_counts_--; @@ -388,9 +390,9 @@ void rpcs::updatestat(proc_t proc) { void rpcs::dispatch(shared_ptr c, const string & buf) { unmarshall req(buf, true); - request_header h; + rpc_protocol::request_header h; req.unpack_header(h); - proc_t proc = h.proc; + proc_id_t proc = h.proc; if (!req.ok()) { IF_LEVEL(1) LOG("unmarshall header failed"); @@ -401,13 +403,13 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce); marshall rep; - reply_header rh{h.xid,0}; + rpc_protocol::reply_header rh{h.xid,0}; // is client sending to an old instance of server? if (h.srv_nonce != 0 && h.srv_nonce != nonce_) { IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce << " (current " << nonce_ << ") proc " << hex << h.proc); - rh.ret = rpc_const::oldsrv_failure; + rh.ret = rpc_protocol::oldsrv_failure; rep.pack_header(rh); c->send(rep); return; @@ -463,10 +465,10 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { updatestat(proc); rh.ret = (*f)(req, rep); - if (rh.ret == rpc_const::unmarshal_args_failure) { - cerr << "failed to unmarshall the arguments. You are " << - "probably calling RPC 0x" << hex << proc << " with the wrong " << - "types of arguments." << endl; + if (rh.ret == rpc_protocol::unmarshal_args_failure) { + LOG("failed to unmarshall the arguments. You are " << + "probably calling RPC 0x" << hex << proc << " with the wrong " << + "types of arguments."); VERIFY(0); } VERIFY(rh.ret >= 0); @@ -498,7 +500,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { break; case FORGOTTEN: // very old request and we don't have the response anymore IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce); - rh.ret = rpc_const::atmostonce_failure; + rh.ret = rpc_protocol::atmostonce_failure; rep.pack_header(rh); c->send(rep); break; @@ -578,7 +580,7 @@ void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) { for (it++; it != l.end() && it->xid < xid; it++); // there should already be an entry, so whine if there isn't if (it == l.end() || it->xid != xid) { - cerr << "Could not find reply struct in add_reply" << endl; + LOG("Could not find reply struct in add_reply"); l.insert(it, reply_t(xid, b)); } else { *it = reply_t(xid, b); @@ -616,7 +618,7 @@ static sockaddr_in make_sockaddr(const string &hostandport) { struct hostent *hp = gethostbyname(host.c_str()); if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) { - cerr << "cannot find host name " << host << endl; + LOG_NONMEMBER("cannot find host name " << host); exit(1); } memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); diff --git a/rpc/rpc.h b/rpc/rpc.h index 84c12f3..7b65101 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -5,6 +5,7 @@ #include #include +#include "rpc_protocol.h" #include "thr_pool.h" #include "marshall.h" #include "marshall_wrap.h" @@ -15,23 +16,27 @@ namespace rpc { static constexpr milliseconds to_min{100}; } -class rpc_const { - public: - static const unsigned int bind = 1; // handler number reserved for bind - static const int timeout_failure = -1; - static const int unmarshal_args_failure = -2; - static const int unmarshal_reply_failure = -3; - static const int atmostonce_failure = -4; - static const int oldsrv_failure = -5; - static const int bind_failure = -6; - static const int cancel_failure = -7; -}; +template struct is_valid_call : false_type {}; + +template +struct is_valid_call : true_type {}; + +template struct is_valid_registration : false_type {}; + +template +struct is_valid_registration::type...), S(R &, Args...)> : true_type {}; + +template +struct is_valid_registration : is_valid_registration {}; // rpc client endpoint. // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, class rpcc : private connection_delegate { private: + using proc_id_t = rpc_protocol::proc_id_t; + template + using proc_t = rpc_protocol::proc_t; // manages per rpc info struct caller { @@ -78,20 +83,20 @@ class rpcc : private connection_delegate { request dup_req_; int xid_rep_done_; - int call1(proc_t proc, marshall &req, string &rep, milliseconds to); + int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to); template - int call_m(proc_t proc, marshall &req, R & r, milliseconds to) { + int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) { string rep; int intret = call1(proc, req, rep, to); unmarshall u(rep, true); if (intret < 0) return intret; u >> r; if (u.okdone() != true) { - cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " << - "calling RPC 0x" << hex << proc << " with the wrong return type." << endl; + LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " << + "calling RPC 0x" << hex << proc << " with the wrong return type."); VERIFY(0); - return rpc_const::unmarshal_reply_failure; + return rpc_protocol::unmarshal_reply_failure; } return intret; } @@ -111,21 +116,25 @@ class rpcc : private connection_delegate { void cancel(); - template - inline int call(proc_t proc, R & r, const Args&... args) { + template + inline int call(proc_t

proc, R & r, const Args&... args) { return call_timeout(proc, rpc::to_max, r, args...); } - template - inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args) { + template + inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args&... args) { + static_assert(is_valid_call::value, "RPC called with incorrect argument types"); marshall m{args...}; - return call_m(proc, m, r, to); + return call_m(proc.id, m, r, to); } }; // rpc server endpoint. class rpcs : private connection_delegate { private: + using proc_id_t = rpc_protocol::proc_id_t; + template + using proc_t = rpc_protocol::proc_t; typedef enum { NEW, // new RPC, not a duplicate @@ -160,7 +169,7 @@ class rpcs : private connection_delegate { rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, int xid, int rep_xid, string & b); - void updatestat(proc_t proc); + void updatestat(proc_id_t proc); // latest connection to the client map> conns_; @@ -168,12 +177,12 @@ class rpcs : private connection_delegate { // counting const size_t counting_; size_t curr_counts_; - map counts_; + map counts_; bool reachable_; // map proc # to function - map procs_; + map procs_; mutex procs_m_; // protect insert/delete to procs[] mutex count_m_; // protect modification of counts @@ -183,13 +192,13 @@ class rpcs : private connection_delegate { void dispatch(shared_ptr c, const string & buf); // internal handler registration - void reg1(proc_t proc, handler *); + void reg1(proc_id_t proc, handler *); unique_ptr dispatchpool_; unique_ptr listener_; // RPC handler for clients binding - int rpcbind(unsigned int &r, int a); + rpc_protocol::status rpcbind(unsigned int &r, int a); bool got_pdu(const shared_ptr & c, const string & b); @@ -200,13 +209,14 @@ class rpcs : private connection_delegate { void set_reachable(bool r) { reachable_ = r; } - template void reg(proc_t proc, F f, C *c=nullptr) { + template void reg(proc_t

proc, F f, C *c=nullptr) { + static_assert(is_valid_registration::value, "RPC handler registered with incorrect argument types"); struct ReturnOnFailure { static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; + return rpc_protocol::unmarshal_args_failure; } }; - reg1(proc, marshalled_func::wrap(f, c)); + reg1(proc.id, marshalled_func::wrap(f, c)); } void start(); diff --git a/rpc/rpc_protocol.h b/rpc/rpc_protocol.h index 8107f04..881de9b 100644 --- a/rpc/rpc_protocol.h +++ b/rpc/rpc_protocol.h @@ -3,33 +3,57 @@ #include "types.h" -using proc_t = uint32_t; -using status_t = int32_t; -using rpc_sz_t = uint32_t; - -struct request_header { - int xid; - proc_t proc; - unsigned int clt_nonce; - unsigned int srv_nonce; - int xid_rep; - - MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep) +namespace rpc_protocol { + using proc_id_t = uint32_t; + + using status = int32_t; + using rpc_sz_t = uint32_t; + + enum : status { + timeout_failure = -1, + unmarshal_args_failure = -2, + unmarshal_reply_failure = -3, + atmostonce_failure = -4, + oldsrv_failure = -5, + bind_failure = -6, + cancel_failure = -7 + }; + + struct request_header { + int xid; + proc_id_t proc; + unsigned int clt_nonce; + unsigned int srv_nonce; + int xid_rep; + + MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep) + }; + + struct reply_header { + int xid; + int ret; + + MEMBERS(xid, ret) + }; + + template + struct proc_t { + using signature = Signature; + proc_id_t id; + }; + + const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); + const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation + const size_t MAX_PDU = 10<<20; //maximum PDF is 10M + +#define REMOTE_PROCEDURE_BASE(_base_) enum proc_no : ::rpc_protocol::proc_id_t { base = _base_ }; +#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr ::rpc_protocol::proc_t _name_{base + _offset_}; + + REMOTE_PROCEDURE_BASE(0); + REMOTE_PROCEDURE(1, bind, (unsigned int &, int)); // handler number reserved for bind }; -ENDIAN_SWAPPABLE(request_header) - -struct reply_header { - int xid; - int ret; - - MEMBERS(xid, ret) -}; - -ENDIAN_SWAPPABLE(reply_header) - -const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); -const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation -const size_t MAX_PDU = 10<<20; //maximum PDF is 10M +ENDIAN_SWAPPABLE(rpc_protocol::request_header) +ENDIAN_SWAPPABLE(rpc_protocol::reply_header) #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 354b528..1963ada 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -27,6 +27,15 @@ class srv { int handle_bigrep(string &r, const size_t a); }; +namespace srv_protocol { + using status = rpc_protocol::status; + REMOTE_PROCEDURE_BASE(0); + REMOTE_PROCEDURE(22, _22, (string &, string, string)); + REMOTE_PROCEDURE(23, fast, (int &, int)); + REMOTE_PROCEDURE(24, slow, (int &, int)); + REMOTE_PROCEDURE(25, bigrep, (string &, size_t)); +}; + // a handler. a and b are arguments, r is the result. // there can be multiple arguments but only one result. // the caller also gets to see the int return value @@ -34,54 +43,43 @@ class srv { // rpcs::reg() decides how to unmarshall by looking // at these argument types, so this function definition // does what a .x file does in SunRPC. -int -srv::handle_22(string &r, const string a, string b) -{ +int srv::handle_22(string &r, const string a, string b) { r = a + b; return 0; } -int -srv::handle_fast(int &r, const int a) -{ +int srv::handle_fast(int &r, const int a) { r = a + 1; return 0; } -int -srv::handle_slow(int &r, const int a) -{ +int srv::handle_slow(int &r, const int a) { usleep(random() % 500); r = a + 2; return 0; } -int -srv::handle_bigrep(string &r, const size_t len) -{ +int srv::handle_bigrep(string &r, const size_t len) { r = string((size_t)len, 'x'); return 0; } srv service; -void startserver() -{ +void startserver() { server = new rpcs(port); - server->reg(22, &srv::handle_22, &service); - server->reg(23, &srv::handle_fast, &service); - server->reg(24, &srv::handle_slow, &service); - server->reg(25, &srv::handle_bigrep, &service); + server->reg(srv_protocol::_22, &srv::handle_22, &service); + server->reg(srv_protocol::fast, &srv::handle_fast, &service); + server->reg(srv_protocol::slow, &srv::handle_slow, &service); + server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service); server->start(); } -void -testmarshall() -{ +void testmarshall() { marshall m; - request_header rh{1,2,3,4,5}; + rpc_protocol::request_header rh{1,2,3,4,5}; m.pack_header(rh); - VERIFY(((string)m).size()==RPC_HEADER_SZ); + VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; string s = "hallo...."; @@ -90,10 +88,10 @@ testmarshall() m << s; string b = m; - VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); + VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); unmarshall un(b, true); - request_header rh1; + rpc_protocol::request_header rh1; un.unpack_header(rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; @@ -106,20 +104,18 @@ testmarshall() VERIFY(i1==i && l1==l && s1==s); } -void -client1(size_t cl) -{ +void client1(size_t cl) { // test concurrency. size_t which_cl = cl % NUM_CL; for(int i = 0; i < 100; i++){ - int arg = (random() % 2000); + unsigned long arg = (random() % 2000); string rep; - int ret = clients[which_cl]->call(25, rep, arg); + int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg); VERIFY(ret == 0); - if ((int)rep.size()!=arg) + if ((unsigned long)rep.size()!=arg) cout << "repsize wrong " << rep.size() << "!=" << arg << endl; - VERIFY((int)rep.size() == arg); + VERIFY((unsigned long)rep.size() == arg); } // test rpc replies coming back not in the order of @@ -131,7 +127,7 @@ client1(size_t cl) auto start = steady_clock::now(); - int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); + int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg); auto end = steady_clock::now(); auto diff = duration_cast(end - start).count(); if (ret != 0) @@ -141,60 +137,53 @@ client1(size_t cl) } } -void -client2(size_t cl) -{ +void client2(size_t cl) { size_t which_cl = cl % NUM_CL; time_t t1; time(&t1); while(time(0) - t1 < 10){ - int arg = (random() % 2000); + unsigned long arg = (random() % 2000); string rep; - int ret = clients[which_cl]->call(25, rep, arg); - if ((int)rep.size()!=arg) + int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg); + if ((unsigned long)rep.size()!=arg) cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl; - VERIFY((int)rep.size() == arg); + VERIFY((unsigned long)rep.size() == arg); } } -void -client3(void *xx) -{ +void client3(void *xx) { rpcc *c = (rpcc *) xx; for(int i = 0; i < 4; i++){ int rep = 0; - int ret = c->call_timeout(24, milliseconds(300), rep, i); - VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i); + VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2); } } - -void -simple_tests(rpcc *c) -{ +void simple_tests(rpcc *c) { cout << "simple_tests" << endl; // an RPC call to procedure #22. // rpcc::call() looks at the argument types to decide how // to marshall the RPC call packet, and how to unmarshall // the reply packet. string rep; - int intret = c->call(22, rep, (string)"hello", (string)" goodbye"); + int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == 0); // this is what handle_22 returns VERIFY(rep == "hello goodbye"); cout << " -- string concat RPC .. ok" << endl; // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call_timeout(25, milliseconds(20000), rep, 70000); + intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul); VERIFY(intret == 0); VERIFY(rep.size() == 70000); cout << " -- small request, big reply .. ok" << endl; // specify a timeout value to an RPC that should succeed (udp) int xx = 0; - intret = c->call_timeout(23, milliseconds(300), xx, 77); + intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77); VERIFY(intret == 0 && xx == 78); cout << " -- no spurious timeout .. ok" << endl; @@ -202,14 +191,14 @@ simple_tests(rpcc *c) { string arg(1000, 'x'); string rep2; - c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x"); + c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x"); VERIFY(rep2.size() == 1001); cout << " -- no spurious timeout .. ok" << endl; } // huge RPC string big(1000000, 'x'); - intret = c->call(22, rep, big, (string)"z"); + intret = c->call(srv_protocol::_22, rep, big, (string)"z"); VERIFY(rep.size() == 1000001); cout << " -- huge 1M rpc request .. ok" << endl; @@ -224,9 +213,7 @@ simple_tests(rpcc *c) cout << "simple_tests OK" << endl; } -void -concurrent_test(size_t nt) -{ +void concurrent_test(size_t nt) { // create threads that make lots of calls in parallel, // to test thread synchronization for concurrent calls // and dispatches. @@ -243,9 +230,7 @@ concurrent_test(size_t nt) cout << " OK" << endl; } -void -lossy_test() -{ +void lossy_test() { cout << "start lossy_test ..."; VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); @@ -274,9 +259,7 @@ lossy_test() VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); } -void -failure_test() -{ +void failure_test() { rpcc *client1; rpcc *client = clients[0]; @@ -293,8 +276,8 @@ failure_test() startserver(); string rep; - int intret = client->call(22, rep, (string)"hello", (string)" goodbye"); - VERIFY(intret == rpc_const::oldsrv_failure); + int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == rpc_protocol::oldsrv_failure); cout << " -- call recovered server with old client .. failed ok" << endl; delete client; @@ -303,7 +286,7 @@ failure_test() VERIFY (client->bind() >= 0); VERIFY (client->bind() < 0); - intret = client->call(22, rep, (string)"hello", (string)" goodbye"); + intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == 0); VERIFY(rep == "hello goodbye"); @@ -344,9 +327,7 @@ failure_test() cout << "failure_test OK" << endl; } -int -main(int argc, char *argv[]) -{ +int main(int argc, char *argv[]) { setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); @@ -393,7 +374,7 @@ main(int argc, char *argv[]) testmarshall(); if (isserver) { - cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl; + cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl; startserver(); } @@ -424,7 +405,6 @@ main(int argc, char *argv[]) exit(0); } - while (1) { + while (1) usleep(100000); - } } diff --git a/rsm.cc b/rsm.cc index 4e73f9f..7e90b03 100644 --- a/rsm.cc +++ b/rsm.cc @@ -115,7 +115,7 @@ void rsm::start() { thread(&rsm::recovery, this).detach(); } -void rsm::reg1(int proc, handler *h) { +void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) { lock ml(rsm_mutex); procs[proc] = h; } @@ -130,7 +130,7 @@ void rsm::recovery() [[noreturn]] { // XXX iannucci 2013/09/15 -- I don't understand whether accessing // cfg->view_id in this manner involves a race. I suspect not. if (join(primary, ml)) { - LOG("recovery: joined"); + LOG("joined"); commit_change(cfg->view_id(), ml); } else { ml.unlock(); @@ -139,13 +139,13 @@ void rsm::recovery() [[noreturn]] { } } vid_insync = vid_commit; - LOG("recovery: sync vid_insync " << vid_insync); + LOG("sync vid_insync " << vid_insync); if (primary == cfg->myaddr()) { r = sync_with_backups(ml); } else { r = sync_with_primary(ml); } - LOG("recovery: sync done"); + LOG("sync done"); // If there was a commited viewchange during the synchronization, restart // the recovery @@ -157,7 +157,7 @@ void rsm::recovery() [[noreturn]] { myvs.seqno = 1; inviewchange = false; } - LOG("recovery: go to sleep " << insync << " " << inviewchange); + LOG("go to sleep " << insync << " " << inviewchange); recovery_cond.wait(ml); } } @@ -285,7 +285,7 @@ void rsm::commit_change(unsigned vid) { void rsm::commit_change(unsigned vid, lock &) { if (vid <= vid_commit) return; - LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," << + LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," << last_myvs.seqno << ") " << primary << " insync " << insync); vid_commit = vid; inviewchange = true; @@ -297,7 +297,7 @@ void rsm::commit_change(unsigned vid, lock &) { } -void rsm::execute(int procno, const string & req, string & r) { +void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) { LOG("execute"); handler *h = procs[procno]; VERIFY(h); @@ -313,7 +313,7 @@ void rsm::execute(int procno, const string & req, string & r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) { +rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) { LOG("invoke procno 0x" << hex << procno); lock ml(invoke_mutex); vector m; @@ -367,7 +367,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str // the replica must execute requests in order (with no gaps) // according to requests' seqno -rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & req) { +rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) { LOG("invoke procno 0x" << hex << proc); lock ml(invoke_mutex); vector m; @@ -491,7 +491,7 @@ void rsm::set_primary(unsigned vid) { VERIFY (c.size() > 0); if (isamember(primary,c)) { - LOG("set_primary: primary stays " << primary); + LOG("primary stays " << primary); return; } @@ -499,7 +499,7 @@ void rsm::set_primary(unsigned vid) { for (unsigned i = 0; i < p.size(); i++) { if (isamember(p[i], c)) { primary = p[i]; - LOG("set_primary: primary is " << primary); + LOG("primary is " << primary); return; } } diff --git a/rsm.h b/rsm.h index 8fdf2d5..b402bab 100644 --- a/rsm.h +++ b/rsm.h @@ -18,9 +18,9 @@ class rsm_state_transfer { class rsm : public config_view_change { private: - void reg1(int proc, handler *); + void reg1(rpc_protocol::proc_id_t proc, handler *); protected: - map procs; + map procs; unique_ptr cfg; rsm_state_transfer *stf = nullptr; rpcs *rsmrpc; @@ -43,7 +43,7 @@ class rsm : public config_view_change { bool break2; rsm_client_protocol::status client_members(vector &r, int i); - rsm_protocol::status invoke(int &, int proc, viewstamp vs, const string & mreq); + rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq); rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src, viewstamp last, unsigned vid); rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid); @@ -54,8 +54,8 @@ class rsm : public config_view_change { mutex rsm_mutex, invoke_mutex; cond recovery_cond, sync_cond; - void execute(int procno, const string & req, string & r); - rsm_client_protocol::status client_invoke(string & r, int procno, const string & req); + void execute(rpc_protocol::proc_id_t procno, const string & req, string & r); + rsm_client_protocol::status client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req); bool statetransfer(const string & m, lock & rsm_mutex_lock); bool statetransferdone(const string & m, lock & rsm_mutex_lock); bool join(const string & m, lock & rsm_mutex_lock); @@ -75,13 +75,12 @@ class rsm : public config_view_change { void recovery(); void commit_change(unsigned vid); - template void reg(int proc, F f, C *c=nullptr); + template void reg(rpc_protocol::proc_t

proc, F f, C *c=nullptr) { + static_assert(is_valid_registration::value, "RSM handler registered with incorrect argument types"); + reg1(proc.id, marshalled_func::wrap(f, c)); + } void start(); }; -template void rsm::reg(int proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); -} - #endif /* rsm_h */ diff --git a/rsm_client.cc b/rsm_client.cc index fa04e26..9e86915 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -7,7 +7,7 @@ rsm_client::rsm_client(string dst) : primary(dst) { LOG("create rsm_client"); lock ml(rsm_client_mutex); VERIFY (init_members(ml)); - LOG("rsm_client: done"); + LOG("done"); } void rsm_client::primary_failure(lock &) { diff --git a/rsm_client.h b/rsm_client.h index 2a9c8c4..be66fec 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -19,14 +19,16 @@ class rsm_client { mutex rsm_client_mutex; void primary_failure(lock & rsm_client_mutex_lock); bool init_members(lock & rsm_client_mutex_lock); + rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req); + template int call_m(unsigned int proc, R & r, const marshall & req); public: rsm_client(string dst); - rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req); - template - int call(unsigned int proc, R & r, const Args & ...a1); - private: - template int call_m(unsigned int proc, R & r, const marshall & req); + template + int call(rpc_protocol::proc_t

proc, R & r, const Args & ...a1) { + static_assert(is_valid_call::value, "RSM method invoked with incorrect argument types"); + return call_m(proc.id, r, marshall{a1...}); + } }; inline string hexify(const string & s) { @@ -55,7 +57,7 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { "0x" << hex << proc << " with the wrong return type"); LOG("here's what I got: \"" << hexify(rep) << "\""); VERIFY(0); - return rpc_const::unmarshal_reply_failure; + return rpc_protocol::unmarshal_reply_failure; } unmarshall u1(res, false); u1 >> r; @@ -65,14 +67,9 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { " with the wrong return type."); LOG("here's what I got: \"" << hexify(res) << "\""); VERIFY(0); - return rpc_const::unmarshal_reply_failure; + return rpc_protocol::unmarshal_reply_failure; } return intret; } -template -int rsm_client::call(unsigned int proc, R & r, const Args & ...a1) { - return call_m(proc, r, marshall{a1...}); -} - #endif diff --git a/rsm_protocol.h b/rsm_protocol.h index a2d13c2..d64c0af 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -4,13 +4,11 @@ #include "types.h" #include "rpc/rpc.h" -class rsm_client_protocol { - public: - enum status : status_t {OK, ERR, NOTPRIMARY, BUSY}; - enum rpc_numbers : proc_t { - invoke = 0x9001, - members, - }; +namespace rsm_client_protocol { + enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY}; + REMOTE_PROCEDURE_BASE(0x9000); + REMOTE_PROCEDURE(1, invoke, (string &, rpc_protocol::proc_id_t, string)); + REMOTE_PROCEDURE(2, members, (vector &, int)); }; struct viewstamp { @@ -24,33 +22,30 @@ struct viewstamp { MARSHALLABLE(viewstamp) -class rsm_protocol { - public: - enum status : status_t { OK, ERR, BUSY}; - enum rpc_numbers : proc_t { - invoke = 0xa001, - transferreq, - transferdonereq, - joinreq, - }; - - struct transferres { - string state; - viewstamp last; - - MEMBERS(state, last) - }; +namespace rsm_protocol { + enum status : rpc_protocol::status { OK, ERR, BUSY}; + + struct transferres { + string state; + viewstamp last; + + MEMBERS(state, last) + }; + + REMOTE_PROCEDURE_BASE(0xa000); + REMOTE_PROCEDURE(1, invoke, (int &, rpc_protocol::proc_id_t, viewstamp, string)); + REMOTE_PROCEDURE(2, transferreq, (transferres &, string, viewstamp, unsigned)); + REMOTE_PROCEDURE(3, transferdonereq, (int &, string, unsigned)); + REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp)); }; MARSHALLABLE(rsm_protocol::transferres) -class rsm_test_protocol { - public: - enum status : status_t {OK, ERR}; - enum rpc_numbers : proc_t { - net_repair = 0x12001, - breakpoint = 0x12002, - }; +namespace rsm_test_protocol { + enum status : rpc_protocol::status {OK, ERR}; + REMOTE_PROCEDURE_BASE(0x12000); + REMOTE_PROCEDURE(1, net_repair, (status &, int)); + REMOTE_PROCEDURE(2, breakpoint, (status &, int)); }; #endif diff --git a/rsm_tester.cc b/rsm_tester.cc index 1a8b833..469aea2 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -10,7 +10,7 @@ char log_thread_prefix = 't'; int main(int argc, char *argv[]) { if(argc != 4){ - cerr << "Usage: " << argv[0] << " [host:]port [partition] arg" << endl; + LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [partition] arg"); return 1; } @@ -22,7 +22,7 @@ int main(int argc, char *argv[]) { int b = stoi(argv[3]); cout << "breakpoint " << b << " returned " << lc->breakpoint(b); } else { - cerr << "Unknown command " << argv[2] << endl; + LOG_NONMEMBER("Unknown command " << argv[2]); } return 0; }