if (h->cl)
return h->cl;
rpcc *cl = new rpcc(h->m);
- LOG("handler_mgr::acquire_handle trying to bind..." << h->m);
+ LOG("trying to bind..." << h->m);
// The test script assumes that the failure can be detected by paxos and
// rsm layer within few seconds. We have to set the timeout with a small
// value to support the assumption.
// With RPC_LOSSY=5, tests may fail due to delays and time outs.
int ret = cl->bind(rpcc::to(1000));
if (ret < 0) {
- LOG("handle_mgr::acquire_handle bind failure! " << h->m << " " << ret);
+ LOG("bind failure! " << h->m << " " << ret);
delete cl;
h->del = true;
} else {
- LOG("handle_mgr::acquire_handle bind succeeded " << h->m);
+ LOG("bind succeeded " << h->m);
h->cl = cl;
}
return h->cl;
void handle_mgr::delete_handle(const string & m, lock &) {
if (hmap.find(m) == hmap.end()) {
- LOG("handle_mgr::delete_handle: cl " << m << " isn't in cl list");
+ LOG("cl " << m << " isn't in cl list");
return;
}
- LOG("handle_mgr::delete_handle: cl " << m << " refcnt " << hmap[m]->refcnt);
+ LOG("cl " << m << " refcnt " << hmap[m]->refcnt);
hinfo *h = hmap[m];
if (h->refcnt == 0) {
if (h->cl) {
}
}
-int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
LOG("lid=" << lid << " client=" << id << "," << xid);
holder_t h = holder_t(id, xid);
lock_state &st = get_lock_state(lid);
return lock_protocol::RETRY;
}
-int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
+int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
LOG("lid=" << lid << " client=" << id << "," << xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
return rep.content();
}
-void lock_server::unmarshal_state(string state) {
+void lock_server::unmarshal_state(const string & state) {
lock sl(lock_table_lock);
unmarshall rep(state, false);
rep >> nacquire >> lock_table;
void revoker();
void retryer();
string marshal_state();
- void unmarshal_state(string state);
- int acquire(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t);
- int release(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t);
+ void unmarshal_state(const string & state);
+ int acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+ int release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
};
#endif
// with header
inline operator string() const { return buf_.substr(0,index_); }
// without header
- inline string content() { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); }
+ inline string content() const { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); }
// letting S be a defaulted template parameter forces the compiler to
// delay looking up operator<<(marshall&, rpc_sz_t) until we define it
return u;
}
+//
+// Recursive marshalling
+//
+
+inline marshall & operator<<(marshall &m, marshall &n) {
+ return m << n.content();
+}
+
+inline unmarshall & operator>>(unmarshall &u, unmarshall &v) {
+ v = unmarshall(u._grab<string>(), false);
+ return u;
+}
+
#endif
#include "rsm.h"
#include "rsm_client.h"
-rsm::rsm(string _first, string _me) :
+rsm::rsm(const string & _first, const string & _me) :
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
insync = true;
cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
- LOG("rsm::sync_with_backups " << backups);
+ LOG("backups " << backups);
sync_cond.wait(rsm_mutex_lock);
insync = false;
return true;
}
-/**
- * Call to transfer state from m to the local node.
- * Assumes that rsm_mutex is already held.
- */
-bool rsm::statetransfer(string m, lock & rsm_mutex_lock)
+//
+// Call to transfer state from m to the local node.
+// Assumes that rsm_mutex is already held.
+//
+bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
{
rsm_protocol::transferres r;
handle h(m);
int ret = 0;
- LOG("rsm::statetransfer: contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
rpcc *cl;
{
rsm_mutex_lock.unlock();
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+ LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
return false;
}
if (stf && last_myvs != r.last) {
stf->unmarshal_state(r.state);
}
last_myvs = r.last;
- LOG("rsm::statetransfer transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
return true;
}
-bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) {
+bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
rsm_mutex_lock.unlock();
handle h(m);
rpcc *cl = h.safebind();
}
-bool rsm::join(string m, lock & rsm_mutex_lock) {
+bool rsm::join(const string & m, lock & rsm_mutex_lock) {
handle h(m);
int ret = 0;
string log;
- LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
rpcc *cl;
{
rsm_mutex_lock.unlock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+ LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
return false;
}
- LOG("rsm::join: succeeded " << log);
+ LOG("succeeded " << log);
cfg->restore(log);
return true;
}
-/*
- * Config informs rsm whenever it has successfully
- * completed a view change
- */
+//
+// Config informs rsm whenever it has successfully
+// completed a view change
+//
void rsm::commit_change(unsigned vid) {
lock ml(rsm_mutex);
commit_change(vid, ml);
}
-void rsm::execute(int procno, string req, string &r) {
+void rsm::execute(int procno, const string & req, string & r) {
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
unmarshall args(req, false);
marshall rep;
- string reps;
auto ret = (rsm_protocol::status)(*h)(args, rep);
- marshall rep1;
- rep1 << ret;
- rep1 << rep.content();
- r = rep1.content();
+ r = marshall{ret, rep.content()}.content();
}
//
// number, and invokes it on all members of the replicated state
// machine.
//
-rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) {
- LOG("rsm::client_invoke: procno 0x" << hex << procno);
+rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) {
+ LOG("invoke procno 0x" << hex << procno);
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
// the replica must execute requests in order (with no gaps)
// according to requests' seqno
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) {
- LOG("rsm::invoke: procno 0x" << hex << proc);
+rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & req) {
+ LOG("invoke procno 0x" << hex << proc);
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
return rsm_protocol::OK;
}
-/**
- * RPC handler: Send back the local node's state to the caller
- */
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src,
+//
+// RPC handler: Send back the local node's state to the caller
+//
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
return rsm_protocol::OK;
}
-/**
- * RPC handler: Inform the local node (the primary) that node m has synchronized
- * for view vid
- */
-rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) {
+//
+// RPC handler: Inform the local node (the primary) that node m has synchronized
+// for view vid
+//
+rsm_protocol::status rsm::transferdonereq(int &, const string & m, unsigned vid) {
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
// a node that wants to join an RSM as a server sends a
// joinreq to the RSM's current primary; this is the
// handler for that RPC.
-rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) {
+rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last) {
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
return ret;
}
-/*
- * RPC handler: Send back all the nodes this local knows about to client
- * so the client can switch to a different primary
- * when it existing primary fails
- */
+//
+// RPC handler: Send back all the nodes this local knows about to client
+// so the client can switch to a different primary
+// when it existing primary fails
+//
rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
vector<string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
m.push_back(primary);
r = m;
- LOG("rsm::client_members return " << m << " m " << primary);
+ LOG("return " << m << " m " << primary);
return rsm_client_protocol::OK;
}
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
handle h(m[i]);
- LOG("rsm::net_repair: " << m[i] << " " << heal);
+ LOG("member " << m[i] << " " << heal);
if (h.safebind()) h.safebind()->set_reachable(heal);
}
}
rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
lock ml(rsm_mutex);
- LOG("rsm::test_net_repairreq: " << heal << " (dopartition " <<
+ LOG("heal " << heal << " (dopartition " <<
dopartition << ", partitioned " << partitioned << ")");
if (heal) {
net_repair(heal, ml);
rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
- LOG("rsm::breakpointreq: " << b);
+ LOG("breakpoint " << b);
if (b == 1) break1 = true;
else if (b == 2) break2 = true;
else if (b == 3 || b == 4) cfg->breakpoint(b);
class rsm_state_transfer {
public:
virtual string marshal_state() = 0;
- virtual void unmarshal_state(string) = 0;
+ virtual void unmarshal_state(const string &) = 0;
virtual ~rsm_state_transfer() {}
};
bool break2;
rsm_client_protocol::status client_members(vector<string> &r, int i);
- rsm_protocol::status invoke(int &, int proc, viewstamp vs, string mreq);
- rsm_protocol::status transferreq(rsm_protocol::transferres &r, string src,
+ rsm_protocol::status invoke(int &, int proc, viewstamp vs, const string & mreq);
+ rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src,
viewstamp last, unsigned vid);
- rsm_protocol::status transferdonereq(int &, string m, unsigned vid);
- rsm_protocol::status joinreq(string & log, string src,
- viewstamp last);
+ rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
+ rsm_protocol::status joinreq(string & log, const string & src, viewstamp last);
rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
mutex rsm_mutex, invoke_mutex;
cond recovery_cond, sync_cond;
- void execute(int procno, string req, string &r);
- rsm_client_protocol::status client_invoke(string &r, int procno, string req);
- bool statetransfer(string m, lock & rsm_mutex_lock);
- bool statetransferdone(string m, lock & rsm_mutex_lock);
- bool join(string m, lock & rsm_mutex_lock);
+ void execute(int procno, const string & req, string & r);
+ rsm_client_protocol::status client_invoke(string & r, int procno, const string & req);
+ bool statetransfer(const string & m, lock & rsm_mutex_lock);
+ bool statetransferdone(const string & m, lock & rsm_mutex_lock);
+ bool join(const string & m, lock & rsm_mutex_lock);
void set_primary(unsigned vid);
bool sync_with_backups(lock & rsm_mutex_lock);
bool sync_with_primary(lock & rsm_mutex_lock);
void partition1(lock & rsm_mutex_lock);
void commit_change(unsigned vid, lock & rsm_mutex_lock);
public:
- rsm (string _first, string _me);
- ~rsm() {}
+ rsm (const string & _first, const string & _me);
bool amiprimary();
void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) {
lock ml(rsm_client_mutex);
while (1) {
- LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary);
+ LOG("proc " << hex << proc << " primary " << primary);
handle h(primary);
ml.unlock();
if (!cl)
goto prim_fail;
- LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
+ LOG("proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
if (ret == rsm_client_protocol::OK)
return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
prim_fail:
LOG("primary " << primary << " failed ret " << dec << ret);
primary_failure(ml);
- LOG("rsm_client::invoke: retry new primary " << primary);
+ LOG("retry new primary " << primary);
}
}
bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
- LOG("rsm_client::init_members get members!");
+ LOG("get members!");
handle h(primary);
int ret = rsm_client_protocol::ERR;
rpcc *cl;
if (cl == 0 || ret != rsm_protocol::OK)
return false;
if (known_mems.size() < 1) {
- LOG("rsm_client::init_members do not know any members!");
+ LOG("do not know any members!");
VERIFY(0);
}
primary = known_mems.back();
known_mems.pop_back();
- LOG("rsm_client::init_members: primary " << primary);
+ LOG("primary " << primary);
return true;
}
template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
};
+inline string hexify(const string & s) {
+ string bytes;
+ for (char ch : s) {
+ bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]);
+ bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]);
+ }
+ return bytes;
+}
+
template<class R>
int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
string rep;
string res;
- int intret = invoke(proc, rep, req);
+ int intret = invoke(proc, rep, req.content());
VERIFY( intret == rsm_client_protocol::OK );
unmarshall u(rep, false);
u >> intret;
if (intret < 0) return intret;
u >> res;
if (!u.okdone()) {
- cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
- cerr << "You probably forgot to set the reply string in "
- "rsm::client_invoke, or you may have called RPC 0x" << hex <<
- proc << " with the wrong return type" << endl;
+ LOG("failed to unmarshall the reply.");
+ LOG("You probably forgot to set the reply string in " <<
+ "rsm::client_invoke, or you may have called RPC " <<
+ "0x" << hex << proc << " with the wrong return type");
+ LOG("here's what I got: \"" << hexify(rep) << "\"");
VERIFY(0);
return rpc_const::unmarshal_reply_failure;
}
unmarshall u1(res, false);
u1 >> r;
if(!u1.okdone()) {
- cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
- cerr << "You are probably calling RPC 0x" << hex << proc <<
- " with the wrong return type." << endl;
+ LOG("failed to unmarshall the reply.");
+ LOG("You are probably calling RPC 0x" << hex << proc <<
+ " with the wrong return type.");
+ LOG("here's what I got: \"" << hexify(res) << "\"");
VERIFY(0);
return rpc_const::unmarshal_reply_failure;
}