// The rule is that a module releases its internal locks before it
// upcalls, but can keep its locks when calling down.
-#include "rsm.h"
-#include "rsm_client.h"
+#include "include/rsm.h"
+#include "include/rsm_client.h"
#include <unistd.h>
using std::vector;
+using namespace std::chrono;
rsm_state_transfer::~rsm_state_transfer() {}
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
- std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
+ std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary?
ml.lock();
}
}
rsm_mutex_lock.lock();
// Start accepting synchronization request (statetransferreq) now!
insync = true;
- cfg->get_view(vid_insync, backups);
+ backups = cfg->get_view(vid_insync);
backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
rsm_mutex_lock.unlock();
cl = rpcc::bind_cached(m);
if (cl) {
- ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
+ ret = cl->call_timeout(rsm_protocol::transferreq, 100ms,
r, cfg->myaddr(), last_myvs, vid_insync);
}
rsm_mutex_lock.lock();
return false;
}
if (stf && last_myvs != r.last) {
- stf->unmarshal_state(r.state);
+ stf->unmarshall_state(r.state);
}
last_myvs = r.last;
LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
rsm_mutex_lock.unlock();
auto cl = rpcc::bind_cached(m);
if (cl)
- ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+ ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs);
rsm_mutex_lock.lock();
if (cl == 0 || ret != rsm_protocol::OK) {
handler *h = procs[procno];
VERIFY(h);
marshall rep;
- auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
- r = marshall(ret, rep.content()).content();
+ auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep);
+ r = marshall(ret, rep);
}
static void logHexString(locked_ostream && log, const string & s) {
if (primary != myaddr)
return rsm_client_protocol::NOTPRIMARY;
LOG << "Assigning a viewstamp";
- cfg->get_view(vid_commit, m);
+ m = cfg->get_view(vid_commit);
// assign the RPC the next viewstamp number
vs = myvs;
myvs++;
}
- // send an invoke RPC to all slaves in the current view with a timeout of 1 second
+ // send an invoke RPC to all slaves in the current view with a timeout
LOG << "Invoking slaves";
- for (unsigned i = 0; i < m.size(); i++) {
- if (m[i] != myaddr) {
+ for (auto & mm : m) {
+ if (mm != myaddr) {
// if invoke on slave fails, return rsm_client_protocol::BUSY
- LOG << "Sending invoke to " << m[i];
- auto cl = rpcc::bind_cached(m[i]);
+ LOG << "Sending invoke to " << mm;
+ auto cl = rpcc::bind_cached(mm);
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
- auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
+ auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req);
LOG << "Invoke returned " << ret;
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
LOG << "invoke procno 0x" << std::hex << proc;
lock ml(invoke_mutex);
- vector<string> m;
string myaddr;
{
lock ml2(rsm_mutex);
myaddr = cfg->myaddr();
if (primary == myaddr)
return rsm_protocol::ERR;
- cfg->get_view(vid_commit, m);
+ vector<string> m = cfg->get_view(vid_commit);
if (std::find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
if (stf && last != last_myvs)
- r.state = stf->marshal_state();
+ r.state = stf->marshall_state();
r.last = last_myvs;
return rsm_protocol::OK;
}
// primary failure
//
rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
- vector<string> m;
lock ml(rsm_mutex);
- cfg->get_view(vid_commit, m);
+ vector<string> m = cfg->get_view(vid_commit);
m.push_back(primary);
r = m;
LOG << "return " << m << " m " << primary;
// otherwise, the lowest number node of the previous view.
// caller should hold rsm_mutex
void rsm::set_primary(unsigned vid) {
- vector<string> c, p;
- cfg->get_view(vid, c);
- cfg->get_view(vid - 1, p);
+ vector<string> c = cfg->get_view(vid), p = cfg->get_view(vid - 1);
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
}
VERIFY(p.size() > 0);
- for (unsigned i = 0; i < p.size(); i++) {
- if (isamember(p[i], c)) {
- primary = p[i];
+ for (auto & pp : p) {
+ if (isamember(pp, c)) {
+ primary = pp;
LOG << "primary is " << primary;
return;
}
void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
VERIFY(rsm_mutex_lock);
- vector<string> m;
- cfg->get_view(vid_commit, m);
- for (unsigned i = 0; i < m.size(); i++) {
- if (m[i] != cfg->myaddr()) {
- LOG << "member " << m[i] << " " << heal;
- if (auto cl = rpcc::bind_cached(m[i]))
+ for (auto & mm : cfg->get_view(vid_commit)) {
+ if (mm != cfg->myaddr()) {
+ LOG << "member " << mm << " " << heal;
+ if (auto cl = rpcc::bind_cached(mm))
cl->set_reachable(heal);
}
}