Got rid of most using directives. Ported tests to python.
[invirt/third/libt4.git] / rpc / connection.h
index da48cf4..68bd902 100644 (file)
 #ifndef connection_h
-#define connection_h 1
+#define connection_h
 
-#include <sys/types.h>
-#include <sys/socket.h>
+#include "types.h"
 #include <arpa/inet.h>
 #include <netinet/in.h>
-#include <cstddef>
-
-#include <map>
-
-#include "pollmgr.h"
+#include "poll_mgr.h"
+#include "file.h"
 
 class connection;
 
-class chanmgr {
-       public:
-               virtual bool got_pdu(connection *c, char *b, int sz) = 0;
-               virtual ~chanmgr() {}
+class connection_delegate {
+    public:
+        virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
+        virtual ~connection_delegate();
 };
 
-class connection : public aio_callback {
-       public:
-               struct charbuf {
-                       charbuf(): buf(NULL), sz(0), solong(0) {}
-                       charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
-                       char *buf;
-                       int sz;
-                       int solong; //amount of bytes written or read so far
-               };
-
-               connection(chanmgr *m1, int f1, int lossytest=0);
-               ~connection();
-
-               int channo() { return fd_; }
-               bool isdead();
-               void closeconn();
-
-               bool send(char *b, int sz);
-               void write_cb(int s);
-               void read_cb(int s);
-
-               void incref();
-               void decref();
-               int ref();
-                
-                int compare(connection *another);
-       private:
-
-               bool readpdu();
-               bool writepdu();
-
-               chanmgr *mgr_;
-               const int fd_;
-               bool dead_;
-
-               charbuf wpdu_;
-               charbuf rpdu_;
-                
-                struct timeval create_time_;
-
-               int waiters_;
-               int refno_;
-               const int lossy_;
-
-               pthread_mutex_t m_;
-               pthread_mutex_t ref_m_;
-               pthread_cond_t send_complete_;
-               pthread_cond_t send_wait_;
-};
+using std::chrono::steady_clock;
+using time_point = std::chrono::time_point<steady_clock>;
 
-class tcpsconn {
-       public:
-               tcpsconn(chanmgr *m1, int port, int lossytest=0);
-               ~tcpsconn();
-                inline int port() { return port_; }
-               void accept_conn();
-       private:
-                int port_;
-               pthread_mutex_t m_;
-               pthread_t th_;
-               int pipe_[2];
-
-               int tcp_; //file desciptor for accepting connection
-               chanmgr *mgr_;
-               int lossy_;
-               std::map<int, connection *> conns_;
-
-               void process_accept();
-};
+class connection : private aio_callback, public std::enable_shared_from_this<connection> {
+    public:
+        connection(connection_delegate * delegate, socket_t && f1, int lossytest=0);
+        ~connection();
+
+        bool isdead() { return dead_; }
+
+        bool send(const string & b);
+
+        static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
+
+        const time_point create_time = steady_clock::now();
+        const file_t fd;
 
-struct bundle {
-       bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
-       chanmgr *mgr;
-       int tcp;
-       int lossy;
+    private:
+        void write_cb(int s);
+        void read_cb(int s);
+
+        bool readpdu();
+        bool writepdu();
+
+        connection_delegate * delegate_;
+        bool dead_ = false;
+
+        enum charbuf_status_t { unused, inflight, error };
+
+        struct charbuf {
+            charbuf_status_t status;
+            string buf;
+            size_t cursor; // number of bytes written or read so far
+        };
+
+        charbuf wpdu_ = {unused, "", 0};
+        charbuf rpdu_ = {unused, "", 0};
+
+        int lossy_ = 0;
+
+        std::mutex m_;
+        cond send_complete_;
+        cond send_wait_;
 };
 
-void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0);
-connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
+class connection_listener : private aio_callback {
+    public:
+        connection_listener(connection_delegate * delegate, in_port_t port, int lossytest=0);
+        ~connection_listener();
+        inline in_port_t port() { return port_; }
+    private:
+        void write_cb(int) {}
+        void read_cb(int s);
+
+        in_port_t port_;
+        std::mutex m_;
+
+        socket_t tcp_; // listens for connections
+        connection_delegate * delegate_;
+        int lossy_;
+        std::map<int, shared_ptr<connection>> conns_;
+};
 #endif