Rolled handle infrastructure into rpcc.
authorPeter Iannucci <iannucci@mit.edu>
Wed, 18 Dec 2013 23:03:24 +0000 (15:03 -0800)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 18 Dec 2013 23:03:24 +0000 (15:03 -0800)
24 files changed:
.gitignore
Makefile
config.cc
handle.cc [deleted file]
handle.h [deleted file]
lock_demo.cc
lock_server.cc
lock_smain.cc
lock_tester.cc
paxos.cc
rpc/connection.cc
rpc/connection.h
rpc/poll_mgr.cc
rpc/poll_mgr.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm_client.cc
rsm_tester.cc
t4.cc [new file with mode: 0644]
t4.h [new file with mode: 0644]
threaded_log.cc
types.h

index 2dde284..378a62f 100644 (file)
@@ -5,8 +5,7 @@ rpc/rpctest
 lock_tester
 lock_demo
 lock_server
-*.swp
-*.swo
+*.sw*
 *.a
 *.log
 rsm_tester
index 0262c17..e41486f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -11,15 +11,15 @@ rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o
        ar cq $@ $^
        ranlib rpc/librpc.a
 
-rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
+rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a t4.o
 
-lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
+lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o
 
-lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
+lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o
 
-lock_server : lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a
+lock_server : lock_smain.o threaded_log.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o
 
-rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a
+rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a t4.o
 
 %.o: %.cc
        $(CXX) $(CXXFLAGS) -c $< -o $@
index 4931a9f..a177362 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,5 +1,4 @@
 #include "config.h"
-#include "handle.h"
 
 using std::vector;
 
@@ -85,7 +84,7 @@ void config::paxos_commit(unsigned instance, const string & value) {
         LOG << "is " << mem << " still a member?";
         if (!isamember(mem, newmem) && me != mem) {
             LOG << "delete " << mem;
-            handle(mem).invalidate();
+            rpcc::unbind_cached(mem);
         }
     }
 
@@ -198,11 +197,10 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock)
     VERIFY(cfg_mutex_lock);
     unsigned vid = my_view_id;
     LOG << "heartbeat to " << m << " (" << vid << ")";
-    handle h(m);
 
     cfg_mutex_lock.unlock();
     int r = 0, ret = rpc_protocol::bind_failure;
-    if (rpcc *cl = h.safebind())
+    if (auto cl = rpcc::bind_cached(m))
         ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
     cfg_mutex_lock.lock();
 
@@ -212,7 +210,7 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock)
             break;
         case rpc_protocol::atmostonce_failure:
         case rpc_protocol::oldsrv_failure:
-            h.invalidate();
+            rpcc::unbind_cached(m);
             break;
         default:
             LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
