More clean-ups
authorPeter Iannucci <iannucci@mit.edu>
Thu, 17 Oct 2013 19:34:32 +0000 (15:34 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 17 Oct 2013 22:29:57 +0000 (18:29 -0400)
Makefile.osx
config.cc
handle.cc
handle.h
rpc/connection.cc
rpc/connection.h
types.h
verify.h [moved from lang/verify.h with 100% similarity]

index 7b2b74a..f69851c 100644 (file)
@@ -2,8 +2,10 @@ PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \
                   -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
                   -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
                   -Wno-exit-time-destructors
-CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY)
-LDFLAGS = -stdlib=libc++
+#OPTFLAGS = -ftrapv -O4
+OPTFLAGS =
+CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) $(OPTFLAGS)
+LDFLAGS = -stdlib=libc++ $(OPTFLAGS)
 CXX = clang++
 CC = clang++
 EXTRA_TARGETS = signatures
index abd2f9c..38c4c05 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -88,8 +88,7 @@ void config::paxos_commit(unsigned instance, const string &value) {
         LOG("is " << mem << " still a member?");
         if (!isamember(mem, newmem) && me != mem) {
             LOG("delete " << mem);
-            invalidate_handle(mem);
-            //handle(mem).invalidate();
+            handle(mem).invalidate();
         }
     }
 
@@ -214,8 +213,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
             break;
         case rpc_protocol::atmostonce_failure:
         case rpc_protocol::oldsrv_failure:
-            invalidate_handle(m);
-            //h.invalidate();
+            h.invalidate();
             break;
         default:
             LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
index 79b3b4c..792ce40 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -12,7 +12,7 @@ public:
 static mutex mgr_mutex;
 static map<string, shared_ptr<hinfo>> hmap;
 
