X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/dfe8486473094c0769fd1922329c3f0dfd8f43c0..e478ac59e66e89cbc174e781ac715c8644539947:/rpc/connection.h diff --git a/rpc/connection.h b/rpc/connection.h index 16b0398..1eb625b 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -1,36 +1,29 @@ #ifndef connection_h #define connection_h +#include "types.h" #include -#include #include #include -#include -#include - -#include - #include "pollmgr.h" -class thread_exit_exception : std::exception { -}; +constexpr size_t size_t_max = numeric_limits::max(); + +class thread_exit_exception : exception {}; class connection; class chanmgr { public: - virtual bool got_pdu(connection *c, char *b, int sz) = 0; + virtual bool got_pdu(connection *c, const string & b) = 0; virtual ~chanmgr() {} }; class connection : public aio_callback { public: struct charbuf { - charbuf(): buf(NULL), sz(0), solong(0) {} - charbuf (char *b, int s) : buf(b), sz(s), solong(0){} - char *buf; - int sz; - int solong; //amount of bytes written or read so far + string buf; + size_t solong = 0; // number of bytes written or read so far }; connection(chanmgr *m1, int f1, int lossytest=0); @@ -40,15 +33,16 @@ class connection : public aio_callback { bool isdead(); void closeconn(); - bool send(char *b, int sz); + bool send(const string & b); void write_cb(int s); void read_cb(int s); void incref(); void decref(); - int ref(); + int ref() { lock rl(ref_m_); return refno_; } int compare(connection *another); + private: bool readpdu(); @@ -56,39 +50,39 @@ class connection : public aio_callback { chanmgr *mgr_; const int fd_; - bool dead_; + bool dead_ = false; charbuf wpdu_; charbuf rpdu_; - std::chrono::time_point create_time_; + time_point create_time_; - int waiters_; - int refno_; - const int lossy_; + int waiters_ = 0; + int refno_ = 1; + int lossy_ = 0; - std::mutex m_; - std::mutex ref_m_; - std::condition_variable send_complete_; - std::condition_variable send_wait_; + mutex m_; + mutex ref_m_; + cond send_complete_; + cond send_wait_; }; class tcpsconn { public: - tcpsconn(chanmgr *m1, int port, int lossytest=0); + tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0); ~tcpsconn(); - inline int port() { return port_; } + inline in_port_t port() { return port_; } void accept_conn(); private: - int port_; - std::mutex m_; - std::thread th_; + in_port_t port_; + mutex m_; + thread th_; int pipe_[2]; int tcp_; //file desciptor for accepting connection chanmgr *mgr_; int lossy_; - std::map conns_; + map conns_; void process_accept(); };