From a5f10a497bebfc680bf418193f1fd9f1ad7cc417 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Tue, 1 Oct 2013 21:52:29 -0400 Subject: [PATCH] Clean-ups to logging and type signatures --- handle.cc | 10 +++--- lock_server.cc | 6 ++-- lock_server.h | 6 ++-- rpc/marshall.h | 15 ++++++++- rsm.cc | 92 +++++++++++++++++++++++++++----------------------------- rsm.h | 24 +++++++-------- rsm_client.cc | 12 ++++---- rsm_client.h | 27 ++++++++++++----- 8 files changed, 105 insertions(+), 87 deletions(-) diff --git a/handle.cc b/handle.cc index d32c895..5287a35 100644 --- a/handle.cc +++ b/handle.cc @@ -34,7 +34,7 @@ rpcc * handle::safebind() { if (h->cl) return h->cl; rpcc *cl = new rpcc(h->m); - LOG("handler_mgr::acquire_handle trying to bind..." << h->m); + LOG("trying to bind..." << h->m); // The test script assumes that the failure can be detected by paxos and // rsm layer within few seconds. We have to set the timeout with a small // value to support the assumption. @@ -42,11 +42,11 @@ rpcc * handle::safebind() { // With RPC_LOSSY=5, tests may fail due to delays and time outs. int ret = cl->bind(rpcc::to(1000)); if (ret < 0) { - LOG("handle_mgr::acquire_handle bind failure! " << h->m << " " << ret); + LOG("bind failure! " << h->m << " " << ret); delete cl; h->del = true; } else { - LOG("handle_mgr::acquire_handle bind succeeded " << h->m); + LOG("bind succeeded " << h->m); h->cl = cl; } return h->cl; @@ -83,10 +83,10 @@ void handle_mgr::delete_handle(const string & m) { void handle_mgr::delete_handle(const string & m, lock &) { if (hmap.find(m) == hmap.end()) { - LOG("handle_mgr::delete_handle: cl " << m << " isn't in cl list"); + LOG("cl " << m << " isn't in cl list"); return; } - LOG("handle_mgr::delete_handle: cl " << m << " refcnt " << hmap[m]->refcnt); + LOG("cl " << m << " refcnt " << hmap[m]->refcnt); hinfo *h = hmap[m]; if (h->refcnt == 0) { if (h->cl) { diff --git a/lock_server.cc b/lock_server.cc index 81cd805..d7367bd 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -93,7 +93,7 @@ void lock_server::retryer() [[noreturn]] { } } -int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { +int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { LOG("lid=" << lid << " client=" << id << "," << xid); holder_t h = holder_t(id, xid); lock_state &st = get_lock_state(lid); @@ -151,7 +151,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_pro return lock_protocol::RETRY; } -int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) { +int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { LOG("lid=" << lid << " client=" << id << "," << xid); lock_state &st = get_lock_state(lid); lock sl(st.m); @@ -171,7 +171,7 @@ string lock_server::marshal_state() { return rep.content(); } -void lock_server::unmarshal_state(string state) { +void lock_server::unmarshal_state(const string & state) { lock sl(lock_table_lock); unmarshall rep(state, false); rep >> nacquire >> lock_table; diff --git a/lock_server.h b/lock_server.h index 560167f..69ac2b8 100644 --- a/lock_server.h +++ b/lock_server.h @@ -42,9 +42,9 @@ class lock_server : public rsm_state_transfer { void revoker(); void retryer(); string marshal_state(); - void unmarshal_state(string state); - int acquire(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t); - int release(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t); + void unmarshal_state(const string & state); + int acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); + int release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); }; #endif diff --git a/rpc/marshall.h b/rpc/marshall.h index 597c0fc..69b10df 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -32,7 +32,7 @@ class marshall { // with header inline operator string() const { return buf_.substr(0,index_); } // without header - inline string content() { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); } + inline string content() const { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); } // letting S be a defaulted template parameter forces the compiler to // delay looking up operator<<(marshall&, rpc_sz_t) until we define it @@ -234,4 +234,17 @@ operator>>(unmarshall &u, E &e) { return u; } +// +// Recursive marshalling +// + +inline marshall & operator<<(marshall &m, marshall &n) { + return m << n.content(); +} + +inline unmarshall & operator>>(unmarshall &u, unmarshall &v) { + v = unmarshall(u._grab(), false); + return u; +} + #endif diff --git a/rsm.cc b/rsm.cc index 0c2e5bf..255d281 100644 --- a/rsm.cc +++ b/rsm.cc @@ -86,7 +86,7 @@ #include "rsm.h" #include "rsm_client.h" -rsm::rsm(string _first, string _me) : +rsm::rsm(const string & _first, const string & _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { @@ -181,7 +181,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { insync = true; cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); - LOG("rsm::sync_with_backups " << backups); + LOG("backups " << backups); sync_cond.wait(rsm_mutex_lock); insync = false; return true; @@ -199,16 +199,16 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) { } -/** - * Call to transfer state from m to the local node. - * Assumes that rsm_mutex is already held. - */ -bool rsm::statetransfer(string m, lock & rsm_mutex_lock) +// +// Call to transfer state from m to the local node. +// Assumes that rsm_mutex is already held. +// +bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) { rsm_protocol::transferres r; handle h(m); int ret = 0; - LOG("rsm::statetransfer: contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); rpcc *cl; { rsm_mutex_lock.unlock(); @@ -220,18 +220,18 @@ bool rsm::statetransfer(string m, lock & rsm_mutex_lock) rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret); + LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret); return false; } if (stf && last_myvs != r.last) { stf->unmarshal_state(r.state); } last_myvs = r.last; - LOG("rsm::statetransfer transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); return true; } -bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) { +bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); handle h(m); rpcc *cl = h.safebind(); @@ -246,12 +246,12 @@ bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) { } -bool rsm::join(string m, lock & rsm_mutex_lock) { +bool rsm::join(const string & m, lock & rsm_mutex_lock) { handle h(m); int ret = 0; string log; - LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"); rpcc *cl; { rsm_mutex_lock.unlock(); @@ -264,18 +264,18 @@ bool rsm::join(string m, lock & rsm_mutex_lock) { } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret); + LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret); return false; } - LOG("rsm::join: succeeded " << log); + LOG("succeeded " << log); cfg->restore(log); return true; } -/* - * Config informs rsm whenever it has successfully - * completed a view change - */ +// +// Config informs rsm whenever it has successfully +// completed a view change +// void rsm::commit_change(unsigned vid) { lock ml(rsm_mutex); commit_change(vid, ml); @@ -298,18 +298,14 @@ void rsm::commit_change(unsigned vid, lock &) { } -void rsm::execute(int procno, string req, string &r) { +void rsm::execute(int procno, const string & req, string & r) { LOG("execute"); handler *h = procs[procno]; VERIFY(h); unmarshall args(req, false); marshall rep; - string reps; auto ret = (rsm_protocol::status)(*h)(args, rep); - marshall rep1; - rep1 << ret; - rep1 << rep.content(); - r = rep1.content(); + r = marshall{ret, rep.content()}.content(); } // @@ -318,8 +314,8 @@ void rsm::execute(int procno, string req, string &r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) { - LOG("rsm::client_invoke: procno 0x" << hex << procno); +rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) { + LOG("invoke procno 0x" << hex << procno); lock ml(invoke_mutex); vector m; string myaddr; @@ -372,8 +368,8 @@ rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req // the replica must execute requests in order (with no gaps) // according to requests' seqno -rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) { - LOG("rsm::invoke: procno 0x" << hex << proc); +rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & req) { + LOG("invoke procno 0x" << hex << proc); lock ml(invoke_mutex); vector m; string myaddr; @@ -404,10 +400,10 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) { return rsm_protocol::OK; } -/** - * RPC handler: Send back the local node's state to the caller - */ -rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src, +// +// RPC handler: Send back the local node's state to the caller +// +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << @@ -420,11 +416,11 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src, return rsm_protocol::OK; } -/** - * RPC handler: Inform the local node (the primary) that node m has synchronized - * for view vid - */ -rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) { +// +// RPC handler: Inform the local node (the primary) that node m has synchronized +// for view vid +// +rsm_protocol::status rsm::transferdonereq(int &, const string & m, unsigned vid) { lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; @@ -437,7 +433,7 @@ rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) { // a node that wants to join an RSM as a server sends a // joinreq to the RSM's current primary; this is the // handler for that RPC. -rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) { +rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last) { auto ret = rsm_protocol::OK; lock ml(rsm_mutex); @@ -471,18 +467,18 @@ rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) { return ret; } -/* - * RPC handler: Send back all the nodes this local knows about to client - * so the client can switch to a different primary - * when it existing primary fails - */ +// +// RPC handler: Send back all the nodes this local knows about to client +// so the client can switch to a different primary +// when it existing primary fails +// rsm_client_protocol::status rsm::client_members(vector &r, int) { vector m; lock ml(rsm_mutex); cfg->get_view(vid_commit, m); m.push_back(primary); r = m; - LOG("rsm::client_members return " << m << " m " << primary); + LOG("return " << m << " m " << primary); return rsm_client_protocol::OK; } @@ -528,7 +524,7 @@ void rsm::net_repair(bool heal, lock &) { for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); - LOG("rsm::net_repair: " << m[i] << " " << heal); + LOG("member " << m[i] << " " << heal); if (h.safebind()) h.safebind()->set_reachable(heal); } } @@ -537,7 +533,7 @@ void rsm::net_repair(bool heal, lock &) { rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) { lock ml(rsm_mutex); - LOG("rsm::test_net_repairreq: " << heal << " (dopartition " << + LOG("heal " << heal << " (dopartition " << dopartition << ", partitioned " << partitioned << ")"); if (heal) { net_repair(heal, ml); @@ -577,7 +573,7 @@ void rsm::partition1(lock & rsm_mutex_lock) { rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); - LOG("rsm::breakpointreq: " << b); + LOG("breakpoint " << b); if (b == 1) break1 = true; else if (b == 2) break2 = true; else if (b == 3 || b == 4) cfg->breakpoint(b); diff --git a/rsm.h b/rsm.h index ef919ff..1e1726d 100644 --- a/rsm.h +++ b/rsm.h @@ -12,7 +12,7 @@ class rsm_state_transfer { public: virtual string marshal_state() = 0; - virtual void unmarshal_state(string) = 0; + virtual void unmarshal_state(const string &) = 0; virtual ~rsm_state_transfer() {} }; @@ -43,23 +43,22 @@ class rsm : public config_view_change { bool break2; rsm_client_protocol::status client_members(vector &r, int i); - rsm_protocol::status invoke(int &, int proc, viewstamp vs, string mreq); - rsm_protocol::status transferreq(rsm_protocol::transferres &r, string src, + rsm_protocol::status invoke(int &, int proc, viewstamp vs, const string & mreq); + rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src, viewstamp last, unsigned vid); - rsm_protocol::status transferdonereq(int &, string m, unsigned vid); - rsm_protocol::status joinreq(string & log, string src, - viewstamp last); + rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid); + rsm_protocol::status joinreq(string & log, const string & src, viewstamp last); rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal); rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b); mutex rsm_mutex, invoke_mutex; cond recovery_cond, sync_cond; - void execute(int procno, string req, string &r); - rsm_client_protocol::status client_invoke(string &r, int procno, string req); - bool statetransfer(string m, lock & rsm_mutex_lock); - bool statetransferdone(string m, lock & rsm_mutex_lock); - bool join(string m, lock & rsm_mutex_lock); + void execute(int procno, const string & req, string & r); + rsm_client_protocol::status client_invoke(string & r, int procno, const string & req); + bool statetransfer(const string & m, lock & rsm_mutex_lock); + bool statetransferdone(const string & m, lock & rsm_mutex_lock); + bool join(const string & m, lock & rsm_mutex_lock); void set_primary(unsigned vid); bool sync_with_backups(lock & rsm_mutex_lock); bool sync_with_primary(lock & rsm_mutex_lock); @@ -69,8 +68,7 @@ class rsm : public config_view_change { void partition1(lock & rsm_mutex_lock); void commit_change(unsigned vid, lock & rsm_mutex_lock); public: - rsm (string _first, string _me); - ~rsm() {} + rsm (const string & _first, const string & _me); bool amiprimary(); void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; } diff --git a/rsm_client.cc b/rsm_client.cc index 310d1ad..e5e4c2b 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -19,7 +19,7 @@ void rsm_client::primary_failure(lock &) { rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) { lock ml(rsm_client_mutex); while (1) { - LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary); + LOG("proc " << hex << proc << " primary " << primary); handle h(primary); ml.unlock(); @@ -32,7 +32,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const st if (!cl) goto prim_fail; - LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary << " ret " << dec << ret); + LOG("proc " << hex << proc << " primary " << primary << " ret " << dec << ret); if (ret == rsm_client_protocol::OK) return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { @@ -48,12 +48,12 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const st prim_fail: LOG("primary " << primary << " failed ret " << dec << ret); primary_failure(ml); - LOG("rsm_client::invoke: retry new primary " << primary); + LOG("retry new primary " << primary); } } bool rsm_client::init_members(lock & rsm_client_mutex_lock) { - LOG("rsm_client::init_members get members!"); + LOG("get members!"); handle h(primary); int ret = rsm_client_protocol::ERR; rpcc *cl; @@ -67,14 +67,14 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) { if (cl == 0 || ret != rsm_protocol::OK) return false; if (known_mems.size() < 1) { - LOG("rsm_client::init_members do not know any members!"); + LOG("do not know any members!"); VERIFY(0); } primary = known_mems.back(); known_mems.pop_back(); - LOG("rsm_client::init_members: primary " << primary); + LOG("primary " << primary); return true; } diff --git a/rsm_client.h b/rsm_client.h index 0b1dc88..2a9c8c4 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -29,30 +29,41 @@ class rsm_client { template int call_m(unsigned int proc, R & r, const marshall & req); }; +inline string hexify(const string & s) { + string bytes; + for (char ch : s) { + bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]); + bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]); + } + return bytes; +} + template int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { string rep; string res; - int intret = invoke(proc, rep, req); + int intret = invoke(proc, rep, req.content()); VERIFY( intret == rsm_client_protocol::OK ); unmarshall u(rep, false); u >> intret; if (intret < 0) return intret; u >> res; if (!u.okdone()) { - cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl; - cerr << "You probably forgot to set the reply string in " - "rsm::client_invoke, or you may have called RPC 0x" << hex << - proc << " with the wrong return type" << endl; + LOG("failed to unmarshall the reply."); + LOG("You probably forgot to set the reply string in " << + "rsm::client_invoke, or you may have called RPC " << + "0x" << hex << proc << " with the wrong return type"); + LOG("here's what I got: \"" << hexify(rep) << "\""); VERIFY(0); return rpc_const::unmarshal_reply_failure; } unmarshall u1(res, false); u1 >> r; if(!u1.okdone()) { - cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl; - cerr << "You are probably calling RPC 0x" << hex << proc << - " with the wrong return type." << endl; + LOG("failed to unmarshall the reply."); + LOG("You are probably calling RPC 0x" << hex << proc << + " with the wrong return type."); + LOG("here's what I got: \"" << hexify(res) << "\""); VERIFY(0); return rpc_const::unmarshal_reply_failure; } -- 1.7.9.5