-Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
-Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
-Wno-exit-time-destructors
-CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY)
-LDFLAGS = -stdlib=libc++
+#OPTFLAGS = -ftrapv -O4
+OPTFLAGS =
+CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) $(OPTFLAGS)
+LDFLAGS = -stdlib=libc++ $(OPTFLAGS)
CXX = clang++
CC = clang++
EXTRA_TARGETS = signatures
LOG("is " << mem << " still a member?");
if (!isamember(mem, newmem) && me != mem) {
LOG("delete " << mem);
- invalidate_handle(mem);
- //handle(mem).invalidate();
+ handle(mem).invalidate();
}
}
break;
case rpc_protocol::atmostonce_failure:
case rpc_protocol::oldsrv_failure:
- invalidate_handle(m);
- //h.invalidate();
+ h.invalidate();
break;
default:
LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
static mutex mgr_mutex;
static map<string, shared_ptr<hinfo>> hmap;
-handle::handle(const string & destination) {
+handle::handle(const string & destination) : destination_(destination) {
lock ml(mgr_mutex);
h = hmap[destination];
if (!h || !h->valid)
return nullptr;
if (!h->client) {
unique_ptr<rpcc> client(new rpcc(h->destination));
- LOG("trying to bind..." << h->destination);
- // The test script assumes that the failure can be detected by paxos and
- // rsm layer within few seconds. We have to set the timeout with a small
- // value to support the assumption.
- //
- // With RPC_LOSSY=5, tests may fail due to delays and time outs.
+ LOG("bind(\"" << h->destination << "\")");
int ret = client->bind(milliseconds(1000));
if (ret < 0) {
LOG("bind failure! " << h->destination << " " << ret);
}
void handle::invalidate() {
- {
- lock cl(h->client_mutex);
- h->valid = false;
-
- LOG_NONMEMBER("cl " << h->destination << " refcnt " << h.use_count());
- }
- lock ml(mgr_mutex);
- hmap.erase(h->destination);
- h = nullptr;
-}
-
-void invalidate_handle(const string & m) {
+ h.reset();
lock ml(mgr_mutex);
- if (hmap.find(m) == hmap.end()) {
- LOG_NONMEMBER("cl " << m << " isn't in cl list");
- return;
+ if (hmap.find(destination_) != hmap.end()) {
+ hmap[destination_]->valid = false;
+ LOG_NONMEMBER("cl " << destination_ << " refcnt " << hmap[destination_].use_count());
+ hmap.erase(destination_);
}
-
- hmap[m]->valid = false;
- LOG_NONMEMBER("cl " << m << " refcnt " << hmap[m].use_count());
- hmap.erase(m);
}
-// manage a cache of RPC connections.
-// assuming cid is a string holding the
-// host:port of the RPC server you want
-// to talk to:
-//
-// handle h(cid);
-// rpcc *cl = h.safebind();
-// if(cl){
-// ret = cl->call(...);
-// } else {
-// bind() failed
-// }
-//
-// if the calling program has not contacted
-// cid before, safebind() will create a new
-// connection, call bind(), and return
-// an rpcc*, or 0 if bind() failed. if the
-// program has previously contacted cid,
-// safebind() just returns the previously
-// created rpcc*. best not to hold any
-// mutexes while calling safebind().
-
#ifndef handle_h
#define handle_h
#include "types.h"
#include "rpc/rpc.h"
-class hinfo;
+// Manage a cache of RPC connections. Typical usage:
+// handle h(dst);
+// rpc_protocol::status ret = rpc_protocol::bind_failure;
+// if (rpcc *cl = h.safebind())
+// ret = cl->call(...);
+// assuming dst is a string holding the host:port of the RPC server you want to
+// talk to.
+//
+// If the calling program has not contacted dst before, safebind() will create
+// a new connection, call bind(), and return an rpcc*, or 0 if bind() failed.
+// if the program has previously contacted dst, safebind() just returns the
+// previously created rpcc*. Because safebind() may block, callers should
+// probably not hold mutexes.
class handle {
private:
- shared_ptr<hinfo> h;
+ shared_ptr<class hinfo> h;
+ const string destination_;
public:
- handle(const string & m);
- /* safebind will try to bind with the rpc server on the first call.
- * Since bind may block, the caller probably should not hold a mutex
- * when calling safebind.
- *
- * return:
- * if the first safebind succeeded, all later calls would return
- * a rpcc object; otherwise, all later calls would return NULL.
- *
- * Example:
- * handle h(dst);
- * XXX_protocol::status ret;
- * if (h.safebind()) {
- * ret = h.safebind()->call(...);
- * }
- * if (!h.safebind() || ret != XXX_protocol::OK) {
- * // handle failure
- * }
- */
+ handle(const string & destination);
rpcc *safebind();
-
void invalidate();
};
-void invalidate_handle(const string & m);
-
#endif
tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
-
- if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0)
- perror("accept_loop setsockopt");
-
- if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0)
- perror("accept_loop setsockopt");
+ tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
+ tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
// careful to exactly match type signature of bind arguments so we don't
// get std::bind instead
IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
- file_t::pipe(pipe_);
-
- pipe_[0].flags() |= O_NONBLOCK;
-
- th_ = thread(&tcpsconn::accept_conn, this);
+ poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
}
tcpsconn::~tcpsconn()
{
- pipe_[1].close();
- th_.join();
+ poll_mgr::shared_mgr.block_remove_fd(tcp_);
for (auto & i : conns_)
i.second->closeconn();
}
-void tcpsconn::process_accept() {
+void tcpsconn::read_cb(int) {
sockaddr_in sin;
socklen_t slen = sizeof(sin);
int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
conns_[ch->channo()] = ch;
}
-
-void tcpsconn::accept_conn() {
- fd_set rfds;
- int max_fd = max((int)pipe_[0], (int)tcp_);
-
- while (1) {
- FD_ZERO(&rfds);
- FD_SET(pipe_[0], &rfds);
- FD_SET(tcp_, &rfds);
-
- int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
- if (ret < 0 && errno == EINTR)
- continue;
- else if (ret < 0) {
- perror("accept_conn select:");
- IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
- VERIFY(0);
- }
-
- if (FD_ISSET(pipe_[0], &rfds))
- return;
-
- VERIFY(FD_ISSET(tcp_, &rfds));
-
- try {
- process_accept();
- } catch (thread_exit_exception e) {
- break;
- }
- }
-}
-
virtual ~connection_delegate() {}
};
-class connection : public aio_callback, public enable_shared_from_this<connection> {
+class connection : private aio_callback, public enable_shared_from_this<connection> {
public:
struct charbuf {
string buf;
void closeconn();
bool send(const string & b);
- void write_cb(int s);
- void read_cb(int s);
time_point<steady_clock> create_time() const { return create_time_; }
static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
private:
+ void write_cb(int s);
+ void read_cb(int s);
bool readpdu();
bool writepdu();
cond send_wait_;
};
-class tcpsconn {
+class tcpsconn : private aio_callback {
public:
tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0);
~tcpsconn();
inline in_port_t port() { return port_; }
- void accept_conn();
private:
+ void write_cb(int) {}
+ void read_cb(int s);
+
in_port_t port_;
mutex m_;
- thread th_;
- file_t pipe_[2];
socket_t tcp_; // listens for connections
connection_delegate *mgr_;
int lossy_;
map<int, shared_ptr<connection>> conns_;
-
- void process_accept();
};
#endif
using std::find;
using std::count_if;
+#include <condition_variable>
+using cond = std::condition_variable;
+using std::cv_status;
+
#include <chrono>
using std::chrono::seconds;
using std::chrono::milliseconds;
#include <mutex>
using std::mutex;
using lock = std::unique_lock<std::mutex>;
-using cond = std::condition_variable;
-using std::cv_status;
#include <sstream>
using std::ostringstream;
return out;
}
-#include "lang/verify.h"
+#include "verify.h"
#include "threaded_log.h"
// struct tuple adapter, useful for marshalling