From: Peter Iannucci Date: Mon, 30 Sep 2013 14:55:25 +0000 (-0400) Subject: Clean-ups to types. X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/ba03b19875aa2e3586e49b10904563cdd3b91de0 Clean-ups to types. --- diff --git a/lock_client.cc b/lock_client.cc index de357f1..0b071f5 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -22,14 +22,14 @@ void lock_state::signal(thread::id who) { typedef map lock_map; -unsigned int lock_client::last_port = 0; +in_port_t lock_client::last_port = 0; lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); return lock_table[lid]; // creates the lock if it doesn't already exist } -lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) { +lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) { cl = new rpcc(xdst); if (cl->bind() < 0) LOG("lock_client: call bind"); diff --git a/lock_client.h b/lock_client.h index 36ee3a2..5db2cbf 100644 --- a/lock_client.h +++ b/lock_client.h @@ -45,8 +45,8 @@ class lock_client { rpcc *cl; thread releaser_thread; rsm_client *rsmc; - class lock_release_user *lu; - unsigned int rlock_port; + lock_release_user *lu; + in_port_t rlock_port; string hostname; string id; mutex xid_mutex; @@ -56,8 +56,8 @@ class lock_client { lock_map lock_table; lock_state &get_lock_state(lock_protocol::lockid_t lid); public: - static unsigned int last_port; - lock_client(string xdst, class lock_release_user *l = 0); + static in_port_t last_port; + lock_client(string xdst, lock_release_user *l = 0); ~lock_client() {} lock_protocol::status acquire(lock_protocol::lockid_t); lock_protocol::status release(lock_protocol::lockid_t); diff --git a/lock_server.cc b/lock_server.cc index b724140..81cd805 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -29,10 +29,10 @@ lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { return lock_table[lid]; } -lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) { +lock_server::lock_server(rsm *r) : rsm_ (r) { thread(&lock_server::revoker, this).detach(); thread(&lock_server::retryer, this).detach(); - rsm->set_state_transfer(this); + rsm_->set_state_transfer(this); } void lock_server::revoker() [[noreturn]] { @@ -40,7 +40,7 @@ void lock_server::revoker() [[noreturn]] { lock_protocol::lockid_t lid; revoke_fifo.deq(&lid); LOG("Revoking " << lid); - if (rsm && !rsm->amiprimary()) + if (rsm_ && !rsm_->amiprimary()) continue; lock_state &st = get_lock_state(lid); @@ -67,7 +67,7 @@ void lock_server::retryer() [[noreturn]] { while (1) { lock_protocol::lockid_t lid; retry_fifo.deq(&lid); - if (rsm && !rsm->amiprimary()) + if (rsm_ && !rsm_->amiprimary()) continue; LOG("Sending retry for " << lid); diff --git a/lock_server.h b/lock_server.h index 5c182e0..560167f 100644 --- a/lock_server.h +++ b/lock_server.h @@ -35,9 +35,9 @@ class lock_server : public rsm_state_transfer { lock_state &get_lock_state(lock_protocol::lockid_t lid); fifo retry_fifo; fifo revoke_fifo; - class rsm *rsm; + rsm *rsm_; public: - lock_server(class rsm *rsm = 0); + lock_server(rsm *r = 0); lock_protocol::status stat(int &, lock_protocol::lockid_t); void revoker(); void retryer(); diff --git a/paxos.cc b/paxos.cc index b39fa5b..3166c92 100644 --- a/paxos.cc +++ b/paxos.cc @@ -20,7 +20,7 @@ bool majority(const nodes_t &l1, const nodes_t &l2) { // paxos_commit to inform higher layers of the agreed value for this // instance. -proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, +proposer_acceptor::proposer_acceptor(paxos_change *_delegate, bool _first, const node_t & _me, const value_t & _value) : delegate(_delegate), me (_me) { diff --git a/paxos.h b/paxos.h index 186daab..642d3ff 100644 --- a/paxos.h +++ b/paxos.h @@ -29,7 +29,7 @@ class proposer_acceptor { paxos_change *delegate; node_t me; - rpcs pxs = {(uint32_t)stoi(me)}; + rpcs pxs{(in_port_t)stoi(me)}; bool break1 = false; bool break2 = false; diff --git a/rpc/connection.cc b/rpc/connection.cc index 55e374a..c16f6dc 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,6 +1,5 @@ -// std::bind and syscall bind have the same name, so don't use std::bind in this file -#define LIBT4_NO_FUNCTIONAL #include "connection.h" +#include "rpc_protocol.h" #include #include #include @@ -8,8 +7,7 @@ #include #include #include - -#define MAX_PDU (10<<20) //maximum PDF is 10M +#include "marshall.h" connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), lossy_(l1) @@ -92,7 +90,7 @@ bool connection::send(const string & b) { if (lossy_) { if ((random()%100) < lossy_) { - IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_); + IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_); shutdown(fd_,SHUT_RDWR); } } @@ -174,13 +172,13 @@ bool connection::writepdu() { return true; if (wpdu_.solong == 0) { - uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t)); + rpc_sz_t sz = hton((rpc_sz_t)(wpdu_.buf.size() - sizeof(uint32_t))); copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]); } ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { - IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno); + IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno); wpdu_.solong = size_t_max; wpdu_.buf.clear(); } @@ -193,7 +191,7 @@ bool connection::writepdu() { bool connection::readpdu() { IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); if (!rpdu_.buf.size()) { - uint32_t sz1; + rpc_sz_t sz1; ssize_t n = read(fd_, &sz1, sizeof(sz1)); if (n == 0) { @@ -210,7 +208,7 @@ bool connection::readpdu() { return false; } - size_t sz = ntohl(sz1); + size_t sz = ntoh(sz1); if (sz > MAX_PDU) { IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); @@ -240,13 +238,13 @@ bool connection::readpdu() { return true; } -tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) : mgr_(m1), lossy_(lossytest) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; - sin.sin_port = htons(port); + sin.sin_port = hton(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); if (tcp_ < 0) { @@ -258,7 +256,9 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { + // careful to exactly match type signature of bind arguments so we don't + // get std::bind instead + if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { perror("accept_loop tcp bind:"); VERIFY(0); } @@ -270,7 +270,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) socklen_t addrlen = sizeof(sin); VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); - port_ = ntohs(sin.sin_port); + port_ = ntoh(sin.sin_port); IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); @@ -308,7 +308,7 @@ void tcpsconn::process_accept() { throw thread_exit_exception(); } - IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port)); + IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); // garbage collect all dead connections with refcount of 1 @@ -345,7 +345,7 @@ void tcpsconn::accept_conn() { continue; } else { perror("accept_conn select:"); - IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno); + IF_LEVEL(0) LOG("accept_conn failure errno " << errno); VERIFY(0); } } @@ -372,11 +372,11 @@ connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); close(s); return NULL; } - IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); return new connection(mgr, s, lossy); } diff --git a/rpc/connection.h b/rpc/connection.h index 882c1e0..1eb625b 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -69,12 +69,12 @@ class connection : public aio_callback { class tcpsconn { public: - tcpsconn(chanmgr *m1, unsigned int port, int lossytest=0); + tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0); ~tcpsconn(); - inline unsigned int port() { return port_; } + inline in_port_t port() { return port_; } void accept_conn(); private: - unsigned int port_; + in_port_t port_; mutex m_; thread th_; int pipe_[2]; diff --git a/rpc/marshall.h b/rpc/marshall.h index d7f1dff..ecf16f7 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -2,6 +2,7 @@ #define marshall_h #include "types.h" +#include "rpc_protocol.h" // for structs or classes containing a MEMBERS declaration class marshall; @@ -20,37 +21,12 @@ unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d #define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_) -using proc_t = uint32_t; -using status_t = int32_t; - -struct request_header { - int xid; - proc_t proc; - unsigned int clt_nonce; - unsigned int srv_nonce; - int xid_rep; - - MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep) -}; - FORWARD_MARSHALLABLE(request_header) ENDIAN_SWAPPABLE(request_header) -struct reply_header { - int xid; - int ret; - - MEMBERS(xid, ret) -}; - FORWARD_MARSHALLABLE(reply_header) ENDIAN_SWAPPABLE(reply_header) -typedef int rpc_sz_t; - -const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); -const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation - // Template parameter pack expansion is not allowed in certain contexts, but // brace initializers (for instance, calls to constructors of empty structs) // are fair game. diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 4254b4f..a938284 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -225,7 +225,7 @@ SelectAIO::wait_ready(vector *readable, vector *writable) return; } else { perror("select:"); - IF_LEVEL(0) LOG("PollMgr::select_loop failure errno " << errno); + IF_LEVEL(0) LOG("select_loop failure errno " << errno); VERIFY(0); } } diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 62003dd..32b25ab 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -262,7 +262,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) { IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << - ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); + ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); if(ch) ch->decref(); @@ -353,7 +353,7 @@ compress: } } -rpcs::rpcs(unsigned int p1, size_t count) +rpcs::rpcs(in_port_t p1, size_t count) : port_(p1), counting_(count), curr_counts_(count), reachable_ (true) { set_rand_seed(); @@ -682,6 +682,6 @@ static sockaddr_in make_sockaddr(const string &host, const string &port) { memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); dst.sin_addr.s_addr = a.s_addr; } - dst.sin_port = hton((uint16_t)stoi(port)); + dst.sin_port = hton((in_port_t)stoi(port)); return dst; } diff --git a/rpc/rpc.h b/rpc/rpc.h index 5dabe4b..2b32e28 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -163,7 +163,7 @@ class rpcs : public chanmgr { string buf; // the reply buffer }; - unsigned int port_; + in_port_t port_; unsigned int nonce_; // provide at most once semantics by maintaining a window of replies @@ -213,9 +213,9 @@ class rpcs : public chanmgr { tcpsconn *listener_; public: - rpcs(unsigned int port, size_t counts=0); + rpcs(in_port_t port, size_t counts=0); ~rpcs(); - inline unsigned int port() { return listener_->port(); } + inline in_port_t port() { return listener_->port(); } //RPC handler for clients binding int rpcbind(unsigned int &r, int a); diff --git a/rpc/rpc_protocol.h b/rpc/rpc_protocol.h new file mode 100644 index 0000000..2ef9ab2 --- /dev/null +++ b/rpc/rpc_protocol.h @@ -0,0 +1,31 @@ +#ifndef rpc_protocol_h +#define rpc_protocol_h + +#include "types.h" + +using proc_t = uint32_t; +using status_t = int32_t; +using rpc_sz_t = uint32_t; + +struct request_header { + int xid; + proc_t proc; + unsigned int clt_nonce; + unsigned int srv_nonce; + int xid_rep; + + MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep) +}; + +struct reply_header { + int xid; + int ret; + + MEMBERS(xid, ret) +}; + +const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); +const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation +const size_t MAX_PDU = 10<<20; //maximum PDF is 10M + +#endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index b90d19a..47d5bce 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -15,7 +15,7 @@ char log_thread_prefix = 'r'; rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object string dst; //server's ip address -int port; +in_port_t port; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers @@ -68,7 +68,7 @@ srv service; void startserver() { - server = new rpcs((unsigned int)port); + server = new rpcs(port); server->reg(22, &srv::handle_22, &service); server->reg(23, &srv::handle_fast, &service); server->reg(24, &srv::handle_slow, &service); @@ -371,7 +371,7 @@ main(int argc, char *argv[]) debug_level = atoi(optarg); break; case 'p': - port = atoi(optarg); + port = (in_port_t)atoi(optarg); break; case 'l': VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); diff --git a/rsm.cc b/rsm.cc index 843418a..0c2e5bf 100644 --- a/rsm.cc +++ b/rsm.cc @@ -90,11 +90,6 @@ rsm::rsm(string _first, string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - last_myvs.vid = 0; - last_myvs.seqno = 0; - myvs = last_myvs; - myvs.seqno = 1; - cfg = new config(_first, _me, this); if (_first == _me) { @@ -111,7 +106,7 @@ rsm::rsm(string _first, string _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = new rpcs((uint32_t)stoi(_me) + 1); + testsvr = new rpcs((in_port_t)stoi(_me) + 1); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); diff --git a/rsm.h b/rsm.h index 18ac8af..ef919ff 100644 --- a/rsm.h +++ b/rsm.h @@ -22,12 +22,12 @@ class rsm : public config_view_change { protected: map procs; config *cfg; - class rsm_state_transfer *stf; + rsm_state_transfer *stf = nullptr; rpcs *rsmrpc; // On slave: expected viewstamp of next invoke request // On primary: viewstamp for the next request from rsm_client - viewstamp myvs; - viewstamp last_myvs; // Viewstamp of the last executed request + viewstamp last_myvs{0, 0}; // Viewstamp of the last executed request + viewstamp myvs{0, 1}; string primary; bool insync; bool inviewchange; diff --git a/rsm_protocol.h b/rsm_protocol.h index 1601bcb..a2d13c2 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -14,7 +14,6 @@ class rsm_client_protocol { }; struct viewstamp { - viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : vid(_vid), seqno(_seqno) {} unsigned int vid; unsigned int seqno; inline void operator++(int) { seqno++; }