Rewrote threaded log code to be more idiomatic.
authorPeter Iannucci <iannucci@mit.edu>
Fri, 22 Nov 2013 20:35:30 +0000 (15:35 -0500)
committerPeter Iannucci <iannucci@mit.edu>
Fri, 22 Nov 2013 20:35:53 +0000 (15:35 -0500)
20 files changed:
config.cc
handle.cc
lock_client.cc
lock_demo.cc
lock_server.cc
lock_smain.cc
lock_tester.cc
log.cc
paxos.cc
rpc/connection.cc
rpc/poll_mgr.cc
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm_client.cc
rsm_client.h
rsm_tester.cc
threaded_log.cc
threaded_log.h

index cd06e1b..374757f 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -59,7 +59,7 @@ void config::get_view(unsigned instance, vector<string> & m) {
 void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
     string value = paxos.value(instance);
-    LOG("get_view(" << instance << "): returns " << value);
+    LOG << "get_view(" << instance << "): returns " << value;
     m = explode(value);
 }
 
@@ -68,7 +68,7 @@ void config::reconstruct(lock & cfg_mutex_lock) {
     my_view_id = paxos.instance();
     if (my_view_id > 0) {
         get_view(my_view_id, mems, cfg_mutex_lock);
-        LOG("view " << my_view_id << " " << mems);
+        LOG << "view " << my_view_id << " " << mems;
     }
 }
 
