X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..dfe8486473094c0769fd1922329c3f0dfd8f43c0:/rpc/connection.h?ds=sidebyside diff --git a/rpc/connection.h b/rpc/connection.h index da48cf4..16b0398 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -1,101 +1,104 @@ #ifndef connection_h -#define connection_h 1 +#define connection_h #include #include #include #include #include +#include #include #include "pollmgr.h" +class thread_exit_exception : std::exception { +}; + class connection; class chanmgr { - public: - virtual bool got_pdu(connection *c, char *b, int sz) = 0; - virtual ~chanmgr() {} + public: + virtual bool got_pdu(connection *c, char *b, int sz) = 0; + virtual ~chanmgr() {} }; class connection : public aio_callback { - public: - struct charbuf { - charbuf(): buf(NULL), sz(0), solong(0) {} - charbuf (char *b, int s) : buf(b), sz(s), solong(0){} - char *buf; - int sz; - int solong; //amount of bytes written or read so far - }; - - connection(chanmgr *m1, int f1, int lossytest=0); - ~connection(); - - int channo() { return fd_; } - bool isdead(); - void closeconn(); - - bool send(char *b, int sz); - void write_cb(int s); - void read_cb(int s); - - void incref(); - void decref(); - int ref(); - - int compare(connection *another); - private: - - bool readpdu(); - bool writepdu(); - - chanmgr *mgr_; - const int fd_; - bool dead_; - - charbuf wpdu_; - charbuf rpdu_; - - struct timeval create_time_; - - int waiters_; - int refno_; - const int lossy_; - - pthread_mutex_t m_; - pthread_mutex_t ref_m_; - pthread_cond_t send_complete_; - pthread_cond_t send_wait_; + public: + struct charbuf { + charbuf(): buf(NULL), sz(0), solong(0) {} + charbuf (char *b, int s) : buf(b), sz(s), solong(0){} + char *buf; + int sz; + int solong; //amount of bytes written or read so far + }; + + connection(chanmgr *m1, int f1, int lossytest=0); + ~connection(); + + int channo() { return fd_; } + bool isdead(); + void closeconn(); + + bool send(char *b, int sz); + void write_cb(int s); + void read_cb(int s); + + void incref(); + void decref(); + int ref(); + + int compare(connection *another); + private: + + bool readpdu(); + bool writepdu(); + + chanmgr *mgr_; + const int fd_; + bool dead_; + + charbuf wpdu_; + charbuf rpdu_; + + std::chrono::time_point create_time_; + + int waiters_; + int refno_; + const int lossy_; + + std::mutex m_; + std::mutex ref_m_; + std::condition_variable send_complete_; + std::condition_variable send_wait_; }; class tcpsconn { - public: - tcpsconn(chanmgr *m1, int port, int lossytest=0); - ~tcpsconn(); - inline int port() { return port_; } - void accept_conn(); - private: - int port_; - pthread_mutex_t m_; - pthread_t th_; - int pipe_[2]; - - int tcp_; //file desciptor for accepting connection - chanmgr *mgr_; - int lossy_; - std::map conns_; - - void process_accept(); + public: + tcpsconn(chanmgr *m1, int port, int lossytest=0); + ~tcpsconn(); + inline int port() { return port_; } + void accept_conn(); + private: + int port_; + std::mutex m_; + std::thread th_; + int pipe_[2]; + + int tcp_; //file desciptor for accepting connection + chanmgr *mgr_; + int lossy_; + std::map conns_; + + void process_accept(); }; struct bundle { - bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} - chanmgr *mgr; - int tcp; - int lossy; + bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} + chanmgr *mgr; + int tcp; + int lossy; }; -void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0); connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); #endif