projects
/
invirt/third/libt4.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Cosmetic improvements.
[invirt/third/libt4.git]
/
rsm.cc
diff --git
a/rsm.cc
b/rsm.cc
index
7e90b03
..
c766145
100644
(file)
--- a/
rsm.cc
+++ b/
rsm.cc
@@
-83,9
+83,7
@@
#include "rsm_client.h"
#include <unistd.h>
#include "rsm_client.h"
#include <unistd.h>
-rsm::rsm(const string & _first, const string & _me) :
- stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
- partitioned (false), dopartition(false), break1(false), break2(false)
+rsm::rsm(const string & _first, const string & _me) : primary(_first)
{
cfg = unique_ptr<config>(new config(_first, _me, this));
{
cfg = unique_ptr<config>(new config(_first, _me, this));
@@
-103,7
+101,7
@@
rsm::rsm(const string & _first, const string & _me) :
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = unique_ptr<rpcs>(new rpcs((in_port_t)stoi(_me) + 1));
+ testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
@@
-115,13
+113,8
@@
void rsm::start() {
thread(&rsm::recovery, this).detach();
}
thread(&rsm::recovery, this).detach();
}
-void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
- lock ml(rsm_mutex);
- procs[proc] = h;
-}
-
// The recovery thread runs this function
// The recovery thread runs this function
-void rsm::recovery() [[noreturn]] {
+void rsm::recovery() {
bool r = true;
lock ml(rsm_mutex);
bool r = true;
lock ml(rsm_mutex);
@@
-279,7
+272,7
@@
void rsm::commit_change(unsigned vid) {
lock ml(rsm_mutex);
commit_change(vid, ml);
if (cfg->ismember(cfg->myaddr(), vid_commit))
lock ml(rsm_mutex);
commit_change(vid, ml);
if (cfg->ismember(cfg->myaddr(), vid_commit))
- breakpoint2();
+ breakpoint(2);
}
void rsm::commit_change(unsigned vid, lock &) {
}
void rsm::commit_change(unsigned vid, lock &) {
@@
-293,7
+286,7
@@
void rsm::commit_change(unsigned vid, lock &) {
recovery_cond.notify_one();
sync_cond.notify_one();
if (cfg->ismember(cfg->myaddr(), vid_commit))
recovery_cond.notify_one();
sync_cond.notify_one();
if (cfg->ismember(cfg->myaddr(), vid_commit))
- breakpoint2();
+ breakpoint(2);
}
}
@@
-301,10
+294,9
@@
void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
- unmarshall args(req, false);
marshall rep;
marshall rep;
- auto ret = (rsm_protocol::status)(*h)(args, rep);
- r = marshall{ret, rep.content()}.content();
+ auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
+ r = marshall(ret, rep.content()).content();
}
//
}
//
@@
-350,12
+342,15
@@
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
LOG("Invoke returned " << ret);
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
LOG("Invoke returned " << ret);
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
- breakpoint1();
+ breakpoint(1);
lock rsm_mutex_lock(rsm_mutex);
partition1(rsm_mutex_lock);
}
}
execute(procno, req, r);
lock rsm_mutex_lock(rsm_mutex);
partition1(rsm_mutex_lock);
}
}
execute(procno, req, r);
+ for (size_t i=0; i<r.size(); i++) {
+ LOG(hex << setfill('0') << setw(2) << (unsigned int)(unsigned char)r[i]);
+ }
last_myvs = vs;
return rsm_client_protocol::OK;
}
last_myvs = vs;
return rsm_client_protocol::OK;
}
@@
-395,14
+390,14
@@
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
string r;
execute(proc, req, r);
last_myvs = vs;
string r;
execute(proc, req, r);
last_myvs = vs;
- breakpoint1();
+ breakpoint(1);
return rsm_protocol::OK;
}
//
// RPC handler: Send back the local node's state to the caller
//
return rsm_protocol::OK;
}
//
// RPC handler: Send back the local node's state to the caller
//
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
@@
-467,11
+462,10
@@
rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
}
//
}
//
-// RPC handler: Send back all the nodes this local knows about to client
-// so the client can switch to a different primary
-// when it existing primary fails
+// RPC handler: Responds with the list of known nodes for fall-back on a
+// primary failure
//
//
-rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
vector<string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
vector<string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
@@
-512,12
+506,10
@@
bool rsm::amiprimary() {
}
}
-// Testing server
+// Test RPCs -- simulate partitions and failures
-// Simulate partitions
-
-// assumes caller holds rsm_mutex
-void rsm::net_repair(bool heal, lock &) {
+void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
+ VERIFY(rsm_mutex_lock);
vector<string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
vector<string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
@@
-530,33
+522,23
@@
void rsm::net_repair(bool heal, lock &) {
rsmrpc->set_reachable(heal);
}
rsmrpc->set_reachable(heal);
}
-rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
lock ml(rsm_mutex);
LOG("heal " << heal << " (dopartition " <<
dopartition << ", partitioned " << partitioned << ")");
lock ml(rsm_mutex);
LOG("heal " << heal << " (dopartition " <<
dopartition << ", partitioned " << partitioned << ")");
- if (heal) {
+ if (heal)
net_repair(heal, ml);
net_repair(heal, ml);
- partitioned = false;
- } else {
+ else
dopartition = true;
dopartition = true;
- partitioned = false;
- }
- r = rsm_test_protocol::OK;
- return r;
+ partitioned = false;
+ return r = rsm_test_protocol::OK;
}
// simulate failure at breakpoint 1 and 2
}
// simulate failure at breakpoint 1 and 2
-void rsm::breakpoint1() {
- if (break1) {
- LOG("Dying at breakpoint 1 in rsm!");
- exit(1);
- }
-}
-
-void rsm::breakpoint2() {
- if (break2) {
- LOG("Dying at breakpoint 2 in rsm!");
+void rsm::breakpoint(int b) {
+ if (breakpoints[b-1]) {
+ LOG("Dying at breakpoint " << b << " in rsm!");
exit(1);
}
}
exit(1);
}
}
@@
-569,12
+551,12
@@
void rsm::partition1(lock & rsm_mutex_lock) {
}
}
}
}
-rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
LOG("breakpoint " << b);
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
LOG("breakpoint " << b);
- if (b == 1) break1 = true;
- else if (b == 2) break2 = true;
+ if (b == 1) breakpoints[1-1] = true;
+ else if (b == 2) breakpoints[2-1] = true;
else if (b == 3 || b == 4) cfg->breakpoint(b);
else r = rsm_test_protocol::ERR;
return r;
else if (b == 3 || b == 4) cfg->breakpoint(b);
else r = rsm_test_protocol::ERR;
return r;