X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/46fb2b4bbe3a0a8516ab04cfafa895a882c70f86..5d99dbf06a14904944f5593c63705934bdfdcfb7:/rsm.cc diff --git a/rsm.cc b/rsm.cc index f12f9db..843418a 100644 --- a/rsm.cc +++ b/rsm.cc @@ -86,7 +86,7 @@ #include "rsm.h" #include "rsm_client.h" -rsm::rsm(std::string _first, std::string _me) : +rsm::rsm(string _first, string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { @@ -111,13 +111,13 @@ rsm::rsm(std::string _first, std::string _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = new rpcs((uint32_t)std::stoi(_me) + 1); + testsvr = new rpcs((uint32_t)stoi(_me) + 1); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); { lock ml(rsm_mutex); - std::thread(&rsm::recovery, this).detach(); + thread(&rsm::recovery, this).detach(); } } @@ -140,7 +140,7 @@ void rsm::recovery() [[noreturn]] { commit_change(cfg->view_id(), ml); } else { ml.unlock(); - std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary? + this_thread::sleep_for(seconds(30)); // XXX make another node in cfg primary? ml.lock(); } } @@ -195,7 +195,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { bool rsm::sync_with_primary(lock & rsm_mutex_lock) { // Remember the primary of vid_insync - std::string m = primary; + string m = primary; while (vid_insync == vid_commit) { if (statetransfer(m, rsm_mutex_lock)) break; @@ -208,7 +208,7 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) { * Call to transfer state from m to the local node. * Assumes that rsm_mutex is already held. */ -bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) +bool rsm::statetransfer(string m, lock & rsm_mutex_lock) { rsm_protocol::transferres r; handle h(m); @@ -225,7 +225,7 @@ bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret); + LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret); return false; } if (stf && last_myvs != r.last) { @@ -236,7 +236,7 @@ bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) return true; } -bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) { +bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); handle h(m); rpcc *cl = h.safebind(); @@ -251,7 +251,7 @@ bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) { } -bool rsm::join(std::string m, lock & rsm_mutex_lock) { +bool rsm::join(string m, lock & rsm_mutex_lock) { handle h(m); int ret = 0; string log; @@ -269,7 +269,7 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) { } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret); + LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret); return false; } LOG("rsm::join: succeeded " << log); @@ -303,18 +303,18 @@ void rsm::commit_change(unsigned vid, lock &) { } -void rsm::execute(int procno, std::string req, std::string &r) { +void rsm::execute(int procno, string req, string &r) { LOG("execute"); handler *h = procs[procno]; VERIFY(h); - unmarshall args(req); + unmarshall args(req, false); marshall rep; - std::string reps; + string reps; auto ret = (rsm_protocol::status)(*h)(args, rep); marshall rep1; rep1 << ret; - rep1 << rep.str(); - r = rep1.str(); + rep1 << rep.content(); + r = rep1.content(); } // @@ -323,11 +323,11 @@ void rsm::execute(int procno, std::string req, std::string &r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) { - LOG("rsm::client_invoke: procno 0x" << std::hex << procno); +rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) { + LOG("rsm::client_invoke: procno 0x" << hex << procno); lock ml(invoke_mutex); - std::vector m; - std::string myaddr; + vector m; + string myaddr; viewstamp vs; { lock ml2(rsm_mutex); @@ -377,11 +377,11 @@ rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std:: // the replica must execute requests in order (with no gaps) // according to requests' seqno -rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) { - LOG("rsm::invoke: procno 0x" << std::hex << proc); +rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) { + LOG("rsm::invoke: procno 0x" << hex << proc); lock ml(invoke_mutex); - std::vector m; - std::string myaddr; + vector m; + string myaddr; { lock ml2(rsm_mutex); // check if !inviewchange @@ -402,7 +402,7 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) return rsm_protocol::ERR; myvs++; } - std::string r; + string r; execute(proc, req, r); last_myvs = vs; breakpoint1(); @@ -412,7 +412,7 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) /** * RPC handler: Send back the local node's state to the caller */ -rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src, +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << @@ -429,7 +429,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string * RPC handler: Inform the local node (the primary) that node m has synchronized * for view vid */ -rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { +rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) { lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; @@ -442,7 +442,7 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { // a node that wants to join an RSM as a server sends a // joinreq to the RSM's current primary; this is the // handler for that RPC. -rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { +rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) { auto ret = rsm_protocol::OK; lock ml(rsm_mutex); @@ -481,8 +481,8 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { * so the client can switch to a different primary * when it existing primary fails */ -rsm_client_protocol::status rsm::client_members(std::vector &r, int) { - std::vector m; +rsm_client_protocol::status rsm::client_members(vector &r, int) { + vector m; lock ml(rsm_mutex); cfg->get_view(vid_commit, m); m.push_back(primary); @@ -495,7 +495,7 @@ rsm_client_protocol::status rsm::client_members(std::vector &r, int // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - std::vector c, p; + vector c, p; cfg->get_view(vid, c); cfg->get_view(vid - 1, p); VERIFY (c.size() > 0); @@ -528,7 +528,7 @@ bool rsm::amiprimary() { // assumes caller holds rsm_mutex void rsm::net_repair(bool heal, lock &) { - std::vector m; + vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) {