// upcalls, but can keep its locks when calling down.
#include "rsm.h"
-#include "handle.h"
#include "rsm_client.h"
#include <unistd.h>
+using std::vector;
+
rsm_state_transfer::~rsm_state_transfer() {}
rsm::rsm(const string & _first, const string & _me) : primary(_first)
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
+ testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1));
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
- this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary?
+ std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
ml.lock();
}
}
// Start accepting synchronization request (statetransferreq) now!
insync = true;
cfg->get_view(vid_insync, backups);
- backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
+ backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
insync = false;
bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
{
rsm_protocol::transferres r;
- handle h(m);
int ret = 0;
LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
- rpcc *cl;
+ shared_ptr<rpcc> cl;
{
rsm_mutex_lock.unlock();
- cl = h.safebind();
+ cl = rpcc::bind_cached(m);
if (cl) {
ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
r, cfg->myaddr(), last_myvs, vid_insync);
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
+ LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
return false;
}
if (stf && last_myvs != r.last) {
bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
rsm_mutex_lock.unlock();
- handle h(m);
- rpcc *cl = h.safebind();
bool done = false;
- if (cl) {
+ if (auto cl = rpcc::bind_cached(m)) {
int r;
auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
done = (ret == rsm_protocol::OK);
bool rsm::join(const string & m, lock & rsm_mutex_lock) {
- handle h(m);
int ret = 0;
string log;
LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
- rpcc *cl;
- {
- rsm_mutex_lock.unlock();
- cl = h.safebind();
- if (cl != 0) {
- ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
- cfg->myaddr(), last_myvs);
- }
- rsm_mutex_lock.lock();
- }
+
+ rsm_mutex_lock.unlock();
+ auto cl = rpcc::bind_cached(m);
+ if (cl)
+ ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+ rsm_mutex_lock.lock();
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
+ LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
return false;
}
LOG << "succeeded " << log;
r = marshall(ret, rep.content()).content();
}
+static void logHexString(locked_ostream && log, const string & s) {
+ log << std::setfill('0') << std::setw(2) << std::hex;
+ for (size_t i=0; i<s.size(); i++)
+ log << (unsigned int)(unsigned char)s[i];
+}
+
//
// Clients call client_invoke to invoke a procedure on the replicated state
// machine: the primary receives the request, assigns it a sequence
// machine.
//
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
- LOG << "invoke procno 0x" << hex << procno;
+ LOG << "invoke procno 0x" << std::hex << procno;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != myaddr) {
// if invoke on slave fails, return rsm_client_protocol::BUSY
- handle h(m[i]);
LOG << "Sending invoke to " << m[i];
- rpcc *cl = h.safebind();
+ auto cl = rpcc::bind_cached(m[i]);
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
partition1(rsm_mutex_lock);
}
}
- {
- auto && log = LOG << setfill('0') << setw(2) << hex;
- for (size_t i=0; i<req.size(); i++)
- log << (unsigned int)(unsigned char)req[i];
- }
+ logHexString(LOG, req);
execute(procno, req, r);
- {
- auto && log = LOG << setfill('0') << setw(2) << hex;
- for (size_t i=0; i<r.size(); i++)
- log << (unsigned int)(unsigned char)r[i];
- }
+ logHexString(LOG, r);
last_myvs = vs;
return rsm_client_protocol::OK;
}
// according to requests' seqno
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
- LOG << "invoke procno 0x" << hex << proc;
+ LOG << "invoke procno 0x" << std::hex << proc;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
if (primary == myaddr)
return rsm_protocol::ERR;
cfg->get_view(vid_commit, m);
- if (find(m.begin(), m.end(), myaddr) == m.end())
+ if (std::find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
LOG << "Checking sequence number";
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
- backups.erase(find(backups.begin(), backups.end(), m));
+ backups.erase(std::find(backups.begin(), backups.end(), m));
if (backups.empty())
sync_cond.notify_one();
return rsm_protocol::OK;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
- handle h(m[i]);
LOG << "member " << m[i] << " " << heal;
- if (h.safebind()) h.safebind()->set_reachable(heal);
+ if (auto cl = rpcc::bind_cached(m[i]))
+ cl->set_reachable(heal);
}
}
rsmrpc->set_reachable(heal);