X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/dfe8486473094c0769fd1922329c3f0dfd8f43c0..c06ef44e7af1571710fd31dd0ab068dd77b1eb2d:/rpc/connection.h diff --git a/rpc/connection.h b/rpc/connection.h index 16b0398..03e92da 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -1,104 +1,82 @@ #ifndef connection_h #define connection_h -#include -#include +#include "types.h" #include #include -#include -#include - -#include - -#include "pollmgr.h" - -class thread_exit_exception : std::exception { -}; +#include "t4.h" +#include "poll_mgr.h" +#include "file.h" +#include "threaded_log.h" class connection; -class chanmgr { +class connection_delegate { public: - virtual bool got_pdu(connection *c, char *b, int sz) = 0; - virtual ~chanmgr() {} + virtual bool got_pdu(const shared_ptr & c, const string & b) = 0; + virtual ~connection_delegate(); }; -class connection : public aio_callback { - public: - struct charbuf { - charbuf(): buf(NULL), sz(0), solong(0) {} - charbuf (char *b, int s) : buf(b), sz(s), solong(0){} - char *buf; - int sz; - int solong; //amount of bytes written or read so far - }; +using std::chrono::steady_clock; +using time_point = std::chrono::time_point; - connection(chanmgr *m1, int f1, int lossytest=0); +class connection : private aio_callback, public std::enable_shared_from_this { + public: + connection(connection_delegate * delegate, socket_t && f1, int lossytest=0); ~connection(); - int channo() { return fd_; } - bool isdead(); - void closeconn(); + bool isdead() { return dead_; } - bool send(char *b, int sz); - void write_cb(int s); - void read_cb(int s); + bool send(const string & b); + + static shared_ptr to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0); - void incref(); - void decref(); - int ref(); + const time_point create_time = steady_clock::now(); + const file_t fd; - int compare(connection *another); private: + void write_cb(int s); + void read_cb(int s); bool readpdu(); bool writepdu(); - chanmgr *mgr_; - const int fd_; - bool dead_; + connection_delegate * delegate_; + bool dead_ = false; + + enum charbuf_status_t { unused, inflight, error }; - charbuf wpdu_; - charbuf rpdu_; + struct charbuf { + charbuf_status_t status; + string buf; + size_t cursor; // number of bytes written or read so far + }; - std::chrono::time_point create_time_; + charbuf wpdu_ = {unused, "", 0}; + charbuf rpdu_ = {unused, "", 0}; - int waiters_; - int refno_; - const int lossy_; + int lossy_ = 0; std::mutex m_; - std::mutex ref_m_; - std::condition_variable send_complete_; - std::condition_variable send_wait_; + cond send_complete_; + cond send_wait_; }; -class tcpsconn { +class connection_listener : private aio_callback { public: - tcpsconn(chanmgr *m1, int port, int lossytest=0); - ~tcpsconn(); - inline int port() { return port_; } - void accept_conn(); + connection_listener(connection_delegate * delegate, in_port_t port, int lossytest=0); + ~connection_listener(); + inline in_port_t port() { return port_; } private: - int port_; + void write_cb(int) {} + void read_cb(int s); + + in_port_t port_; std::mutex m_; - std::thread th_; - int pipe_[2]; - int tcp_; //file desciptor for accepting connection - chanmgr *mgr_; + socket_t tcp_; // listens for connections + connection_delegate * delegate_; int lossy_; - std::map conns_; - - void process_accept(); + std::map> conns_; }; - -struct bundle { - bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} - chanmgr *mgr; - int tcp; - int lossy; -}; - -connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); #endif