From eb3d5c6416c0f0d1cad35e52af3231de7866fea8 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Fri, 22 Nov 2013 15:35:30 -0500 Subject: [PATCH] Rewrote threaded log code to be more idiomatic. --- config.cc | 40 +++++++++++----------- handle.cc | 8 ++--- lock_client.cc | 32 ++++++++--------- lock_demo.cc | 4 +-- lock_server.cc | 24 ++++++------- lock_smain.cc | 2 +- lock_tester.cc | 42 +++++++++++------------ log.cc | 14 ++++---- paxos.cc | 50 +++++++++++++-------------- rpc/connection.cc | 26 +++++++------- rpc/poll_mgr.cc | 2 +- rpc/rpc.cc | 76 ++++++++++++++++++++--------------------- rpc/rpc.h | 4 +-- rpc/rpctest.cc | 2 +- rsm.cc | 98 +++++++++++++++++++++++++++-------------------------- rsm_client.cc | 22 ++++++------ rsm_client.h | 18 +++++----- rsm_tester.cc | 4 +-- threaded_log.cc | 22 +++++++++++- threaded_log.h | 49 +++++++++++---------------- 20 files changed, 276 insertions(+), 263 deletions(-) diff --git a/config.cc b/config.cc index cd06e1b..374757f 100644 --- a/config.cc +++ b/config.cc @@ -59,7 +59,7 @@ void config::get_view(unsigned instance, vector & m) { void config::get_view(unsigned instance, vector & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); string value = paxos.value(instance); - LOG("get_view(" << instance << "): returns " << value); + LOG << "get_view(" << instance << "): returns " << value; m = explode(value); } @@ -68,7 +68,7 @@ void config::reconstruct(lock & cfg_mutex_lock) { my_view_id = paxos.instance(); if (my_view_id > 0) { get_view(my_view_id, mems, cfg_mutex_lock); - LOG("view " << my_view_id << " " << mems); + LOG << "view " << my_view_id << " " << mems; } } @@ -77,12 +77,12 @@ void config::paxos_commit(unsigned instance, const string & value) { lock cfg_mutex_lock(cfg_mutex); vector newmem = explode(value); - LOG("instance " << instance << ": " << newmem); + LOG << "instance " << instance << ": " << newmem; for (auto mem : mems) { - LOG("is " << mem << " still a member?"); + LOG << "is " << mem << " still a member?"; if (!isamember(mem, newmem) && me != mem) { - LOG("delete " << mem); + LOG << "delete " << mem; handle(mem).invalidate(); } } @@ -105,28 +105,28 @@ bool config::ismember(const string & m, unsigned vid) { bool config::add(const string & new_m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); - LOG("adding " << new_m << " to " << vid); + LOG << "adding " << new_m << " to " << vid; if (vid != my_view_id) { - LOG("that's not my view id, " << my_view_id << "!"); + LOG << "that's not my view id, " << my_view_id << "!"; return false; } - LOG("calling down to paxos layer"); + LOG << "calling down to paxos layer"; vector m(mems), cmems(mems); m.push_back(new_m); - LOG("old mems " << cmems << " " << implode(cmems)); - LOG("new mems " << m << " " << implode(m)); + LOG << "old mems " << cmems << " " << implode(cmems); + LOG << "new mems " << m << " " << implode(m); unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, implode(m)); cfg_mutex_lock.lock(); - LOG("paxos proposer returned " << (r ? "success" : "failure")); + LOG << "paxos proposer returned " << (r ? "success" : "failure"); return r; } // caller should hold cfg_mutex bool config::remove(const string & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); - LOG("my_view_id " << my_view_id << " remove? " << m); + LOG << "my_view_id " << my_view_id << " remove? " << m; vector n; for (auto mem : mems) { if (mem != m) @@ -137,7 +137,7 @@ bool config::remove(const string & m, lock & cfg_mutex_lock) { cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, implode(n)); cfg_mutex_lock.lock(); - LOG("proposer returned " << (r ? "success" : "failure")); + LOG << "proposer returned " << (r ? "success" : "failure"); return r; } @@ -146,16 +146,16 @@ void config::heartbeater() { while (1) { auto next_timeout = steady_clock::now() + milliseconds(300); - LOG("go to sleep"); + LOG << "go to sleep"; config_cond.wait_until(cfg_mutex_lock, next_timeout); unsigned vid = my_view_id; vector cmems; get_view(vid, cmems, cfg_mutex_lock); - LOG("current membership " << cmems); + LOG << "current membership " << cmems; if (!isamember(me, cmems)) { - LOG("not member yet; skip hearbeat"); + LOG << "not member yet; skip hearbeat"; continue; } @@ -182,7 +182,7 @@ void config::heartbeater() { paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); r = (int) my_view_id; - LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); + LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id; if (vid == my_view_id) return paxos_protocol::OK; else if (paxos.isrunning()) { @@ -195,7 +195,7 @@ paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); unsigned vid = my_view_id; - LOG("heartbeat to " << m << " (" << vid << ")"); + LOG << "heartbeat to " << m << " (" << vid << ")"; handle h(m); cfg_mutex_lock.unlock(); @@ -213,9 +213,9 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) h.invalidate(); break; default: - LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); + LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r; res = (ret < 0) ? FAILURE : VIEWERR; } - LOG("done " << res); + LOG << "done " << res; return res; } diff --git a/handle.cc b/handle.cc index 792ce40..fa7495c 100644 --- a/handle.cc +++ b/handle.cc @@ -27,13 +27,13 @@ rpcc * handle::safebind() { return nullptr; if (!h->client) { unique_ptr client(new rpcc(h->destination)); - LOG("bind(\"" << h->destination << "\")"); + LOG << "bind(\"" << h->destination << "\")"; int ret = client->bind(milliseconds(1000)); if (ret < 0) { - LOG("bind failure! " << h->destination << " " << ret); + LOG << "bind failure! " << h->destination << " " << ret; h->valid = false; } else { - LOG("bind succeeded " << h->destination); + LOG << "bind succeeded " << h->destination; h->client = move(client); } } @@ -45,7 +45,7 @@ void handle::invalidate() { lock ml(mgr_mutex); if (hmap.find(destination_) != hmap.end()) { hmap[destination_]->valid = false; - LOG_NONMEMBER("cl " << destination_ << " refcnt " << hmap[destination_].use_count()); + LOG << "cl " << destination_ << " refcnt " << hmap[destination_].use_count(); hmap.erase(destination_); } } diff --git a/lock_client.cc b/lock_client.cc index beca1cc..81c102e 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -32,7 +32,7 @@ lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) { cl = unique_ptr(new rpcc(xdst)); if (cl->bind() < 0) - LOG("lock_client: call bind"); + LOG << "lock_client: call bind"; srandom((uint32_t)time(NULL)^last_port); rlock_port = ((random()%32000) | (0x1 << 10)); @@ -50,7 +50,7 @@ void lock_client::releaser() { while (1) { lock_protocol::lockid_t lid; release_fifo.deq(&lid); - LOG("Releaser: " << lid); + LOG << "Releaser: " << lid; lock_state & st = get_lock_state(lid); lock sl(st.m); @@ -65,7 +65,7 @@ void lock_client::releaser() { sl.lock(); } st.state = lock_state::none; - LOG("Lock " << lid << ": none"); + LOG << "Lock " << lid << ": none"; st.signal(); } } @@ -91,7 +91,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { while (1) { if (st.state != lock_state::free) - LOG("Lock " << lid << ": not free"); + LOG << "Lock " << lid << ": not free"; if (st.state == lock_state::none || st.state == lock_state::retrying) { if (st.state == lock_state::none) { @@ -99,7 +99,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { st.xid = next_xid++; } st.state = lock_state::acquiring; - LOG("Lock " << lid << ": acquiring"); + LOG << "Lock " << lid << ": acquiring"; lock_protocol::status result; { sl.unlock(); @@ -107,10 +107,10 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); sl.lock(); } - LOG("acquire returned " << result); + LOG << "acquire returned " << result; if (result == lock_protocol::OK) { st.state = lock_state::free; - LOG("Lock " << lid << ": free"); + LOG << "Lock " << lid << ": free"; } } @@ -122,7 +122,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { st.wanted_by.pop_front(); st.state = lock_state::locked; st.held_by = releaser_thread.get_id(); - LOG("Queuing " << lid << " for release"); + LOG << "Queuing " << lid << " for release"; release_fifo.enq(lid); } else if (front == self) { st.wanted_by.pop_front(); @@ -134,12 +134,12 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { } } - LOG("waiting..."); + LOG << "waiting..."; st.wait(sl); - LOG("wait ended"); + LOG << "wait ended"; } - LOG("Lock " << lid << ": locked"); + LOG << "Lock " << lid << ": locked"; return lock_protocol::OK; } @@ -149,24 +149,24 @@ lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { auto self = this_thread::get_id(); VERIFY(st.state == lock_state::locked && st.held_by == self); st.state = lock_state::free; - LOG("Lock " << lid << ": free"); + LOG << "Lock " << lid << ": free"; if (st.wanted_by.size()) { auto front = st.wanted_by.front(); if (front == releaser_thread.get_id()) { st.state = lock_state::locked; st.held_by = releaser_thread.get_id(); st.wanted_by.pop_front(); - LOG("Queuing " << lid << " for release"); + LOG << "Queuing " << lid << " for release"; release_fifo.enq(lid); } else st.signal(front); } - LOG("Finished signaling."); + LOG << "Finished signaling."; return lock_protocol::OK; } rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { - LOG("Revoke handler " << lid << " " << xid); + LOG << "Revoke handler " << lid << " " << xid; lock_state & st = get_lock_state(lid); lock sl(st.m); @@ -193,7 +193,7 @@ rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lock sl(st.m); VERIFY(st.state == lock_state::acquiring); st.state = lock_state::retrying; - LOG("Lock " << lid << ": none"); + LOG << "Lock " << lid << ": none"; st.signal(); // only one thread needs to wake up return rlock_protocol::OK; } diff --git a/lock_demo.cc b/lock_demo.cc index 97c2964..6cdf346 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -4,10 +4,10 @@ char log_thread_prefix = 'd'; int main(int argc, char *argv[]) { if(argc != 2) { - LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port"); + LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port"; return 1; } lock_client *lc = new lock_client(argv[1]); - LOG_NONMEMBER("stat returned " << lc->stat("1")); + LOG_NONMEMBER << "stat returned " << lc->stat("1"); } diff --git a/lock_server.cc b/lock_server.cc index 90ad5b2..a4d5881 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -42,7 +42,7 @@ void lock_server::revoker () { while (1) { lock_protocol::lockid_t lid; revoke_fifo.deq(&lid); - LOG("Revoking " << lid); + LOG << "Revoking " << lid; if (rsm_ && !rsm_->amiprimary()) continue; @@ -61,7 +61,7 @@ void lock_server::revoker () { if (proxy) { int r; auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second); - LOG("Revoke returned " << ret); + LOG << "Revoke returned " << ret; } } } @@ -73,7 +73,7 @@ void lock_server::retryer() { if (rsm_ && !rsm_->amiprimary()) continue; - LOG("Sending retry for " << lid); + LOG << "Sending retry for " << lid; lock_state & st = get_lock_state(lid); holder_t front; { @@ -91,13 +91,13 @@ void lock_server::retryer() { if (proxy) { int r; auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second); - LOG("Retry returned " << ret); + LOG << "Retry returned " << ret; } } } lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { - LOG("lid=" << lid << " client=" << id << "," << xid); + LOG << "lid=" << lid << " client=" << id << "," << xid; holder_t h = holder_t(id, xid); lock_state & st = get_lock_state(lid); lock sl(st.m); @@ -109,7 +109,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c return lock_protocol::RPCERR; else if (old_xid == xid) { if (st.held && st.held_by == h) { - LOG("Client " << id << " sent duplicate acquire xid=" << xid); + LOG << "Client " << id << " sent duplicate acquire xid=" << xid; return lock_protocol::OK; } } @@ -123,7 +123,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c st.held = true; st.held_by = h; - LOG("Lock " << lid << " held by " << h.first); + LOG << "Lock " << lid << " held by " << h.first; if (st.wanted_by.size()) revoke_fifo.enq(lid); return lock_protocol::OK; @@ -135,7 +135,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c if (p.first == id) { // make sure client is obeying serialization if (p.second != xid) { - LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second); + LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second; return lock_protocol::RPCERR; } found = true; @@ -145,7 +145,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c if (!found) st.wanted_by.push_back(h); - LOG("wanted_by=" << st.wanted_by); + LOG << "wanted_by=" << st.wanted_by; // send revoke if we're first in line if (st.wanted_by.front() == h) @@ -155,12 +155,12 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c } lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { - LOG("lid=" << lid << " client=" << id << "," << xid); + LOG << "lid=" << lid << " client=" << id << "," << xid; lock_state & st = get_lock_state(lid); lock sl(st.m); if (st.held && st.held_by == holder_t(id, xid)) { st.held = false; - LOG("Lock " << lid << " not held"); + LOG << "Lock " << lid << " not held"; } if (st.wanted_by.size()) retry_fifo.enq(lid); @@ -178,7 +178,7 @@ void lock_server::unmarshal_state(const string & state) { } lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) { - LOG("stat request for " << lid); + LOG << "stat request for " << lid; VERIFY(0); r = nacquire; return lock_protocol::OK; diff --git a/lock_smain.cc b/lock_smain.cc index 2c9828b..fecd7f8 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -13,7 +13,7 @@ int main(int argc, char *argv[]) { srandom((uint32_t)getpid()); if(argc != 3){ - LOG_NONMEMBER("Usage: " << argv[0] << " [master:]port [me:]port"); + LOG_NONMEMBER << "Usage: " << argv[0] << " [master:]port [me:]port"; exit(1); } diff --git a/lock_tester.cc b/lock_tester.cc index 5e615ca..e9ec0a8 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -26,7 +26,7 @@ static void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; if (ct[x] != 0) { - LOG_NONMEMBER("error: server granted " << lid << " twice"); + LOG_NONMEMBER << "error: server granted " << lid << " twice"; exit(1); } ct[x] += 1; @@ -36,14 +36,14 @@ static void check_release(lock_protocol::lockid_t lid) { lock ml(count_mutex); int x = lid[0] & 0x0f; if (ct[x] != 1) { - LOG_NONMEMBER("error: client released un-held lock " << lid); + LOG_NONMEMBER << "error: client released un-held lock " << lid; exit(1); } ct[x] -= 1; } static void test1(void) { - LOG_NONMEMBER("acquire a release a acquire a release a"); + LOG_NONMEMBER << "acquire a release a acquire a release a"; lc[0]->acquire(a); check_grant(a); lc[0]->release(a); @@ -53,7 +53,7 @@ static void test1(void) { lc[0]->release(a); check_release(a); - LOG_NONMEMBER("acquire a acquire b release b release a"); + LOG_NONMEMBER << "acquire a acquire b release b release a"; lc[0]->acquire(a); check_grant(a); lc[0]->acquire(b); @@ -65,46 +65,46 @@ static void test1(void) { } static void test2(int i) { - LOG_NONMEMBER("test2: client " << i << " acquire a release a"); + LOG_NONMEMBER << "test2: client " << i << " acquire a release a"; lc[i]->acquire(a); - LOG_NONMEMBER("test2: client " << i << " acquire done"); + LOG_NONMEMBER << "test2: client " << i << " acquire done"; check_grant(a); usleep(100000); - LOG_NONMEMBER("test2: client " << i << " release"); + LOG_NONMEMBER << "test2: client " << i << " release"; check_release(a); lc[i]->release(a); - LOG_NONMEMBER("test2: client " << i << " release done"); + LOG_NONMEMBER << "test2: client " << i << " release done"; } static void test3(int i) { - LOG_NONMEMBER("test3: client " << i << " acquire a release a concurrent"); + LOG_NONMEMBER << "test3: client " << i << " acquire a release a concurrent"; for (int j = 0; j < 10; j++) { lc[i]->acquire(a); check_grant(a); - LOG_NONMEMBER("test3: client " << i << " got lock"); + LOG_NONMEMBER << "test3: client " << i << " got lock"; check_release(a); lc[i]->release(a); } } static void test4(int i) { - LOG_NONMEMBER("test4: thread " << i << " acquire a release a concurrent; same clnt"); + LOG_NONMEMBER << "test4: thread " << i << " acquire a release a concurrent; same clnt"; for (int j = 0; j < 10; j++) { lc[0]->acquire(a); check_grant(a); - LOG_NONMEMBER("test4: thread " << i << " on client 0 got lock"); + LOG_NONMEMBER << "test4: thread " << i << " on client 0 got lock"; check_release(a); lc[0]->release(a); } } static void test5(int i) { - LOG_NONMEMBER("test5: client " << i << " acquire a release a concurrent; same and diff clnt"); + LOG_NONMEMBER << "test5: client " << i << " acquire a release a concurrent; same and diff clnt"; for (int j = 0; j < 10; j++) { if (i < 5) lc[0]->acquire(a); else lc[1]->acquire(a); check_grant(a); - LOG_NONMEMBER("test5: client " << i << " got lock"); + LOG_NONMEMBER << "test5: client " << i << " got lock"; check_release(a); if (i < 5) lc[0]->release(a); else lc[1]->release(a); @@ -122,7 +122,7 @@ main(int argc, char *argv[]) srandom((uint32_t)getpid()); if (argc < 2) { - LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [test]"); + LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [test]"; exit(1); } @@ -131,12 +131,12 @@ main(int argc, char *argv[]) if (argc > 2) { test = atoi(argv[2]); if (test < 1 || test > 5) { - LOG_NONMEMBER("Test number must be between 1 and 5"); + LOG_NONMEMBER << "Test number must be between 1 and 5"; exit(1); } } - LOG_NONMEMBER("cache lock client"); + LOG_NONMEMBER << "cache lock client"; for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst); if (!test || test == 1) { @@ -152,7 +152,7 @@ main(int argc, char *argv[]) } if (!test || test == 3) { - LOG_NONMEMBER("test 3"); + LOG_NONMEMBER << "test 3"; for (int i = 0; i < nt; i++) th[i] = thread(test3, i); @@ -161,7 +161,7 @@ main(int argc, char *argv[]) } if (!test || test == 4) { - LOG_NONMEMBER("test 4"); + LOG_NONMEMBER << "test 4"; for (int i = 0; i < 2; i++) th[i] = thread(test4, i); @@ -170,7 +170,7 @@ main(int argc, char *argv[]) } if (!test || test == 5) { - LOG_NONMEMBER("test 5"); + LOG_NONMEMBER << "test 5"; for (int i = 0; i < nt; i++) th[i] = thread(test5, i); @@ -178,6 +178,6 @@ main(int argc, char *argv[]) th[i].join(); } - LOG_NONMEMBER(argv[0] << ": passed all tests successfully"); + LOG_NONMEMBER << argv[0] << ": passed all tests successfully"; } diff --git a/log.cc b/log.cc index 3b881fa..c9af175 100644 --- a/log.cc +++ b/log.cc @@ -16,7 +16,7 @@ void log::logread(void) { string type; unsigned instance; - LOG("logread"); + LOG << "logread"; while (from >> type) { if (type == "done") { string v; @@ -25,23 +25,23 @@ void log::logread(void) { getline(from, v); pxs->values[instance] = v; pxs->instance_h = instance; - LOG("logread: instance: " << instance << " w. v = " << - pxs->values[instance]); + LOG << "logread: instance: " << instance << " w. v = " + << pxs->values[instance]; pxs->accepted_value.clear(); pxs->promise.n = 0; pxs->accepted.n = 0; } else if (type == "propseen") { from >> pxs->promise.n >> pxs->promise.m; - LOG("logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")"); + LOG << "logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")"; } else if (type == "accepted") { string v; from >> pxs->accepted.n >> pxs->accepted.m; from.get(); getline(from, v); pxs->accepted_value = v; - LOG("logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value); + LOG << "logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value; } else { - LOG("logread: unknown log record"); + LOG << "logread: unknown log record"; VERIFY(0); } } @@ -59,7 +59,7 @@ string log::dump() { } void log::restore(string s) { - LOG("restore: " << s); + LOG << "restore: " << s; ofstream f(name, ios::trunc); f << s; f.close(); diff --git a/paxos.cc b/paxos.cc index 4bba25d..cb32e36 100644 --- a/paxos.cc +++ b/paxos.cc @@ -41,9 +41,9 @@ proposer_acceptor::proposer_acceptor(paxos_change *_delegate, bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { lock ml(proposer_mutex); - LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable); + LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable; if (!stable) { // already running proposer? - LOG("paxos proposer already running"); + LOG << "paxos proposer already running"; return false; } stable = false; @@ -54,7 +54,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - LOG("received a majority of prepare responses"); + LOG << "received a majority of prepare responses"; if (!v.size()) v = newv; @@ -66,20 +66,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - LOG("received a majority of accept responses"); + LOG << "received a majority of accept responses"; breakpoint2(); decide(instance, accepts, v); r = true; } else { - LOG("no majority of accept responses"); + LOG << "no majority of accept responses"; } } else { - LOG("no majority of prepare responses"); + LOG << "no majority of prepare responses"; } } else { - LOG("prepare is rejected " << stable); + LOG << "prepare is rejected " << stable; } stable = true; return r; @@ -87,7 +87,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v) { - LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"); + LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"; prepareres res; prop_t highest_n_a{0, ""}; for (auto i : nodes) { @@ -99,16 +99,16 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal); if (status == paxos_protocol::OK) { if (res.oldinstance) { - LOG("commiting old instance!"); + LOG << "commiting old instance!"; commit(instance, res.v_a); return false; } - LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " << - "v_a=\"" << res.v_a << "\""); + LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " + << "v_a=\"" << res.v_a << "\""; if (res.accept) { accepts.push_back(i); if (res.n_a >= highest_n_a) { - LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"); + LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"; v = res.v_a; highest_n_a = res.n_a; } @@ -146,26 +146,26 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const paxos_protocol::status proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { - LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")"); + LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")"; lock ml(acceptor_mutex); r.oldinstance = false; r.accept = false; r.n_a = accepted; r.v_a = accepted_value; if (instance <= instance_h) { - LOG("old instance " << instance << " has value " << values[instance]); + LOG << "old instance " << instance << " has value " << values[instance]; r.oldinstance = true; r.v_a = values[instance]; } else if (n > promise) { - LOG("looks good to me"); + LOG << "looks good to me"; promise = n; l.logprop(promise); r.accept = true; } else { - LOG("I totally rejected this request. Ha."); + LOG << "I totally rejected this request. Ha."; } - LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << - "v_a=\"" << r.v_a << "\""); + LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " + << "v_a=\"" << r.v_a << "\""; return paxos_protocol::OK; } @@ -189,7 +189,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t paxos_protocol::status proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { lock ml(acceptor_mutex); - LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value); + LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value; if (instance == instance_h + 1) { VERIFY(accepted_value == v); commit(instance, accepted_value, ml); @@ -208,9 +208,9 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value) { } void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) { - LOG("instance=" << instance << " has v=" << value); + LOG << "instance=" << instance << " has v=" << value; if (instance > instance_h) { - LOG("highestacceptedinstance = " << instance); + LOG << "highestacceptedinstance = " << instance; values[instance] = value; l.loginstance(instance, value); instance_h = instance; @@ -228,24 +228,24 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & // For testing purposes void proposer_acceptor::breakpoint1() { if (break1) { - LOG("Dying at breakpoint 1!"); + LOG << "Dying at breakpoint 1!"; exit(1); } } void proposer_acceptor::breakpoint2() { if (break2) { - LOG("Dying at breakpoint 2!"); + LOG << "Dying at breakpoint 2!"; exit(1); } } void proposer_acceptor::breakpoint(int b) { if (b == 3) { - LOG("breakpoint 1"); + LOG << "breakpoint 1"; break1 = true; } else if (b == 4) { - LOG("breakpoint 2"); + LOG << "breakpoint 2"; break2 = true; } } diff --git a/rpc/connection.cc b/rpc/connection.cc index c4edbf6..c2635ef 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -37,11 +37,11 @@ shared_ptr connection::to_dst(const sockaddr_in & dst, connection_de socket_t s = socket(AF_INET, SOCK_STREAM, 0); s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port); close(s); return nullptr; } - IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port); return make_shared(delegate, move(s), lossy); } @@ -61,7 +61,7 @@ bool connection::send(const string & b) { if (lossy_) { if ((random()%100) < lossy_) { - IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd); + IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd; shutdown(fd,SHUT_RDWR); } } @@ -113,11 +113,11 @@ void connection::read_cb(int s) { if (dead_) return; - IF_LEVEL(5) LOG("got data on fd " << s); + IF_LEVEL(5) LOG << "got data on fd " << s; if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { if (!readpdu()) { - IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); + IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying"; poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; send_complete_.notify_one(); @@ -141,7 +141,7 @@ bool connection::writepdu() { ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { - IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno); + IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno; wpdu_.solong = size_t_max; wpdu_.buf.clear(); } @@ -152,7 +152,7 @@ bool connection::writepdu() { } bool connection::readpdu() { - IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); + IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size(); if (!rpdu_.buf.size()) { rpc_protocol::rpc_sz_t sz1; ssize_t n = fd.read(sz1); @@ -166,18 +166,18 @@ bool connection::readpdu() { } if (n > 0 && n != sizeof(sz1)) { - IF_LEVEL(0) LOG("short read of sz"); + IF_LEVEL(0) LOG << "short read of sz"; return false; } size_t sz = ntoh(sz1); if (sz > rpc_protocol::MAX_PDU) { - IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); + IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << hex << sz1; return false; } - IF_LEVEL(5) LOG("read size of datagram = " << sz); + IF_LEVEL(5) LOG << "read size of datagram = " << sz; rpdu_.buf.assign(sz+sizeof(sz1), 0); rpdu_.solong = sizeof(sz1); @@ -185,7 +185,7 @@ bool connection::readpdu() { ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); - IF_LEVEL(5) LOG("read " << n << " bytes"); + IF_LEVEL(5) LOG << "read " << n << " bytes"; if (n <= 0) { if (errno == EAGAIN) @@ -224,7 +224,7 @@ connection_listener::connection_listener(connection_delegate * delegate, in_port VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); port_ = ntoh(sin.sin_port); - IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); + IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port; poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this); } @@ -242,7 +242,7 @@ void connection_listener::read_cb(int) { throw runtime_error("connection listener failure"); } - IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); + IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port); auto ch = make_shared(delegate_, s1, lossy_); // garbage collect dead connections diff --git a/rpc/poll_mgr.cc b/rpc/poll_mgr.cc index 2598249..d29abd2 100644 --- a/rpc/poll_mgr.cc +++ b/rpc/poll_mgr.cc @@ -196,7 +196,7 @@ void SelectAIO::wait_ready(vector & readable, vector & writable) { return; else if (ret < 0) { perror("select:"); - IF_LEVEL(0) LOG("select_loop failure errno " << errno); + IF_LEVEL(0) LOG << "select_loop failure errno " << errno; VERIFY(0); } diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 6985498..a451c9f 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -76,14 +76,14 @@ rpcc::rpcc(const string & d) : dst_(make_sockaddr(d)) if (loss_env) lossytest_ = atoi(loss_env); - IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_); + IF_LEVEL(2) LOG << "cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_; } // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { cancel(); - IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1)); + IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1); chan_.reset(); VERIFY(calls_.size() == 0); } @@ -96,7 +96,7 @@ int rpcc::bind(milliseconds to) { bind_done_ = true; srv_nonce_ = r; } else { - IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret); + IF_LEVEL(2) LOG << "bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret; } return ret; } @@ -105,11 +105,11 @@ int rpcc::bind(milliseconds to) { void rpcc::cancel(void) { lock ml(m_); if (calls_.size()) { - LOG("force callers to fail"); + LOG << "force callers to fail"; for (auto & p : calls_) { caller *ca = p.second; - IF_LEVEL(2) LOG("force caller to fail"); + IF_LEVEL(2) LOG << "force caller to fail"; lock cl(ca->m); ca->done = true; @@ -121,7 +121,7 @@ void rpcc::cancel(void) { while (calls_.size () > 0) destroy_wait_c_.wait(ml); - LOG("done"); + LOG << "done"; } } @@ -133,7 +133,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { lock ml(m_); if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) { - IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice"); + IF_LEVEL(1) LOG << "rpcc has not been bound to dst or binding twice"; return rpc_protocol::bind_failure; } @@ -172,9 +172,9 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { ch->send(forgot.buf); ch->send(req); } - else IF_LEVEL(1) LOG("not reachable"); - IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc << - " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_); + else IF_LEVEL(1) LOG << "not reachable"; + IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << hex << proc + << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_; } transmit = false; // only send once on a given channel } @@ -185,14 +185,14 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { { lock cal(ca.m); while (!ca.done) { - IF_LEVEL(2) LOG("wait"); + IF_LEVEL(2) LOG << "wait"; if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) { - IF_LEVEL(2) LOG("timeout"); + IF_LEVEL(2) LOG << "timeout"; break; } } if (ca.done) { - IF_LEVEL(2) LOG("reply received"); + IF_LEVEL(2) LOG << "reply received"; break; } } @@ -231,9 +231,9 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { lock cal(ca.m); - IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc << - " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << - ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); + IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << hex << proc + << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" + << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret; // destruction of req automatically frees its buffer return (ca.done? ca.intret : rpc_protocol::timeout_failure); @@ -261,7 +261,7 @@ rpcc::got_pdu(const shared_ptr &, const string & b) rep.unpack_header(h); if (!rep.ok()) { - IF_LEVEL(1) LOG("unmarshall header failed!!!"); + IF_LEVEL(1) LOG << "unmarshall header failed!!!"; return true; } @@ -270,7 +270,7 @@ rpcc::got_pdu(const shared_ptr &, const string & b) update_xid_rep(h.xid, ml); if (calls_.find(h.xid) == calls_.end()) { - IF_LEVEL(2) LOG("xid " << h.xid << " no pending request"); + IF_LEVEL(2) LOG << "xid " << h.xid << " no pending request"; return true; } caller *ca = calls_[h.xid]; @@ -280,7 +280,7 @@ rpcc::got_pdu(const shared_ptr &, const string & b) *ca->rep = b; ca->intret = h.ret; if (ca->intret < 0) { - IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret); + IF_LEVEL(2) LOG << "RPC reply error for xid " << h.xid << " intret " << ca->intret; } ca->done = 1; } @@ -313,7 +313,7 @@ rpcs::rpcs(in_port_t p1) : port_(p1) { set_rand_seed(); nonce_ = (nonce_t)random(); - IF_LEVEL(2) LOG("created with nonce " << nonce_); + IF_LEVEL(2) LOG << "created with nonce " << nonce_; reg(rpc_protocol::bind, &rpcs::rpcbind, this); } @@ -331,7 +331,7 @@ rpcs::~rpcs() { bool rpcs::got_pdu(const shared_ptr & c, const string & b) { if (!reachable_) { - IF_LEVEL(1) LOG("not reachable"); + IF_LEVEL(1) LOG << "not reachable"; return true; } @@ -346,20 +346,20 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { proc_id_t proc = h.proc; if (!req.ok()) { - IF_LEVEL(1) LOG("unmarshall header failed"); + IF_LEVEL(1) LOG << "unmarshall header failed"; return; } - IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " << - dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce); + IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << hex << proc << ", last_rep " + << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; marshall rep; rpc_protocol::reply_header rh{h.xid,0}; // is client sending to an old instance of server? if (h.srv_nonce != 0 && h.srv_nonce != nonce_) { - IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce << - " (current " << nonce_ << ") proc " << hex << h.proc); + IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce + << " (current " << nonce_ << ") proc " << hex << h.proc; rh.ret = rpc_protocol::oldsrv_failure; rep.pack_header(rh); c->send(rep); @@ -371,7 +371,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { { lock pl(procs_m_); if (procs_.count(proc) < 1) { - LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_); + LOG << "unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_; VERIFY(0); return; } @@ -386,8 +386,8 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { if (reply_window_.find(h.clt_nonce) == reply_window_.end()) { VERIFY (reply_window_[h.clt_nonce].size() == 0); // create reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid - IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid << - " chan " << c->fd << ", total clients " << (reply_window_.size()-1)); + IF_LEVEL(2) LOG << "new client " << h.clt_nonce << " xid " << h.xid + << " chan " << c->fd << ", total clients " << (reply_window_.size()-1); } } @@ -406,9 +406,9 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { case NEW: // new request rh.ret = (*f)(forward(req), rep); if (rh.ret == rpc_protocol::unmarshall_args_failure) { - LOG("failed to unmarshall the arguments. You are " << - "probably calling RPC 0x" << hex << proc << " with the wrong " << - "types of arguments."); + LOG << "failed to unmarshall the arguments. You are " + << "probably calling RPC 0x" << hex << proc << " with the wrong " + << "types of arguments."; VERIFY(0); } VERIFY(rh.ret >= 0); @@ -416,8 +416,8 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { rep.pack_header(rh); b1 = rep; - IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " << - h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce); + IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc " + << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce; add_reply(h.clt_nonce, h.xid, b1); @@ -436,7 +436,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { c->send(b1); break; case FORGOTTEN: // very old request and we don't have the response anymore - IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce); + IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce; rh.ret = rpc_protocol::atmostonce_failure; rep.pack_header(rh); c->send(rep); @@ -515,7 +515,7 @@ void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) { for (it++; it != l.end() && it->xid < xid; it++); // there should already be an entry, so whine if there isn't if (it == l.end() || it->xid != xid) { - LOG("Could not find reply struct in add_reply"); + LOG << "Could not find reply struct in add_reply"; l.insert(it, reply_t(xid, b)); } else { *it = reply_t(xid, b); @@ -523,7 +523,7 @@ void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) { } rpc_protocol::status rpcs::rpcbind(nonce_t & r) { - IF_LEVEL(2) LOG("called return nonce " << nonce_); + IF_LEVEL(2) LOG << "called return nonce " << nonce_; r = nonce_; return 0; } @@ -548,7 +548,7 @@ static sockaddr_in make_sockaddr(const string & hostandport) { struct hostent *hp = gethostbyname(host.c_str()); if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) { - LOG_NONMEMBER("cannot find host name " << host); + LOG_NONMEMBER << "cannot find host name " << host; exit(1); } memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); diff --git a/rpc/rpc.h b/rpc/rpc.h index 211c717..53a1746 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -95,8 +95,8 @@ class rpcc : private connection_delegate { if (intret < 0) return intret; unmarshall u(rep, true, r); if (u.okdone() != true) { - LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " << - "calling RPC 0x" << hex << proc << " with the wrong return type."); + LOG << "rpcc::call_m: failed to unmarshall the reply. You are probably " << + "calling RPC 0x" << hex << proc << " with the wrong return type."; VERIFY(0); return rpc_protocol::unmarshall_reply_failure; } diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index f7df5fe..f2e8c7d 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -378,7 +378,7 @@ int main(int argc, char *argv[]) { if (debug_level > 0) { DEBUG_LEVEL = debug_level; - IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level); + IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level; } testmarshall(); diff --git a/rsm.cc b/rsm.cc index de37b5d..672243c 100644 --- a/rsm.cc +++ b/rsm.cc @@ -125,7 +125,7 @@ void rsm::recovery() { // XXX iannucci 2013/09/15 -- I don't understand whether accessing // cfg->view_id in this manner involves a race. I suspect not. if (join(primary, ml)) { - LOG("joined"); + LOG << "joined"; commit_change(cfg->view_id(), ml); } else { ml.unlock(); @@ -134,13 +134,13 @@ void rsm::recovery() { } } vid_insync = vid_commit; - LOG("sync vid_insync " << vid_insync); + LOG << "sync vid_insync " << vid_insync; if (primary == cfg->myaddr()) { r = sync_with_backups(ml); } else { r = sync_with_primary(ml); } - LOG("sync done"); + LOG << "sync done"; // If there was a commited viewchange during the synchronization, restart // the recovery @@ -152,7 +152,7 @@ void rsm::recovery() { myvs.seqno = 1; inviewchange = false; } - LOG("go to sleep " << insync << " " << inviewchange); + LOG << "go to sleep " << insync << " " << inviewchange; recovery_cond.wait(ml); } } @@ -175,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { insync = true; cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); - LOG("backups " << backups); + LOG << "backups " << backups; sync_cond.wait(rsm_mutex_lock); insync = false; return true; @@ -202,7 +202,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_protocol::transferres r; handle h(m); int ret = 0; - LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; rpcc *cl; { rsm_mutex_lock.unlock(); @@ -214,14 +214,14 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret); + LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret; return false; } if (stf && last_myvs != r.last) { stf->unmarshal_state(r.state); } last_myvs = r.last; - LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; return true; } @@ -245,7 +245,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { int ret = 0; string log; - LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"; rpcc *cl; { rsm_mutex_lock.unlock(); @@ -258,10 +258,10 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret); + LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret; return false; } - LOG("succeeded " << log); + LOG << "succeeded " << log; cfg->restore(log); return true; } @@ -280,8 +280,8 @@ void rsm::commit_change(unsigned vid) { void rsm::commit_change(unsigned vid, lock &) { if (vid <= vid_commit) return; - LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," << - last_myvs.seqno << ") " << primary << " insync " << insync); + LOG << "new view (" << vid << ") last vs (" << last_myvs.vid << "," + << last_myvs.seqno << ") " << primary << " insync " << insync; vid_commit = vid; inviewchange = true; set_primary(vid); @@ -293,7 +293,7 @@ void rsm::commit_change(unsigned vid, lock &) { void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) { - LOG("execute"); + LOG << "execute"; handler *h = procs[procno]; VERIFY(h); marshall rep; @@ -308,21 +308,21 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r // machine. // rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) { - LOG("invoke procno 0x" << hex << procno); + LOG << "invoke procno 0x" << hex << procno; lock ml(invoke_mutex); vector m; string myaddr; viewstamp vs; { lock ml2(rsm_mutex); - LOG("Checking for inviewchange"); + LOG << "Checking for inviewchange"; if (inviewchange) return rsm_client_protocol::BUSY; - LOG("Checking for primacy"); + LOG << "Checking for primacy"; myaddr = cfg->myaddr(); if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; - LOG("Assigning a viewstamp"); + LOG << "Assigning a viewstamp"; cfg->get_view(vid_commit, m); // assign the RPC the next viewstamp number vs = myvs; @@ -330,18 +330,18 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id } // send an invoke RPC to all slaves in the current view with a timeout of 1 second - LOG("Invoking slaves"); + LOG << "Invoking slaves"; for (unsigned i = 0; i < m.size(); i++) { if (m[i] != myaddr) { // if invoke on slave fails, return rsm_client_protocol::BUSY handle h(m[i]); - LOG("Sending invoke to " << m[i]); + LOG << "Sending invoke to " << m[i]; rpcc *cl = h.safebind(); if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req); - LOG("Invoke returned " << ret); + LOG << "Invoke returned " << ret; if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; breakpoint(1); @@ -349,15 +349,17 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id partition1(rsm_mutex_lock); } } - LOG(setfill('0') << setw(2) << hex; + { + auto && log = LOG << setfill('0') << setw(2) << hex; for (size_t i=0; i m; string myaddr; { lock ml2(rsm_mutex); // check if !inviewchange - LOG("Checking for view change"); + LOG << "Checking for view change"; if (inviewchange) return rsm_protocol::ERR; // check if slave - LOG("Checking for slave status"); + LOG << "Checking for slave status"; myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; @@ -389,7 +391,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp if (find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number - LOG("Checking sequence number"); + LOG << "Checking sequence number"; if (vs != myvs) return rsm_protocol::ERR; myvs++; @@ -407,8 +409,8 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); - LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << - last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" + << last_myvs.vid << "," << last_myvs.seqno << ")"; if (!insync || vid != vid_insync) return rsm_protocol::BUSY; if (stf && last != last_myvs) @@ -438,18 +440,18 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last auto ret = rsm_protocol::OK; lock ml(rsm_mutex); - LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" << - last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" + << last_myvs.vid << "," << last_myvs.seqno << ")"; if (cfg->ismember(m, vid_commit)) { - LOG(m << " is still a member -- nothing to do"); + LOG << m << " is still a member -- nothing to do"; log = cfg->dump(); } else if (cfg->myaddr() != primary) { - LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!"); + LOG << "but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!"; ret = rsm_protocol::BUSY; } else { // We cache vid_commit to avoid adding m to a view which already contains // m due to race condition - LOG("calling down to config layer"); + LOG << "calling down to config layer"; unsigned vid_cache = vid_commit; bool succ; { @@ -459,9 +461,9 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last } if (cfg->ismember(m, cfg->view_id())) { log = cfg->dump(); - LOG("ret " << ret << " log " << log); + LOG << "ret " << ret << " log " << log; } else { - LOG("failed; proposer couldn't add " << succ); + LOG << "failed; proposer couldn't add " << succ; ret = rsm_protocol::BUSY; } } @@ -478,7 +480,7 @@ rsm_client_protocol::status rsm::client_members(vector & r, int) { cfg->get_view(vid_commit, m); m.push_back(primary); r = m; - LOG("return " << m << " m " << primary); + LOG << "return " << m << " m " << primary; return rsm_client_protocol::OK; } @@ -492,7 +494,7 @@ void rsm::set_primary(unsigned vid) { VERIFY (c.size() > 0); if (isamember(primary,c)) { - LOG("primary stays " << primary); + LOG << "primary stays " << primary; return; } @@ -500,7 +502,7 @@ void rsm::set_primary(unsigned vid) { for (unsigned i = 0; i < p.size(); i++) { if (isamember(p[i], c)) { primary = p[i]; - LOG("primary is " << primary); + LOG << "primary is " << primary; return; } } @@ -522,7 +524,7 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); - LOG("member " << m[i] << " " << heal); + LOG << "member " << m[i] << " " << heal; if (h.safebind()) h.safebind()->set_reachable(heal); } } @@ -531,8 +533,8 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) { lock ml(rsm_mutex); - LOG("heal " << heal << " (dopartition " << - dopartition << ", partitioned " << partitioned << ")"); + LOG << "heal " << heal << " (dopartition " + << dopartition << ", partitioned " << partitioned << ")"; if (heal) net_repair(heal, ml); else @@ -545,7 +547,7 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, void rsm::breakpoint(int b) { if (breakpoints[b-1]) { - LOG("Dying at breakpoint " << b << " in rsm!"); + LOG << "Dying at breakpoint " << b << " in rsm!"; exit(1); } } @@ -561,7 +563,7 @@ void rsm::partition1(lock & rsm_mutex_lock) { rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); - LOG("breakpoint " << b); + LOG << "breakpoint " << b; if (b == 1) breakpoints[1-1] = true; else if (b == 2) breakpoints[2-1] = true; else if (b == 3 || b == 4) cfg->breakpoint(b); diff --git a/rsm_client.cc b/rsm_client.cc index 598a1ed..d047f8b 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -4,10 +4,10 @@ #include rsm_client::rsm_client(string dst) : primary(dst) { - LOG("create rsm_client"); + LOG << "create rsm_client"; lock ml(rsm_client_mutex); VERIFY (init_members(ml)); - LOG("done"); + LOG << "done"; } void rsm_client::primary_failure(lock &) { @@ -18,7 +18,7 @@ void rsm_client::primary_failure(lock &) { rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) { lock ml(rsm_client_mutex); while (1) { - LOG("proc " << hex << proc << " primary " << primary); + LOG << "proc " << hex << proc << " primary " << primary; handle h(primary); ml.unlock(); @@ -31,28 +31,28 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s if (!cl) goto prim_fail; - LOG("proc " << hex << proc << " primary " << primary << " ret " << dec << ret); + LOG << "proc " << hex << proc << " primary " << primary << " ret " << dec << ret; if (ret == rsm_client_protocol::OK) return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { - LOG("rsm is busy " << primary); + LOG << "rsm is busy " << primary; usleep(300000); continue; } if (ret == rsm_client_protocol::NOTPRIMARY) { - LOG("primary " << primary << " isn't the primary--let's get a complete list of mems"); + LOG << "primary " << primary << " isn't the primary--let's get a complete list of mems"; if (init_members(ml)) continue; } prim_fail: - LOG("primary " << primary << " failed ret " << dec << ret); + LOG << "primary " << primary << " failed ret " << dec << ret; primary_failure(ml); - LOG("retry new primary " << primary); + LOG << "retry new primary " << primary; } } bool rsm_client::init_members(lock & rsm_client_mutex_lock) { - LOG("get members!"); + LOG << "get members!"; handle h(primary); int ret = rsm_client_protocol::ERR; rpcc *cl; @@ -66,14 +66,14 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) { if (cl == 0 || ret != rsm_protocol::OK) return false; if (known_mems.size() < 1) { - LOG("do not know any members!"); + LOG << "do not know any members!"; VERIFY(0); } primary = known_mems.back(); known_mems.pop_back(); - LOG("primary " << primary); + LOG << "primary " << primary; return true; } diff --git a/rsm_client.h b/rsm_client.h index 583ecbb..2bdf440 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -50,19 +50,19 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { string res; u >> res; if (!u.okdone()) { - LOG("failed to unmarshall the reply."); - LOG("You probably forgot to set the reply string in " << - "rsm::client_invoke, or you may have called RPC " << - "0x" << hex << proc << " with the wrong return type"); - LOG("here's what I got: \"" << hexify(rep) << "\""); + LOG << "failed to unmarshall the reply."; + LOG << "You probably forgot to set the reply string in " << + "rsm::client_invoke, or you may have called RPC " << + "0x" << hex << proc << " with the wrong return type"; + LOG << "here's what I got: \"" << hexify(rep) << "\""; VERIFY(0); return rpc_protocol::unmarshall_reply_failure; } if(!unmarshall(res, false, r).okdone()) { - LOG("failed to unmarshall the reply."); - LOG("You are probably calling RPC 0x" << hex << proc << - " with the wrong return type."); - LOG("here's what I got: \"" << hexify(res) << "\""); + LOG << "failed to unmarshall the reply."; + LOG << "You are probably calling RPC 0x" << hex << proc << + " with the wrong return type."; + LOG << "here's what I got: \"" << hexify(res) << "\""; VERIFY(0); return rpc_protocol::unmarshall_reply_failure; } diff --git a/rsm_tester.cc b/rsm_tester.cc index 469aea2..09b2bf9 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -10,7 +10,7 @@ char log_thread_prefix = 't'; int main(int argc, char *argv[]) { if(argc != 4){ - LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [partition] arg"); + LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [partition] arg"; return 1; } @@ -22,7 +22,7 @@ int main(int argc, char *argv[]) { int b = stoi(argv[3]); cout << "breakpoint " << b << " returned " << lc->breakpoint(b); } else { - LOG_NONMEMBER("Unknown command " << argv[2]); + LOG_NONMEMBER << "Unknown command " << argv[2]; } return 0; } diff --git a/threaded_log.cc b/threaded_log.cc index c44266e..b450f52 100644 --- a/threaded_log.cc +++ b/threaded_log.cc @@ -3,6 +3,26 @@ mutex cerr_mutex; map thread_name_map; int next_thread_num = 0; -map instance_name_map; +map instance_name_map; int next_instance_num = 0; int DEBUG_LEVEL = 0; + +locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) { + auto thread = this_thread::get_id(); + int tid = thread_name_map[thread]; + if (tid==0) + tid = thread_name_map[thread] = ++next_thread_num; + auto utime = duration_cast(system_clock::now().time_since_epoch()).count() % 1000000000; + f << setfill('0') << dec << left << setw(9) << utime << " "; + f << setfill(' ') << log_thread_prefix << left << setw(2) << tid; + f << " " << setw(20) << file << " " << setw(18) << func; + return move(f); +} + +locked_ostream && _log_member(locked_ostream && f, const void *ptr) { + int id = instance_name_map[ptr]; + if (id == 0) + id = instance_name_map[ptr] = ++next_instance_num; + f << "#" << left << setw(2) << id << " "; + return move(f); +} diff --git a/threaded_log.h b/threaded_log.h index bccd813..c02531e 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -6,38 +6,29 @@ extern mutex cerr_mutex; extern map thread_name_map; extern int next_thread_num; -extern map instance_name_map; +extern map instance_name_map; extern int next_instance_num; extern char log_thread_prefix; -#define LOG_PREFIX { \ - auto _thread_ = this_thread::get_id(); \ - int _tid_ = thread_name_map[_thread_]; \ - if (_tid_==0) \ - _tid_ = thread_name_map[_thread_] = ++next_thread_num; \ - auto _utime_ = duration_cast(system_clock::now().time_since_epoch()).count() % 1000000000; \ - cerr << setfill('0') << dec << left << setw(9) << _utime_ << " "; \ - cerr << setfill(' ') << log_thread_prefix << left << setw(2) << _tid_; \ - cerr << " " << setw(20) << __FILE__ << " " << setw(18) << __func__; \ -} -#define LOG_THIS_POINTER { \ - int _self_ = instance_name_map[this]; \ - if (_self_==0) \ - _self_ = instance_name_map[this] = ++next_instance_num; \ - cerr << "#" << left << setw(2) << _self_ << " "; \ -} - -#define LOG_NONMEMBER(_x_) { \ - lock _cel_(cerr_mutex); \ - LOG_PREFIX; \ - cerr << _x_ << endl; \ -} -#define LOG(_x_) { \ - lock _cel_(cerr_mutex); \ - LOG_PREFIX; \ - LOG_THIS_POINTER; \ - cerr << _x_ << endl; \ -} +struct locked_ostream { + ostream & s; + lock l; + ~locked_ostream() { s << endl; } + template + locked_ostream & operator<<(U && u) { s << u; return *this; } + + typedef std::ostream& (*ostream_manipulator)(ostream&); + locked_ostream & operator<<(ostream_manipulator manip) { s << manip; return *this; } +}; + +locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func); +locked_ostream && _log_member(locked_ostream && f, const void *ptr); +#define _log_nonmember(f, ptr) f + +#define _LOG(_context_) _context_(_log_prefix(locked_ostream{cerr, lock(cerr_mutex)}, __FILE__, __func__), (const void *)this) + +#define LOG_NONMEMBER _LOG(_log_nonmember) +#define LOG _LOG(_log_member) extern int DEBUG_LEVEL; -- 1.7.9.5