X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/6623ac357055b95ce4fc0cbe9c5dc15524a9f20c..03b35a9a1bd1f583e32b27d260b223a0989d6c75:/rpc/connection.h?ds=inline diff --git a/rpc/connection.h b/rpc/connection.h index 97bacbb..68bd902 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -7,79 +7,74 @@ #include "poll_mgr.h" #include "file.h" -constexpr size_t size_t_max = numeric_limits::max(); - -class thread_exit_exception : exception {}; - class connection; class connection_delegate { public: virtual bool got_pdu(const shared_ptr & c, const string & b) = 0; - virtual ~connection_delegate() {} + virtual ~connection_delegate(); }; -class connection : public aio_callback, public enable_shared_from_this { - public: - struct charbuf { - string buf; - size_t solong = 0; // number of bytes written or read so far - }; +using std::chrono::steady_clock; +using time_point = std::chrono::time_point; - connection(connection_delegate *m1, socket_t && f1, int lossytest=0); +class connection : private aio_callback, public std::enable_shared_from_this { + public: + connection(connection_delegate * delegate, socket_t && f1, int lossytest=0); ~connection(); - int channo() { return fd_; } - bool isdead() { lock ml(m_); return dead_; } - void closeconn(); + bool isdead() { return dead_; } bool send(const string & b); - void write_cb(int s); - void read_cb(int s); - time_point create_time() const { return create_time_; } + static shared_ptr to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0); - static shared_ptr to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0); + const time_point create_time = steady_clock::now(); + const file_t fd; private: + void write_cb(int s); + void read_cb(int s); bool readpdu(); bool writepdu(); - connection_delegate *mgr_; - const file_t fd_; + connection_delegate * delegate_; bool dead_ = false; - charbuf wpdu_; - charbuf rpdu_; + enum charbuf_status_t { unused, inflight, error }; - time_point create_time_; + struct charbuf { + charbuf_status_t status; + string buf; + size_t cursor; // number of bytes written or read so far + }; + + charbuf wpdu_ = {unused, "", 0}; + charbuf rpdu_ = {unused, "", 0}; - int waiters_ = 0; int lossy_ = 0; - mutex m_; + std::mutex m_; cond send_complete_; cond send_wait_; }; -class tcpsconn { +class connection_listener : private aio_callback { public: - tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0); - ~tcpsconn(); + connection_listener(connection_delegate * delegate, in_port_t port, int lossytest=0); + ~connection_listener(); inline in_port_t port() { return port_; } - void accept_conn(); private: + void write_cb(int) {} + void read_cb(int s); + in_port_t port_; - mutex m_; - thread th_; - file_t pipe_[2]; + std::mutex m_; socket_t tcp_; // listens for connections - connection_delegate *mgr_; + connection_delegate * delegate_; int lossy_; - map> conns_; - - void process_accept(); + std::map> conns_; }; #endif