#include "rsm.h"
#include "rsm_client.h"
-rsm::rsm(std::string _first, std::string _me) :
+rsm::rsm(string _first, string _me) :
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = new rpcs((uint32_t)std::stoi(_me) + 1);
+ testsvr = new rpcs((uint32_t)stoi(_me) + 1);
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
{
lock ml(rsm_mutex);
- std::thread(&rsm::recovery, this).detach();
+ thread(&rsm::recovery, this).detach();
}
}
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
- std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
+ this_thread::sleep_for(seconds(30)); // XXX make another node in cfg primary?
ml.lock();
}
}
bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
// Remember the primary of vid_insync
- std::string m = primary;
+ string m = primary;
while (vid_insync == vid_commit) {
if (statetransfer(m, rsm_mutex_lock))
break;
* Call to transfer state from m to the local node.
* Assumes that rsm_mutex is already held.
*/
-bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
+bool rsm::statetransfer(string m, lock & rsm_mutex_lock)
{
rsm_protocol::transferres r;
handle h(m);
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
+ LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
return false;
}
if (stf && last_myvs != r.last) {
return true;
}
-bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
+bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) {
rsm_mutex_lock.unlock();
handle h(m);
rpcc *cl = h.safebind();
}
-bool rsm::join(std::string m, lock & rsm_mutex_lock) {
+bool rsm::join(string m, lock & rsm_mutex_lock) {
handle h(m);
int ret = 0;
string log;
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
+ LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
return false;
}
LOG("rsm::join: succeeded " << log);
}
-void rsm::execute(int procno, std::string req, std::string &r) {
+void rsm::execute(int procno, string req, string &r) {
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
- unmarshall args(req);
+ unmarshall args(req, false);
marshall rep;
- std::string reps;
+ string reps;
auto ret = (rsm_protocol::status)(*h)(args, rep);
marshall rep1;
rep1 << ret;
- rep1 << rep.str();
- r = rep1.str();
+ rep1 << rep.content();
+ r = rep1.content();
}
//
// number, and invokes it on all members of the replicated state
// machine.
//
-rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) {
- LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
+rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) {
+ LOG("rsm::client_invoke: procno 0x" << hex << procno);
lock ml(invoke_mutex);
- std::vector<std::string> m;
- std::string myaddr;
+ vector<string> m;
+ string myaddr;
viewstamp vs;
{
lock ml2(rsm_mutex);
// the replica must execute requests in order (with no gaps)
// according to requests' seqno
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) {
- LOG("rsm::invoke: procno 0x" << std::hex << proc);
+rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) {
+ LOG("rsm::invoke: procno 0x" << hex << proc);
lock ml(invoke_mutex);
- std::vector<std::string> m;
- std::string myaddr;
+ vector<string> m;
+ string myaddr;
{
lock ml2(rsm_mutex);
// check if !inviewchange
return rsm_protocol::ERR;
myvs++;
}
- std::string r;
+ string r;
execute(proc, req, r);
last_myvs = vs;
breakpoint1();
/**
* RPC handler: Send back the local node's state to the caller
*/
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
* RPC handler: Inform the local node (the primary) that node m has synchronized
* for view vid
*/
-rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
+rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) {
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
// a node that wants to join an RSM as a server sends a
// joinreq to the RSM's current primary; this is the
// handler for that RPC.
-rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
+rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) {
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
* so the client can switch to a different primary
* when it existing primary fails
*/
-rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int) {
- std::vector<std::string> m;
+rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+ vector<string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
m.push_back(primary);
// otherwise, the lowest number node of the previous view.
// caller should hold rsm_mutex
void rsm::set_primary(unsigned vid) {
- std::vector<std::string> c, p;
+ vector<string> c, p;
cfg->get_view(vid, c);
cfg->get_view(vid - 1, p);
VERIFY (c.size() > 0);
// assumes caller holds rsm_mutex
void rsm::net_repair(bool heal, lock &) {
- std::vector<std::string> m;
+ vector<string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {