+#include "include/rpc/marshall.h"
+
+connection_delegate::~connection_delegate() {}
+
+connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
+: fd(std::move(f1)), delegate_(delegate), lossy_(l1)
+{
+ fd.flags() |= O_NONBLOCK;
+
+ signal(SIGPIPE, SIG_IGN);
+
+ global->shared_mgr.add_callback(fd, CB_RDONLY, this);
+}
+
+connection::~connection() {
+ {
+ lock ml(m_);
+ if (dead_)
+ return;
+ dead_ = true;
+ fd.shutdown(SHUT_RDWR);
+ }
+ // after block_remove_fd, select will never wait on fd and no callbacks
+ // will be active
+ global->shared_mgr.block_remove_fd(fd);
+ VERIFY(dead_ && wpdu_.status == unused);
+}
+
+shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
+ socket_t s = socket(AF_INET, SOCK_STREAM, 0);
+ s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
+ if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+ IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
+ close(s);
+ return nullptr;
+ }
+ IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
+ return std::make_shared<connection>(delegate, std::move(s), lossy);
+}
+
+bool connection::send(const string & b) {
+ lock ml(m_);
+
+ while (!dead_ && wpdu_.status != unused)
+ send_wait_.wait(ml);
+
+ if (dead_)
+ return false;
+
+ wpdu_ = {inflight, b, 0};
+
+ if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) {
+ IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
+ fd.shutdown(SHUT_RDWR);
+ }
+
+ if (!writepdu()) {
+ dead_ = true;
+ ml.unlock();
+ global->shared_mgr.block_remove_fd(fd);
+ ml.lock();
+ } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
+ // should be rare to need to explicitly add write callback
+ global->shared_mgr.add_callback(fd, CB_WRONLY, this);
+ while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
+ send_complete_.wait(ml);
+ }
+ bool ret = (!dead_ && wpdu_.status == inflight && wpdu_.cursor == b.size());
+ wpdu_ = {unused, "", 0};
+ send_wait_.notify_all();
+ return ret;
+}
+
+// fd is ready to be written
+void connection::write_cb(int s) {
+ lock ml(m_);
+ VERIFY(!dead_);
+ VERIFY(fd == s);
+ if (wpdu_.status != inflight) {
+ global->shared_mgr.del_callback(fd, CB_WRONLY);
+ return;
+ }
+ if (!writepdu()) {
+ global->shared_mgr.del_callback(fd, CB_RDWR);
+ dead_ = true;
+ } else {
+ VERIFY(wpdu_.status != error);
+ if (wpdu_.cursor < wpdu_.buf.size())
+ return;
+ }
+ send_complete_.notify_one();
+}
+
+bool connection::writepdu() {
+ VERIFY(wpdu_.status == inflight);
+ if (wpdu_.cursor == wpdu_.buf.size())
+ return true;
+
+ ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
+ wpdu_ = {error, "", 0};
+ }
+ return (errno == EAGAIN);
+ }
+ wpdu_.cursor += (size_t)n;
+ return true;
+}
+
+// fd is ready to be read
+void connection::read_cb(int s) {
+ lock ml(m_);
+ VERIFY(fd == s);
+ if (dead_)
+ return;
+
+ IF_LEVEL(5) LOG << "got data on fd " << s;
+
+ if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
+ if (!readpdu()) {
+ IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
+ global->shared_mgr.del_callback(fd, CB_RDWR);
+ dead_ = true;
+ send_complete_.notify_one();
+ }
+ }