void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
string value = paxos.value(instance);
- LOG("get_view(" << instance << "): returns " << value);
+ LOG << "get_view(" << instance << "): returns " << value;
m = explode(value);
}
my_view_id = paxos.instance();
if (my_view_id > 0) {
get_view(my_view_id, mems, cfg_mutex_lock);
- LOG("view " << my_view_id << " " << mems);
+ LOG << "view " << my_view_id << " " << mems;
}
}
lock cfg_mutex_lock(cfg_mutex);
vector<string> newmem = explode(value);
- LOG("instance " << instance << ": " << newmem);
+ LOG << "instance " << instance << ": " << newmem;
for (auto mem : mems) {
- LOG("is " << mem << " still a member?");
+ LOG << "is " << mem << " still a member?";
if (!isamember(mem, newmem) && me != mem) {
- LOG("delete " << mem);
+ LOG << "delete " << mem;
handle(mem).invalidate();
}
}
bool config::add(const string & new_m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
- LOG("adding " << new_m << " to " << vid);
+ LOG << "adding " << new_m << " to " << vid;
if (vid != my_view_id) {
- LOG("that's not my view id, " << my_view_id << "!");
+ LOG << "that's not my view id, " << my_view_id << "!";
return false;
}
- LOG("calling down to paxos layer");
+ LOG << "calling down to paxos layer";
vector<string> m(mems), cmems(mems);
m.push_back(new_m);
- LOG("old mems " << cmems << " " << implode(cmems));
- LOG("new mems " << m << " " << implode(m));
+ LOG << "old mems " << cmems << " " << implode(cmems);
+ LOG << "new mems " << m << " " << implode(m);
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, implode(m));
cfg_mutex_lock.lock();
- LOG("paxos proposer returned " << (r ? "success" : "failure"));
+ LOG << "paxos proposer returned " << (r ? "success" : "failure");
return r;
}
// caller should hold cfg_mutex
bool config::remove(const string & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
- LOG("my_view_id " << my_view_id << " remove? " << m);
+ LOG << "my_view_id " << my_view_id << " remove? " << m;
vector<string> n;
for (auto mem : mems) {
if (mem != m)
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, implode(n));
cfg_mutex_lock.lock();
- LOG("proposer returned " << (r ? "success" : "failure"));
+ LOG << "proposer returned " << (r ? "success" : "failure");
return r;
}
while (1) {
auto next_timeout = steady_clock::now() + milliseconds(300);
- LOG("go to sleep");
+ LOG << "go to sleep";
config_cond.wait_until(cfg_mutex_lock, next_timeout);
unsigned vid = my_view_id;
vector<string> cmems;
get_view(vid, cmems, cfg_mutex_lock);
- LOG("current membership " << cmems);
+ LOG << "current membership " << cmems;
if (!isamember(me, cmems)) {
- LOG("not member yet; skip hearbeat");
+ LOG << "not member yet; skip hearbeat";
continue;
}
paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
r = (int) my_view_id;
- LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+ LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
if (vid == my_view_id)
return paxos_protocol::OK;
else if (paxos.isrunning()) {
config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
unsigned vid = my_view_id;
- LOG("heartbeat to " << m << " (" << vid << ")");
+ LOG << "heartbeat to " << m << " (" << vid << ")";
handle h(m);
cfg_mutex_lock.unlock();
h.invalidate();
break;
default:
- LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+ LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
res = (ret < 0) ? FAILURE : VIEWERR;
}
- LOG("done " << res);
+ LOG << "done " << res;
return res;
}
return nullptr;
if (!h->client) {
unique_ptr<rpcc> client(new rpcc(h->destination));
- LOG("bind(\"" << h->destination << "\")");
+ LOG << "bind(\"" << h->destination << "\")";
int ret = client->bind(milliseconds(1000));
if (ret < 0) {
- LOG("bind failure! " << h->destination << " " << ret);
+ LOG << "bind failure! " << h->destination << " " << ret;
h->valid = false;
} else {
- LOG("bind succeeded " << h->destination);
+ LOG << "bind succeeded " << h->destination;
h->client = move(client);
}
}
lock ml(mgr_mutex);
if (hmap.find(destination_) != hmap.end()) {
hmap[destination_]->valid = false;
- LOG_NONMEMBER("cl " << destination_ << " refcnt " << hmap[destination_].use_count());
+ LOG << "cl " << destination_ << " refcnt " << hmap[destination_].use_count();
hmap.erase(destination_);
}
}
lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
cl = unique_ptr<rpcc>(new rpcc(xdst));
if (cl->bind() < 0)
- LOG("lock_client: call bind");
+ LOG << "lock_client: call bind";
srandom((uint32_t)time(NULL)^last_port);
rlock_port = ((random()%32000) | (0x1 << 10));
while (1) {
lock_protocol::lockid_t lid;
release_fifo.deq(&lid);
- LOG("Releaser: " << lid);
+ LOG << "Releaser: " << lid;
lock_state & st = get_lock_state(lid);
lock sl(st.m);
sl.lock();
}
st.state = lock_state::none;
- LOG("Lock " << lid << ": none");
+ LOG << "Lock " << lid << ": none";
st.signal();
}
}
while (1) {
if (st.state != lock_state::free)
- LOG("Lock " << lid << ": not free");
+ LOG << "Lock " << lid << ": not free";
if (st.state == lock_state::none || st.state == lock_state::retrying) {
if (st.state == lock_state::none) {
st.xid = next_xid++;
}
st.state = lock_state::acquiring;
- LOG("Lock " << lid << ": acquiring");
+ LOG << "Lock " << lid << ": acquiring";
lock_protocol::status result;
{
sl.unlock();
result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
sl.lock();
}
- LOG("acquire returned " << result);
+ LOG << "acquire returned " << result;
if (result == lock_protocol::OK) {
st.state = lock_state::free;
- LOG("Lock " << lid << ": free");
+ LOG << "Lock " << lid << ": free";
}
}
st.wanted_by.pop_front();
st.state = lock_state::locked;
st.held_by = releaser_thread.get_id();
- LOG("Queuing " << lid << " for release");
+ LOG << "Queuing " << lid << " for release";
release_fifo.enq(lid);
} else if (front == self) {
st.wanted_by.pop_front();
}
}
- LOG("waiting...");
+ LOG << "waiting...";
st.wait(sl);
- LOG("wait ended");
+ LOG << "wait ended";
}
- LOG("Lock " << lid << ": locked");
+ LOG << "Lock " << lid << ": locked";
return lock_protocol::OK;
}
auto self = this_thread::get_id();
VERIFY(st.state == lock_state::locked && st.held_by == self);
st.state = lock_state::free;
- LOG("Lock " << lid << ": free");
+ LOG << "Lock " << lid << ": free";
if (st.wanted_by.size()) {
auto front = st.wanted_by.front();
if (front == releaser_thread.get_id()) {
st.state = lock_state::locked;
st.held_by = releaser_thread.get_id();
st.wanted_by.pop_front();
- LOG("Queuing " << lid << " for release");
+ LOG << "Queuing " << lid << " for release";
release_fifo.enq(lid);
} else
st.signal(front);
}
- LOG("Finished signaling.");
+ LOG << "Finished signaling.";
return lock_protocol::OK;
}
rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
- LOG("Revoke handler " << lid << " " << xid);
+ LOG << "Revoke handler " << lid << " " << xid;
lock_state & st = get_lock_state(lid);
lock sl(st.m);
lock sl(st.m);
VERIFY(st.state == lock_state::acquiring);
st.state = lock_state::retrying;
- LOG("Lock " << lid << ": none");
+ LOG << "Lock " << lid << ": none";
st.signal(); // only one thread needs to wake up
return rlock_protocol::OK;
}
int main(int argc, char *argv[]) {
if(argc != 2) {
- LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port");
+ LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port";
return 1;
}
lock_client *lc = new lock_client(argv[1]);
- LOG_NONMEMBER("stat returned " << lc->stat("1"));
+ LOG_NONMEMBER << "stat returned " << lc->stat("1");
}
while (1) {
lock_protocol::lockid_t lid;
revoke_fifo.deq(&lid);
- LOG("Revoking " << lid);
+ LOG << "Revoking " << lid;
if (rsm_ && !rsm_->amiprimary())
continue;
if (proxy) {
int r;
auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
- LOG("Revoke returned " << ret);
+ LOG << "Revoke returned " << ret;
}
}
}
if (rsm_ && !rsm_->amiprimary())
continue;
- LOG("Sending retry for " << lid);
+ LOG << "Sending retry for " << lid;
lock_state & st = get_lock_state(lid);
holder_t front;
{
if (proxy) {
int r;
auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
- LOG("Retry returned " << ret);
+ LOG << "Retry returned " << ret;
}
}
}
lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
- LOG("lid=" << lid << " client=" << id << "," << xid);
+ LOG << "lid=" << lid << " client=" << id << "," << xid;
holder_t h = holder_t(id, xid);
lock_state & st = get_lock_state(lid);
lock sl(st.m);
return lock_protocol::RPCERR;
else if (old_xid == xid) {
if (st.held && st.held_by == h) {
- LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+ LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
return lock_protocol::OK;
}
}
st.held = true;
st.held_by = h;
- LOG("Lock " << lid << " held by " << h.first);
+ LOG << "Lock " << lid << " held by " << h.first;
if (st.wanted_by.size())
revoke_fifo.enq(lid);
return lock_protocol::OK;
if (p.first == id) {
// make sure client is obeying serialization
if (p.second != xid) {
- LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
+ LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
return lock_protocol::RPCERR;
}
found = true;
if (!found)
st.wanted_by.push_back(h);
- LOG("wanted_by=" << st.wanted_by);
+ LOG << "wanted_by=" << st.wanted_by;
// send revoke if we're first in line
if (st.wanted_by.front() == h)
}
lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
- LOG("lid=" << lid << " client=" << id << "," << xid);
+ LOG << "lid=" << lid << " client=" << id << "," << xid;
lock_state & st = get_lock_state(lid);
lock sl(st.m);
if (st.held && st.held_by == holder_t(id, xid)) {
st.held = false;
- LOG("Lock " << lid << " not held");
+ LOG << "Lock " << lid << " not held";
}
if (st.wanted_by.size())
retry_fifo.enq(lid);
}
lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
- LOG("stat request for " << lid);
+ LOG << "stat request for " << lid;
VERIFY(0);
r = nacquire;
return lock_protocol::OK;
srandom((uint32_t)getpid());
if(argc != 3){
- LOG_NONMEMBER("Usage: " << argv[0] << " [master:]port [me:]port");
+ LOG_NONMEMBER << "Usage: " << argv[0] << " [master:]port [me:]port";
exit(1);
}
lock ml(count_mutex);
int x = lid[0] & 0x0f;
if (ct[x] != 0) {
- LOG_NONMEMBER("error: server granted " << lid << " twice");
+ LOG_NONMEMBER << "error: server granted " << lid << " twice";
exit(1);
}
ct[x] += 1;
lock ml(count_mutex);
int x = lid[0] & 0x0f;
if (ct[x] != 1) {
- LOG_NONMEMBER("error: client released un-held lock " << lid);
+ LOG_NONMEMBER << "error: client released un-held lock " << lid;
exit(1);
}
ct[x] -= 1;
}
static void test1(void) {
- LOG_NONMEMBER("acquire a release a acquire a release a");
+ LOG_NONMEMBER << "acquire a release a acquire a release a";
lc[0]->acquire(a);
check_grant(a);
lc[0]->release(a);
lc[0]->release(a);
check_release(a);
- LOG_NONMEMBER("acquire a acquire b release b release a");
+ LOG_NONMEMBER << "acquire a acquire b release b release a";
lc[0]->acquire(a);
check_grant(a);
lc[0]->acquire(b);
}
static void test2(int i) {
- LOG_NONMEMBER("test2: client " << i << " acquire a release a");
+ LOG_NONMEMBER << "test2: client " << i << " acquire a release a";
lc[i]->acquire(a);
- LOG_NONMEMBER("test2: client " << i << " acquire done");
+ LOG_NONMEMBER << "test2: client " << i << " acquire done";
check_grant(a);
usleep(100000);
- LOG_NONMEMBER("test2: client " << i << " release");
+ LOG_NONMEMBER << "test2: client " << i << " release";
check_release(a);
lc[i]->release(a);
- LOG_NONMEMBER("test2: client " << i << " release done");
+ LOG_NONMEMBER << "test2: client " << i << " release done";
}
static void test3(int i) {
- LOG_NONMEMBER("test3: client " << i << " acquire a release a concurrent");
+ LOG_NONMEMBER << "test3: client " << i << " acquire a release a concurrent";
for (int j = 0; j < 10; j++) {
lc[i]->acquire(a);
check_grant(a);
- LOG_NONMEMBER("test3: client " << i << " got lock");
+ LOG_NONMEMBER << "test3: client " << i << " got lock";
check_release(a);
lc[i]->release(a);
}
}
static void test4(int i) {
- LOG_NONMEMBER("test4: thread " << i << " acquire a release a concurrent; same clnt");
+ LOG_NONMEMBER << "test4: thread " << i << " acquire a release a concurrent; same clnt";
for (int j = 0; j < 10; j++) {
lc[0]->acquire(a);
check_grant(a);
- LOG_NONMEMBER("test4: thread " << i << " on client 0 got lock");
+ LOG_NONMEMBER << "test4: thread " << i << " on client 0 got lock";
check_release(a);
lc[0]->release(a);
}
}
static void test5(int i) {
- LOG_NONMEMBER("test5: client " << i << " acquire a release a concurrent; same and diff clnt");
+ LOG_NONMEMBER << "test5: client " << i << " acquire a release a concurrent; same and diff clnt";
for (int j = 0; j < 10; j++) {
if (i < 5) lc[0]->acquire(a);
else lc[1]->acquire(a);
check_grant(a);
- LOG_NONMEMBER("test5: client " << i << " got lock");
+ LOG_NONMEMBER << "test5: client " << i << " got lock";
check_release(a);
if (i < 5) lc[0]->release(a);
else lc[1]->release(a);
srandom((uint32_t)getpid());
if (argc < 2) {
- LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [test]");
+ LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [test]";
exit(1);
}
if (argc > 2) {
test = atoi(argv[2]);
if (test < 1 || test > 5) {
- LOG_NONMEMBER("Test number must be between 1 and 5");
+ LOG_NONMEMBER << "Test number must be between 1 and 5";
exit(1);
}
}
- LOG_NONMEMBER("cache lock client");
+ LOG_NONMEMBER << "cache lock client";
for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst);
if (!test || test == 1) {
}
if (!test || test == 3) {
- LOG_NONMEMBER("test 3");
+ LOG_NONMEMBER << "test 3";
for (int i = 0; i < nt; i++)
th[i] = thread(test3, i);
}
if (!test || test == 4) {
- LOG_NONMEMBER("test 4");
+ LOG_NONMEMBER << "test 4";
for (int i = 0; i < 2; i++)
th[i] = thread(test4, i);
}
if (!test || test == 5) {
- LOG_NONMEMBER("test 5");
+ LOG_NONMEMBER << "test 5";
for (int i = 0; i < nt; i++)
th[i] = thread(test5, i);
th[i].join();
}
- LOG_NONMEMBER(argv[0] << ": passed all tests successfully");
+ LOG_NONMEMBER << argv[0] << ": passed all tests successfully";
}
string type;
unsigned instance;
- LOG("logread");
+ LOG << "logread";
while (from >> type) {
if (type == "done") {
string v;
getline(from, v);
pxs->values[instance] = v;
pxs->instance_h = instance;
- LOG("logread: instance: " << instance << " w. v = " <<
- pxs->values[instance]);
+ LOG << "logread: instance: " << instance << " w. v = "
+ << pxs->values[instance];
pxs->accepted_value.clear();
pxs->promise.n = 0;
pxs->accepted.n = 0;
} else if (type == "propseen") {
from >> pxs->promise.n >> pxs->promise.m;
- LOG("logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")");
+ LOG << "logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")";
} else if (type == "accepted") {
string v;
from >> pxs->accepted.n >> pxs->accepted.m;
from.get();
getline(from, v);
pxs->accepted_value = v;
- LOG("logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value);
+ LOG << "logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value;
} else {
- LOG("logread: unknown log record");
+ LOG << "logread: unknown log record";
VERIFY(0);
}
}
}
void log::restore(string s) {
- LOG("restore: " << s);
+ LOG << "restore: " << s;
ofstream f(name, ios::trunc);
f << s;
f.close();
bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
{
lock ml(proposer_mutex);
- LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
+ LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
if (!stable) { // already running proposer?
- LOG("paxos proposer already running");
+ LOG << "paxos proposer already running";
return false;
}
stable = false;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- LOG("received a majority of prepare responses");
+ LOG << "received a majority of prepare responses";
if (!v.size())
v = newv;
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- LOG("received a majority of accept responses");
+ LOG << "received a majority of accept responses";
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- LOG("no majority of accept responses");
+ LOG << "no majority of accept responses";
}
} else {
- LOG("no majority of prepare responses");
+ LOG << "no majority of prepare responses";
}
} else {
- LOG("prepare is rejected " << stable);
+ LOG << "prepare is rejected " << stable;
}
stable = true;
return r;
bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, value_t & v) {
- LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
+ LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
prepareres res;
prop_t highest_n_a{0, ""};
for (auto i : nodes) {
paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
- LOG("commiting old instance!");
+ LOG << "commiting old instance!";
commit(instance, res.v_a);
return false;
}
- LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
- "v_a=\"" << res.v_a << "\"");
+ LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
+ << "v_a=\"" << res.v_a << "\"";
if (res.accept) {
accepts.push_back(i);
if (res.n_a >= highest_n_a) {
- LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
+ LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
v = res.v_a;
highest_n_a = res.n_a;
}
paxos_protocol::status
proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
- LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
+ LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
lock ml(acceptor_mutex);
r.oldinstance = false;
r.accept = false;
r.n_a = accepted;
r.v_a = accepted_value;
if (instance <= instance_h) {
- LOG("old instance " << instance << " has value " << values[instance]);
+ LOG << "old instance " << instance << " has value " << values[instance];
r.oldinstance = true;
r.v_a = values[instance];
} else if (n > promise) {
- LOG("looks good to me");
+ LOG << "looks good to me";
promise = n;
l.logprop(promise);
r.accept = true;
} else {
- LOG("I totally rejected this request. Ha.");
+ LOG << "I totally rejected this request. Ha.";
}
- LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
- "v_a=\"" << r.v_a << "\"");
+ LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
+ << "v_a=\"" << r.v_a << "\"";
return paxos_protocol::OK;
}
paxos_protocol::status
proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
lock ml(acceptor_mutex);
- LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
+ LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
if (instance == instance_h + 1) {
VERIFY(accepted_value == v);
commit(instance, accepted_value, ml);
}
void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
- LOG("instance=" << instance << " has v=" << value);
+ LOG << "instance=" << instance << " has v=" << value;
if (instance > instance_h) {
- LOG("highestacceptedinstance = " << instance);
+ LOG << "highestacceptedinstance = " << instance;
values[instance] = value;
l.loginstance(instance, value);
instance_h = instance;
// For testing purposes
void proposer_acceptor::breakpoint1() {
if (break1) {
- LOG("Dying at breakpoint 1!");
+ LOG << "Dying at breakpoint 1!";
exit(1);
}
}
void proposer_acceptor::breakpoint2() {
if (break2) {
- LOG("Dying at breakpoint 2!");
+ LOG << "Dying at breakpoint 2!";
exit(1);
}
}
void proposer_acceptor::breakpoint(int b) {
if (b == 3) {
- LOG("breakpoint 1");
+ LOG << "breakpoint 1";
break1 = true;
} else if (b == 4) {
- LOG("breakpoint 2");
+ LOG << "breakpoint 2";
break2 = true;
}
}
socket_t s = socket(AF_INET, SOCK_STREAM, 0);
s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
- IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+ IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
close(s);
return nullptr;
}
- IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+ IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
return make_shared<connection>(delegate, move(s), lossy);
}
if (lossy_) {
if ((random()%100) < lossy_) {
- IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd);
+ IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
shutdown(fd,SHUT_RDWR);
}
}
if (dead_)
return;
- IF_LEVEL(5) LOG("got data on fd " << s);
+ IF_LEVEL(5) LOG << "got data on fd " << s;
if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
if (!readpdu()) {
- IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
+ IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
dead_ = true;
send_complete_.notify_one();
ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
if (n < 0) {
if (errno != EAGAIN) {
- IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno);
+ IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
wpdu_.solong = size_t_max;
wpdu_.buf.clear();
}
}
bool connection::readpdu() {
- IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
+ IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
if (!rpdu_.buf.size()) {
rpc_protocol::rpc_sz_t sz1;
ssize_t n = fd.read(sz1);
}
if (n > 0 && n != sizeof(sz1)) {
- IF_LEVEL(0) LOG("short read of sz");
+ IF_LEVEL(0) LOG << "short read of sz";
return false;
}
size_t sz = ntoh(sz1);
if (sz > rpc_protocol::MAX_PDU) {
- IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+ IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << hex << sz1;
return false;
}
- IF_LEVEL(5) LOG("read size of datagram = " << sz);
+ IF_LEVEL(5) LOG << "read size of datagram = " << sz;
rpdu_.buf.assign(sz+sizeof(sz1), 0);
rpdu_.solong = sizeof(sz1);
ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
- IF_LEVEL(5) LOG("read " << n << " bytes");
+ IF_LEVEL(5) LOG << "read " << n << " bytes";
if (n <= 0) {
if (errno == EAGAIN)
VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
port_ = ntoh(sin.sin_port);
- IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
+ IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
}
throw runtime_error("connection listener failure");
}
- IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
+ IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
auto ch = make_shared<connection>(delegate_, s1, lossy_);
// garbage collect dead connections
return;
else if (ret < 0) {
perror("select:");
- IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+ IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
VERIFY(0);
}
if (loss_env)
lossytest_ = atoi(loss_env);
- IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
+ IF_LEVEL(2) LOG << "cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_;
}
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
cancel();
- IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1));
+ IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1);
chan_.reset();
VERIFY(calls_.size() == 0);
}
bind_done_ = true;
srv_nonce_ = r;
} else {
- IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
+ IF_LEVEL(2) LOG << "bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret;
}
return ret;
}
void rpcc::cancel(void) {
lock ml(m_);
if (calls_.size()) {
- LOG("force callers to fail");
+ LOG << "force callers to fail";
for (auto & p : calls_) {
caller *ca = p.second;
- IF_LEVEL(2) LOG("force caller to fail");
+ IF_LEVEL(2) LOG << "force caller to fail";
lock cl(ca->m);
ca->done = true;
while (calls_.size () > 0)
destroy_wait_c_.wait(ml);
- LOG("done");
+ LOG << "done";
}
}
lock ml(m_);
if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
- IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
+ IF_LEVEL(1) LOG << "rpcc has not been bound to dst or binding twice";
return rpc_protocol::bind_failure;
}
ch->send(forgot.buf);
ch->send(req);
}
- else IF_LEVEL(1) LOG("not reachable");
- IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
- " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
+ else IF_LEVEL(1) LOG << "not reachable";
+ IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << hex << proc
+ << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_;
}
transmit = false; // only send once on a given channel
}
{
lock cal(ca.m);
while (!ca.done) {
- IF_LEVEL(2) LOG("wait");
+ IF_LEVEL(2) LOG << "wait";
if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
- IF_LEVEL(2) LOG("timeout");
+ IF_LEVEL(2) LOG << "timeout";
break;
}
}
if (ca.done) {
- IF_LEVEL(2) LOG("reply received");
+ IF_LEVEL(2) LOG << "reply received";
break;
}
}
lock cal(ca.m);
- IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
- " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
- ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
+ IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << hex << proc
+ << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":"
+ << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret;
// destruction of req automatically frees its buffer
return (ca.done? ca.intret : rpc_protocol::timeout_failure);
rep.unpack_header(h);
if (!rep.ok()) {
- IF_LEVEL(1) LOG("unmarshall header failed!!!");
+ IF_LEVEL(1) LOG << "unmarshall header failed!!!";
return true;
}
update_xid_rep(h.xid, ml);
if (calls_.find(h.xid) == calls_.end()) {
- IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
+ IF_LEVEL(2) LOG << "xid " << h.xid << " no pending request";
return true;
}
caller *ca = calls_[h.xid];
*ca->rep = b;
ca->intret = h.ret;
if (ca->intret < 0) {
- IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
+ IF_LEVEL(2) LOG << "RPC reply error for xid " << h.xid << " intret " << ca->intret;
}
ca->done = 1;
}
{
set_rand_seed();
nonce_ = (nonce_t)random();
- IF_LEVEL(2) LOG("created with nonce " << nonce_);
+ IF_LEVEL(2) LOG << "created with nonce " << nonce_;
reg(rpc_protocol::bind, &rpcs::rpcbind, this);
}
bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
if (!reachable_) {
- IF_LEVEL(1) LOG("not reachable");
+ IF_LEVEL(1) LOG << "not reachable";
return true;
}
proc_id_t proc = h.proc;
if (!req.ok()) {
- IF_LEVEL(1) LOG("unmarshall header failed");
+ IF_LEVEL(1) LOG << "unmarshall header failed";
return;
}
- IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
- dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
+ IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << hex << proc << ", last_rep "
+ << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce;
marshall rep;
rpc_protocol::reply_header rh{h.xid,0};
// is client sending to an old instance of server?
if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
- IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
- " (current " << nonce_ << ") proc " << hex << h.proc);
+ IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce
+ << " (current " << nonce_ << ") proc " << hex << h.proc;
rh.ret = rpc_protocol::oldsrv_failure;
rep.pack_header(rh);
c->send(rep);
{
lock pl(procs_m_);
if (procs_.count(proc) < 1) {
- LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
+ LOG << "unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_;
VERIFY(0);
return;
}
if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
- " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
+ IF_LEVEL(2) LOG << "new client " << h.clt_nonce << " xid " << h.xid
+ << " chan " << c->fd << ", total clients " << (reply_window_.size()-1);
}
}
case NEW: // new request
rh.ret = (*f)(forward<unmarshall>(req), rep);
if (rh.ret == rpc_protocol::unmarshall_args_failure) {
- LOG("failed to unmarshall the arguments. You are " <<
- "probably calling RPC 0x" << hex << proc << " with the wrong " <<
- "types of arguments.");
+ LOG << "failed to unmarshall the arguments. You are "
+ << "probably calling RPC 0x" << hex << proc << " with the wrong "
+ << "types of arguments.";
VERIFY(0);
}
VERIFY(rh.ret >= 0);
rep.pack_header(rh);
b1 = rep;
- IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
- h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
+ IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc "
+ << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce;
add_reply(h.clt_nonce, h.xid, b1);
c->send(b1);
break;
case FORGOTTEN: // very old request and we don't have the response anymore
- IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
+ IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce;
rh.ret = rpc_protocol::atmostonce_failure;
rep.pack_header(rh);
c->send(rep);
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
if (it == l.end() || it->xid != xid) {
- LOG("Could not find reply struct in add_reply");
+ LOG << "Could not find reply struct in add_reply";
l.insert(it, reply_t(xid, b));
} else {
*it = reply_t(xid, b);
}
rpc_protocol::status rpcs::rpcbind(nonce_t & r) {
- IF_LEVEL(2) LOG("called return nonce " << nonce_);
+ IF_LEVEL(2) LOG << "called return nonce " << nonce_;
r = nonce_;
return 0;
}
struct hostent *hp = gethostbyname(host.c_str());
if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
- LOG_NONMEMBER("cannot find host name " << host);
+ LOG_NONMEMBER << "cannot find host name " << host;
exit(1);
}
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
if (intret < 0) return intret;
unmarshall u(rep, true, r);
if (u.okdone() != true) {
- LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " <<
- "calling RPC 0x" << hex << proc << " with the wrong return type.");
+ LOG << "rpcc::call_m: failed to unmarshall the reply. You are probably " <<
+ "calling RPC 0x" << hex << proc << " with the wrong return type.";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
}
if (debug_level > 0) {
DEBUG_LEVEL = debug_level;
- IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+ IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level;
}
testmarshall();
// XXX iannucci 2013/09/15 -- I don't understand whether accessing
// cfg->view_id in this manner involves a race. I suspect not.
if (join(primary, ml)) {
- LOG("joined");
+ LOG << "joined";
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
}
}
vid_insync = vid_commit;
- LOG("sync vid_insync " << vid_insync);
+ LOG << "sync vid_insync " << vid_insync;
if (primary == cfg->myaddr()) {
r = sync_with_backups(ml);
} else {
r = sync_with_primary(ml);
}
- LOG("sync done");
+ LOG << "sync done";
// If there was a commited viewchange during the synchronization, restart
// the recovery
myvs.seqno = 1;
inviewchange = false;
}
- LOG("go to sleep " << insync << " " << inviewchange);
+ LOG << "go to sleep " << insync << " " << inviewchange;
recovery_cond.wait(ml);
}
}
insync = true;
cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
- LOG("backups " << backups);
+ LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
insync = false;
return true;
rsm_protocol::transferres r;
handle h(m);
int ret = 0;
- LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
rpcc *cl;
{
rsm_mutex_lock.unlock();
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+ LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
return false;
}
if (stf && last_myvs != r.last) {
stf->unmarshal_state(r.state);
}
last_myvs = r.last;
- LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
return true;
}
int ret = 0;
string log;
- LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
rpcc *cl;
{
rsm_mutex_lock.unlock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+ LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
return false;
}
- LOG("succeeded " << log);
+ LOG << "succeeded " << log;
cfg->restore(log);
return true;
}
void rsm::commit_change(unsigned vid, lock &) {
if (vid <= vid_commit)
return;
- LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
- last_myvs.seqno << ") " << primary << " insync " << insync);
+ LOG << "new view (" << vid << ") last vs (" << last_myvs.vid << ","
+ << last_myvs.seqno << ") " << primary << " insync " << insync;
vid_commit = vid;
inviewchange = true;
set_primary(vid);
void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
- LOG("execute");
+ LOG << "execute";
handler *h = procs[procno];
VERIFY(h);
marshall rep;
// machine.
//
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
- LOG("invoke procno 0x" << hex << procno);
+ LOG << "invoke procno 0x" << hex << procno;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
viewstamp vs;
{
lock ml2(rsm_mutex);
- LOG("Checking for inviewchange");
+ LOG << "Checking for inviewchange";
if (inviewchange)
return rsm_client_protocol::BUSY;
- LOG("Checking for primacy");
+ LOG << "Checking for primacy";
myaddr = cfg->myaddr();
if (primary != myaddr)
return rsm_client_protocol::NOTPRIMARY;
- LOG("Assigning a viewstamp");
+ LOG << "Assigning a viewstamp";
cfg->get_view(vid_commit, m);
// assign the RPC the next viewstamp number
vs = myvs;
}
// send an invoke RPC to all slaves in the current view with a timeout of 1 second
- LOG("Invoking slaves");
+ LOG << "Invoking slaves";
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != myaddr) {
// if invoke on slave fails, return rsm_client_protocol::BUSY
handle h(m[i]);
- LOG("Sending invoke to " << m[i]);
+ LOG << "Sending invoke to " << m[i];
rpcc *cl = h.safebind();
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
- LOG("Invoke returned " << ret);
+ LOG << "Invoke returned " << ret;
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
breakpoint(1);
partition1(rsm_mutex_lock);
}
}
- LOG(setfill('0') << setw(2) << hex;
+ {
+ auto && log = LOG << setfill('0') << setw(2) << hex;
for (size_t i=0; i<req.size(); i++)
- cerr << (unsigned int)(unsigned char)req[i];
- cerr);
+ log << (unsigned int)(unsigned char)req[i];
+ }
execute(procno, req, r);
- LOG(setfill('0') << setw(2) << hex;
+ {
+ auto && log = LOG << setfill('0') << setw(2) << hex;
for (size_t i=0; i<r.size(); i++)
- cerr << (unsigned int)(unsigned char)r[i];
- cerr);
+ log << (unsigned int)(unsigned char)r[i];
+ }
last_myvs = vs;
return rsm_client_protocol::OK;
}
// according to requests' seqno
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
- LOG("invoke procno 0x" << hex << proc);
+ LOG << "invoke procno 0x" << hex << proc;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
{
lock ml2(rsm_mutex);
// check if !inviewchange
- LOG("Checking for view change");
+ LOG << "Checking for view change";
if (inviewchange)
return rsm_protocol::ERR;
// check if slave
- LOG("Checking for slave status");
+ LOG << "Checking for slave status";
myaddr = cfg->myaddr();
if (primary == myaddr)
return rsm_protocol::ERR;
if (find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
- LOG("Checking sequence number");
+ LOG << "Checking sequence number";
if (vs != myvs)
return rsm_protocol::ERR;
myvs++;
rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
- LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
- last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs ("
+ << last_myvs.vid << "," << last_myvs.seqno << ")";
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
if (stf && last != last_myvs)
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
- LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
- last_myvs.vid << "," << last_myvs.seqno << ")");
+ LOG << "join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=("
+ << last_myvs.vid << "," << last_myvs.seqno << ")";
if (cfg->ismember(m, vid_commit)) {
- LOG(m << " is still a member -- nothing to do");
+ LOG << m << " is still a member -- nothing to do";
log = cfg->dump();
} else if (cfg->myaddr() != primary) {
- LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
+ LOG << "but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!";
ret = rsm_protocol::BUSY;
} else {
// We cache vid_commit to avoid adding m to a view which already contains
// m due to race condition
- LOG("calling down to config layer");
+ LOG << "calling down to config layer";
unsigned vid_cache = vid_commit;
bool succ;
{
}
if (cfg->ismember(m, cfg->view_id())) {
log = cfg->dump();
- LOG("ret " << ret << " log " << log);
+ LOG << "ret " << ret << " log " << log;
} else {
- LOG("failed; proposer couldn't add " << succ);
+ LOG << "failed; proposer couldn't add " << succ;
ret = rsm_protocol::BUSY;
}
}
cfg->get_view(vid_commit, m);
m.push_back(primary);
r = m;
- LOG("return " << m << " m " << primary);
+ LOG << "return " << m << " m " << primary;
return rsm_client_protocol::OK;
}
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
- LOG("primary stays " << primary);
+ LOG << "primary stays " << primary;
return;
}
for (unsigned i = 0; i < p.size(); i++) {
if (isamember(p[i], c)) {
primary = p[i];
- LOG("primary is " << primary);
+ LOG << "primary is " << primary;
return;
}
}
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
handle h(m[i]);
- LOG("member " << m[i] << " " << heal);
+ LOG << "member " << m[i] << " " << heal;
if (h.safebind()) h.safebind()->set_reachable(heal);
}
}
rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
lock ml(rsm_mutex);
- LOG("heal " << heal << " (dopartition " <<
- dopartition << ", partitioned " << partitioned << ")");
+ LOG << "heal " << heal << " (dopartition "
+ << dopartition << ", partitioned " << partitioned << ")";
if (heal)
net_repair(heal, ml);
else
void rsm::breakpoint(int b) {
if (breakpoints[b-1]) {
- LOG("Dying at breakpoint " << b << " in rsm!");
+ LOG << "Dying at breakpoint " << b << " in rsm!";
exit(1);
}
}
rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
- LOG("breakpoint " << b);
+ LOG << "breakpoint " << b;
if (b == 1) breakpoints[1-1] = true;
else if (b == 2) breakpoints[2-1] = true;
else if (b == 3 || b == 4) cfg->breakpoint(b);
#include <unistd.h>
rsm_client::rsm_client(string dst) : primary(dst) {
- LOG("create rsm_client");
+ LOG << "create rsm_client";
lock ml(rsm_client_mutex);
VERIFY (init_members(ml));
- LOG("done");
+ LOG << "done";
}
void rsm_client::primary_failure(lock &) {
rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
lock ml(rsm_client_mutex);
while (1) {
- LOG("proc " << hex << proc << " primary " << primary);
+ LOG << "proc " << hex << proc << " primary " << primary;
handle h(primary);
ml.unlock();
if (!cl)
goto prim_fail;
- LOG("proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
+ LOG << "proc " << hex << proc << " primary " << primary << " ret " << dec << ret;
if (ret == rsm_client_protocol::OK)
return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
- LOG("rsm is busy " << primary);
+ LOG << "rsm is busy " << primary;
usleep(300000);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
- LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
+ LOG << "primary " << primary << " isn't the primary--let's get a complete list of mems";
if (init_members(ml))
continue;
}
prim_fail:
- LOG("primary " << primary << " failed ret " << dec << ret);
+ LOG << "primary " << primary << " failed ret " << dec << ret;
primary_failure(ml);
- LOG("retry new primary " << primary);
+ LOG << "retry new primary " << primary;
}
}
bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
- LOG("get members!");
+ LOG << "get members!";
handle h(primary);
int ret = rsm_client_protocol::ERR;
rpcc *cl;
if (cl == 0 || ret != rsm_protocol::OK)
return false;
if (known_mems.size() < 1) {
- LOG("do not know any members!");
+ LOG << "do not know any members!";
VERIFY(0);
}
primary = known_mems.back();
known_mems.pop_back();
- LOG("primary " << primary);
+ LOG << "primary " << primary;
return true;
}
string res;
u >> res;
if (!u.okdone()) {
- LOG("failed to unmarshall the reply.");
- LOG("You probably forgot to set the reply string in " <<
- "rsm::client_invoke, or you may have called RPC " <<
- "0x" << hex << proc << " with the wrong return type");
- LOG("here's what I got: \"" << hexify(rep) << "\"");
+ LOG << "failed to unmarshall the reply.";
+ LOG << "You probably forgot to set the reply string in " <<
+ "rsm::client_invoke, or you may have called RPC " <<
+ "0x" << hex << proc << " with the wrong return type";
+ LOG << "here's what I got: \"" << hexify(rep) << "\"";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
}
if(!unmarshall(res, false, r).okdone()) {
- LOG("failed to unmarshall the reply.");
- LOG("You are probably calling RPC 0x" << hex << proc <<
- " with the wrong return type.");
- LOG("here's what I got: \"" << hexify(res) << "\"");
+ LOG << "failed to unmarshall the reply.";
+ LOG << "You are probably calling RPC 0x" << hex << proc <<
+ " with the wrong return type.";
+ LOG << "here's what I got: \"" << hexify(res) << "\"";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
}
int main(int argc, char *argv[]) {
if(argc != 4){
- LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [partition] arg");
+ LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [partition] arg";
return 1;
}
int b = stoi(argv[3]);
cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
} else {
- LOG_NONMEMBER("Unknown command " << argv[2]);
+ LOG_NONMEMBER << "Unknown command " << argv[2];
}
return 0;
}
mutex cerr_mutex;
map<thread::id, int> thread_name_map;
int next_thread_num = 0;
-map<void *, int> instance_name_map;
+map<const void *, int> instance_name_map;
int next_instance_num = 0;
int DEBUG_LEVEL = 0;
+
+locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) {
+ auto thread = this_thread::get_id();
+ int tid = thread_name_map[thread];
+ if (tid==0)
+ tid = thread_name_map[thread] = ++next_thread_num;
+ auto utime = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000;
+ f << setfill('0') << dec << left << setw(9) << utime << " ";
+ f << setfill(' ') << log_thread_prefix << left << setw(2) << tid;
+ f << " " << setw(20) << file << " " << setw(18) << func;
+ return move(f);
+}
+
+locked_ostream && _log_member(locked_ostream && f, const void *ptr) {
+ int id = instance_name_map[ptr];
+ if (id == 0)
+ id = instance_name_map[ptr] = ++next_instance_num;
+ f << "#" << left << setw(2) << id << " ";
+ return move(f);
+}
extern mutex cerr_mutex;
extern map<thread::id, int> thread_name_map;
extern int next_thread_num;
-extern map<void *, int> instance_name_map;
+extern map<const void *, int> instance_name_map;
extern int next_instance_num;
extern char log_thread_prefix;
-#define LOG_PREFIX { \
- auto _thread_ = this_thread::get_id(); \
- int _tid_ = thread_name_map[_thread_]; \
- if (_tid_==0) \
- _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
- auto _utime_ = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000; \
- cerr << setfill('0') << dec << left << setw(9) << _utime_ << " "; \
- cerr << setfill(' ') << log_thread_prefix << left << setw(2) << _tid_; \
- cerr << " " << setw(20) << __FILE__ << " " << setw(18) << __func__; \
-}
-#define LOG_THIS_POINTER { \
- int _self_ = instance_name_map[this]; \
- if (_self_==0) \
- _self_ = instance_name_map[this] = ++next_instance_num; \
- cerr << "#" << left << setw(2) << _self_ << " "; \
-}
-
-#define LOG_NONMEMBER(_x_) { \
- lock _cel_(cerr_mutex); \
- LOG_PREFIX; \
- cerr << _x_ << endl; \
-}
-#define LOG(_x_) { \
- lock _cel_(cerr_mutex); \
- LOG_PREFIX; \
- LOG_THIS_POINTER; \
- cerr << _x_ << endl; \
-}
+struct locked_ostream {
+ ostream & s;
+ lock l;
+ ~locked_ostream() { s << endl; }
+ template <typename U>
+ locked_ostream & operator<<(U && u) { s << u; return *this; }
+
+ typedef std::ostream& (*ostream_manipulator)(ostream&);
+ locked_ostream & operator<<(ostream_manipulator manip) { s << manip; return *this; }
+};
+
+locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func);
+locked_ostream && _log_member(locked_ostream && f, const void *ptr);
+#define _log_nonmember(f, ptr) f
+
+#define _LOG(_context_) _context_(_log_prefix(locked_ostream{cerr, lock(cerr_mutex)}, __FILE__, __func__), (const void *)this)
+
+#define LOG_NONMEMBER _LOG(_log_nonmember)
+#define LOG _LOG(_log_member)
extern int DEBUG_LEVEL;