projects
/
invirt/third/libt4.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Global destructor clean-ups and python test fixes
[invirt/third/libt4.git]
/
rsm.cc
diff --git
a/rsm.cc
b/rsm.cc
index
672243c
..
cb986fe
100644
(file)
--- a/
rsm.cc
+++ b/
rsm.cc
@@
-83,6
+83,8
@@
#include "rsm_client.h"
#include <unistd.h>
#include "rsm_client.h"
#include <unistd.h>
+using std::vector;
+
rsm_state_transfer::~rsm_state_transfer() {}
rsm::rsm(const string & _first, const string & _me) : primary(_first)
rsm_state_transfer::~rsm_state_transfer() {}
rsm::rsm(const string & _first, const string & _me) : primary(_first)
@@
-103,7
+105,7
@@
rsm::rsm(const string & _first, const string & _me) : primary(_first)
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
+ testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1));
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
@@
-129,7
+131,7
@@
void rsm::recovery() {
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
- this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary?
+ std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
ml.lock();
}
}
ml.lock();
}
}
@@
-174,7
+176,7
@@
bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
// Start accepting synchronization request (statetransferreq) now!
insync = true;
cfg->get_view(vid_insync, backups);
// Start accepting synchronization request (statetransferreq) now!
insync = true;
cfg->get_view(vid_insync, backups);
- backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
+ backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
insync = false;
LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
insync = false;
@@
-214,7
+216,7
@@
bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
+ LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
return false;
}
if (stf && last_myvs != r.last) {
return false;
}
if (stf && last_myvs != r.last) {
@@
-258,7
+260,7
@@
bool rsm::join(const string & m, lock & rsm_mutex_lock) {
}
if (cl == 0 || ret != rsm_protocol::OK) {
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
+ LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
return false;
}
LOG << "succeeded " << log;
return false;
}
LOG << "succeeded " << log;
@@
-301,6
+303,12
@@
void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
r = marshall(ret, rep.content()).content();
}
r = marshall(ret, rep.content()).content();
}
+static void logHexString(locked_ostream && log, const string & s) {
+ log << std::setfill('0') << std::setw(2) << std::hex;
+ for (size_t i=0; i<s.size(); i++)
+ log << (unsigned int)(unsigned char)s[i];
+}
+
//
// Clients call client_invoke to invoke a procedure on the replicated state
// machine: the primary receives the request, assigns it a sequence
//
// Clients call client_invoke to invoke a procedure on the replicated state
// machine: the primary receives the request, assigns it a sequence
@@
-308,7
+316,7
@@
void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
// machine.
//
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
// machine.
//
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
- LOG << "invoke procno 0x" << hex << procno;
+ LOG << "invoke procno 0x" << std::hex << procno;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
@@
-349,17
+357,9
@@
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
partition1(rsm_mutex_lock);
}
}
partition1(rsm_mutex_lock);
}
}
- {
- auto && log = LOG << setfill('0') << setw(2) << hex;
- for (size_t i=0; i<req.size(); i++)
- log << (unsigned int)(unsigned char)req[i];
- }
+ logHexString(LOG, req);
execute(procno, req, r);
execute(procno, req, r);
- {
- auto && log = LOG << setfill('0') << setw(2) << hex;
- for (size_t i=0; i<r.size(); i++)
- log << (unsigned int)(unsigned char)r[i];
- }
+ logHexString(LOG, r);
last_myvs = vs;
return rsm_client_protocol::OK;
}
last_myvs = vs;
return rsm_client_protocol::OK;
}
@@
-372,7
+372,7
@@
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
// according to requests' seqno
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
// according to requests' seqno
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
- LOG << "invoke procno 0x" << hex << proc;
+ LOG << "invoke procno 0x" << std::hex << proc;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
@@
-388,7
+388,7
@@
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
if (primary == myaddr)
return rsm_protocol::ERR;
cfg->get_view(vid_commit, m);
if (primary == myaddr)
return rsm_protocol::ERR;
cfg->get_view(vid_commit, m);
- if (find(m.begin(), m.end(), myaddr) == m.end())
+ if (std::find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
LOG << "Checking sequence number";
return rsm_protocol::ERR;
// check sequence number
LOG << "Checking sequence number";
@@
-427,7
+427,7
@@
rsm_protocol::status rsm::transferdonereq(int &, const string & m, unsigned vid)
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
- backups.erase(find(backups.begin(), backups.end(), m));
+ backups.erase(std::find(backups.begin(), backups.end(), m));
if (backups.empty())
sync_cond.notify_one();
return rsm_protocol::OK;
if (backups.empty())
sync_cond.notify_one();
return rsm_protocol::OK;