-handle::handle(const string & destination) {
+handle::handle(const string & destination) : destination_(destination) {
     lock ml(mgr_mutex);
     h = hmap[destination];
     if (!h || !h->valid)
@@ -27,12 +27,7 @@ rpcc * handle::safebind() {
         return nullptr;
     if (!h->client) {
         unique_ptr<rpcc> client(new rpcc(h->destination));
-        LOG("trying to bind..." << h->destination);
-        // The test script assumes that the failure can be detected by paxos and
-        // rsm layer within few seconds. We have to set the timeout with a small
-        // value to support the assumption.
-        // 
-        // With RPC_LOSSY=5, tests may fail due to delays and time outs.
+        LOG("bind(\"" << h->destination << "\")");
         int ret = client->bind(milliseconds(1000));
         if (ret < 0) {
             LOG("bind failure! " << h->destination << " " << ret);
@@ -46,25 +41,11 @@ rpcc * handle::safebind() {
 }
 
 void handle::invalidate() {
-    {
-        lock cl(h->client_mutex);
-        h->valid = false;
-
-        LOG_NONMEMBER("cl " << h->destination << " refcnt " << h.use_count());
-    }
-    lock ml(mgr_mutex);
-    hmap.erase(h->destination);
-    h = nullptr;
-}
-
-void invalidate_handle(const string & m) {
+    h.reset();
     lock ml(mgr_mutex);
-    if (hmap.find(m) == hmap.end()) {
-        LOG_NONMEMBER("cl " << m << " isn't in cl list");
-        return;
+    if (hmap.find(destination_) != hmap.end()) {
+        hmap[destination_]->valid = false;
+        LOG_NONMEMBER("cl " << destination_ << " refcnt " << hmap[destination_].use_count());
+        hmap.erase(destination_);
     }
-
-    hmap[m]->valid = false;
-    LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count());
-    hmap.erase(m);
 }
index f4df61a..92110bd 100644 (file)
--- a/handle.h
+++ b/handle.h
@@ -1,61 +1,31 @@
-// manage a cache of RPC connections.
-// assuming cid is a string holding the
-// host:port of the RPC server you want
-// to talk to:
-//
-// handle h(cid);
-// rpcc *cl = h.safebind();
-// if(cl){
-//   ret = cl->call(...);
-// } else {
-//   bind() failed
-// }
-//
-// if the calling program has not contacted
-// cid before, safebind() will create a new
-// connection, call bind(), and return
-// an rpcc*, or 0 if bind() failed. if the
-// program has previously contacted cid,
-// safebind() just returns the previously
-// created rpcc*. best not to hold any
-// mutexes while calling safebind().
-
 #ifndef handle_h
 #define handle_h
 
 #include "types.h"
 #include "rpc/rpc.h"
 
-class hinfo;
+// Manage a cache of RPC connections.  Typical usage:
+//     handle h(dst);
+//     rpc_protocol::status ret = rpc_protocol::bind_failure;
+//     if (rpcc *cl = h.safebind())
+//         ret = cl->call(...);
+// assuming dst is a string holding the host:port of the RPC server you want to
+// talk to.
+//
+// If the calling program has not contacted dst before, safebind() will create
+// a new connection, call bind(), and return an rpcc*, or 0 if bind() failed.
+// if the program has previously contacted dst, safebind() just returns the
+// previously created rpcc*.  Because safebind() may block, callers should
+// probably not hold mutexes.
 
 class handle {
     private:
-        shared_ptr<hinfo> h;
+        shared_ptr<class hinfo> h;
+        const string destination_;
     public:
-        handle(const string & m);
-        /* safebind will try to bind with the rpc server on the first call.
-         * Since bind may block, the caller probably should not hold a mutex
-         * when calling safebind.
-         *
-         * return: 
-         *   if the first safebind succeeded, all later calls would return
-         *   a rpcc object; otherwise, all later calls would return NULL.
-         *
-         * Example:
-         *   handle h(dst);
-         *   XXX_protocol::status ret;
-         *   if (h.safebind()) {
-         *     ret = h.safebind()->call(...);
-         *   }
-         *   if (!h.safebind() || ret != XXX_protocol::OK) {
-         *     // handle failure
-         *   }
-         */
+        handle(const string & destination);
         rpcc *safebind();
-
         void invalidate();
 };
 
-void invalidate_handle(const string & m);
-
 #endif
index e269a3d..358a2af 100644 (file)
@@ -214,12 +214,8 @@ tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
 
     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
-
-    if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0)
-        perror("accept_loop setsockopt");
-
-    if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0)
-        perror("accept_loop setsockopt");
+    tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
+    tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
 
     // careful to exactly match type signature of bind arguments so we don't
     // get std::bind instead
@@ -239,23 +235,18 @@ tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
-    file_t::pipe(pipe_);
-
-    pipe_[0].flags() |= O_NONBLOCK;
-
-    th_ = thread(&tcpsconn::accept_conn, this);
+    poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
 }
 
 tcpsconn::~tcpsconn()
 {
-    pipe_[1].close();
-    th_.join();
+    poll_mgr::shared_mgr.block_remove_fd(tcp_);
 
     for (auto & i : conns_)
         i.second->closeconn();
 }
 
-void tcpsconn::process_accept() {
+void tcpsconn::read_cb(int) {
     sockaddr_in sin;
     socklen_t slen = sizeof(sin);
     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
@@ -277,36 +268,3 @@ void tcpsconn::process_accept() {
 
     conns_[ch->channo()] = ch;
 }
-
-void tcpsconn::accept_conn() {
-    fd_set rfds;
-    int max_fd = max((int)pipe_[0], (int)tcp_);
-
-    while (1) {
-        FD_ZERO(&rfds);
-        FD_SET(pipe_[0], &rfds);
-        FD_SET(tcp_, &rfds);
-
-        int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
-        if (ret < 0 && errno == EINTR)
-            continue;
-        else if (ret < 0) {
-            perror("accept_conn select:");
-            IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
-            VERIFY(0);
-        }
-
-        if (FD_ISSET(pipe_[0], &rfds))
-            return;
-
-        VERIFY(FD_ISSET(tcp_, &rfds));
-
-        try {
-            process_accept();
-        } catch (thread_exit_exception e) {
-            break;
-        }
-    }
-}
-
index 97bacbb..87d17e4 100644 (file)
@@ -19,7 +19,7 @@ class connection_delegate {
         virtual ~connection_delegate() {}
 };
 
-class connection : public aio_callback, public enable_shared_from_this<connection> {
+class connection : private aio_callback, public enable_shared_from_this<connection> {
     public:
         struct charbuf {
             string buf;
@@ -34,14 +34,14 @@ class connection : public aio_callback, public enable_shared_from_this<connectio
         void closeconn();
 
         bool send(const string & b);
-        void write_cb(int s);
-        void read_cb(int s);
 
         time_point<steady_clock> create_time() const { return create_time_; }
 
         static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
 
     private:
+        void write_cb(int s);
+        void read_cb(int s);
 
         bool readpdu();
         bool writepdu();
@@ -63,23 +63,21 @@ class connection : public aio_callback, public enable_shared_from_this<connectio
         cond send_wait_;
 };
 
-class tcpsconn {
+class tcpsconn : private aio_callback {
     public:
         tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0);
         ~tcpsconn();
         inline in_port_t port() { return port_; }
-        void accept_conn();
     private:
+        void write_cb(int) {}
+        void read_cb(int s);
+
         in_port_t port_;
         mutex m_;
-        thread th_;
-        file_t pipe_[2];
 
         socket_t tcp_; // listens for connections
         connection_delegate *mgr_;
         int lossy_;
         map<int, shared_ptr<connection>> conns_;
-
-        void process_accept();
 };
 #endif
diff --git a/types.h b/types.h
index 3ad73fb..6e6f0f6 100644 (file)
--- a/types.h
+++ b/types.h
@@ -12,6 +12,10 @@ using std::min_element;
 using std::find;
 using std::count_if;
 
+#include <condition_variable>
+using cond = std::condition_variable;
+using std::cv_status;
+
 #include <chrono>
 using std::chrono::seconds;
 using std::chrono::milliseconds;
@@ -71,8 +75,6 @@ using std::weak_ptr;
 #include <mutex>
 using std::mutex;
 using lock = std::unique_lock<std::mutex>;
-using cond = std::condition_variable;
-using std::cv_status;
 
 #include <sstream>
 using std::ostringstream;
@@ -161,7 +163,7 @@ inline vector<string> explode(const string &s, string delim=" ") {
     return out;
 }
 
-#include "lang/verify.h"
+#include "verify.h"
 #include "threaded_log.h"
 
 // struct tuple adapter, useful for marshalling
similarity index 100%
rename from lang/verify.h
rename to verify.h