// XXX iannucci 2013/09/15 -- I don't understand whether accessing
// cfg->view_id in this manner involves a race. I suspect not.
if (join(primary, ml)) {
- LOG("joined");
+ LOG << "joined";
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
}
}
vid_insync = vid_commit;
- LOG("sync vid_insync " << vid_insync);
+ LOG << "sync vid_insync " << vid_insync;
if (primary == cfg->myaddr()) {
r = sync_with_backups(ml);
} else {
r = sync_with_primary(ml);
}
- LOG("sync done");
+ LOG << "sync done";
// If there was a commited viewchange during the synchronization, restart
// the recovery
myvs.seqno = 1;
inviewchange = false;
}
- LOG("go to sleep " << insync << " " << inviewchange);
+ LOG << "go to sleep " << insync << " " << inviewchange;
recovery_cond.wait(ml);
}
}
insync = true;
cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
- LOG("backups " << backups);
+ LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
insync = false;
return true;
rsm_protocol::transferres r;
handle h(m);
int ret = 0;
- LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
rpcc *cl;
{
rsm_mutex_lock.unlock();
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+ LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
return false;
}
if (stf && last_myvs != r.last) {
stf->unmarshal_state(r.state);
}
last_myvs = r.last;
- LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
return true;
}
int ret = 0;
string log;
- LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
rpcc *cl;
{
rsm_mutex_lock.unlock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+ LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
return false;
}
- LOG("succeeded " << log);
+ LOG << "succeeded " << log;
cfg->restore(log);
return true;
}
void rsm::commit_change(unsigned vid, lock &) {
if (vid <= vid_commit)
return;
- LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
- last_myvs.seqno << ") " << primary << " insync " << insync);
+ LOG << "new view (" << vid << ") last vs (" << last_myvs.vid << ","
+ << last_myvs.seqno << ") " << primary << " insync " << insync;
vid_commit = vid;
inviewchange = true;
set_primary(vid);
void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
- LOG("execute");
+ LOG << "execute";
handler *h = procs[procno];
VERIFY(h);
marshall rep;
// machine.
//
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
- LOG("invoke procno 0x" << hex << procno);
+ LOG << "invoke procno 0x" << hex << procno;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
viewstamp vs;
{
lock ml2(rsm_mutex);
- LOG("Checking for inviewchange");
+ LOG << "Checking for inviewchange";
if (inviewchange)
return rsm_client_protocol::BUSY;
- LOG("Checking for primacy");
+ LOG << "Checking for primacy";
myaddr = cfg->myaddr();
if (primary != myaddr)
return rsm_client_protocol::NOTPRIMARY;
- LOG("Assigning a viewstamp");
+ LOG << "Assigning a viewstamp";
cfg->get_view(vid_commit, m);
// assign the RPC the next viewstamp number
vs = myvs;
}
// send an invoke RPC to all slaves in the current view with a timeout of 1 second
- LOG("Invoking slaves");
+ LOG << "Invoking slaves";
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != myaddr) {
// if invoke on slave fails, return rsm_client_protocol::BUSY
handle h(m[i]);
- LOG("Sending invoke to " << m[i]);
+ LOG << "Sending invoke to " << m[i];
rpcc *cl = h.safebind();
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
- LOG("Invoke returned " << ret);
+ LOG << "Invoke returned " << ret;
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
breakpoint(1);
partition1(rsm_mutex_lock);
}
}
- LOG(setfill('0') << setw(2) << hex;
+ {
+ auto && log = LOG << setfill('0') << setw(2) << hex;
for (size_t i=0; i<req.size(); i++)
- cerr << (unsigned int)(unsigned char)req[i];
- cerr);
+ log << (unsigned int)(unsigned char)req[i];
+ }
execute(procno, req, r);
- LOG(setfill('0') << setw(2) << hex;
+ {
+ auto && log = LOG << setfill('0') << setw(2) << hex;
for (size_t i=0; i<r.size(); i++)
- cerr << (unsigned int)(unsigned char)r[i];
- cerr);
+ log << (unsigned int)(unsigned char)r[i];
+ }
last_myvs = vs;
return rsm_client_protocol::OK;
}
// according to requests' seqno
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
- LOG("invoke procno 0x" << hex << proc);
+ LOG << "invoke procno 0x" << hex << proc;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
{
lock ml2(rsm_mutex);
// check if !inviewchange
- LOG("Checking for view change");
+ LOG << "Checking for view change";
if (inviewchange)
return rsm_protocol::ERR;
// check if slave
- LOG("Checking for slave status");
+ LOG << "Checking for slave status";
myaddr = cfg->myaddr();
if (primary == myaddr)
return rsm_protocol::ERR;
if (find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
- LOG("Checking sequence number");
+ LOG << "Checking sequence number";
if (vs != myvs)
return rsm_protocol::ERR;
myvs++;
rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
- LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
- last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs ("
+ << last_myvs.vid << "," << last_myvs.seqno << ")";
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
if (stf && last != last_myvs)
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
- LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
- last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=("
+ << last_myvs.vid << "," << last_myvs.seqno << ")";
if (cfg->ismember(m, vid_commit)) {
- LOG(m << " is still a member -- nothing to do");
+ LOG << m << " is still a member -- nothing to do";
log = cfg->dump();
} else if (cfg->myaddr() != primary) {
- LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
+ LOG << "but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!";
ret = rsm_protocol::BUSY;
} else {
// We cache vid_commit to avoid adding m to a view which already contains
// m due to race condition
- LOG("calling down to config layer");
+ LOG << "calling down to config layer";
unsigned vid_cache = vid_commit;
bool succ;
{
}
if (cfg->ismember(m, cfg->view_id())) {
log = cfg->dump();
- LOG("ret " << ret << " log " << log);
+ LOG << "ret " << ret << " log " << log;
} else {
- LOG("failed; proposer couldn't add " << succ);
+ LOG << "failed; proposer couldn't add " << succ;
ret = rsm_protocol::BUSY;
}
}
cfg->get_view(vid_commit, m);
m.push_back(primary);
r = m;
- LOG("return " << m << " m " << primary);
+ LOG << "return " << m << " m " << primary;
return rsm_client_protocol::OK;
}
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
- LOG("primary stays " << primary);
+ LOG << "primary stays " << primary;
return;
}
for (unsigned i = 0; i < p.size(); i++) {
if (isamember(p[i], c)) {
primary = p[i];
- LOG("primary is " << primary);
+ LOG << "primary is " << primary;
return;
}
}
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
handle h(m[i]);
- LOG("member " << m[i] << " " << heal);
+ LOG << "member " << m[i] << " " << heal;
if (h.safebind()) h.safebind()->set_reachable(heal);
}
}
rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
lock ml(rsm_mutex);
- LOG("heal " << heal << " (dopartition " <<
- dopartition << ", partitioned " << partitioned << ")");
+ LOG << "heal " << heal << " (dopartition "
+ << dopartition << ", partitioned " << partitioned << ")";
if (heal)
net_repair(heal, ml);
else
void rsm::breakpoint(int b) {
if (breakpoints[b-1]) {
- LOG("Dying at breakpoint " << b << " in rsm!");
+ LOG << "Dying at breakpoint " << b << " in rsm!";
exit(1);
}
}
rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
- LOG("breakpoint " << b);
+ LOG << "breakpoint " << b;
if (b == 1) breakpoints[1-1] = true;
else if (b == 2) breakpoints[2-1] = true;
else if (b == 3 || b == 4) cfg->breakpoint(b);