diff --git a/handle.cc b/handle.cc
deleted file mode 100644 (file)
index 20438f9..0000000
--- a/handle.cc
+++ /dev/null
@@ -1,61 +0,0 @@
-#include "handle.h"
-
-class hinfo {
-public:
-    unique_ptr<rpcc> client;
-    bool valid = true;
-    string destination;
-    std::mutex client_mutex;
-    hinfo(const string & destination_) : destination(destination_) {}
-};
-
-static std::mutex mgr_mutex;
-static std::map<string, shared_ptr<hinfo>> hmap;
-
-void handle::shutdown() {
-    lock ml(mgr_mutex);
-    LOG_NONMEMBER << "Shutting down handle manager";
-    for (auto p : hmap) {
-        p.second->valid = false;
-        LOG_NONMEMBER << "cl " << p.first << " refcnt " << p.second.use_count();
-    }
-    hmap.clear();
-}
-
-handle::handle(const string & destination) : destination_(destination) {
-    lock ml(mgr_mutex);
-    h = hmap[destination];
-    if (!h || !h->valid)
-        h = (hmap[destination] = std::make_shared<hinfo>(destination));
-}
-
-rpcc * handle::safebind() {
-    if (!h)
-        return nullptr;
-    lock cl(h->client_mutex);
-    if (!h->valid)
-        return nullptr;
-    if (!h->client) {
-        unique_ptr<rpcc> client(new rpcc(h->destination));
-        LOG << "bind(\"" << h->destination << "\")";
-        int ret = client->bind(milliseconds(1000));
-        if (ret < 0) {
-            LOG << "bind failure! " << h->destination << " " << ret;
-            h->valid = false;
-        } else {
-            LOG << "bind succeeded " << h->destination;
-            h->client = std::move(client);
-        }
-    }
-    return h->client.get();
-}
-
-void handle::invalidate() {
-    h.reset();
-    lock ml(mgr_mutex);
-    if (hmap.find(destination_) != hmap.end()) {
-        hmap[destination_]->valid = false;
-        LOG << "cl " << destination_ << " refcnt " << hmap[destination_].use_count();
-        hmap.erase(destination_);
-    }
-}
diff --git a/handle.h b/handle.h
deleted file mode 100644 (file)
index e569002..0000000
--- a/handle.h
+++ /dev/null
@@ -1,32 +0,0 @@
-#ifndef handle_h
-#define handle_h
-
-#include "types.h"
-#include "rpc/rpc.h"
-
-// Manage a cache of RPC connections.  Typical usage:
-//     handle h(dst);
-//     rpc_protocol::status ret = rpc_protocol::bind_failure;
-//     if (rpcc *cl = h.safebind())
-//         ret = cl->call(...);
-// assuming dst is a string holding the host:port of the RPC server you want to
-// talk to.
-//
-// If the calling program has not contacted dst before, safebind() will create
-// a new connection, call bind(), and return an rpcc*, or 0 if bind() failed.
-// if the program has previously contacted dst, safebind() just returns the
-// previously created rpcc*.  Because safebind() may block, callers should
-// probably not hold mutexes.
-
-class handle {
-    private:
-        shared_ptr<class hinfo> h;
-        const string destination_;
-    public:
-        handle(const string & destination);
-        rpcc *safebind();
-        void invalidate();
-        static void shutdown();
-};
-
-#endif
index 6cdf346..88383ab 100644 (file)
@@ -1,8 +1,7 @@
 #include "lock_client.h"
 
