Clean-ups to types.
authorPeter Iannucci <iannucci@mit.edu>
Mon, 30 Sep 2013 14:55:25 +0000 (10:55 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Mon, 30 Sep 2013 14:57:40 +0000 (10:57 -0400)
17 files changed:
lock_client.cc
lock_client.h
lock_server.cc
lock_server.h
paxos.cc
paxos.h
rpc/connection.cc
rpc/connection.h
rpc/marshall.h
rpc/pollmgr.cc
rpc/rpc.cc
rpc/rpc.h
rpc/rpc_protocol.h [new file with mode: 0644]
rpc/rpctest.cc
rsm.cc
rsm.h
rsm_protocol.h

index de357f1..0b071f5 100644 (file)
@@ -22,14 +22,14 @@ void lock_state::signal(thread::id who) {
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 
-unsigned int lock_client::last_port = 0;
+in_port_t lock_client::last_port = 0;
 
 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
     return lock_table[lid]; // creates the lock if it doesn't already exist
 }
 
-lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) {
+lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
     cl = new rpcc(xdst);
     if (cl->bind() < 0)
         LOG("lock_client: call bind");
index 36ee3a2..5db2cbf 100644 (file)
@@ -45,8 +45,8 @@ class lock_client {
         rpcc *cl;
         thread releaser_thread;
         rsm_client *rsmc;
-        class lock_release_user *lu;
-        unsigned int rlock_port;
+        lock_release_user *lu;
+        in_port_t rlock_port;
         string hostname;
         string id;
         mutex xid_mutex;
@@ -56,8 +56,8 @@ class lock_client {
         lock_map lock_table;
         lock_state &get_lock_state(lock_protocol::lockid_t lid);
     public:
-        static unsigned int last_port;
-        lock_client(string xdst, class lock_release_user *l = 0);
+        static in_port_t last_port;
+        lock_client(string xdst, lock_release_user *l = 0);
         ~lock_client() {}
         lock_protocol::status acquire(lock_protocol::lockid_t);
         lock_protocol::status release(lock_protocol::lockid_t);
index b724140..81cd805 100644 (file)
@@ -29,10 +29,10 @@ lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
     return lock_table[lid];
 }
 
-lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
+lock_server::lock_server(rsm *r) : rsm_ (r) {
     thread(&lock_server::revoker, this).detach();
     thread(&lock_server::retryer, this).detach();
-    rsm->set_state_transfer(this);
+    rsm_->set_state_transfer(this);
 }
 
 void lock_server::revoker() [[noreturn]] {
@@ -40,7 +40,7 @@ void lock_server::revoker() [[noreturn]] {
         lock_protocol::lockid_t lid;
         revoke_fifo.deq(&lid);
         LOG("Revoking " << lid);
-        if (rsm && !rsm->amiprimary())
+        if (rsm_ && !rsm_->amiprimary())
             continue;
 
         lock_state &st = get_lock_state(lid);
@@ -67,7 +67,7 @@ void lock_server::retryer() [[noreturn]] {
     while (1) {
         lock_protocol::lockid_t lid;
         retry_fifo.deq(&lid);
-        if (rsm && !rsm->amiprimary())
+        if (rsm_ && !rsm_->amiprimary())
             continue;
 
         LOG("Sending retry for " << lid);
index 5c182e0..560167f 100644 (file)
@@ -35,9 +35,9 @@ class lock_server : public rsm_state_transfer {
         lock_state &get_lock_state(lock_protocol::lockid_t lid);
         fifo<lock_protocol::lockid_t> retry_fifo;
         fifo<lock_protocol::lockid_t> revoke_fifo;
-        class rsm *rsm;
+        rsm *rsm_;
     public:
-        lock_server(class rsm *rsm = 0);
+        lock_server(rsm *r = 0);
         lock_protocol::status stat(int &, lock_protocol::lockid_t);
         void revoker();
         void retryer();
index b39fa5b..3166c92 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -20,7 +20,7 @@ bool majority(const nodes_t &l1, const nodes_t &l2) {
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
-proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
         bool _first, const node_t & _me, const value_t & _value)
     : delegate(_delegate), me (_me)
 {
diff --git a/paxos.h b/paxos.h
index 186daab..642d3ff 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -29,7 +29,7 @@ class proposer_acceptor {
         paxos_change *delegate;
         node_t me;
 
-        rpcs pxs = {(uint32_t)stoi(me)};
+        rpcs pxs{(in_port_t)stoi(me)};
 
         bool break1 = false;
         bool break2 = false;
index 55e374a..c16f6dc 100644 (file)
@@ -1,6 +1,5 @@
-// std::bind and syscall bind have the same name, so don't use std::bind in this file
-#define LIBT4_NO_FUNCTIONAL
 #include "connection.h"
+#include "rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
 #include <fcntl.h>
@@ -8,8 +7,7 @@
 #include <netinet/tcp.h>
 #include <unistd.h>
 #include <sys/socket.h>
-
-#define MAX_PDU (10<<20) //maximum PDF is 10M
+#include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
@@ -92,7 +90,7 @@ bool connection::send(const string & b) {
 
     if (lossy_) {
         if ((random()%100) < lossy_) {
-            IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+            IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
             shutdown(fd_,SHUT_RDWR);
         }
     }
@@ -174,13 +172,13 @@ bool connection::writepdu() {
         return true;
 
     if (wpdu_.solong == 0) {
-        uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
+        rpc_sz_t sz = hton((rpc_sz_t)(wpdu_.buf.size() - sizeof(uint32_t)));
         copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
     }
     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
     if (n < 0) {
         if (errno != EAGAIN) {
-            IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+            IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
             wpdu_.solong = size_t_max;
             wpdu_.buf.clear();
         }
@@ -193,7 +191,7 @@ bool connection::writepdu() {
 bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
-        uint32_t sz1;
+        rpc_sz_t sz1;
         ssize_t n = read(fd_, &sz1, sizeof(sz1));
 
         if (n == 0) {
@@ -210,7 +208,7 @@ bool connection::readpdu() {
             return false;
         }
 
-        size_t sz = ntohl(sz1);
+        size_t sz = ntoh(sz1);
 
         if (sz > MAX_PDU) {
             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
@@ -240,13 +238,13 @@ bool connection::readpdu() {
     return true;
 }
 
-tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 : mgr_(m1), lossy_(lossytest)
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
-    sin.sin_port = htons(port);
+    sin.sin_port = hton(port);
 
     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp_ < 0) {
@@ -258,7 +256,9 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
 
-    if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+    // careful to exactly match type signature of bind arguments so we don't
+    // get std::bind instead
+    if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
         perror("accept_loop tcp bind:");
         VERIFY(0);
     }
@@ -270,7 +270,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
 
     socklen_t addrlen = sizeof(sin);
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
-    port_ = ntohs(sin.sin_port);
+    port_ = ntoh(sin.sin_port);
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
@@ -308,7 +308,7 @@ void tcpsconn::process_accept() {
         throw thread_exit_exception();
     }
 
-    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
     connection *ch = new connection(mgr_, s1, lossy_);
 
     // garbage collect all dead connections with refcount of 1
@@ -345,7 +345,7 @@ void tcpsconn::accept_conn() {
                     continue;
                 } else {
                     perror("accept_conn select:");
-                    IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
+                    IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
                     VERIFY(0);
                 }
             }
@@ -372,11 +372,11 @@ connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
     int yes = 1;
     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-        IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+        IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
         close(s);
         return NULL;
     }
-    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
     return new connection(mgr, s, lossy);
 }
 
index 882c1e0..1eb625b 100644 (file)
@@ -69,12 +69,12 @@ class connection : public aio_callback {
 
 class tcpsconn {
     public:
-        tcpsconn(chanmgr *m1, unsigned int port, int lossytest=0);
+        tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
         ~tcpsconn();
-        inline unsigned int port() { return port_; }
+        inline in_port_t port() { return port_; }
         void accept_conn();
     private:
-        unsigned int port_;
+        in_port_t port_;
         mutex m_;
         thread th_;
         int pipe_[2];
index d7f1dff..ecf16f7 100644 (file)
@@ -2,6 +2,7 @@
 #define marshall_h
 
 #include "types.h"
+#include "rpc_protocol.h"
 
 // for structs or classes containing a MEMBERS declaration
 class marshall;
@@ -20,37 +21,12 @@ unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d
 
 #define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
 
-using proc_t = uint32_t;
-using status_t = int32_t;
-
-struct request_header {
-    int xid;
-    proc_t proc;
-    unsigned int clt_nonce;
-    unsigned int srv_nonce;
-    int xid_rep;
-
-    MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
-};
-
 FORWARD_MARSHALLABLE(request_header)
 ENDIAN_SWAPPABLE(request_header)
 
-struct reply_header {
-    int xid;
-    int ret;
-
-    MEMBERS(xid, ret)
-};
-
 FORWARD_MARSHALLABLE(reply_header)
 ENDIAN_SWAPPABLE(reply_header)
 
-typedef int rpc_sz_t;
-
-const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
-const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
-
 // Template parameter pack expansion is not allowed in certain contexts, but
 // brace initializers (for instance, calls to constructors of empty structs)
 // are fair game.  
index 4254b4f..a938284 100644 (file)
@@ -225,7 +225,7 @@ SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
             return;
         } else {
             perror("select:");
-            IF_LEVEL(0) LOG("PollMgr::select_loop failure errno " << errno);
+            IF_LEVEL(0) LOG("select_loop failure errno " << errno);
             VERIFY(0);
         }
     }
index 62003dd..32b25ab 100644 (file)
@@ -262,7 +262,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
 
     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
-                    ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
+                    ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
     if(ch)
         ch->decref();
@@ -353,7 +353,7 @@ compress:
     }
 }
 
-rpcs::rpcs(unsigned int p1, size_t count)
+rpcs::rpcs(in_port_t p1, size_t count)
   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
 {
     set_rand_seed();
@@ -682,6 +682,6 @@ static sockaddr_in make_sockaddr(const string &host, const string &port) {
         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
         dst.sin_addr.s_addr = a.s_addr;
     }
-    dst.sin_port = hton((uint16_t)stoi(port));
+    dst.sin_port = hton((in_port_t)stoi(port));
     return dst;
 }
index 5dabe4b..2b32e28 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -163,7 +163,7 @@ class rpcs : public chanmgr {
         string buf;      // the reply buffer
     };
 
-    unsigned int port_;
+    in_port_t port_;
     unsigned int nonce_;
 
     // provide at most once semantics by maintaining a window of replies
@@ -213,9 +213,9 @@ class rpcs : public chanmgr {
     tcpsconn *listener_;
 
     public:
-    rpcs(unsigned int port, size_t counts=0);
+    rpcs(in_port_t port, size_t counts=0);
     ~rpcs();
-    inline unsigned int port() { return listener_->port(); }
+    inline in_port_t port() { return listener_->port(); }
     //RPC handler for clients binding
     int rpcbind(unsigned int &r, int a);
 
diff --git a/rpc/rpc_protocol.h b/rpc/rpc_protocol.h
new file mode 100644 (file)
index 0000000..2ef9ab2
--- /dev/null
@@ -0,0 +1,31 @@
+#ifndef rpc_protocol_h
+#define rpc_protocol_h
+
+#include "types.h"
+
+using proc_t = uint32_t;
+using status_t = int32_t;
+using rpc_sz_t = uint32_t;
+
+struct request_header {
+    int xid;
+    proc_t proc;
+    unsigned int clt_nonce;
+    unsigned int srv_nonce;
+    int xid_rep;
+
+    MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
+};
+
+struct reply_header {
+    int xid;
+    int ret;
+
+    MEMBERS(xid, ret)
+};
+
+const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
+const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+
+#endif
index b90d19a..47d5bce 100644 (file)
@@ -15,7 +15,7 @@ char log_thread_prefix = 'r';
 rpcs *server;  // server rpc object
 rpcc *clients[NUM_CL];  // client rpc object
 string dst; //server's ip address
-int port;
+in_port_t port;
 
 // server-side handlers. they must be methods of some class
 // to simplify rpcs::reg(). a server process can have handlers
@@ -68,7 +68,7 @@ srv service;
 
 void startserver()
 {
-    server = new rpcs((unsigned int)port);
+    server = new rpcs(port);
     server->reg(22, &srv::handle_22, &service);
     server->reg(23, &srv::handle_fast, &service);
     server->reg(24, &srv::handle_slow, &service);
@@ -371,7 +371,7 @@ main(int argc, char *argv[])
                 debug_level = atoi(optarg);
                 break;
             case 'p':
-                port = atoi(optarg);
+                port = (in_port_t)atoi(optarg);
                 break;
             case 'l':
                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
diff --git a/rsm.cc b/rsm.cc
index 843418a..0c2e5bf 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -90,11 +90,6 @@ rsm::rsm(string _first, string _me) :
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
     partitioned (false), dopartition(false), break1(false), break2(false)
 {
-    last_myvs.vid = 0;
-    last_myvs.seqno = 0;
-    myvs = last_myvs;
-    myvs.seqno = 1;
-
     cfg = new config(_first, _me, this);
 
     if (_first == _me) {
@@ -111,7 +106,7 @@ rsm::rsm(string _first, string _me) :
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr = new rpcs((uint32_t)stoi(_me) + 1);
+    testsvr = new rpcs((in_port_t)stoi(_me) + 1);
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 
diff --git a/rsm.h b/rsm.h
index 18ac8af..ef919ff 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -22,12 +22,12 @@ class rsm : public config_view_change {
     protected:
         map<int, handler *> procs;
         config *cfg;
-        class rsm_state_transfer *stf;
+        rsm_state_transfer *stf = nullptr;
         rpcs *rsmrpc;
         // On slave: expected viewstamp of next invoke request
         // On primary: viewstamp for the next request from rsm_client
-        viewstamp myvs;
-        viewstamp last_myvs;   // Viewstamp of the last executed request
+        viewstamp last_myvs{0, 0};   // Viewstamp of the last executed request
+        viewstamp myvs{0, 1};
         string primary;
         bool insync;
         bool inviewchange;
index 1601bcb..a2d13c2 100644 (file)
@@ -14,7 +14,6 @@ class rsm_client_protocol {
 };
 
 struct viewstamp {
-    viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : vid(_vid), seqno(_seqno) {}
     unsigned int vid;
     unsigned int seqno;
     inline void operator++(int) { seqno++; }