typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-unsigned int lock_client::last_port = 0;
+in_port_t lock_client::last_port = 0;
lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
return lock_table[lid]; // creates the lock if it doesn't already exist
}
-lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) {
+lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
cl = new rpcc(xdst);
if (cl->bind() < 0)
LOG("lock_client: call bind");
rpcc *cl;
thread releaser_thread;
rsm_client *rsmc;
- class lock_release_user *lu;
- unsigned int rlock_port;
+ lock_release_user *lu;
+ in_port_t rlock_port;
string hostname;
string id;
mutex xid_mutex;
lock_map lock_table;
lock_state &get_lock_state(lock_protocol::lockid_t lid);
public:
- static unsigned int last_port;
- lock_client(string xdst, class lock_release_user *l = 0);
+ static in_port_t last_port;
+ lock_client(string xdst, lock_release_user *l = 0);
~lock_client() {}
lock_protocol::status acquire(lock_protocol::lockid_t);
lock_protocol::status release(lock_protocol::lockid_t);
return lock_table[lid];
}
-lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
+lock_server::lock_server(rsm *r) : rsm_ (r) {
thread(&lock_server::revoker, this).detach();
thread(&lock_server::retryer, this).detach();
- rsm->set_state_transfer(this);
+ rsm_->set_state_transfer(this);
}
void lock_server::revoker() [[noreturn]] {
lock_protocol::lockid_t lid;
revoke_fifo.deq(&lid);
LOG("Revoking " << lid);
- if (rsm && !rsm->amiprimary())
+ if (rsm_ && !rsm_->amiprimary())
continue;
lock_state &st = get_lock_state(lid);
while (1) {
lock_protocol::lockid_t lid;
retry_fifo.deq(&lid);
- if (rsm && !rsm->amiprimary())
+ if (rsm_ && !rsm_->amiprimary())
continue;
LOG("Sending retry for " << lid);
lock_state &get_lock_state(lock_protocol::lockid_t lid);
fifo<lock_protocol::lockid_t> retry_fifo;
fifo<lock_protocol::lockid_t> revoke_fifo;
- class rsm *rsm;
+ rsm *rsm_;
public:
- lock_server(class rsm *rsm = 0);
+ lock_server(rsm *r = 0);
lock_protocol::status stat(int &, lock_protocol::lockid_t);
void revoker();
void retryer();
// paxos_commit to inform higher layers of the agreed value for this
// instance.
-proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
bool _first, const node_t & _me, const value_t & _value)
: delegate(_delegate), me (_me)
{
paxos_change *delegate;
node_t me;
- rpcs pxs = {(uint32_t)stoi(me)};
+ rpcs pxs{(in_port_t)stoi(me)};
bool break1 = false;
bool break2 = false;
-// std::bind and syscall bind have the same name, so don't use std::bind in this file
-#define LIBT4_NO_FUNCTIONAL
#include "connection.h"
+#include "rpc_protocol.h"
#include <cerrno>
#include <csignal>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/socket.h>
-
-#define MAX_PDU (10<<20) //maximum PDF is 10M
+#include "marshall.h"
connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), lossy_(l1)
if (lossy_) {
if ((random()%100) < lossy_) {
- IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+ IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
shutdown(fd_,SHUT_RDWR);
}
}
return true;
if (wpdu_.solong == 0) {
- uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
+ rpc_sz_t sz = hton((rpc_sz_t)(wpdu_.buf.size() - sizeof(uint32_t)));
copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
}
ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
if (n < 0) {
if (errno != EAGAIN) {
- IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+ IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
wpdu_.solong = size_t_max;
wpdu_.buf.clear();
}
bool connection::readpdu() {
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
- uint32_t sz1;
+ rpc_sz_t sz1;
ssize_t n = read(fd_, &sz1, sizeof(sz1));
if (n == 0) {
return false;
}
- size_t sz = ntohl(sz1);
+ size_t sz = ntoh(sz1);
if (sz > MAX_PDU) {
IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
return true;
}
-tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
: mgr_(m1), lossy_(lossytest)
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
- sin.sin_port = htons(port);
+ sin.sin_port = hton(port);
tcp_ = socket(AF_INET, SOCK_STREAM, 0);
if (tcp_ < 0) {
setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
- if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+ // careful to exactly match type signature of bind arguments so we don't
+ // get std::bind instead
+ if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
perror("accept_loop tcp bind:");
VERIFY(0);
}
socklen_t addrlen = sizeof(sin);
VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
- port_ = ntohs(sin.sin_port);
+ port_ = ntoh(sin.sin_port);
IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
throw thread_exit_exception();
}
- IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+ IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
connection *ch = new connection(mgr_, s1, lossy_);
// garbage collect all dead connections with refcount of 1
continue;
} else {
perror("accept_conn select:");
- IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
+ IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
VERIFY(0);
}
}
int yes = 1;
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
- IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+ IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
close(s);
return NULL;
}
- IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+ IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
return new connection(mgr, s, lossy);
}
class tcpsconn {
public:
- tcpsconn(chanmgr *m1, unsigned int port, int lossytest=0);
+ tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
~tcpsconn();
- inline unsigned int port() { return port_; }
+ inline in_port_t port() { return port_; }
void accept_conn();
private:
- unsigned int port_;
+ in_port_t port_;
mutex m_;
thread th_;
int pipe_[2];
#define marshall_h
#include "types.h"
+#include "rpc_protocol.h"
// for structs or classes containing a MEMBERS declaration
class marshall;
#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
-using proc_t = uint32_t;
-using status_t = int32_t;
-
-struct request_header {
- int xid;
- proc_t proc;
- unsigned int clt_nonce;
- unsigned int srv_nonce;
- int xid_rep;
-
- MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
-};
-
FORWARD_MARSHALLABLE(request_header)
ENDIAN_SWAPPABLE(request_header)
-struct reply_header {
- int xid;
- int ret;
-
- MEMBERS(xid, ret)
-};
-
FORWARD_MARSHALLABLE(reply_header)
ENDIAN_SWAPPABLE(reply_header)
-typedef int rpc_sz_t;
-
-const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
-const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
-
// Template parameter pack expansion is not allowed in certain contexts, but
// brace initializers (for instance, calls to constructors of empty structs)
// are fair game.
return;
} else {
perror("select:");
- IF_LEVEL(0) LOG("PollMgr::select_loop failure errno " << errno);
+ IF_LEVEL(0) LOG("select_loop failure errno " << errno);
VERIFY(0);
}
}
IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
" xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
- ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
+ ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
if(ch)
ch->decref();
}
}
-rpcs::rpcs(unsigned int p1, size_t count)
+rpcs::rpcs(in_port_t p1, size_t count)
: port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
{
set_rand_seed();
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
dst.sin_addr.s_addr = a.s_addr;
}
- dst.sin_port = hton((uint16_t)stoi(port));
+ dst.sin_port = hton((in_port_t)stoi(port));
return dst;
}
string buf; // the reply buffer
};
- unsigned int port_;
+ in_port_t port_;
unsigned int nonce_;
// provide at most once semantics by maintaining a window of replies
tcpsconn *listener_;
public:
- rpcs(unsigned int port, size_t counts=0);
+ rpcs(in_port_t port, size_t counts=0);
~rpcs();
- inline unsigned int port() { return listener_->port(); }
+ inline in_port_t port() { return listener_->port(); }
//RPC handler for clients binding
int rpcbind(unsigned int &r, int a);
--- /dev/null
+#ifndef rpc_protocol_h
+#define rpc_protocol_h
+
+#include "types.h"
+
+using proc_t = uint32_t;
+using status_t = int32_t;
+using rpc_sz_t = uint32_t;
+
+struct request_header {
+ int xid;
+ proc_t proc;
+ unsigned int clt_nonce;
+ unsigned int srv_nonce;
+ int xid_rep;
+
+ MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
+};
+
+struct reply_header {
+ int xid;
+ int ret;
+
+ MEMBERS(xid, ret)
+};
+
+const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
+const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+
+#endif
rpcs *server; // server rpc object
rpcc *clients[NUM_CL]; // client rpc object
string dst; //server's ip address
-int port;
+in_port_t port;
// server-side handlers. they must be methods of some class
// to simplify rpcs::reg(). a server process can have handlers
void startserver()
{
- server = new rpcs((unsigned int)port);
+ server = new rpcs(port);
server->reg(22, &srv::handle_22, &service);
server->reg(23, &srv::handle_fast, &service);
server->reg(24, &srv::handle_slow, &service);
debug_level = atoi(optarg);
break;
case 'p':
- port = atoi(optarg);
+ port = (in_port_t)atoi(optarg);
break;
case 'l':
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
- last_myvs.vid = 0;
- last_myvs.seqno = 0;
- myvs = last_myvs;
- myvs.seqno = 1;
-
cfg = new config(_first, _me, this);
if (_first == _me) {
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = new rpcs((uint32_t)stoi(_me) + 1);
+ testsvr = new rpcs((in_port_t)stoi(_me) + 1);
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
protected:
map<int, handler *> procs;
config *cfg;
- class rsm_state_transfer *stf;
+ rsm_state_transfer *stf = nullptr;
rpcs *rsmrpc;
// On slave: expected viewstamp of next invoke request
// On primary: viewstamp for the next request from rsm_client
- viewstamp myvs;
- viewstamp last_myvs; // Viewstamp of the last executed request
+ viewstamp last_myvs{0, 0}; // Viewstamp of the last executed request
+ viewstamp myvs{0, 1};
string primary;
bool insync;
bool inviewchange;
};
struct viewstamp {
- viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : vid(_vid), seqno(_seqno) {}
unsigned int vid;
unsigned int seqno;
inline void operator++(int) { seqno++; }