From d54215aea2a7321ab0f2dc7b0042fea2b7ff5df5 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Thu, 17 Oct 2013 15:34:32 -0400 Subject: [PATCH] More clean-ups --- Makefile.osx | 6 +++-- config.cc | 6 ++--- handle.cc | 33 +++++------------------- handle.h | 62 ++++++++++++--------------------------------- rpc/connection.cc | 52 ++++--------------------------------- rpc/connection.h | 16 +++++------- types.h | 8 +++--- lang/verify.h => verify.h | 0 8 files changed, 46 insertions(+), 137 deletions(-) rename lang/verify.h => verify.h (100%) diff --git a/Makefile.osx b/Makefile.osx index 7b2b74a..f69851c 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -2,8 +2,10 @@ PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \ -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \ -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \ -Wno-exit-time-destructors -CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) -LDFLAGS = -stdlib=libc++ +#OPTFLAGS = -ftrapv -O4 +OPTFLAGS = +CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) $(OPTFLAGS) +LDFLAGS = -stdlib=libc++ $(OPTFLAGS) CXX = clang++ CC = clang++ EXTRA_TARGETS = signatures diff --git a/config.cc b/config.cc index abd2f9c..38c4c05 100644 --- a/config.cc +++ b/config.cc @@ -88,8 +88,7 @@ void config::paxos_commit(unsigned instance, const string &value) { LOG("is " << mem << " still a member?"); if (!isamember(mem, newmem) && me != mem) { LOG("delete " << mem); - invalidate_handle(mem); - //handle(mem).invalidate(); + handle(mem).invalidate(); } } @@ -214,8 +213,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { break; case rpc_protocol::atmostonce_failure: case rpc_protocol::oldsrv_failure: - invalidate_handle(m); - //h.invalidate(); + h.invalidate(); break; default: LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); diff --git a/handle.cc b/handle.cc index 79b3b4c..792ce40 100644 --- a/handle.cc +++ b/handle.cc @@ -12,7 +12,7 @@ public: static mutex mgr_mutex; static map> hmap; -handle::handle(const string & destination) { +handle::handle(const string & destination) : destination_(destination) { lock ml(mgr_mutex); h = hmap[destination]; if (!h || !h->valid) @@ -27,12 +27,7 @@ rpcc * handle::safebind() { return nullptr; if (!h->client) { unique_ptr client(new rpcc(h->destination)); - LOG("trying to bind..." << h->destination); - // The test script assumes that the failure can be detected by paxos and - // rsm layer within few seconds. We have to set the timeout with a small - // value to support the assumption. - // - // With RPC_LOSSY=5, tests may fail due to delays and time outs. + LOG("bind(\"" << h->destination << "\")"); int ret = client->bind(milliseconds(1000)); if (ret < 0) { LOG("bind failure! " << h->destination << " " << ret); @@ -46,25 +41,11 @@ rpcc * handle::safebind() { } void handle::invalidate() { - { - lock cl(h->client_mutex); - h->valid = false; - - LOG_NONMEMBER("cl " << h->destination << " refcnt " << h.use_count()); - } - lock ml(mgr_mutex); - hmap.erase(h->destination); - h = nullptr; -} - -void invalidate_handle(const string & m) { + h.reset(); lock ml(mgr_mutex); - if (hmap.find(m) == hmap.end()) { - LOG_NONMEMBER("cl " << m << " isn't in cl list"); - return; + if (hmap.find(destination_) != hmap.end()) { + hmap[destination_]->valid = false; + LOG_NONMEMBER("cl " << destination_ << " refcnt " << hmap[destination_].use_count()); + hmap.erase(destination_); } - - hmap[m]->valid = false; - LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count()); - hmap.erase(m); } diff --git a/handle.h b/handle.h index f4df61a..92110bd 100644 --- a/handle.h +++ b/handle.h @@ -1,61 +1,31 @@ -// manage a cache of RPC connections. -// assuming cid is a string holding the -// host:port of the RPC server you want -// to talk to: -// -// handle h(cid); -// rpcc *cl = h.safebind(); -// if(cl){ -// ret = cl->call(...); -// } else { -// bind() failed -// } -// -// if the calling program has not contacted -// cid before, safebind() will create a new -// connection, call bind(), and return -// an rpcc*, or 0 if bind() failed. if the -// program has previously contacted cid, -// safebind() just returns the previously -// created rpcc*. best not to hold any -// mutexes while calling safebind(). - #ifndef handle_h #define handle_h #include "types.h" #include "rpc/rpc.h" -class hinfo; +// Manage a cache of RPC connections. Typical usage: +// handle h(dst); +// rpc_protocol::status ret = rpc_protocol::bind_failure; +// if (rpcc *cl = h.safebind()) +// ret = cl->call(...); +// assuming dst is a string holding the host:port of the RPC server you want to +// talk to. +// +// If the calling program has not contacted dst before, safebind() will create +// a new connection, call bind(), and return an rpcc*, or 0 if bind() failed. +// if the program has previously contacted dst, safebind() just returns the +// previously created rpcc*. Because safebind() may block, callers should +// probably not hold mutexes. class handle { private: - shared_ptr h; + shared_ptr h; + const string destination_; public: - handle(const string & m); - /* safebind will try to bind with the rpc server on the first call. - * Since bind may block, the caller probably should not hold a mutex - * when calling safebind. - * - * return: - * if the first safebind succeeded, all later calls would return - * a rpcc object; otherwise, all later calls would return NULL. - * - * Example: - * handle h(dst); - * XXX_protocol::status ret; - * if (h.safebind()) { - * ret = h.safebind()->call(...); - * } - * if (!h.safebind() || ret != XXX_protocol::OK) { - * // handle failure - * } - */ + handle(const string & destination); rpcc *safebind(); - void invalidate(); }; -void invalidate_handle(const string & m); - #endif diff --git a/rpc/connection.cc b/rpc/connection.cc index e269a3d..358a2af 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -214,12 +214,8 @@ tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); - - if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0) - perror("accept_loop setsockopt"); - - if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0) - perror("accept_loop setsockopt"); + tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}); + tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}); // careful to exactly match type signature of bind arguments so we don't // get std::bind instead @@ -239,23 +235,18 @@ tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); - file_t::pipe(pipe_); - - pipe_[0].flags() |= O_NONBLOCK; - - th_ = thread(&tcpsconn::accept_conn, this); + poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this); } tcpsconn::~tcpsconn() { - pipe_[1].close(); - th_.join(); + poll_mgr::shared_mgr.block_remove_fd(tcp_); for (auto & i : conns_) i.second->closeconn(); } -void tcpsconn::process_accept() { +void tcpsconn::read_cb(int) { sockaddr_in sin; socklen_t slen = sizeof(sin); int s1 = accept(tcp_, (sockaddr *)&sin, &slen); @@ -277,36 +268,3 @@ void tcpsconn::process_accept() { conns_[ch->channo()] = ch; } - -void tcpsconn::accept_conn() { - fd_set rfds; - int max_fd = max((int)pipe_[0], (int)tcp_); - - while (1) { - FD_ZERO(&rfds); - FD_SET(pipe_[0], &rfds); - FD_SET(tcp_, &rfds); - - int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); - - if (ret < 0 && errno == EINTR) - continue; - else if (ret < 0) { - perror("accept_conn select:"); - IF_LEVEL(0) LOG("accept_conn failure errno " << errno); - VERIFY(0); - } - - if (FD_ISSET(pipe_[0], &rfds)) - return; - - VERIFY(FD_ISSET(tcp_, &rfds)); - - try { - process_accept(); - } catch (thread_exit_exception e) { - break; - } - } -} - diff --git a/rpc/connection.h b/rpc/connection.h index 97bacbb..87d17e4 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -19,7 +19,7 @@ class connection_delegate { virtual ~connection_delegate() {} }; -class connection : public aio_callback, public enable_shared_from_this { +class connection : private aio_callback, public enable_shared_from_this { public: struct charbuf { string buf; @@ -34,14 +34,14 @@ class connection : public aio_callback, public enable_shared_from_this create_time() const { return create_time_; } static shared_ptr to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0); private: + void write_cb(int s); + void read_cb(int s); bool readpdu(); bool writepdu(); @@ -63,23 +63,21 @@ class connection : public aio_callback, public enable_shared_from_this> conns_; - - void process_accept(); }; #endif diff --git a/types.h b/types.h index 3ad73fb..6e6f0f6 100644 --- a/types.h +++ b/types.h @@ -12,6 +12,10 @@ using std::min_element; using std::find; using std::count_if; +#include +using cond = std::condition_variable; +using std::cv_status; + #include using std::chrono::seconds; using std::chrono::milliseconds; @@ -71,8 +75,6 @@ using std::weak_ptr; #include using std::mutex; using lock = std::unique_lock; -using cond = std::condition_variable; -using std::cv_status; #include using std::ostringstream; @@ -161,7 +163,7 @@ inline vector explode(const string &s, string delim=" ") { return out; } -#include "lang/verify.h" +#include "verify.h" #include "threaded_log.h" // struct tuple adapter, useful for marshalling diff --git a/lang/verify.h b/verify.h similarity index 100% rename from lang/verify.h rename to verify.h -- 1.7.9.5