X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/03b35a9a1bd1f583e32b27d260b223a0989d6c75..refs/heads/iannucci:/rpc/connection.cc diff --git a/rpc/connection.cc b/rpc/connection.cc index c7e8f95..7a6371a 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,10 +1,10 @@ -#include "connection.h" -#include "rpc_protocol.h" +#include "include/rpc/connection.h" +#include "include/rpc/rpc_protocol.h" #include #include #include #include -#include "marshall.h" +#include "include/rpc/marshall.h" connection_delegate::~connection_delegate() {} @@ -15,7 +15,7 @@ connection::connection(connection_delegate * delegate, socket_t && f1, int l1) signal(SIGPIPE, SIG_IGN); - poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this); + global->shared_mgr.add_callback(fd, CB_RDONLY, this); } connection::~connection() { @@ -24,13 +24,12 @@ connection::~connection() { if (dead_) return; dead_ = true; - shutdown(fd,SHUT_RDWR); + fd.shutdown(SHUT_RDWR); } // after block_remove_fd, select will never wait on fd and no callbacks // will be active - poll_mgr::shared_mgr.block_remove_fd(fd); - VERIFY(dead_); - VERIFY(wpdu_.status == unused); + global->shared_mgr.block_remove_fd(fd); + VERIFY(dead_ && wpdu_.status == unused); } shared_ptr connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) { @@ -56,19 +55,19 @@ bool connection::send(const string & b) { wpdu_ = {inflight, b, 0}; - if (lossy_ && (random()%100) < lossy_) { + if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) { IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd; - shutdown(fd,SHUT_RDWR); + fd.shutdown(SHUT_RDWR); } if (!writepdu()) { dead_ = true; ml.unlock(); - poll_mgr::shared_mgr.block_remove_fd(fd); + global->shared_mgr.block_remove_fd(fd); ml.lock(); } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) { // should be rare to need to explicitly add write callback - poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this); + global->shared_mgr.add_callback(fd, CB_WRONLY, this); while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size()) send_complete_.wait(ml); } @@ -84,11 +83,11 @@ void connection::write_cb(int s) { VERIFY(!dead_); VERIFY(fd == s); if (wpdu_.status != inflight) { - poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY); + global->shared_mgr.del_callback(fd, CB_WRONLY); return; } if (!writepdu()) { - poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); + global->shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.status != error); @@ -103,7 +102,7 @@ bool connection::writepdu() { if (wpdu_.cursor == wpdu_.buf.size()) return true; - ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor)); + ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor)); if (n < 0) { if (errno != EAGAIN) { IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno; @@ -127,7 +126,7 @@ void connection::read_cb(int s) { if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) { if (!readpdu()) { IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying"; - poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); + global->shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; send_complete_.notify_one(); } @@ -189,8 +188,8 @@ bool connection::readpdu() { connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest) : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest) { - tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); - tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); + tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, int{1}); + tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, int{1}); tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}); tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}); @@ -214,11 +213,11 @@ connection_listener::connection_listener(connection_delegate * delegate, in_port IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port; - poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this); + global->shared_mgr.add_callback(tcp_, CB_RDONLY, this); } connection_listener::~connection_listener() { - poll_mgr::shared_mgr.block_remove_fd(tcp_); + global->shared_mgr.block_remove_fd(tcp_); } void connection_listener::read_cb(int) {