X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b..6623ac357055b95ce4fc0cbe9c5dc15524a9f20c:/rpc/connection.h diff --git a/rpc/connection.h b/rpc/connection.h index 3e19a93..97bacbb 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -2,10 +2,9 @@ #define connection_h #include "types.h" -#include #include #include -#include "pollmgr.h" +#include "poll_mgr.h" #include "file.h" constexpr size_t size_t_max = numeric_limits::max(); @@ -14,42 +13,40 @@ class thread_exit_exception : exception {}; class connection; -class chanmgr { +class connection_delegate { public: - virtual bool got_pdu(connection *c, const string & b) = 0; - virtual ~chanmgr() {} + virtual bool got_pdu(const shared_ptr & c, const string & b) = 0; + virtual ~connection_delegate() {} }; -class connection : public aio_callback { +class connection : public aio_callback, public enable_shared_from_this { public: struct charbuf { string buf; size_t solong = 0; // number of bytes written or read so far }; - connection(chanmgr *m1, int f1, int lossytest=0); + connection(connection_delegate *m1, socket_t && f1, int lossytest=0); ~connection(); int channo() { return fd_; } - bool isdead(); + bool isdead() { lock ml(m_); return dead_; } void closeconn(); bool send(const string & b); void write_cb(int s); void read_cb(int s); - void incref(); - void decref(); - int ref() { lock rl(ref_m_); return refno_; } + time_point create_time() const { return create_time_; } - int compare(connection *another); + static shared_ptr to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0); private: bool readpdu(); bool writepdu(); - chanmgr *mgr_; + connection_delegate *mgr_; const file_t fd_; bool dead_ = false; @@ -59,18 +56,16 @@ class connection : public aio_callback { time_point create_time_; int waiters_ = 0; - int refno_ = 1; int lossy_ = 0; mutex m_; - mutex ref_m_; cond send_complete_; cond send_wait_; }; class tcpsconn { public: - tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0); + tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0); ~tcpsconn(); inline in_port_t port() { return port_; } void accept_conn(); @@ -81,19 +76,10 @@ class tcpsconn { file_t pipe_[2]; socket_t tcp_; // listens for connections - chanmgr *mgr_; + connection_delegate *mgr_; int lossy_; - map conns_; + map> conns_; void process_accept(); }; - -struct bundle { - bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} - chanmgr *mgr; - int tcp; - int lossy; -}; - -connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); #endif