-char log_thread_prefix = 'd';
-
 int main(int argc, char *argv[]) {
+    global = new t4_state('d');
     if(argc != 2) {
         LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port";
         return 1;
index a4d5881..141a598 100644 (file)
@@ -3,7 +3,6 @@
 #include "lock_server.h"
 #include <unistd.h>
 #include <arpa/inet.h>
-#include "handle.h"
 
 lock_state::lock_state():
     held(false)
@@ -47,20 +46,13 @@ void lock_server::revoker () {
             continue;
 
         lock_state & st = get_lock_state(lid);
-        holder_t held_by;
-        {
-            lock sl(st.m);
-            held_by = st.held_by;
-        }
+        lock sl(st.m);
+        holder_t held_by = st.held_by;
+        sl.unlock();
 
-        rpcc *proxy = NULL;
-        // try a few times?
-        //int t=5;
-        //while (t-- && !proxy)
-        proxy = handle(held_by.first).safebind();
-        if (proxy) {
+        if (auto cl = rpcc::bind_cached(held_by.first)) {
             int r;
-            auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+            auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
             LOG << "Revoke returned " << ret;
         }
     }
@@ -83,14 +75,9 @@ void lock_server::retryer() {
             front = st.wanted_by.front();
         }
 
-        rpcc *proxy = NULL;
-        // try a few times?
-        //int t=5;
-        //while (t-- && !proxy)
-        proxy = handle(front.first).safebind();
-        if (proxy) {
+        if (auto cl = rpcc::bind_cached(front.first)) {
             int r;
-            auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
+            auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
             LOG << "Retry returned " << ret;
         }
     }
index fecd7f8..c6f3356 100644 (file)
@@ -4,9 +4,8 @@
 
 // Main loop of lock_server
 
-char log_thread_prefix = 's';
-
 int main(int argc, char *argv[]) {
+    global = new t4_state('s');
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
 
index 58e691f..a546e09 100644 (file)
@@ -5,9 +5,6 @@
 #include "lock_client.h"
 #include <arpa/inet.h>
 #include <unistd.h>
-#include "handle.h"
-
-char log_thread_prefix = 'c';
 
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
@@ -115,6 +112,7 @@ static void test5(int i) {
 int
 main(int argc, char *argv[])
 {
+    global = new t4_state('c');
     thread th[nt];
     int test = 0;
 
@@ -183,9 +181,4 @@ main(int argc, char *argv[])
 
     for (int i = 0; i < nt; i++)
         delete lc[i];
-
-    handle::shutdown();
-    poll_mgr::shared_mgr.shutdown();
-
-    LOG_NONMEMBER << argv[0] << ": clean-up complete";
 }
index a055c36..2c7e79a 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,5 +1,4 @@
 #include "paxos.h"
-#include "handle.h"
 
 using namespace std::placeholders;
 
@@ -94,8 +93,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
     prepareres res;
     prop_t highest_n_a{0, ""};
     for (auto i : nodes) {
-        handle h(i);
-        rpcc *r = h.safebind();
+        auto r = rpcc::bind_cached(i);
         if (!r)
             continue;
         auto status = (paxos_protocol::status)r->call_timeout(
@@ -124,8 +122,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
         const nodes_t & nodes, const value_t & v) {
     for (auto i : nodes) {
-        handle h(i);
-        rpcc *r = h.safebind();
+        auto r = rpcc::bind_cached(i);
         if (!r)
             continue;
         bool accept = false;
@@ -138,8 +135,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
 
 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
     for (auto i : accepts) {
-        handle h(i);
-        rpcc *r = h.safebind();
+        auto r = rpcc::bind_cached(i);
         if (!r)
             continue;
         int res = 0;
index c7e8f95..31ec5fa 100644 (file)
@@ -15,7 +15,7 @@ connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
 
     signal(SIGPIPE, SIG_IGN);
 
-    poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
+    global->shared_mgr.add_callback(fd, CB_RDONLY, this);
 }
 
 connection::~connection() {
@@ -28,7 +28,7 @@ connection::~connection() {
     }
     // after block_remove_fd, select will never wait on fd and no callbacks
     // will be active
-    poll_mgr::shared_mgr.block_remove_fd(fd);
+    global->shared_mgr.block_remove_fd(fd);
     VERIFY(dead_);
     VERIFY(wpdu_.status == unused);
 }
@@ -64,11 +64,11 @@ bool connection::send(const string & b) {
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
-        poll_mgr::shared_mgr.block_remove_fd(fd);
+        global->shared_mgr.block_remove_fd(fd);
         ml.lock();
     } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
         // should be rare to need to explicitly add write callback
-        poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
+        global->shared_mgr.add_callback(fd, CB_WRONLY, this);
         while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
             send_complete_.wait(ml);
     }
@@ -84,11 +84,11 @@ void connection::write_cb(int s) {
     VERIFY(!dead_);
     VERIFY(fd == s);
     if (wpdu_.status != inflight) {
-        poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
+        global->shared_mgr.del_callback(fd, CB_WRONLY);
         return;
     }
     if (!writepdu()) {
-        poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
+        global->shared_mgr.del_callback(fd, CB_RDWR);
         dead_ = true;
     } else {
         VERIFY(wpdu_.status != error);
@@ -127,7 +127,7 @@ void connection::read_cb(int s) {
     if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
         if (!readpdu()) {
             IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
-            poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
+            global->shared_mgr.del_callback(fd, CB_RDWR);
             dead_ = true;
             send_complete_.notify_one();
         }
@@ -214,11 +214,11 @@ connection_listener::connection_listener(connection_delegate * delegate, in_port
 
     IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
 
-    poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
+    global->shared_mgr.add_callback(tcp_, CB_RDONLY, this);
 }
 
 connection_listener::~connection_listener() {
-    poll_mgr::shared_mgr.block_remove_fd(tcp_);
+    global->shared_mgr.block_remove_fd(tcp_);
 }
 
 void connection_listener::read_cb(int) {
index 68bd902..03e92da 100644 (file)
@@ -4,8 +4,10 @@
 #include "types.h"
 #include <arpa/inet.h>
 #include <netinet/in.h>
+#include "t4.h"
 #include "poll_mgr.h"
 #include "file.h"
+#include "threaded_log.h"
 
 class connection;
 
index ebd61e1..102b8dc 100644 (file)
@@ -2,6 +2,7 @@
 #include <errno.h>
 #include <sys/select.h>
 #include "file.h"
+#include "threaded_log.h"
 
 #ifdef __linux__
 #include <sys/epoll.h>
@@ -11,8 +12,6 @@ using std::vector;
 
 aio_callback::~aio_callback() {}
 
-poll_mgr poll_mgr::shared_mgr;
-
 class wait_manager {
     public:
         virtual void watch_fd(int fd, poll_flag flag) = 0;
index babf908..6fe66c0 100644 (file)
@@ -25,8 +25,6 @@ class poll_mgr {
         poll_mgr();
         ~poll_mgr();
 
-        static poll_mgr shared_mgr;
-
         void add_callback(int fd, poll_flag flag, aio_callback *ch);
         void del_callback(int fd, poll_flag flag);
         void block_remove_fd(int fd);
index de33675..2889db9 100644 (file)
@@ -85,7 +85,8 @@ rpcc::rpcc(const string & d) : dst_(make_sockaddr(d))
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
-    cancel();
+    lock ml(m_);
+    cancel(ml);
     IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1);
     chan_.reset();
     VERIFY(calls_.size() == 0);
@@ -104,9 +105,29 @@ int rpcc::bind(milliseconds to) {
     return ret;
 }
 
+shared_ptr<rpcc> rpcc::bind_cached(const string & destination) {
+    auto client = global->get_handle(destination);
+    lock cl = lock(client->bind_m_);
+    if (!client->bind_done_) {
+        LOG_NONMEMBER << "bind(\"" << destination << "\")";
+        int ret = client->bind(milliseconds(1000));
+        if (ret < 0) {
+            LOG_NONMEMBER << "bind failure! " << destination << " " << ret;
+            client.reset();
+        } else {
+            LOG_NONMEMBER << "bind succeeded " << destination;
+        }
+    }
+    return client;
+}
+
+void rpcc::unbind_cached(const string & destination) {
+    global->erase_handle(destination);
+}
+
 // Cancel all outstanding calls
-void rpcc::cancel(void) {
-    lock ml(m_);
+void rpcc::cancel(lock & m_lock) {
+    VERIFY(m_lock);
     if (calls_.size()) {
         LOG << "force callers to fail";
         for (auto & p : calls_) {
@@ -122,7 +143,7 @@ void rpcc::cancel(void) {
 
         destroy_wait_ = true;
         while (calls_.size () > 0)
-            destroy_wait_c_.wait(ml);
+            destroy_wait_c_.wait(m_lock);
 
         LOG << "done";
     }
index 58e9381..f1eb3bc 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -10,6 +10,7 @@
 #include "marshall.h"
 #include "marshall_wrap.h"
 #include "connection.h"
+#include "threaded_log.h"
 
 using std::chrono::milliseconds;
 
@@ -75,6 +76,7 @@ class rpcc : private connection_delegate {
 
         std::mutex m_; // protect insert/delete to calls[]
         std::mutex chan_m_;
+        std::mutex bind_m_; // protect bind operations
 
         bool destroy_wait_ = false;
         cond destroy_wait_c_;
@@ -122,9 +124,17 @@ class rpcc : private connection_delegate {
 
         int bind(milliseconds to = rpc::to_max);
 
+        // Manages a cache of RPC connections.  Usage:
+        //     if (auto cl = rpcc::bind_cached(dst))
+        //         ret = cl->call(...);
+        // where the string dst has the form "host:port".  Because bind_cached()
+        // may block, callers should probably not hold mutexes.
+        static shared_ptr<rpcc> bind_cached(const string & destination);
+        static void unbind_cached(const string & destination);
+
         void set_reachable(bool r) { reachable_ = r; }
 
-        void cancel();
+        void cancel(lock & m_lock);
 
         template<class P, class R, typename ...Args>
         inline int call(proc_t<P> proc, R & r, const Args & ... args) {
index fb170e7..f8395e9 100644 (file)
@@ -7,11 +7,10 @@
 #include <getopt.h>
 #include <unistd.h>
 #include <string.h>
+#include "threaded_log.h"
 
 #define NUM_CL 2
 
-char log_thread_prefix = 'r';
-
 static rpcs *server;  // server rpc object
 static rpcc *clients[NUM_CL];  // client rpc object
 static string dst; //server's ip address
@@ -343,6 +342,7 @@ static void failure_test() {
 }
 
 int main(int argc, char *argv[]) {
+    global = new t4_state('r');
 
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
diff --git a/rsm.cc b/rsm.cc
index cb986fe..711f0ad 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -79,7 +79,6 @@
 // upcalls, but can keep its locks when calling down.
 
 #include "rsm.h"
-#include "handle.h"
 #include "rsm_client.h"
 #include <unistd.h>
 
@@ -202,13 +201,12 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
 bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
 {
     rsm_protocol::transferres r;
-    handle h(m);
     int ret = 0;
     LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
-    rpcc *cl;
+    shared_ptr<rpcc> cl;
     {
         rsm_mutex_lock.unlock();
-        cl = h.safebind();
+        cl = rpcc::bind_cached(m);
         if (cl) {
             ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
                     r, cfg->myaddr(), last_myvs, vid_insync);
@@ -229,10 +227,8 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
 
 bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
     rsm_mutex_lock.unlock();
-    handle h(m);
-    rpcc *cl = h.safebind();
     bool done = false;
-    if (cl) {
+    if (auto cl = rpcc::bind_cached(m)) {
         int r;
         auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
         done = (ret == rsm_protocol::OK);
@@ -243,21 +239,16 @@ bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
 
 
 bool rsm::join(const string & m, lock & rsm_mutex_lock) {
-    handle h(m);
     int ret = 0;
     string log;
 
     LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
-    rpcc *cl;
-    {
-        rsm_mutex_lock.unlock();
-        cl = h.safebind();
-        if (cl != 0) {
-            ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
-                    cfg->myaddr(), last_myvs);
-        }
-        rsm_mutex_lock.lock();
-    }
+
+    rsm_mutex_lock.unlock();
+    auto cl = rpcc::bind_cached(m);
+    if (cl)
+        ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+    rsm_mutex_lock.lock();
 
     if (cl == 0 || ret != rsm_protocol::OK) {
         LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
@@ -342,9 +333,8 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
-            handle h(m[i]);
             LOG << "Sending invoke to " << m[i];
-            rpcc *cl = h.safebind();
+            auto cl = rpcc::bind_cached(m[i]);
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
@@ -523,9 +513,9 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
-            handle h(m[i]);
             LOG << "member " << m[i] << " " << heal;
-            if (h.safebind()) h.safebind()->set_reachable(heal);
+            if (auto cl = rpcc::bind_cached(m[i]))
+                cl->set_reachable(heal);
         }
     }
     rsmrpc->set_reachable(heal);
index 161ddb5..9906d39 100644 (file)
@@ -1,6 +1,5 @@
 #include "rsm_client.h"
 #include <arpa/inet.h>
-#include <handle.h>
 #include <unistd.h>
 
 rsm_client::rsm_client(string dst) : primary(dst) {
@@ -19,10 +18,10 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s
     lock ml(rsm_client_mutex);
     while (1) {
         LOG << "proc " << std::hex << proc << " primary " << primary;
-        handle h(primary);
+        string prim = primary;
 
         ml.unlock();
-        rpcc *cl = h.safebind();
+        auto cl = rpcc::bind_cached(prim);
         auto ret = rsm_client_protocol::OK;
         if (cl)
             ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
@@ -31,34 +30,34 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s
         if (!cl)
             goto prim_fail;
 
-        LOG << "proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret;
+        LOG << "proc " << std::hex << proc << " primary " << prim << " ret " << std::dec << ret;
         if (ret == rsm_client_protocol::OK)
             return rsm_protocol::OK;
         if (ret == rsm_client_protocol::BUSY) {
-            LOG << "rsm is busy " << primary;
+            LOG << "rsm is busy " << prim;
             usleep(300000);
             continue;
         }
         if (ret == rsm_client_protocol::NOTPRIMARY) {
-            LOG << "primary " << primary << " isn't the primary--let's get a complete list of mems";
+            LOG << "primary " << prim << " isn't the primary--let's get a complete list of mems";
             if (init_members(ml))
                 continue;
         }
 prim_fail:
-        LOG << "primary " << primary << " failed ret " << std::dec << ret;
+        LOG << "primary " << prim << " failed ret " << std::dec << ret;
         primary_failure(ml);
-        LOG << "retry new primary " << primary;
+        LOG << "retry new primary " << prim;
     }
 }
 
 bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
     LOG << "get members!";
-    handle h(primary);
+    string prim = primary;
     int ret = rsm_client_protocol::ERR;
-    rpcc *cl;
+    shared_ptr<rpcc> cl;
     {
         rsm_client_mutex_lock.unlock();
-        cl = h.safebind();
+        cl = rpcc::bind_cached(prim);
         if (cl)
             ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
         rsm_client_mutex_lock.lock();
index d5d27fd..5507f80 100644 (file)
@@ -6,9 +6,8 @@
 #include "rsm_protocol.h"
 #include "rsmtest_client.h"
 
-char log_thread_prefix = 't';
-
 int main(int argc, char *argv[]) {
+    global = new t4_state('t');
     if(argc != 4){
         LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [partition] arg";
         return 1;
diff --git a/t4.cc b/t4.cc
new file mode 100644 (file)
index 0000000..834136b
--- /dev/null
+++ b/t4.cc
@@ -0,0 +1,39 @@
+#include "t4.h"
+#include <unistd.h>
+#include "rpc/rpc.h"
+
+using namespace std::chrono;
+
+t4_state *global;
+
+t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) {
+    uint32_t seed = std::random_device()();
+    auto time = system_clock::now().time_since_epoch();
+    auto ticks = duration_cast<nanoseconds>(time).count();
+    seed ^= (uint32_t)ticks;
+    auto pid = getpid();
+    seed ^= (uint32_t)pid;
+    auto tid = std::hash<std::thread::id>()(std::this_thread::get_id());
+    seed ^= (uint32_t)tid;
+    random_generator.seed(seed);
+    // make sure the clock will read differently next time!
+    std::this_thread::sleep_for(microseconds(1));
+}
+
+t4_state::~t4_state() {
+    lock ml(handle_cache_mutex);
+    handle_cache.clear();
+    shared_mgr.shutdown();
+}
+
+shared_ptr<rpcc> t4_state::get_handle(const string & destination) {
+    lock ml(handle_cache_mutex);
+    if (!handle_cache[destination])
+        handle_cache[destination] = std::make_shared<rpcc>(destination);
+    return handle_cache[destination];
+}
+
+void t4_state::erase_handle(const string & destination) {
+    lock ml(handle_cache_mutex);
+    handle_cache.erase(destination);
+}
diff --git a/t4.h b/t4.h
new file mode 100644 (file)
index 0000000..19eaddc
--- /dev/null
+++ b/t4.h
@@ -0,0 +1,33 @@
+#ifndef t4_h
+#define t4_h
+
+#include "types.h"
+#include "rpc/poll_mgr.h"
+
+struct t4_state {
+    std::mutex log_mutex;
+    std::map<thread::id, int> thread_name_map;
+    std::map<const void *, int> instance_name_map;
+
+    poll_mgr shared_mgr;
+
+    int next_thread_num = 0;
+    int next_instance_num = 0;
+    int DEBUG_LEVEL = 0;
+    char log_thread_prefix;
+
+    std::mutex handle_cache_mutex;
+    std::map<string, shared_ptr<class rpcc>> handle_cache;
+
+    shared_ptr<class rpcc> get_handle(const string & destination);
+    void erase_handle(const string & destination);
+
+    std::mt19937_64 random_generator;
+
+    t4_state(char log_prefix = ' ');
+    ~t4_state();
+};
+
+extern t4_state *global;
+
+#endif
index 98cc6f6..0160f33 100644 (file)
@@ -1,4 +1,5 @@
 #include "threaded_log.h"
+#include "t4.h"
 
 static std::mutex log_mutex;
 static std::map<thread::id, int> thread_name_map;
@@ -17,7 +18,7 @@ locked_ostream && _log_prefix(locked_ostream && f, const string & file, const st
     auto utime = duration_cast<microseconds>(
             system_clock::now().time_since_epoch()).count() % 1000000000;
     f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " ";
-    f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid;
+    f << std::setfill(' ') << global->log_thread_prefix << std::left << std::setw(2) << tid;
     f << " " << std::setw(20) << file << " " << std::setw(18) << func;
     return std::move(f);
 }
diff --git a/types.h b/types.h
index 7ab04cc..ce52a15 100644 (file)
--- a/types.h
+++ b/types.h
@@ -15,6 +15,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <random>
 #include <stdexcept>
 #include <sstream>
 #include <string>
@@ -106,7 +107,6 @@ operator<<(std::ostream & o, const A & a) {
 }
 
 #include "verify.h"
-#include "threaded_log.h"
 
 // struct tuple adapter, useful for marshalling and endian swapping.  usage:
 //