@@ -77,12 +77,12 @@ void config::paxos_commit(unsigned instance, const string & value) {
     lock cfg_mutex_lock(cfg_mutex);
 
     vector<string> newmem = explode(value);
-    LOG("instance " << instance << ": " << newmem);
+    LOG << "instance " << instance << ": " << newmem;
 
     for (auto mem : mems) {
-        LOG("is " << mem << " still a member?");
+        LOG << "is " << mem << " still a member?";
         if (!isamember(mem, newmem) && me != mem) {
-            LOG("delete " << mem);
+            LOG << "delete " << mem;
             handle(mem).invalidate();
         }
     }
@@ -105,28 +105,28 @@ bool config::ismember(const string & m, unsigned vid) {
 
 bool config::add(const string & new_m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
-    LOG("adding " << new_m << " to " << vid);
+    LOG << "adding " << new_m << " to " << vid;
     if (vid != my_view_id) {
-        LOG("that's not my view id, " << my_view_id << "!");
+        LOG << "that's not my view id, " << my_view_id << "!";
         return false;
     }
-    LOG("calling down to paxos layer");
+    LOG << "calling down to paxos layer";
     vector<string> m(mems), cmems(mems);
     m.push_back(new_m);
-    LOG("old mems " << cmems << " " << implode(cmems));
-    LOG("new mems " << m << " " << implode(m));
+    LOG << "old mems " << cmems << " " << implode(cmems);
+    LOG << "new mems " << m << " " << implode(m);
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
     bool r = paxos.run(nextvid, cmems, implode(m));
     cfg_mutex_lock.lock();
-    LOG("paxos proposer returned " << (r ? "success" : "failure"));
+    LOG << "paxos proposer returned " << (r ? "success" : "failure");
     return r;
 }
 
 // caller should hold cfg_mutex
 bool config::remove(const string & m, lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
-    LOG("my_view_id " << my_view_id << " remove? " << m);
+    LOG << "my_view_id " << my_view_id << " remove? " << m;
     vector<string> n;
     for (auto mem : mems) {
         if (mem != m)
@@ -137,7 +137,7 @@ bool config::remove(const string & m, lock & cfg_mutex_lock) {
     cfg_mutex_lock.unlock();
     bool r = paxos.run(nextvid, cmems, implode(n));
     cfg_mutex_lock.lock();
-    LOG("proposer returned " << (r ? "success" : "failure"));
+    LOG << "proposer returned " << (r ? "success" : "failure");
     return r;
 }
 
@@ -146,16 +146,16 @@ void config::heartbeater() {
 
     while (1) {
         auto next_timeout = steady_clock::now() + milliseconds(300);
-        LOG("go to sleep");
+        LOG << "go to sleep";
         config_cond.wait_until(cfg_mutex_lock, next_timeout);
 
         unsigned vid = my_view_id;
         vector<string> cmems;
         get_view(vid, cmems, cfg_mutex_lock);
-        LOG("current membership " << cmems);
+        LOG << "current membership " << cmems;
 
         if (!isamember(me, cmems)) {
-            LOG("not member yet; skip hearbeat");
+            LOG << "not member yet; skip hearbeat";
             continue;
         }
 
@@ -182,7 +182,7 @@ void config::heartbeater() {
 paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
     r = (int) my_view_id;
-    LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+    LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
     if (vid == my_view_id)
         return paxos_protocol::OK;
     else if (paxos.isrunning()) {
@@ -195,7 +195,7 @@ paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
 config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
     unsigned vid = my_view_id;
-    LOG("heartbeat to " << m << " (" << vid << ")");
+    LOG << "heartbeat to " << m << " (" << vid << ")";
     handle h(m);
 
     cfg_mutex_lock.unlock();
@@ -213,9 +213,9 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock)
             h.invalidate();
             break;
         default:
-            LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+            LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
             res = (ret < 0) ? FAILURE : VIEWERR;
     }
-    LOG("done " << res);
+    LOG << "done " << res;
     return res;
 }
index 792ce40..fa7495c 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -27,13 +27,13 @@ rpcc * handle::safebind() {
         return nullptr;
     if (!h->client) {
         unique_ptr<rpcc> client(new rpcc(h->destination));
-        LOG("bind(\"" << h->destination << "\")");
+        LOG << "bind(\"" << h->destination << "\")";
         int ret = client->bind(milliseconds(1000));
         if (ret < 0) {
-            LOG("bind failure! " << h->destination << " " << ret);
+            LOG << "bind failure! " << h->destination << " " << ret;
             h->valid = false;
         } else {
-            LOG("bind succeeded " << h->destination);
+            LOG << "bind succeeded " << h->destination;
             h->client = move(client);
         }
     }
@@ -45,7 +45,7 @@ void handle::invalidate() {
     lock ml(mgr_mutex);
     if (hmap.find(destination_) != hmap.end()) {
         hmap[destination_]->valid = false;
-        LOG_NONMEMBER("cl " << destination_ << " refcnt " << hmap[destination_].use_count());
+        LOG << "cl " << destination_ << " refcnt " << hmap[destination_].use_count();
         hmap.erase(destination_);
     }
 }
index beca1cc..81c102e 100644 (file)
@@ -32,7 +32,7 @@ lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
 lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
     cl = unique_ptr<rpcc>(new rpcc(xdst));
     if (cl->bind() < 0)
-        LOG("lock_client: call bind");
+        LOG << "lock_client: call bind";
 
     srandom((uint32_t)time(NULL)^last_port);
     rlock_port = ((random()%32000) | (0x1 << 10));
@@ -50,7 +50,7 @@ void lock_client::releaser() {
     while (1) {
         lock_protocol::lockid_t lid;
         release_fifo.deq(&lid);
-        LOG("Releaser: " << lid);
+        LOG << "Releaser: " << lid;
 
         lock_state & st = get_lock_state(lid);
         lock sl(st.m);
@@ -65,7 +65,7 @@ void lock_client::releaser() {
             sl.lock();
         }
         st.state = lock_state::none;
-        LOG("Lock " << lid << ": none");
+        LOG << "Lock " << lid << ": none";
         st.signal();
     }
 }
@@ -91,7 +91,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
 
     while (1) {
         if (st.state != lock_state::free)
-            LOG("Lock " << lid << ": not free");
+            LOG << "Lock " << lid << ": not free";
 
         if (st.state == lock_state::none || st.state == lock_state::retrying) {
             if (st.state == lock_state::none) {
@@ -99,7 +99,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
                 st.xid = next_xid++;
             }
             st.state = lock_state::acquiring;
-            LOG("Lock " << lid << ": acquiring");
+            LOG << "Lock " << lid << ": acquiring";
             lock_protocol::status result;
             {
                 sl.unlock();
@@ -107,10 +107,10 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
                 result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
                 sl.lock();
             }
-            LOG("acquire returned " << result);
+            LOG << "acquire returned " << result;
             if (result == lock_protocol::OK) {
                 st.state = lock_state::free;
-                LOG("Lock " << lid << ": free");
+                LOG << "Lock " << lid << ": free";
             }
         }
 
@@ -122,7 +122,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
                 st.wanted_by.pop_front();
                 st.state = lock_state::locked;
                 st.held_by = releaser_thread.get_id();
-                LOG("Queuing " << lid << " for release");
+                LOG << "Queuing " << lid << " for release";
                 release_fifo.enq(lid);
             } else if (front == self) {
                 st.wanted_by.pop_front();
@@ -134,12 +134,12 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
             }
         }
 
-        LOG("waiting...");
+        LOG << "waiting...";
         st.wait(sl);
-        LOG("wait ended");
+        LOG << "wait ended";
     }
 
-    LOG("Lock " << lid << ": locked");
+    LOG << "Lock " << lid << ": locked";
     return lock_protocol::OK;
 }
 
@@ -149,24 +149,24 @@ lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
     auto self = this_thread::get_id();
     VERIFY(st.state == lock_state::locked && st.held_by == self);
     st.state = lock_state::free;
-    LOG("Lock " << lid << ": free");
+    LOG << "Lock " << lid << ": free";
     if (st.wanted_by.size()) {
         auto front = st.wanted_by.front();
         if (front == releaser_thread.get_id()) {
             st.state = lock_state::locked;
             st.held_by = releaser_thread.get_id();
             st.wanted_by.pop_front();
-            LOG("Queuing " << lid << " for release");
+            LOG << "Queuing " << lid << " for release";
             release_fifo.enq(lid);
         } else
             st.signal(front);
     }
-    LOG("Finished signaling.");
+    LOG << "Finished signaling.";
     return lock_protocol::OK;
 }
 
 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
-    LOG("Revoke handler " << lid << " " << xid);
+    LOG << "Revoke handler " << lid << " " << xid;
     lock_state & st = get_lock_state(lid);
     lock sl(st.m);
 
@@ -193,7 +193,7 @@ rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t
     lock sl(st.m);
     VERIFY(st.state == lock_state::acquiring);
     st.state = lock_state::retrying;
-    LOG("Lock " << lid << ": none");
+    LOG << "Lock " << lid << ": none";
     st.signal(); // only one thread needs to wake up
     return rlock_protocol::OK;
 }
index 97c2964..6cdf346 100644 (file)
@@ -4,10 +4,10 @@ char log_thread_prefix = 'd';
 
 int main(int argc, char *argv[]) {
     if(argc != 2) {
-        LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port");
+        LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port";
         return 1;
     }
 
     lock_client *lc = new lock_client(argv[1]);
-    LOG_NONMEMBER("stat returned " << lc->stat("1"));
+    LOG_NONMEMBER << "stat returned " << lc->stat("1");
 }
index 90ad5b2..a4d5881 100644 (file)
@@ -42,7 +42,7 @@ void lock_server::revoker () {
     while (1) {
         lock_protocol::lockid_t lid;
         revoke_fifo.deq(&lid);
-        LOG("Revoking " << lid);
+        LOG << "Revoking " << lid;
         if (rsm_ && !rsm_->amiprimary())
             continue;
 
@@ -61,7 +61,7 @@ void lock_server::revoker () {
         if (proxy) {
             int r;
             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
-            LOG("Revoke returned " << ret);
+            LOG << "Revoke returned " << ret;
         }
     }
 }
@@ -73,7 +73,7 @@ void lock_server::retryer() {
         if (rsm_ && !rsm_->amiprimary())
             continue;
 
-        LOG("Sending retry for " << lid);
+        LOG << "Sending retry for " << lid;
         lock_state & st = get_lock_state(lid);
         holder_t front;
         {
@@ -91,13 +91,13 @@ void lock_server::retryer() {
         if (proxy) {
             int r;
             auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
-            LOG("Retry returned " << ret);
+            LOG << "Retry returned " << ret;
         }
     }
 }
 
 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
-    LOG("lid=" << lid << " client=" << id << "," << xid);
+    LOG << "lid=" << lid << " client=" << id << "," << xid;
     holder_t h = holder_t(id, xid);
     lock_state & st = get_lock_state(lid);
     lock sl(st.m);
@@ -109,7 +109,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c
             return lock_protocol::RPCERR;
         else if (old_xid == xid) {
             if (st.held && st.held_by == h) {
-                LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+                LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
                 return lock_protocol::OK;
             }
         }
@@ -123,7 +123,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c
 
         st.held = true;
         st.held_by = h;
-        LOG("Lock " << lid << " held by " << h.first);
+        LOG << "Lock " << lid << " held by " << h.first;
         if (st.wanted_by.size())
             revoke_fifo.enq(lid);
         return lock_protocol::OK;
@@ -135,7 +135,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c
         if (p.first == id) {
             // make sure client is obeying serialization
             if (p.second != xid) {
-                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
+                LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
                 return lock_protocol::RPCERR;
             }
             found = true;
@@ -145,7 +145,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c
     if (!found)
         st.wanted_by.push_back(h);
 
-    LOG("wanted_by=" << st.wanted_by);
+    LOG << "wanted_by=" << st.wanted_by;
 
     // send revoke if we're first in line
     if (st.wanted_by.front() == h)
@@ -155,12 +155,12 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c
 }
 
 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
-    LOG("lid=" << lid << " client=" << id << "," << xid);
+    LOG << "lid=" << lid << " client=" << id << "," << xid;
     lock_state & st = get_lock_state(lid);
     lock sl(st.m);
     if (st.held && st.held_by == holder_t(id, xid)) {
         st.held = false;
-        LOG("Lock " << lid << " not held");
+        LOG << "Lock " << lid << " not held";
     }
     if (st.wanted_by.size())
         retry_fifo.enq(lid);
@@ -178,7 +178,7 @@ void lock_server::unmarshal_state(const string & state) {
 }
 
 lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
-    LOG("stat request for " << lid);
+    LOG << "stat request for " << lid;
     VERIFY(0);
     r = nacquire;
     return lock_protocol::OK;
index 2c9828b..fecd7f8 100644 (file)
@@ -13,7 +13,7 @@ int main(int argc, char *argv[]) {
     srandom((uint32_t)getpid());
 
     if(argc != 3){
-        LOG_NONMEMBER("Usage: " << argv[0] << " [master:]port [me:]port");
+        LOG_NONMEMBER << "Usage: " << argv[0] << " [master:]port [me:]port";
         exit(1);
     }
 
index 5e615ca..e9ec0a8 100644 (file)
@@ -26,7 +26,7 @@ static void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 0) {
-        LOG_NONMEMBER("error: server granted " << lid << " twice");
+        LOG_NONMEMBER << "error: server granted " << lid << " twice";
         exit(1);
     }
     ct[x] += 1;
@@ -36,14 +36,14 @@ static void check_release(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
     int x = lid[0] & 0x0f;
     if (ct[x] != 1) {
-        LOG_NONMEMBER("error: client released un-held lock " << lid);
+        LOG_NONMEMBER << "error: client released un-held lock " << lid;
         exit(1);
     }
     ct[x] -= 1;
 }
 
 static void test1(void) {
-    LOG_NONMEMBER("acquire a release a acquire a release a");
+    LOG_NONMEMBER << "acquire a release a acquire a release a";
     lc[0]->acquire(a);
     check_grant(a);
     lc[0]->release(a);
@@ -53,7 +53,7 @@ static void test1(void) {
     lc[0]->release(a);
     check_release(a);
 
-    LOG_NONMEMBER("acquire a acquire b release b release a");
+    LOG_NONMEMBER << "acquire a acquire b release b release a";
     lc[0]->acquire(a);
     check_grant(a);
     lc[0]->acquire(b);
@@ -65,46 +65,46 @@ static void test1(void) {
 }
 
 static void test2(int i) {
-    LOG_NONMEMBER("test2: client " << i << " acquire a release a");
+    LOG_NONMEMBER << "test2: client " << i << " acquire a release a";
     lc[i]->acquire(a);
-    LOG_NONMEMBER("test2: client " << i << " acquire done");
+    LOG_NONMEMBER << "test2: client " << i << " acquire done";
     check_grant(a);
     usleep(100000);
-    LOG_NONMEMBER("test2: client " << i << " release");
+    LOG_NONMEMBER << "test2: client " << i << " release";
     check_release(a);
     lc[i]->release(a);
-    LOG_NONMEMBER("test2: client " << i << " release done");
+    LOG_NONMEMBER << "test2: client " << i << " release done";
 }
 
 static void test3(int i) {
-    LOG_NONMEMBER("test3: client " << i << " acquire a release a concurrent");
+    LOG_NONMEMBER << "test3: client " << i << " acquire a release a concurrent";
     for (int j = 0; j < 10; j++) {
         lc[i]->acquire(a);
         check_grant(a);
-        LOG_NONMEMBER("test3: client " << i << " got lock");
+        LOG_NONMEMBER << "test3: client " << i << " got lock";
         check_release(a);
         lc[i]->release(a);
     }
 }
 
 static void test4(int i) {
-    LOG_NONMEMBER("test4: thread " << i << " acquire a release a concurrent; same clnt");
+    LOG_NONMEMBER << "test4: thread " << i << " acquire a release a concurrent; same clnt";
     for (int j = 0; j < 10; j++) {
         lc[0]->acquire(a);
         check_grant(a);
-        LOG_NONMEMBER("test4: thread " << i << " on client 0 got lock");
+        LOG_NONMEMBER << "test4: thread " << i << " on client 0 got lock";
         check_release(a);
         lc[0]->release(a);
     }
 }
 
 static void test5(int i) {
-    LOG_NONMEMBER("test5: client " << i << " acquire a release a concurrent; same and diff clnt");
+    LOG_NONMEMBER << "test5: client " << i << " acquire a release a concurrent; same and diff clnt";
     for (int j = 0; j < 10; j++) {
         if (i < 5)  lc[0]->acquire(a);
         else  lc[1]->acquire(a);
         check_grant(a);
-        LOG_NONMEMBER("test5: client " << i << " got lock");
+        LOG_NONMEMBER << "test5: client " << i << " got lock";
         check_release(a);
         if (i < 5) lc[0]->release(a);
         else lc[1]->release(a);
@@ -122,7 +122,7 @@ main(int argc, char *argv[])
     srandom((uint32_t)getpid());
 
     if (argc < 2) {
-        LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [test]");
+        LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [test]";
         exit(1);
     }
 
@@ -131,12 +131,12 @@ main(int argc, char *argv[])
     if (argc > 2) {
         test = atoi(argv[2]);
         if (test < 1 || test > 5) {
-            LOG_NONMEMBER("Test number must be between 1 and 5");
+            LOG_NONMEMBER << "Test number must be between 1 and 5";
             exit(1);
         }
     }
 
-    LOG_NONMEMBER("cache lock client");
+    LOG_NONMEMBER << "cache lock client";
     for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst);
 
     if (!test || test == 1) {
@@ -152,7 +152,7 @@ main(int argc, char *argv[])
     }
 
     if (!test || test == 3) {
-        LOG_NONMEMBER("test 3");
+        LOG_NONMEMBER << "test 3";
 
         for (int i = 0; i < nt; i++)
             th[i] = thread(test3, i);
@@ -161,7 +161,7 @@ main(int argc, char *argv[])
     }
 
     if (!test || test == 4) {
-        LOG_NONMEMBER("test 4");
+        LOG_NONMEMBER << "test 4";
 
         for (int i = 0; i < 2; i++)
             th[i] = thread(test4, i);
@@ -170,7 +170,7 @@ main(int argc, char *argv[])
     }
 
     if (!test || test == 5) {
-        LOG_NONMEMBER("test 5");
+        LOG_NONMEMBER << "test 5";
 
         for (int i = 0; i < nt; i++)
             th[i] = thread(test5, i);
@@ -178,6 +178,6 @@ main(int argc, char *argv[])
             th[i].join();
     }
 
-    LOG_NONMEMBER(argv[0] << ": passed all tests successfully");
+    LOG_NONMEMBER << argv[0] << ": passed all tests successfully";
 
 }
diff --git a/log.cc b/log.cc
index 3b881fa..c9af175 100644 (file)
--- a/log.cc
+++ b/log.cc
@@ -16,7 +16,7 @@ void log::logread(void) {
     string type;
     unsigned instance;
 
-    LOG("logread");
+    LOG << "logread";
     while (from >> type) {
         if (type == "done") {
             string v;
@@ -25,23 +25,23 @@ void log::logread(void) {
             getline(from, v);
             pxs->values[instance] = v;
             pxs->instance_h = instance;
-            LOG("logread: instance: " << instance << " w. v = " <<
-                    pxs->values[instance]);
+            LOG << "logread: instance: " << instance << " w. v = "
+                << pxs->values[instance];
             pxs->accepted_value.clear();
             pxs->promise.n = 0;
             pxs->accepted.n = 0;
         } else if (type == "propseen") {
             from >> pxs->promise.n >> pxs->promise.m;
-            LOG("logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")");
+            LOG << "logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")";
         } else if (type == "accepted") {
             string v;
             from >> pxs->accepted.n >> pxs->accepted.m;
             from.get();
             getline(from, v);
             pxs->accepted_value = v;
-            LOG("logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value);
+            LOG << "logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value;
         } else {
-            LOG("logread: unknown log record");
+            LOG << "logread: unknown log record";
             VERIFY(0);
         }
     } 
@@ -59,7 +59,7 @@ string log::dump() {
 }
 
 void log::restore(string s) {
-    LOG("restore: " << s);
+    LOG << "restore: " << s;
     ofstream f(name, ios::trunc);
     f << s;
     f.close();
index 4bba25d..cb32e36 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -41,9 +41,9 @@ proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
 {
     lock ml(proposer_mutex);
-    LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
+    LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
     if (!stable) {  // already running proposer?
-        LOG("paxos proposer already running");
+        LOG << "paxos proposer already running";
         return false;
     }
     stable = false;
@@ -54,7 +54,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
     if (prepare(instance, accepts, cur_nodes, v)) {
 
         if (majority(cur_nodes, accepts)) {
-            LOG("received a majority of prepare responses");
+            LOG << "received a majority of prepare responses";
 
             if (!v.size())
                 v = newv;
@@ -66,20 +66,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
             accept(instance, accepts, nodes, v);
 
             if (majority(cur_nodes, accepts)) {
-                LOG("received a majority of accept responses");
+                LOG << "received a majority of accept responses";
 
                 breakpoint2();
 
                 decide(instance, accepts, v);
                 r = true;
             } else {
-                LOG("no majority of accept responses");
+                LOG << "no majority of accept responses";
             }
         } else {
-            LOG("no majority of prepare responses");
+            LOG << "no majority of prepare responses";
         }
     } else {
-        LOG("prepare is rejected " << stable);
+        LOG << "prepare is rejected " << stable;
     }
     stable = true;
     return r;
@@ -87,7 +87,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
 
 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         const nodes_t & nodes, value_t & v) {
-    LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
+    LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
     prepareres res;
     prop_t highest_n_a{0, ""};
     for (auto i : nodes) {
@@ -99,16 +99,16 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
                 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
-                LOG("commiting old instance!");
+                LOG << "commiting old instance!";
                 commit(instance, res.v_a);
                 return false;
             }
-            LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
-                    "v_a=\"" << res.v_a << "\"");
+            LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
+                << "v_a=\"" << res.v_a << "\"";
             if (res.accept) {
                 accepts.push_back(i);
                 if (res.n_a >= highest_n_a) {
-                    LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
+                    LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
                     v = res.v_a;
                     highest_n_a = res.n_a;
                 }
@@ -146,26 +146,26 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
 
 paxos_protocol::status
 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
-    LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
+    LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
     lock ml(acceptor_mutex);
     r.oldinstance = false;
     r.accept = false;
     r.n_a = accepted;
     r.v_a = accepted_value;
     if (instance <= instance_h) {
-        LOG("old instance " << instance << " has value " << values[instance]);
+        LOG << "old instance " << instance << " has value " << values[instance];
         r.oldinstance = true;
         r.v_a = values[instance];
     } else if (n > promise) {
-        LOG("looks good to me");
+        LOG << "looks good to me";
         promise = n;
         l.logprop(promise);
         r.accept = true;
     } else {
-        LOG("I totally rejected this request.  Ha.");
+        LOG << "I totally rejected this request.  Ha.";
     }
-    LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
-        "v_a=\"" << r.v_a << "\"");
+    LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
+        << "v_a=\"" << r.v_a << "\"";
     return paxos_protocol::OK;
 }
 
@@ -189,7 +189,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t
 paxos_protocol::status
 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
     lock ml(acceptor_mutex);
-    LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
+    LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
     if (instance == instance_h + 1) {
         VERIFY(accepted_value == v);
         commit(instance, accepted_value, ml);
@@ -208,9 +208,9 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value) {
 }
 
 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
-    LOG("instance=" << instance << " has v=" << value);
+    LOG << "instance=" << instance << " has v=" << value;
     if (instance > instance_h) {
-        LOG("highestacceptedinstance = " << instance);
+        LOG << "highestacceptedinstance = " << instance;
         values[instance] = value;
         l.loginstance(instance, value);
         instance_h = instance;
@@ -228,24 +228,24 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock &
 // For testing purposes
 void proposer_acceptor::breakpoint1() {
     if (break1) {
-        LOG("Dying at breakpoint 1!");
+        LOG << "Dying at breakpoint 1!";
         exit(1);
     }
 }
 
 void proposer_acceptor::breakpoint2() {
     if (break2) {
-        LOG("Dying at breakpoint 2!");
+        LOG << "Dying at breakpoint 2!";
         exit(1);
     }
 }
 
 void proposer_acceptor::breakpoint(int b) {
     if (b == 3) {
-        LOG("breakpoint 1");
+        LOG << "breakpoint 1";
         break1 = true;
     } else if (b == 4) {
-        LOG("breakpoint 2");
+        LOG << "breakpoint 2";
         break2 = true;
     }
 }
index c4edbf6..c2635ef 100644 (file)
@@ -37,11 +37,11 @@ shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_de
     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-        IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+        IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
         close(s);
         return nullptr;
     }
-    IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+    IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
     return make_shared<connection>(delegate, move(s), lossy);
 }
 
@@ -61,7 +61,7 @@ bool connection::send(const string & b) {
 
     if (lossy_) {
         if ((random()%100) < lossy_) {
-            IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd);
+            IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
             shutdown(fd,SHUT_RDWR);
         }
     }
@@ -113,11 +113,11 @@ void connection::read_cb(int s) {
     if (dead_)
         return;
 
-    IF_LEVEL(5) LOG("got data on fd " << s);
+    IF_LEVEL(5) LOG << "got data on fd " << s;
 
     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
         if (!readpdu()) {
-            IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
+            IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
             poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
             dead_ = true;
             send_complete_.notify_one();
@@ -141,7 +141,7 @@ bool connection::writepdu() {
     ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
     if (n < 0) {
         if (errno != EAGAIN) {
-            IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno);
+            IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
             wpdu_.solong = size_t_max;
             wpdu_.buf.clear();
         }
@@ -152,7 +152,7 @@ bool connection::writepdu() {
 }
 
 bool connection::readpdu() {
-    IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
+    IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
     if (!rpdu_.buf.size()) {
         rpc_protocol::rpc_sz_t sz1;
         ssize_t n = fd.read(sz1);
@@ -166,18 +166,18 @@ bool connection::readpdu() {
         }
 
         if (n > 0 && n != sizeof(sz1)) {
-            IF_LEVEL(0) LOG("short read of sz");
+            IF_LEVEL(0) LOG << "short read of sz";
             return false;
         }
 
         size_t sz = ntoh(sz1);
 
         if (sz > rpc_protocol::MAX_PDU) {
-            IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+            IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << hex << sz1;
             return false;
         }
 
-        IF_LEVEL(5) LOG("read size of datagram = " << sz);
+        IF_LEVEL(5) LOG << "read size of datagram = " << sz;
 
         rpdu_.buf.assign(sz+sizeof(sz1), 0);
         rpdu_.solong = sizeof(sz1);
@@ -185,7 +185,7 @@ bool connection::readpdu() {
 
     ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
 
-    IF_LEVEL(5) LOG("read " << n << " bytes");
+    IF_LEVEL(5) LOG << "read " << n << " bytes";
 
     if (n <= 0) {
         if (errno == EAGAIN)
@@ -224,7 +224,7 @@ connection_listener::connection_listener(connection_delegate * delegate, in_port
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
     port_ = ntoh(sin.sin_port);
 
-    IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
+    IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
 
     poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
 }
@@ -242,7 +242,7 @@ void connection_listener::read_cb(int) {
         throw runtime_error("connection listener failure");
     }
 
-    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
+    IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
     auto ch = make_shared<connection>(delegate_, s1, lossy_);
 
     // garbage collect dead connections
index 2598249..d29abd2 100644 (file)
@@ -196,7 +196,7 @@ void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
         return;
     else if (ret < 0) {
         perror("select:");
-        IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+        IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
         VERIFY(0);
     }
 
index 6985498..a451c9f 100644 (file)
@@ -76,14 +76,14 @@ rpcc::rpcc(const string & d) : dst_(make_sockaddr(d))
     if (loss_env)
         lossytest_ = atoi(loss_env);
 
-    IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
+    IF_LEVEL(2) LOG << "cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_;
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
     cancel();
-    IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1));
+    IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1);
     chan_.reset();
     VERIFY(calls_.size() == 0);
 }
@@ -96,7 +96,7 @@ int rpcc::bind(milliseconds to) {
         bind_done_ = true;
         srv_nonce_ = r;
     } else {
-        IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
+        IF_LEVEL(2) LOG << "bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret;
     }
     return ret;
 }
@@ -105,11 +105,11 @@ int rpcc::bind(milliseconds to) {
 void rpcc::cancel(void) {
     lock ml(m_);
     if (calls_.size()) {
-        LOG("force callers to fail");
+        LOG << "force callers to fail";
         for (auto & p : calls_) {
             caller *ca = p.second;
 
-            IF_LEVEL(2) LOG("force caller to fail");
+            IF_LEVEL(2) LOG << "force caller to fail";
 
             lock cl(ca->m);
             ca->done = true;
@@ -121,7 +121,7 @@ void rpcc::cancel(void) {
         while (calls_.size () > 0)
             destroy_wait_c_.wait(ml);
 
-        LOG("done");
+        LOG << "done";
     }
 }
 
@@ -133,7 +133,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
         lock ml(m_);
 
         if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
-            IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
+            IF_LEVEL(1) LOG << "rpcc has not been bound to dst or binding twice";
             return rpc_protocol::bind_failure;
         }
 
@@ -172,9 +172,9 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
                         ch->send(forgot.buf);
                     ch->send(req);
                 }
-                else IF_LEVEL(1) LOG("not reachable");
-                IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
-                                " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
+                else IF_LEVEL(1) LOG << "not reachable";
+                IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << hex << proc
+                                << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_;
             }
             transmit = false; // only send once on a given channel
         }
@@ -185,14 +185,14 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
         {
             lock cal(ca.m);
             while (!ca.done) {
-                IF_LEVEL(2) LOG("wait");
+                IF_LEVEL(2) LOG << "wait";
                 if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
-                    IF_LEVEL(2) LOG("timeout");
+                    IF_LEVEL(2) LOG << "timeout";
                     break;
                 }
             }
             if (ca.done) {
-                IF_LEVEL(2) LOG("reply received");
+                IF_LEVEL(2) LOG << "reply received";
                 break;
             }
         }
@@ -231,9 +231,9 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
 
     lock cal(ca.m);
 
-    IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
-                    " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
-                    ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
+    IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << hex << proc
+                    << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":"
+                    << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret;
 
     // destruction of req automatically frees its buffer
     return (ca.done? ca.intret : rpc_protocol::timeout_failure);
@@ -261,7 +261,7 @@ rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
     rep.unpack_header(h);
 
     if (!rep.ok()) {
-        IF_LEVEL(1) LOG("unmarshall header failed!!!");
+        IF_LEVEL(1) LOG << "unmarshall header failed!!!";
         return true;
     }
 
@@ -270,7 +270,7 @@ rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
     update_xid_rep(h.xid, ml);
 
     if (calls_.find(h.xid) == calls_.end()) {
-        IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
+        IF_LEVEL(2) LOG << "xid " << h.xid << " no pending request";
         return true;
     }
     caller *ca = calls_[h.xid];
@@ -280,7 +280,7 @@ rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
         *ca->rep = b;
         ca->intret = h.ret;
         if (ca->intret < 0) {
-            IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
+            IF_LEVEL(2) LOG << "RPC reply error for xid " << h.xid << " intret " << ca->intret;
         }
         ca->done = 1;
     }
@@ -313,7 +313,7 @@ rpcs::rpcs(in_port_t p1) : port_(p1)
 {
     set_rand_seed();
     nonce_ = (nonce_t)random();
-    IF_LEVEL(2) LOG("created with nonce " << nonce_);
+    IF_LEVEL(2) LOG << "created with nonce " << nonce_;
 
     reg(rpc_protocol::bind, &rpcs::rpcbind, this);
 }
@@ -331,7 +331,7 @@ rpcs::~rpcs() {
 
 bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
     if (!reachable_) {
-        IF_LEVEL(1) LOG("not reachable");
+        IF_LEVEL(1) LOG << "not reachable";
         return true;
     }
 
@@ -346,20 +346,20 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     proc_id_t proc = h.proc;
 
     if (!req.ok()) {
-        IF_LEVEL(1) LOG("unmarshall header failed");
+        IF_LEVEL(1) LOG << "unmarshall header failed";
         return;
     }
 
-    IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
-                    dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
+    IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << hex << proc << ", last_rep "
+                    << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce;
 
     marshall rep;
     rpc_protocol::reply_header rh{h.xid,0};
 
     // is client sending to an old instance of server?
     if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
-        IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
-                        " (current " << nonce_ << ") proc " << hex << h.proc);
+        IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce
+                        << " (current " << nonce_ << ") proc " << hex << h.proc;
         rh.ret = rpc_protocol::oldsrv_failure;
         rep.pack_header(rh);
         c->send(rep);
@@ -371,7 +371,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     {
         lock pl(procs_m_);
         if (procs_.count(proc) < 1) {
-            LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
+            LOG << "unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_;
             VERIFY(0);
             return;
         }
@@ -386,8 +386,8 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
         if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
             VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
             reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
-            IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
-                            " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
+            IF_LEVEL(2) LOG << "new client " << h.clt_nonce << " xid " << h.xid
+                            << " chan " << c->fd << ", total clients " << (reply_window_.size()-1);
         }
     }
 
@@ -406,9 +406,9 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
         case NEW: // new request
             rh.ret = (*f)(forward<unmarshall>(req), rep);
             if (rh.ret == rpc_protocol::unmarshall_args_failure) {
-                LOG("failed to unmarshall the arguments. You are " <<
-                    "probably calling RPC 0x" << hex << proc << " with the wrong " <<
-                    "types of arguments.");
+                LOG << "failed to unmarshall the arguments. You are "
+                    << "probably calling RPC 0x" << hex << proc << " with the wrong "
+                    << "types of arguments.";
                 VERIFY(0);
             }
             VERIFY(rh.ret >= 0);
@@ -416,8 +416,8 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
             rep.pack_header(rh);
             b1 = rep;
 
-            IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
-                            h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
+            IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc "
+                            << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce;
 
             add_reply(h.clt_nonce, h.xid, b1);
 
@@ -436,7 +436,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
             c->send(b1);
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
-            IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
+            IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce;
             rh.ret = rpc_protocol::atmostonce_failure;
             rep.pack_header(rh);
             c->send(rep);
@@ -515,7 +515,7 @@ void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
-        LOG("Could not find reply struct in add_reply");
+        LOG << "Could not find reply struct in add_reply";
         l.insert(it, reply_t(xid, b));
     } else {
         *it = reply_t(xid, b);
@@ -523,7 +523,7 @@ void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
 }
 
 rpc_protocol::status rpcs::rpcbind(nonce_t & r) {
-    IF_LEVEL(2) LOG("called return nonce " << nonce_);
+    IF_LEVEL(2) LOG << "called return nonce " << nonce_;
     r = nonce_;
     return 0;
 }
@@ -548,7 +548,7 @@ static sockaddr_in make_sockaddr(const string & hostandport) {
         struct hostent *hp = gethostbyname(host.c_str());
 
         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
-            LOG_NONMEMBER("cannot find host name " << host);
+            LOG_NONMEMBER << "cannot find host name " << host;
             exit(1);
         }
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
index 211c717..53a1746 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -95,8 +95,8 @@ class rpcc : private connection_delegate {
             if (intret < 0) return intret;
             unmarshall u(rep, true, r);
             if (u.okdone() != true) {
-                LOG("rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
-                    "calling RPC 0x" << hex << proc << " with the wrong return type.");
+                LOG << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
+                       "calling RPC 0x" << hex << proc << " with the wrong return type.";
                 VERIFY(0);
                 return rpc_protocol::unmarshall_reply_failure;
             }
index f7df5fe..f2e8c7d 100644 (file)
@@ -378,7 +378,7 @@ int main(int argc, char *argv[]) {
 
     if (debug_level > 0) {
         DEBUG_LEVEL = debug_level;
-        IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+        IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level;
     }
 
     testmarshall();
diff --git a/rsm.cc b/rsm.cc
index de37b5d..672243c 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -125,7 +125,7 @@ void rsm::recovery() {
             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
             // cfg->view_id in this manner involves a race.  I suspect not.
             if (join(primary, ml)) {
-                LOG("joined");
+                LOG << "joined";
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
@@ -134,13 +134,13 @@ void rsm::recovery() {
             }
         }
         vid_insync = vid_commit;
-        LOG("sync vid_insync " << vid_insync);
+        LOG << "sync vid_insync " << vid_insync;
         if (primary == cfg->myaddr()) {
             r = sync_with_backups(ml);
         } else {
             r = sync_with_primary(ml);
         }
-        LOG("sync done");
+        LOG << "sync done";
 
         // If there was a commited viewchange during the synchronization, restart
         // the recovery
@@ -152,7 +152,7 @@ void rsm::recovery() {
             myvs.seqno = 1;
             inviewchange = false;
         }
-        LOG("go to sleep " << insync << " " << inviewchange);
+        LOG << "go to sleep " << insync << " " << inviewchange;
         recovery_cond.wait(ml);
     }
 }
@@ -175,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
     insync = true;
     cfg->get_view(vid_insync, backups);
     backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
-    LOG("backups " << backups);
+    LOG << "backups " << backups;
     sync_cond.wait(rsm_mutex_lock);
     insync = false;
     return true;
@@ -202,7 +202,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
     rsm_protocol::transferres r;
     handle h(m);
     int ret = 0;
-    LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
     rpcc *cl;
     {
         rsm_mutex_lock.unlock();
@@ -214,14 +214,14 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         rsm_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
-        LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+        LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
         return false;
     }
     if (stf && last_myvs != r.last) {
         stf->unmarshal_state(r.state);
     }
     last_myvs = r.last;
-    LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
     return true;
 }
 
@@ -245,7 +245,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
     int ret = 0;
     string log;
 
-    LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
     rpcc *cl;
     {
         rsm_mutex_lock.unlock();
@@ -258,10 +258,10 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
-        LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+        LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
         return false;
     }
-    LOG("succeeded " << log);
+    LOG << "succeeded " << log;
     cfg->restore(log);
     return true;
 }
@@ -280,8 +280,8 @@ void rsm::commit_change(unsigned vid) {
 void rsm::commit_change(unsigned vid, lock &) {
     if (vid <= vid_commit)
         return;
-    LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
-            last_myvs.seqno << ") " << primary << " insync " << insync);
+    LOG << "new view (" << vid << ") last vs (" << last_myvs.vid << ","
+        << last_myvs.seqno << ") " << primary << " insync " << insync;
     vid_commit = vid;
     inviewchange = true;
     set_primary(vid);
@@ -293,7 +293,7 @@ void rsm::commit_change(unsigned vid, lock &) {
 
 
 void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
-    LOG("execute");
+    LOG << "execute";
     handler *h = procs[procno];
     VERIFY(h);
     marshall rep;
@@ -308,21 +308,21 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
 // machine.
 //
 rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
-    LOG("invoke procno 0x" << hex << procno);
+    LOG << "invoke procno 0x" << hex << procno;
     lock ml(invoke_mutex);
     vector<string> m;
     string myaddr;
     viewstamp vs;
     {
         lock ml2(rsm_mutex);
-        LOG("Checking for inviewchange");
+        LOG << "Checking for inviewchange";
         if (inviewchange)
             return rsm_client_protocol::BUSY;
-        LOG("Checking for primacy");
+        LOG << "Checking for primacy";
         myaddr = cfg->myaddr();
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
-        LOG("Assigning a viewstamp");
+        LOG << "Assigning a viewstamp";
         cfg->get_view(vid_commit, m);
         // assign the RPC the next viewstamp number
         vs = myvs;
@@ -330,18 +330,18 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
     }
 
     // send an invoke RPC to all slaves in the current view with a timeout of 1 second
-    LOG("Invoking slaves");
+    LOG << "Invoking slaves";
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
             handle h(m[i]);
-            LOG("Sending invoke to " << m[i]);
+            LOG << "Sending invoke to " << m[i];
             rpcc *cl = h.safebind();
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
             auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
-            LOG("Invoke returned " << ret);
+            LOG << "Invoke returned " << ret;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
             breakpoint(1);
@@ -349,15 +349,17 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
             partition1(rsm_mutex_lock);
         }
     }
-    LOG(setfill('0') << setw(2) << hex;
+    {
+        auto && log = LOG << setfill('0') << setw(2) << hex;
         for (size_t i=0; i<req.size(); i++)
-            cerr << (unsigned int)(unsigned char)req[i];
-        cerr);
+            log << (unsigned int)(unsigned char)req[i];
+    }
     execute(procno, req, r);
-    LOG(setfill('0') << setw(2) << hex;
+    {
+        auto && log = LOG << setfill('0') << setw(2) << hex;
         for (size_t i=0; i<r.size(); i++)
-            cerr << (unsigned int)(unsigned char)r[i];
-        cerr);
+            log << (unsigned int)(unsigned char)r[i];
+    }
     last_myvs = vs;
     return rsm_client_protocol::OK;
 }
@@ -370,18 +372,18 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
 // according to requests' seqno
 
 rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
-    LOG("invoke procno 0x" << hex << proc);
+    LOG << "invoke procno 0x" << hex << proc;
     lock ml(invoke_mutex);
     vector<string> m;
     string myaddr;
     {
         lock ml2(rsm_mutex);
         // check if !inviewchange
-        LOG("Checking for view change");
+        LOG << "Checking for view change";
         if (inviewchange)
             return rsm_protocol::ERR;
         // check if slave
-        LOG("Checking for slave status");
+        LOG << "Checking for slave status";
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
@@ -389,7 +391,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
         if (find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
-        LOG("Checking sequence number");
+        LOG << "Checking sequence number";
         if (vs != myvs)
             return rsm_protocol::ERR;
         myvs++;
@@ -407,8 +409,8 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
 rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
-    LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
-            last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs ("
+        << last_myvs.vid << "," << last_myvs.seqno << ")";
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
@@ -438,18 +440,18 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
-    LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
-            last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=("
+        << last_myvs.vid << "," << last_myvs.seqno << ")";
     if (cfg->ismember(m, vid_commit)) {
-        LOG(m << " is still a member -- nothing to do");
+        LOG << m << " is still a member -- nothing to do";
         log = cfg->dump();
     } else if (cfg->myaddr() != primary) {
-        LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
+        LOG << "but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!";
         ret = rsm_protocol::BUSY;
     } else {
         // We cache vid_commit to avoid adding m to a view which already contains
         // m due to race condition
-        LOG("calling down to config layer");
+        LOG << "calling down to config layer";
         unsigned vid_cache = vid_commit;
         bool succ;
         {
@@ -459,9 +461,9 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
         }
         if (cfg->ismember(m, cfg->view_id())) {
             log = cfg->dump();
-            LOG("ret " << ret << " log " << log);
+            LOG << "ret " << ret << " log " << log;
         } else {
-            LOG("failed; proposer couldn't add " << succ);
+            LOG << "failed; proposer couldn't add " << succ;
             ret = rsm_protocol::BUSY;
         }
     }
@@ -478,7 +480,7 @@ rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
     r = m;
-    LOG("return " << m << " m " << primary);
+    LOG << "return " << m << " m " << primary;
     return rsm_client_protocol::OK;
 }
 
@@ -492,7 +494,7 @@ void rsm::set_primary(unsigned vid) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
-        LOG("primary stays " << primary);
+        LOG << "primary stays " << primary;
         return;
     }
 
@@ -500,7 +502,7 @@ void rsm::set_primary(unsigned vid) {
     for (unsigned i = 0; i < p.size(); i++) {
         if (isamember(p[i], c)) {
             primary = p[i];
-            LOG("primary is " << primary);
+            LOG << "primary is " << primary;
             return;
         }
     }
@@ -522,7 +524,7 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
             handle h(m[i]);
-            LOG("member " << m[i] << " " << heal);
+            LOG << "member " << m[i] << " " << heal;
             if (h.safebind()) h.safebind()->set_reachable(heal);
         }
     }
@@ -531,8 +533,8 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
 
 rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
     lock ml(rsm_mutex);
-    LOG("heal " << heal << " (dopartition " <<
-            dopartition << ", partitioned " << partitioned << ")");
+    LOG << "heal " << heal << " (dopartition "
+        << dopartition << ", partitioned " << partitioned << ")";
     if (heal)
         net_repair(heal, ml);
     else
@@ -545,7 +547,7 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r,
 
 void rsm::breakpoint(int b) {
     if (breakpoints[b-1]) {
-        LOG("Dying at breakpoint " << b << " in rsm!");
+        LOG << "Dying at breakpoint " << b << " in rsm!";
         exit(1);
     }
 }
@@ -561,7 +563,7 @@ void rsm::partition1(lock & rsm_mutex_lock) {
 rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
-    LOG("breakpoint " << b);
+    LOG << "breakpoint " << b;
     if (b == 1) breakpoints[1-1] = true;
     else if (b == 2) breakpoints[2-1] = true;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
index 598a1ed..d047f8b 100644 (file)
@@ -4,10 +4,10 @@
 #include <unistd.h>
 
 rsm_client::rsm_client(string dst) : primary(dst) {
-    LOG("create rsm_client");
+    LOG << "create rsm_client";
     lock ml(rsm_client_mutex);
     VERIFY (init_members(ml));
-    LOG("done");
+    LOG << "done";
 }
 
 void rsm_client::primary_failure(lock &) {
@@ -18,7 +18,7 @@ void rsm_client::primary_failure(lock &) {
 rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
     lock ml(rsm_client_mutex);
     while (1) {
-        LOG("proc " << hex << proc << " primary " << primary);
+        LOG << "proc " << hex << proc << " primary " << primary;
         handle h(primary);
 
         ml.unlock();
@@ -31,28 +31,28 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s
         if (!cl)
             goto prim_fail;
 
-        LOG("proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
+        LOG << "proc " << hex << proc << " primary " << primary << " ret " << dec << ret;
         if (ret == rsm_client_protocol::OK)
             return rsm_protocol::OK;
         if (ret == rsm_client_protocol::BUSY) {
-            LOG("rsm is busy " << primary);
+            LOG << "rsm is busy " << primary;
             usleep(300000);
             continue;
         }
         if (ret == rsm_client_protocol::NOTPRIMARY) {
-            LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
+            LOG << "primary " << primary << " isn't the primary--let's get a complete list of mems";
             if (init_members(ml))
                 continue;
         }
 prim_fail:
-        LOG("primary " << primary << " failed ret " << dec << ret);
+        LOG << "primary " << primary << " failed ret " << dec << ret;
         primary_failure(ml);
-        LOG("retry new primary " << primary);
+        LOG << "retry new primary " << primary;
     }
 }
 
 bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
-    LOG("get members!");
+    LOG << "get members!";
     handle h(primary);
     int ret = rsm_client_protocol::ERR;
     rpcc *cl;
@@ -66,14 +66,14 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
     if (cl == 0 || ret != rsm_protocol::OK)
         return false;
     if (known_mems.size() < 1) {
-        LOG("do not know any members!");
+        LOG << "do not know any members!";
         VERIFY(0);
     }
 
     primary = known_mems.back();
     known_mems.pop_back();
 
-    LOG("primary " << primary);
+    LOG << "primary " << primary;
 
     return true;
 }
index 583ecbb..2bdf440 100644 (file)
@@ -50,19 +50,19 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
     string res;
     u >> res;
     if (!u.okdone()) {
-        LOG("failed to unmarshall the reply.");
-        LOG("You probably forgot to set the reply string in " <<
-            "rsm::client_invoke, or you may have called RPC " <<
-            "0x" << hex << proc << " with the wrong return type");
-        LOG("here's what I got: \"" << hexify(rep) << "\"");
+        LOG << "failed to unmarshall the reply.";
+        LOG << "You probably forgot to set the reply string in " <<
+               "rsm::client_invoke, or you may have called RPC " <<
+               "0x" << hex << proc << " with the wrong return type";
+        LOG << "here's what I got: \"" << hexify(rep) << "\"";
         VERIFY(0);
         return rpc_protocol::unmarshall_reply_failure;
     }
     if(!unmarshall(res, false, r).okdone()) {
-        LOG("failed to unmarshall the reply.");
-        LOG("You are probably calling RPC 0x" << hex << proc <<
-            " with the wrong return type.");
-        LOG("here's what I got: \"" << hexify(res) << "\"");
+        LOG << "failed to unmarshall the reply.";
+        LOG << "You are probably calling RPC 0x" << hex << proc <<
+               " with the wrong return type.";
+        LOG << "here's what I got: \"" << hexify(res) << "\"";
         VERIFY(0);
         return rpc_protocol::unmarshall_reply_failure;
     }
index 469aea2..09b2bf9 100644 (file)
@@ -10,7 +10,7 @@ char log_thread_prefix = 't';
 
 int main(int argc, char *argv[]) {
     if(argc != 4){
-        LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [partition] arg");
+        LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [partition] arg";
         return 1;
     }
 
@@ -22,7 +22,7 @@ int main(int argc, char *argv[]) {
         int b = stoi(argv[3]);
         cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
     } else {
-        LOG_NONMEMBER("Unknown command " << argv[2]);
+        LOG_NONMEMBER << "Unknown command " << argv[2];
     }
     return 0;
 }
index c44266e..b450f52 100644 (file)
@@ -3,6 +3,26 @@
 mutex cerr_mutex;
 map<thread::id, int> thread_name_map;
 int next_thread_num = 0;
-map<void *, int> instance_name_map;
+map<const void *, int> instance_name_map;
 int next_instance_num = 0;
 int DEBUG_LEVEL = 0;
+
+locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) {
+    auto thread = this_thread::get_id();
+    int tid = thread_name_map[thread];
+    if (tid==0)
+        tid = thread_name_map[thread] = ++next_thread_num;
+    auto utime = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000;
+    f << setfill('0') << dec << left << setw(9) << utime << " ";
+    f << setfill(' ') << log_thread_prefix << left << setw(2) << tid;
+    f << " " << setw(20) << file << " " << setw(18) << func;
+    return move(f);
+}
+
+locked_ostream && _log_member(locked_ostream && f, const void *ptr) {
+    int id = instance_name_map[ptr];
+    if (id == 0)
+        id = instance_name_map[ptr] = ++next_instance_num;
+    f << "#" << left << setw(2) << id << " ";
+    return move(f);
+}
index bccd813..c02531e 100644 (file)
@@ -6,38 +6,29 @@
 extern mutex cerr_mutex;
 extern map<thread::id, int> thread_name_map;
 extern int next_thread_num;
-extern map<void *, int> instance_name_map;
+extern map<const void *, int> instance_name_map;
 extern int next_instance_num;
 extern char log_thread_prefix;
 
-#define LOG_PREFIX { \
-    auto _thread_ = this_thread::get_id(); \
-    int _tid_ = thread_name_map[_thread_]; \
-    if (_tid_==0) \
-        _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
-    auto _utime_ = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000; \
-    cerr << setfill('0') << dec << left << setw(9) << _utime_ << " "; \
-    cerr << setfill(' ') << log_thread_prefix << left << setw(2) << _tid_; \
-    cerr << " " << setw(20) << __FILE__ << " " << setw(18) << __func__; \
-}
-#define LOG_THIS_POINTER { \
-    int _self_ = instance_name_map[this]; \
-    if (_self_==0) \
-        _self_ = instance_name_map[this] = ++next_instance_num; \
-    cerr << "#" << left << setw(2) << _self_ << " "; \
-}
-
-#define LOG_NONMEMBER(_x_) { \
-    lock _cel_(cerr_mutex); \
-    LOG_PREFIX; \
-    cerr << _x_ << endl; \
-}
-#define LOG(_x_) { \
-    lock _cel_(cerr_mutex); \
-    LOG_PREFIX; \
-    LOG_THIS_POINTER; \
-    cerr << _x_ << endl; \
-}
+struct locked_ostream {
+    ostream & s;
+    lock l;
+    ~locked_ostream() { s << endl; }
+    template <typename U>
+    locked_ostream & operator<<(U && u) { s << u; return *this; }
+
+    typedef std::ostream& (*ostream_manipulator)(ostream&);
+    locked_ostream & operator<<(ostream_manipulator manip) { s << manip; return *this; }
+};
+
+locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func);
+locked_ostream && _log_member(locked_ostream && f, const void *ptr);
+#define _log_nonmember(f, ptr) f
+
+#define _LOG(_context_) _context_(_log_prefix(locked_ostream{cerr, lock(cerr_mutex)}, __FILE__, __func__), (const void *)this)
+
+#define LOG_NONMEMBER _LOG(_log_nonmember)
+#define LOG           _LOG(_log_member)
 
 extern int DEBUG_LEVEL;