Cosmetic changes
[invirt/third/libt4.git] / rpc / rpc.h
index 58e9381..ddf15f9 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -2,14 +2,13 @@
 #define rpc_h
 
 #include "types.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
 
 #include "rpc_protocol.h"
-#include "thr_pool.h"
+#include "thread_pool.h"
 #include "marshall.h"
 #include "marshall_wrap.h"
 #include "connection.h"
+#include "threaded_log.h"
 
 using std::chrono::milliseconds;
 
@@ -75,6 +74,7 @@ class rpcc : private connection_delegate {
 
         std::mutex m_; // protect insert/delete to calls[]
         std::mutex chan_m_;
+        std::mutex bind_m_; // protect bind operations
 
         bool destroy_wait_ = false;
         cond destroy_wait_c_;
@@ -94,20 +94,14 @@ class rpcc : private connection_delegate {
         request dup_req_;
         int xid_rep_done_ = -1;
 
-        int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req);
+        int call_marshalled(proc_id_t proc, milliseconds to, string & rep, marshall & req);
 
         template<class R>
         inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) {
             string rep;
-            int intret = call1(proc, to, rep, req);
-            if (intret < 0) return intret;
-            unmarshall u(rep, true, r);
-            if (u.okdone() != true) {
-                LOG << "rpcc::call_m: failed to unmarshall the reply.  You are probably "
-                    << "calling RPC 0x" << std::hex << proc << " with the wrong return type.";
-                VERIFY(0);
-                return rpc_protocol::unmarshall_reply_failure;
-            }
+            int intret = call_marshalled(proc, to, rep, req);
+            if (intret >= 0)
+                VERIFY(unmarshall(rep, true, r).okdone()); // guaranteed by static type checking
             return intret;
         }
 
@@ -118,13 +112,19 @@ class rpcc : private connection_delegate {
         rpcc(const string & d);
         ~rpcc();
 
-        nonce_t id() { return clt_nonce_; }
-
         int bind(milliseconds to = rpc::to_max);
 
+        // Manages a cache of RPC connections.  Usage:
+        //     if (auto cl = rpcc::bind_cached(dst))
+        //         ret = cl->call(...);
+        // where the string dst has the form "host:port".  Because bind_cached()
+        // may block, callers should probably not hold mutexes.
+        static shared_ptr<rpcc> bind_cached(const string & destination);
+        static void unbind_cached(const string & destination);
+
         void set_reachable(bool r) { reachable_ = r; }
 
-        void cancel();
+        void cancel(lock & m_lock);
 
         template<class P, class R, typename ...Args>
         inline int call(proc_t<P> proc, R & r, const Args & ... args) {
@@ -193,7 +193,7 @@ class rpcs : private connection_delegate {
 
         void dispatch(shared_ptr<connection> c, const string & buf);
 
-        unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
+        unique_ptr<thread_pool> dispatchpool_{new thread_pool(6)};
         unique_ptr<connection_listener> listener_;
 
         // RPC handler for clients binding