all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o
rm -f $@
ar cq $@ $^
ranlib rpc/librpc.a
}
vector<string> config::members(const string &value) const {
- istringstream ist(value);
- using it = istream_iterator<string>;
- return {it(ist), it()};
+ return explode(value);
}
-string config::value(const vector<string> &m) const {
- ostringstream ost;
- copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
- return ost.str();
+string config::value(const vector<string> &members) const {
+ return implode(members);
}
void config::reconstruct(lock &cfg_mutex_lock) {
my_view_id = paxos.instance();
if (my_view_id > 0) {
get_view(my_view_id, mems, cfg_mutex_lock);
- LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
+ LOG("config::reconstruct: " << my_view_id << " " << mems);
}
}
lock cfg_mutex_lock(cfg_mutex);
vector<string> newmem = members(value);
- LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
+ LOG("config::paxos_commit: " << instance << ": " << newmem);
for (auto mem : mems) {
LOG("config::paxos_commit: is " << mem << " still a member?");
bool config::add(const string &new_m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
- if (vid != my_view_id)
+ LOG("adding " << new_m << " to " << vid);
+ if (vid != my_view_id) {
+ LOG("that's not my view id, " << my_view_id << "!");
return false;
- LOG("config::add " << new_m);
+ }
vector<string> m = mems;
m.push_back(new_m);
vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
+ LOG("calling down to paxos layer");
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, value(m));
cfg_mutex_lock.lock();
- LOG("config::add: proposer returned " << (r ? "success" : "failure"));
+ LOG("paxos proposer returned " << (r ? "success" : "failure"));
return r;
}
unsigned vid = my_view_id;
vector<string> cmems;
get_view(vid, cmems, cfg_mutex_lock);
- LOG("heartbeater: current membership " << print_members(cmems));
+ LOG("heartbeater: current membership " << cmems);
if (!isamember(me, cmems)) {
LOG("heartbeater: not member yet; skip hearbeat");
if (hmap.find(m) == hmap.end()) {
h = new hinfo(m);
hmap[m] = h;
+ h->refcnt++;
} else if (!hmap[m]->del) {
h = hmap[m];
+ h->refcnt++;
}
- h->refcnt++;
return h;
}
int main(int argc, char *argv[]) {
if(argc != 2) {
- fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
+ cerr << "Usage: " << argv[0] << " [host:]port" << endl;
return 1;
}
return *this;
}
-marshall & operator<<(marshall &m, const lock_state &d) {
- return m << d.held << d.held_by << d.wanted_by;
-}
-
-unmarshall & operator>>(unmarshall &u, lock_state &d) {
- return u >> d.held >> d.held_by >> d.wanted_by;
-}
-
lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
- // by the semantics of map, this will create
- // the lock if it doesn't already exist
+ // this will create the lock if it doesn't already exist
return lock_table[lid];
}
map<callback_t, lock_protocol::xid_t> old_requests;
mutex m;
lock_state& operator=(const lock_state&);
+
+ MEMBERS(held, held_by, wanted_by)
};
+MARSHALLABLE(lock_state)
+
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
class lock_server : public rsm_state_transfer {
srandom((uint32_t)getpid());
if(argc != 3){
- fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+ cerr << "Usage: " << argv[0] << " [master:]port [me:]port" << endl;
exit(1);
}
pxs->instance_h = instance;
LOG("logread: instance: " << instance << " w. v = " <<
pxs->values[instance]);
- pxs->v_a.clear();
- pxs->n_h.n = 0;
- pxs->n_a.n = 0;
+ pxs->accepted_value.clear();
+ pxs->promise.n = 0;
+ pxs->accepted.n = 0;
} else if (type == "propseen") {
- from >> pxs->n_h.n >> pxs->n_h.m;
- LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
+ from >> pxs->promise.n >> pxs->promise.m;
+ LOG("logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")");
} else if (type == "accepted") {
string v;
- from >> pxs->n_a.n >> pxs->n_a.m;
+ from >> pxs->accepted.n >> pxs->accepted.m;
from.get();
getline(from, v);
- pxs->v_a = v;
- LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
+ pxs->accepted_value = v;
+ LOG("logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value);
} else {
LOG("logread: unknown log record");
VERIFY(0);
f.close();
}
-// an acceptor should call logprop(n_h) when it
+// an acceptor should call logprop(promise) when it
// receives a prepare to which it responds prepare_ok().
-void log::logprop(prop_t n_h) {
+void log::logprop(prop_t promise) {
ofstream f(name, std::ios::app);
- f << "propseen " << n_h.n << " " << n_h.m << "\n";
+ f << "propseen " << promise.n << " " << promise.m << "\n";
f.close();
}
-// an acceptor should call logaccept(n_a, v_a) when it
+// an acceptor should call logaccept(accepted, accepted_value) when it
// receives an accept RPC to which it replies accept_ok().
void log::logaccept(prop_t n, string v) {
ofstream f(name, std::ios::app);
#include "paxos.h"
#include "handle.h"
-string print_members(const nodes_t &nodes) {
- ostringstream ost;
- copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
- return ost.str();
-}
-
bool isamember(const node_t & m, const nodes_t & nodes) {
return find(nodes.begin(), nodes.end(), m) != nodes.end();
}
bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
{
lock ml(proposer_mutex);
- LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
+ LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
if (!stable) { // already running proposer?
- LOG("proposer::run: already running");
+ LOG("paxos proposer already running");
return false;
}
stable = false;
bool r = false;
- my_n.n = std::max(n_h.n, my_n.n) + 1;
+ proposal.n = std::max(promise.n, proposal.n) + 1;
nodes_t accepts;
value_t v = newv;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- LOG("paxos::run: received a majority of prepare responses");
+ LOG("received a majority of prepare responses");
breakpoint1();
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- LOG("paxos::run: received a majority of accept responses");
+ LOG("received a majority of accept responses");
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- LOG("paxos::run: no majority of accept responses");
+ LOG("no majority of accept responses");
}
} else {
- LOG("paxos::run: no majority of prepare responses");
+ LOG("no majority of prepare responses");
}
} else {
- LOG("paxos::run: prepare is rejected " << stable);
+ LOG("prepare is rejected " << stable);
}
stable = true;
return r;
bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, value_t & v) {
+ LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
prepareres res;
prop_t highest_n_a{0, ""};
for (auto i : nodes) {
if (!r)
continue;
auto status = (paxos_protocol::status)r->call_timeout(
- paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
+ paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
LOG("commiting old instance!");
commit(instance, res.v_a);
return false;
}
+ LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
+ "v_a=\"" << res.v_a << "\"");
if (res.accept) {
accepts.push_back(i);
if (res.n_a >= highest_n_a) {
- LOG("found a newer accepted proposal");
+ LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
v = res.v_a;
highest_n_a = res.n_a;
}
continue;
bool accept = false;
int status = r->call_timeout(
- paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
+ paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v);
if (status == paxos_protocol::OK && accept)
accepts.push_back(i);
}
paxos_protocol::status
proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+ LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
lock ml(acceptor_mutex);
r.oldinstance = false;
r.accept = false;
- r.n_a = n_a;
- r.v_a = v_a;
+ r.n_a = accepted;
+ r.v_a = accepted_value;
if (instance <= instance_h) {
+ LOG("old instance " << instance << " has value " << values[instance]);
r.oldinstance = true;
r.v_a = values[instance];
- } else if (n > n_h) {
- n_h = n;
- l.logprop(n_h);
+ } else if (n > promise) {
+ LOG("looks good to me");
+ promise = n;
+ l.logprop(promise);
r.accept = true;
} else {
LOG("I totally rejected this request. Ha.");
}
+ LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
+ "v_a=\"" << r.v_a << "\"");
return paxos_protocol::OK;
}
lock ml(acceptor_mutex);
r = false;
if (instance == instance_h + 1) {
- if (n >= n_h) {
- n_a = n;
- v_a = v;
- l.logaccept(n_a, v_a);
+ if (n >= promise) {
+ accepted = n;
+ accepted_value = v;
+ l.logaccept(accepted, accepted_value);
r = true;
}
return paxos_protocol::OK;
paxos_protocol::status
proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
lock ml(acceptor_mutex);
- LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
+ LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
if (instance == instance_h + 1) {
- VERIFY(v_a == v);
- commit(instance, v_a, ml);
+ VERIFY(accepted_value == v);
+ commit(instance, accepted_value, ml);
} else if (instance <= instance_h) {
// we are ahead; ignore.
} else {
values[instance] = value;
l.loginstance(instance, value);
instance_h = instance;
- n_a = n_h = {0, me};
- v_a.clear();
+ accepted = promise = {0, me};
+ accepted_value.clear();
if (delegate) {
pxs_mutex_lock.unlock();
delegate->paxos_commit(instance, value);
extern bool isamember(const node_t & m, const nodes_t & nodes);
extern bool majority(const nodes_t & l1, const nodes_t & l2);
-extern string print_members(const nodes_t & nodes);
class proposer_acceptor {
private:
// Proposer state
bool stable = true;
- prop_t my_n = {0, me}; // number of the last proposal used in this instance
+ prop_t proposal = {0, me}; // number of the last proposal used in this instance
// Acceptor state
- prop_t n_h = {0, me}; // number of the highest proposal seen in a prepare
- prop_t n_a = {0, me}; // number of highest proposal accepted
- value_t v_a; // value of highest proposal accepted
+ prop_t promise = {0, me}; // number of the highest proposal seen in a prepare
+ prop_t accepted = {0, me}; // number of highest proposal accepted
+ value_t accepted_value; // value of highest proposal accepted
unsigned instance_h = 0; // number of the highest instance we have decided
map<unsigned,value_t> values; // vals of each instance
struct prop_t {
unsigned n;
string m;
+
+ MEMBERS(n, m)
+ LEXICOGRAPHIC_COMPARISON(prop_t)
};
+MARSHALLABLE(prop_t)
+
class paxos_protocol {
public:
enum status : status_t { OK, ERR };
bool accept;
prop_t n_a;
string v_a;
+
+ MEMBERS(oldinstance, accept, n_a, v_a)
};
};
-inline unmarshall & operator>>(unmarshall &u, prop_t &a) { return u >> a.n >> a.m; }
-inline marshall & operator<<(marshall &m, prop_t a) { return m << a.n << a.m; }
-inline bool operator>(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) > tie(b.n, b.m); }
-inline bool operator>=(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) >= tie(b.n, b.m); }
-
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) {
- return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) {
- return m << r.oldinstance << r.accept << r.n_a << r.v_a;
-}
+MARSHALLABLE(paxos_protocol::prepareres)
#endif
#include <errno.h>
#include <signal.h>
#include <unistd.h>
-#include "jsl_log.h"
#include <sys/socket.h>
#define MAX_PDU (10<<20) //maximum PDF is 10M
connection::connection(chanmgr *m1, int f1, int l1)
-: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
+: mgr_(m1), fd_(f1), lossy_(l1)
{
-
int flags = fcntl(fd_, F_GETFL, NULL);
flags |= O_NONBLOCK;
fcntl(fd_, F_SETFL, flags);
PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
}
-connection::~connection()
-{
+connection::~connection() {
VERIFY(dead_);
if (rpdu_.buf)
free(rpdu_.buf);
close(fd_);
}
-void
-connection::incref()
-{
+void connection::incref() {
lock rl(ref_m_);
refno_++;
}
-bool
-connection::isdead()
-{
+bool connection::isdead() {
lock ml(m_);
return dead_;
}
-void
-connection::closeconn()
-{
+void connection::closeconn() {
{
lock ml(m_);
if (!dead_) {
PollMgr::Instance()->block_remove_fd(fd_);
}
-void
-connection::decref()
-{
+void connection::decref() {
bool dead = false;
{
lock rl(ref_m_);
dead = dead_;
}
}
- if (dead) {
+ if (dead)
delete this;
- }
-}
-
-int
-connection::ref()
-{
- lock rl(ref_m_);
- return refno_;
}
-int
-connection::compare(connection *another)
-{
+int connection::compare(connection *another) {
if (create_time_ > another->create_time_)
return 1;
if (create_time_ < another->create_time_)
return 0;
}
-bool
-connection::send(char *b, size_t sz)
-{
+bool connection::send(char *b, size_t sz) {
lock ml(m_);
- waiters_++;
- while (!dead_ && wpdu_.buf) {
+ waiters_++;
+ while (!dead_ && wpdu_.buf) {
send_wait_.wait(ml);
- }
- waiters_--;
- if (dead_) {
- return false;
- }
- wpdu_.buf = b;
- wpdu_.sz = sz;
- wpdu_.solong = 0;
-
- if (lossy_) {
- if ((random()%100) < lossy_) {
- jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
- shutdown(fd_,SHUT_RDWR);
- }
- }
-
- if (!writepdu()) {
- dead_ = true;
+ }
+ waiters_--;
+ if (dead_) {
+ return false;
+ }
+ wpdu_.buf = b;
+ wpdu_.sz = sz;
+ wpdu_.solong = 0;
+
+ if (lossy_) {
+ if ((random()%100) < lossy_) {
+ IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+ shutdown(fd_,SHUT_RDWR);
+ }
+ }
+
+ if (!writepdu()) {
+ dead_ = true;
ml.unlock();
- PollMgr::Instance()->block_remove_fd(fd_);
+ PollMgr::Instance()->block_remove_fd(fd_);
ml.lock();
- } else {
- if (wpdu_.solong == wpdu_.sz) {
- } else {
- //should be rare to need to explicitly add write callback
- PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
- while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
+ } else {
+ if (wpdu_.solong == wpdu_.sz) {
+ } else {
+ //should be rare to need to explicitly add write callback
+ PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+ while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
send_complete_.wait(ml);
- }
- }
- }
- bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
- wpdu_.solong = wpdu_.sz = 0;
- wpdu_.buf = NULL;
- if (waiters_ > 0)
+ }
+ }
+ }
+ bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
+ wpdu_.solong = wpdu_.sz = 0;
+ wpdu_.buf = NULL;
+ if (waiters_ > 0)
send_wait_.notify_all();
- return ret;
+ return ret;
}
//fd_ is ready to be written
-void
-connection::write_cb(int s)
-{
+void connection::write_cb(int s) {
lock ml(m_);
- VERIFY(!dead_);
- VERIFY(fd_ == s);
- if (wpdu_.sz == 0) {
- PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
- return;
- }
- if (!writepdu()) {
- PollMgr::Instance()->del_callback(fd_, CB_RDWR);
- dead_ = true;
- } else {
- VERIFY(wpdu_.solong != size_t_max);
- if (wpdu_.solong < wpdu_.sz) {
- return;
- }
+ VERIFY(!dead_);
+ VERIFY(fd_ == s);
+ if (wpdu_.sz == 0) {
+ PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+ return;
+ }
+ if (!writepdu()) {
+ PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+ dead_ = true;
+ } else {
+ VERIFY(wpdu_.solong != size_t_max);
+ if (wpdu_.solong < wpdu_.sz) {
+ return;
+ }
}
- send_complete_.notify_one();
+ send_complete_.notify_one();
}
//fd_ is ready to be read
-void
-connection::read_cb(int s)
-{
+void connection::read_cb(int s) {
lock ml(m_);
- VERIFY(fd_ == s);
- if (dead_) {
- return;
- }
-
- bool succ = true;
- if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
- succ = readpdu();
- }
-
- if (!succ) {
- PollMgr::Instance()->del_callback(fd_,CB_RDWR);
- dead_ = true;
- send_complete_.notify_one();
- }
-
- if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
- if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
- //chanmgr has successfully consumed the pdu
- rpdu_.buf = NULL;
- rpdu_.sz = rpdu_.solong = 0;
- }
- }
+ VERIFY(fd_ == s);
+ if (dead_) {
+ return;
+ }
+
+ bool succ = true;
+ if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
+ succ = readpdu();
+ }
+
+ if (!succ) {
+ PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+ dead_ = true;
+ send_complete_.notify_one();
+ }
+
+ if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
+ if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
+ //chanmgr has successfully consumed the pdu
+ rpdu_.buf = NULL;
+ rpdu_.sz = rpdu_.solong = 0;
+ }
+ }
}
-bool
-connection::writepdu()
-{
- VERIFY(wpdu_.solong != size_t_max);
- if (wpdu_.solong == wpdu_.sz)
- return true;
-
- if (wpdu_.solong == 0) {
- uint32_t sz = htonl((uint32_t)wpdu_.sz);
- bcopy(&sz,wpdu_.buf,sizeof(sz));
- }
- ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
- if (n < 0) {
- if (errno != EAGAIN) {
- jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
- wpdu_.solong = size_t_max;
- wpdu_.sz = 0;
- }
- return (errno == EAGAIN);
- }
- wpdu_.solong += (size_t)n;
- return true;
+bool connection::writepdu() {
+ VERIFY(wpdu_.solong != size_t_max);
+ if (wpdu_.solong == wpdu_.sz)
+ return true;
+
+ if (wpdu_.solong == 0) {
+ uint32_t sz = htonl((uint32_t)wpdu_.sz);
+ bcopy(&sz,wpdu_.buf,sizeof(sz));
+ }
+ ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+ wpdu_.solong = size_t_max;
+ wpdu_.sz = 0;
+ }
+ return (errno == EAGAIN);
+ }
+ wpdu_.solong += (size_t)n;
+ return true;
}
-bool
-connection::readpdu()
-{
- if (!rpdu_.sz) {
- uint32_t sz1;
- ssize_t n = read(fd_, &sz1, sizeof(sz1));
-
- if (n == 0) {
- return false;
- }
-
- if (n < 0) {
- VERIFY(errno!=EAGAIN);
- return false;
- }
-
- if (n > 0 && n != sizeof(sz1)) {
- jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
- return false;
- }
-
- size_t sz = ntohl(sz1);
-
- if (sz > MAX_PDU) {
- char *tmpb = (char *)&sz1;
- jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %lu network order=%x %x %x %x %x\n", sz,
- sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
- return false;
- }
-
- rpdu_.sz = sz;
- VERIFY(rpdu_.buf == NULL);
- rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
- VERIFY(rpdu_.buf);
- bcopy(&sz1,rpdu_.buf,sizeof(sz1));
- rpdu_.solong = sizeof(sz1);
- }
-
- ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
- if (n <= 0) {
- if (errno == EAGAIN)
- return true;
- if (rpdu_.buf)
- free(rpdu_.buf);
- rpdu_.buf = NULL;
- rpdu_.sz = rpdu_.solong = 0;
- return (errno == EAGAIN);
- }
- rpdu_.solong += (size_t)n;
- return true;
+bool connection::readpdu() {
+ if (!rpdu_.sz) {
+ uint32_t sz1;
+ ssize_t n = read(fd_, &sz1, sizeof(sz1));
+
+ if (n == 0) {
+ return false;
+ }
+
+ if (n < 0) {
+ VERIFY(errno!=EAGAIN);
+ return false;
+ }
+
+ if (n > 0 && n != sizeof(sz1)) {
+ IF_LEVEL(0) LOG("connection::readpdu short read of sz");
+ return false;
+ }
+
+ size_t sz = ntohl(sz1);
+
+ if (sz > MAX_PDU) {
+ IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+ return false;
+ }
+
+ rpdu_.sz = sz;
+ VERIFY(rpdu_.buf == NULL);
+ rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
+ VERIFY(rpdu_.buf);
+ bcopy(&sz1,rpdu_.buf,sizeof(sz1));
+ rpdu_.solong = sizeof(sz1);
+ }
+
+ ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+ if (n <= 0) {
+ if (errno == EAGAIN)
+ return true;
+ if (rpdu_.buf)
+ free(rpdu_.buf);
+ rpdu_.buf = NULL;
+ rpdu_.sz = rpdu_.solong = 0;
+ return (errno == EAGAIN);
+ }
+ rpdu_.solong += (size_t)n;
+ return true;
}
tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
: mgr_(m1), lossy_(lossytest)
{
- struct sockaddr_in sin;
- memset(&sin, 0, sizeof(sin));
- sin.sin_family = AF_INET;
- sin.sin_port = htons(port);
-
- tcp_ = socket(AF_INET, SOCK_STREAM, 0);
- if (tcp_ < 0) {
- perror("tcpsconn::tcpsconn accept_loop socket:");
- VERIFY(0);
- }
-
- int yes = 1;
- setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
- setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
-
- if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
- perror("accept_loop tcp bind:");
- VERIFY(0);
- }
-
- if (listen(tcp_, 1000) < 0) {
- perror("tcpsconn::tcpsconn listen:");
- VERIFY(0);
- }
+ struct sockaddr_in sin;
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(port);
+
+ tcp_ = socket(AF_INET, SOCK_STREAM, 0);
+ if (tcp_ < 0) {
+ perror("tcpsconn::tcpsconn accept_loop socket:");
+ VERIFY(0);
+ }
+
+ int yes = 1;
+ setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
+ setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+
+ if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+ perror("accept_loop tcp bind:");
+ VERIFY(0);
+ }
+
+ if (listen(tcp_, 1000) < 0) {
+ perror("tcpsconn::tcpsconn listen:");
+ VERIFY(0);
+ }
socklen_t addrlen = sizeof(sin);
VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
port_ = ntohs(sin.sin_port);
- jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
- sin.sin_port);
+ IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
- if (pipe(pipe_) < 0) {
- perror("accept_loop pipe:");
- VERIFY(0);
- }
+ if (pipe(pipe_) < 0) {
+ perror("accept_loop pipe:");
+ VERIFY(0);
+ }
- int flags = fcntl(pipe_[0], F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(pipe_[0], F_SETFL, flags);
+ int flags = fcntl(pipe_[0], F_GETFL, NULL);
+ flags |= O_NONBLOCK;
+ fcntl(pipe_[0], F_SETFL, flags);
th_ = thread(&tcpsconn::accept_conn, this);
}
tcpsconn::~tcpsconn()
{
- VERIFY(close(pipe_[1]) == 0);
+ VERIFY(close(pipe_[1]) == 0);
th_.join();
- //close all the active connections
- map<int, connection *>::iterator i;
- for (i = conns_.begin(); i != conns_.end(); i++) {
- i->second->closeconn();
- i->second->decref();
- }
+ //close all the active connections
+ map<int, connection *>::iterator i;
+ for (i = conns_.begin(); i != conns_.end(); i++) {
+ i->second->closeconn();
+ i->second->decref();
+ }
}
-void
-tcpsconn::process_accept()
-{
- sockaddr_in sin;
- socklen_t slen = sizeof(sin);
- int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
- if (s1 < 0) {
- perror("tcpsconn::accept_conn error");
- throw thread_exit_exception();
- }
-
- jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
- s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
- connection *ch = new connection(mgr_, s1, lossy_);
+void tcpsconn::process_accept() {
+ sockaddr_in sin;
+ socklen_t slen = sizeof(sin);
+ int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
+ if (s1 < 0) {
+ perror("tcpsconn::accept_conn error");
+ throw thread_exit_exception();
+ }
+
+ IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+ connection *ch = new connection(mgr_, s1, lossy_);
// garbage collect all dead connections with refcount of 1
for (auto i = conns_.begin(); i != conns_.end();) {
if (i->second->isdead() && i->second->ref() == 1) {
- jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
- i->second->channo());
+ IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
i->second->decref();
// Careful not to reuse i right after erase. (i++) will
// be evaluated before the erase call because in C++,
++i;
}
- conns_[ch->channo()] = ch;
+ conns_[ch->channo()] = ch;
}
-void
-tcpsconn::accept_conn()
-{
- fd_set rfds;
- int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
+void tcpsconn::accept_conn() {
+ fd_set rfds;
+ int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
try {
while (1) {
continue;
} else {
perror("accept_conn select:");
- jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
+ IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
VERIFY(0);
}
}
}
}
-connection *
-connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
-{
- int s = socket(AF_INET, SOCK_STREAM, 0);
- int yes = 1;
- setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
- if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
- jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
- inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
- close(s);
- return NULL;
- }
- jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n",
- s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
- return new connection(mgr, s, lossy);
+connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
+ int s = socket(AF_INET, SOCK_STREAM, 0);
+ int yes = 1;
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+ if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+ IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+ close(s);
+ return NULL;
+ }
+ IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+ return new connection(mgr, s, lossy);
}
void incref();
void decref();
- int ref();
+ int ref() { lock rl(ref_m_); return refno_; }
int compare(connection *another);
private:
chanmgr *mgr_;
const int fd_;
- bool dead_;
+ bool dead_ = false;
charbuf wpdu_;
charbuf rpdu_;
time_point<steady_clock> create_time_;
- int waiters_;
- int refno_;
+ int waiters_ = 0;
+ int refno_ = 1;
const int lossy_;
mutex m_;
+++ /dev/null
-int JSL_DEBUG_LEVEL = 0;
+++ /dev/null
-#ifndef jsl_log_h
-#define jsl_log_h
-
-enum dbcode {
- JSL_DBG_OFF = 0,
- JSL_DBG_1 = 1, // Critical
- JSL_DBG_2 = 2, // Error
- JSL_DBG_3 = 3, // Info
- JSL_DBG_4 = 4, // Debugging
-};
-
-extern int JSL_DEBUG_LEVEL;
-
-#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);}
-
-#endif
#define DEFAULT_RPC_SZ 1024
#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
+struct pass { template <typename... Args> inline pass(Args&&...) {} };
+
class marshall {
private:
char *buf_; // Base of the raw bytes buffer (dynamically readjusted)
}
}
public:
- struct pass { template <typename... Args> inline pass(Args&&...) {} };
-
template <typename... Args>
-
marshall(const Args&... args) {
buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
VERIFY(buf_);
struct marshalled_func<F, ErrorHandler, function<Signature>> :
public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
+template <class ...Args, size_t ...Indices> unmarshall &
+tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
+ (void)pass{(u >> get<Indices>(t))...};
+ return u;
+}
+
+template <class... Args> unmarshall &
+operator>>(unmarshall & u, tuple<Args &...> && t) {
+ using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+ return tuple_unmarshall_imp(u, t, Indices());
+}
+
+template <class ...Args, size_t ...Indices> marshall &
+tuple_marshall_imp(marshall & m, tuple<Args...> & t, tuple_indices<Indices...>) {
+ (void)pass{(m << get<Indices>(t))...};
+ return m;
+}
+
+template <class... Args> marshall &
+operator<<(marshall & m, tuple<Args...> && t) {
+ using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+ return tuple_marshall_imp(m, t, Indices());
+}
+
+// for structs or classes containing a MEMBERS declaration
+#define MARSHALLABLE(_c_) \
+inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
+inline marshall & operator<<(marshall &m, _c_ a) { return m << a._tuple_(); }
+
#endif
#include <fcntl.h>
#include <unistd.h>
-#include "jsl_log.h"
#include "pollmgr.h"
PollMgr *PollMgr::instance = NULL;
static void
PollMgrInit()
{
- PollMgr::instance = new PollMgr();
+ PollMgr::instance = new PollMgr();
}
PollMgr *
PollMgr::Instance()
{
std::call_once(pollmgr_is_initialized, PollMgrInit);
- return instance;
+ return instance;
}
PollMgr::PollMgr() : pending_change_(false)
{
- bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
- aio_ = new SelectAIO();
- //aio_ = new EPollAIO();
+ bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
+ aio_ = new SelectAIO();
+ //aio_ = new EPollAIO();
th_ = std::thread(&PollMgr::wait_loop, this);
}
PollMgr::~PollMgr() [[noreturn]]
{
- //never kill me!!!
- VERIFY(0);
+ //never kill me!!!
+ VERIFY(0);
}
void
PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
{
- VERIFY(fd < MAX_POLL_FDS);
+ VERIFY(fd < MAX_POLL_FDS);
lock ml(m_);
- aio_->watch_fd(fd, flag);
+ aio_->watch_fd(fd, flag);
- VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
- callbacks_[fd] = ch;
+ VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
+ callbacks_[fd] = ch;
}
//remove all callbacks related to fd
PollMgr::block_remove_fd(int fd)
{
lock ml(m_);
- aio_->unwatch_fd(fd, CB_RDWR);
- pending_change_ = true;
+ aio_->unwatch_fd(fd, CB_RDWR);
+ pending_change_ = true;
changedone_c_.wait(ml);
- callbacks_[fd] = NULL;
+ callbacks_[fd] = NULL;
}
void
PollMgr::del_callback(int fd, poll_flag flag)
{
lock ml(m_);
- if (aio_->unwatch_fd(fd, flag)) {
- callbacks_[fd] = NULL;
- }
+ if (aio_->unwatch_fd(fd, flag)) {
+ callbacks_[fd] = NULL;
+ }
}
bool
PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
{
lock ml(m_);
- if (!callbacks_[fd] || callbacks_[fd]!=c)
- return false;
+ if (!callbacks_[fd] || callbacks_[fd]!=c)
+ return false;
- return aio_->is_watched(fd, flag);
+ return aio_->is_watched(fd, flag);
}
void
PollMgr::wait_loop() [[noreturn]]
{
- std::vector<int> readable;
- std::vector<int> writable;
+ std::vector<int> readable;
+ std::vector<int> writable;
- while (1) {
- {
+ while (1) {
+ {
lock ml(m_);
- if (pending_change_) {
- pending_change_ = false;
+ if (pending_change_) {
+ pending_change_ = false;
changedone_c_.notify_all();
- }
- }
- readable.clear();
- writable.clear();
- aio_->wait_ready(&readable,&writable);
-
- if (!readable.size() && !writable.size()) {
- continue;
- }
- //no locking of m_
- //because no add_callback() and del_callback should
- //modify callbacks_[fd] while the fd is not dead
- for (unsigned int i = 0; i < readable.size(); i++) {
- int fd = readable[i];
- if (callbacks_[fd])
- callbacks_[fd]->read_cb(fd);
- }
-
- for (unsigned int i = 0; i < writable.size(); i++) {
- int fd = writable[i];
- if (callbacks_[fd])
- callbacks_[fd]->write_cb(fd);
- }
- }
+ }
+ }
+ readable.clear();
+ writable.clear();
+ aio_->wait_ready(&readable,&writable);
+
+ if (!readable.size() && !writable.size()) {
+ continue;
+ }
+ //no locking of m_
+ //because no add_callback() and del_callback should
+ //modify callbacks_[fd] while the fd is not dead
+ for (unsigned int i = 0; i < readable.size(); i++) {
+ int fd = readable[i];
+ if (callbacks_[fd])
+ callbacks_[fd]->read_cb(fd);
+ }
+
+ for (unsigned int i = 0; i < writable.size(); i++) {
+ int fd = writable[i];
+ if (callbacks_[fd])
+ callbacks_[fd]->write_cb(fd);
+ }
+ }
}
SelectAIO::SelectAIO() : highfds_(0)
{
- FD_ZERO(&rfds_);
- FD_ZERO(&wfds_);
+ FD_ZERO(&rfds_);
+ FD_ZERO(&wfds_);
- VERIFY(pipe(pipefd_) == 0);
- FD_SET(pipefd_[0], &rfds_);
- highfds_ = pipefd_[0];
+ VERIFY(pipe(pipefd_) == 0);
+ FD_SET(pipefd_[0], &rfds_);
+ highfds_ = pipefd_[0];
- int flags = fcntl(pipefd_[0], F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(pipefd_[0], F_SETFL, flags);
+ int flags = fcntl(pipefd_[0], F_GETFL, NULL);
+ flags |= O_NONBLOCK;
+ fcntl(pipefd_[0], F_SETFL, flags);
}
SelectAIO::~SelectAIO()
SelectAIO::watch_fd(int fd, poll_flag flag)
{
lock ml(m_);
- if (highfds_ <= fd)
- highfds_ = fd;
-
- if (flag == CB_RDONLY) {
- FD_SET(fd,&rfds_);
- }else if (flag == CB_WRONLY) {
- FD_SET(fd,&wfds_);
- }else {
- FD_SET(fd,&rfds_);
- FD_SET(fd,&wfds_);
- }
-
- char tmp = 1;
- VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ if (highfds_ <= fd)
+ highfds_ = fd;
+
+ if (flag == CB_RDONLY) {
+ FD_SET(fd,&rfds_);
+ }else if (flag == CB_WRONLY) {
+ FD_SET(fd,&wfds_);
+ }else {
+ FD_SET(fd,&rfds_);
+ FD_SET(fd,&wfds_);
+ }
+
+ char tmp = 1;
+ VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
}
bool
SelectAIO::is_watched(int fd, poll_flag flag)
{
lock ml(m_);
- if (flag == CB_RDONLY) {
- return FD_ISSET(fd,&rfds_);
- }else if (flag == CB_WRONLY) {
- return FD_ISSET(fd,&wfds_);
- }else{
- return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
- }
+ if (flag == CB_RDONLY) {
+ return FD_ISSET(fd,&rfds_);
+ }else if (flag == CB_WRONLY) {
+ return FD_ISSET(fd,&wfds_);
+ }else{
+ return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
+ }
}
bool
SelectAIO::unwatch_fd(int fd, poll_flag flag)
{
lock ml(m_);
- if (flag == CB_RDONLY) {
- FD_CLR(fd, &rfds_);
- }else if (flag == CB_WRONLY) {
- FD_CLR(fd, &wfds_);
- }else if (flag == CB_RDWR) {
- FD_CLR(fd, &wfds_);
- FD_CLR(fd, &rfds_);
- }else{
- VERIFY(0);
- }
-
- if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
- if (fd == highfds_) {
- int newh = pipefd_[0];
- for (int i = 0; i <= highfds_; i++) {
- if (FD_ISSET(i, &rfds_)) {
- newh = i;
- }else if (FD_ISSET(i, &wfds_)) {
- newh = i;
- }
- }
- highfds_ = newh;
- }
- }
- if (flag == CB_RDWR) {
- char tmp = 1;
- VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
- }
- return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
+ if (flag == CB_RDONLY) {
+ FD_CLR(fd, &rfds_);
+ }else if (flag == CB_WRONLY) {
+ FD_CLR(fd, &wfds_);
+ }else if (flag == CB_RDWR) {
+ FD_CLR(fd, &wfds_);
+ FD_CLR(fd, &rfds_);
+ }else{
+ VERIFY(0);
+ }
+
+ if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
+ if (fd == highfds_) {
+ int newh = pipefd_[0];
+ for (int i = 0; i <= highfds_; i++) {
+ if (FD_ISSET(i, &rfds_)) {
+ newh = i;
+ }else if (FD_ISSET(i, &wfds_)) {
+ newh = i;
+ }
+ }
+ highfds_ = newh;
+ }
+ }
+ if (flag == CB_RDWR) {
+ char tmp = 1;
+ VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ }
+ return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
}
void
SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
{
- fd_set trfds, twfds;
- int high;
+ fd_set trfds, twfds;
+ int high;
- {
+ {
lock ml(m_);
- trfds = rfds_;
- twfds = wfds_;
- high = highfds_;
- }
-
- int ret = select(high+1, &trfds, &twfds, NULL, NULL);
-
- if (ret < 0) {
- if (errno == EINTR) {
- return;
- } else {
- perror("select:");
- jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
- VERIFY(0);
- }
- }
-
- for (int fd = 0; fd <= high; fd++) {
- if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
- char tmp;
- VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
- VERIFY(tmp==1);
- }else {
- if (FD_ISSET(fd, &twfds)) {
- writable->push_back(fd);
- }
- if (FD_ISSET(fd, &trfds)) {
- readable->push_back(fd);
- }
- }
- }
+ trfds = rfds_;
+ twfds = wfds_;
+ high = highfds_;
+ }
+
+ int ret = select(high+1, &trfds, &twfds, NULL, NULL);
+
+ if (ret < 0) {
+ if (errno == EINTR) {
+ return;
+ } else {
+ perror("select:");
+ IF_LEVEL(0) LOG("PollMgr::select_loop failure errno " << errno);
+ VERIFY(0);
+ }
+ }
+
+ for (int fd = 0; fd <= high; fd++) {
+ if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+ char tmp;
+ VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+ VERIFY(tmp==1);
+ }else {
+ if (FD_ISSET(fd, &twfds)) {
+ writable->push_back(fd);
+ }
+ if (FD_ISSET(fd, &trfds)) {
+ readable->push_back(fd);
+ }
+ }
+ }
}
#ifdef __linux__
EPollAIO::EPollAIO()
{
- pollfd_ = epoll_create(MAX_POLL_FDS);
- VERIFY(pollfd_ >= 0);
- bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
+ pollfd_ = epoll_create(MAX_POLL_FDS);
+ VERIFY(pollfd_ >= 0);
+ bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
}
EPollAIO::~EPollAIO()
{
- close(pollfd_);
+ close(pollfd_);
}
static inline
int poll_flag_to_event(poll_flag flag)
{
- int f;
- if (flag == CB_RDONLY) {
- f = EPOLLIN;
- }else if (flag == CB_WRONLY) {
- f = EPOLLOUT;
- }else { //flag == CB_RDWR
- f = EPOLLIN | EPOLLOUT;
- }
- return f;
+ int f;
+ if (flag == CB_RDONLY) {
+ f = EPOLLIN;
+ }else if (flag == CB_WRONLY) {
+ f = EPOLLOUT;
+ }else { //flag == CB_RDWR
+ f = EPOLLIN | EPOLLOUT;
+ }
+ return f;
}
void
EPollAIO::watch_fd(int fd, poll_flag flag)
{
- VERIFY(fd < MAX_POLL_FDS);
+ VERIFY(fd < MAX_POLL_FDS);
- struct epoll_event ev;
- int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
- fdstatus_[fd] |= (int)flag;
+ struct epoll_event ev;
+ int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+ fdstatus_[fd] |= (int)flag;
- ev.events = EPOLLET;
- ev.data.fd = fd;
+ ev.events = EPOLLET;
+ ev.data.fd = fd;
- if (fdstatus_[fd] & CB_RDONLY) {
- ev.events |= EPOLLIN;
- }
- if (fdstatus_[fd] & CB_WRONLY) {
- ev.events |= EPOLLOUT;
- }
+ if (fdstatus_[fd] & CB_RDONLY) {
+ ev.events |= EPOLLIN;
+ }
+ if (fdstatus_[fd] & CB_WRONLY) {
+ ev.events |= EPOLLOUT;
+ }
- if (flag == CB_RDWR) {
- VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
- }
+ if (flag == CB_RDWR) {
+ VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
+ }
- VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+ VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
}
bool
EPollAIO::unwatch_fd(int fd, poll_flag flag)
{
- VERIFY(fd < MAX_POLL_FDS);
- fdstatus_[fd] &= ~(int)flag;
-
- struct epoll_event ev;
- int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
-
- ev.events = EPOLLET;
- ev.data.fd = fd;
-
- if (fdstatus_[fd] & CB_RDONLY) {
- ev.events |= EPOLLIN;
- }
- if (fdstatus_[fd] & CB_WRONLY) {
- ev.events |= EPOLLOUT;
- }
-
- if (flag == CB_RDWR) {
- VERIFY(op == EPOLL_CTL_DEL);
- }
- VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
- return (op == EPOLL_CTL_DEL);
+ VERIFY(fd < MAX_POLL_FDS);
+ fdstatus_[fd] &= ~(int)flag;
+
+ struct epoll_event ev;
+ int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+
+ ev.events = EPOLLET;
+ ev.data.fd = fd;
+
+ if (fdstatus_[fd] & CB_RDONLY) {
+ ev.events |= EPOLLIN;
+ }
+ if (fdstatus_[fd] & CB_WRONLY) {
+ ev.events |= EPOLLOUT;
+ }
+
+ if (flag == CB_RDWR) {
+ VERIFY(op == EPOLL_CTL_DEL);
+ }
+ VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+ return (op == EPOLL_CTL_DEL);
}
bool
EPollAIO::is_watched(int fd, poll_flag flag)
{
- VERIFY(fd < MAX_POLL_FDS);
- return ((fdstatus_[fd] & CB_MASK) == flag);
+ VERIFY(fd < MAX_POLL_FDS);
+ return ((fdstatus_[fd] & CB_MASK) == flag);
}
void
EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
{
- int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
- for (int i = 0; i < nfds; i++) {
- if (ready_[i].events & EPOLLIN) {
- readable->push_back(ready_[i].data.fd);
- }
- if (ready_[i].events & EPOLLOUT) {
- writable->push_back(ready_[i].data.fd);
- }
- }
+ int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+ for (int i = 0; i < nfds; i++) {
+ if (ready_[i].events & EPOLLIN) {
+ readable->push_back(ready_[i].data.fd);
+ }
+ if (ready_[i].events & EPOLLOUT) {
+ writable->push_back(ready_[i].data.fd);
+ }
+ }
}
#endif
#include <netdb.h>
#include <unistd.h>
-#include "jsl_log.h"
-
const rpcc::TO rpcc::to_max = { 120000 };
const rpcc::TO rpcc::to_min = { 1000 };
// xid starts with 1 and latest received reply starts with 0
xid_rep_window_.push_back(0);
- jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
- clt_nonce_, lossytest_);
+ IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
}
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
-rpcc::~rpcc()
-{
- jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
- clt_nonce_, chan_?chan_->channo():-1);
+rpcc::~rpcc() {
+ IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
if(chan_){
chan_->closeconn();
chan_->decref();
VERIFY(calls_.size() == 0);
}
-int
-rpcc::bind(TO to)
-{
+int rpcc::bind(TO to) {
unsigned int r;
int ret = call_timeout(rpc_const::bind, to, r, 0);
if(ret == 0){
bind_done_ = true;
srv_nonce_ = r;
} else {
- jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
- inet_ntoa(dst_.sin_addr), ret);
+ IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
}
return ret;
};
// Cancel all outstanding calls
- void
-rpcc::cancel(void)
-{
+void rpcc::cancel(void) {
lock ml(m_);
LOG("rpcc::cancel: force callers to fail");
for(auto &p : calls_){
caller *ca = p.second;
- jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
+ IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail");
{
lock cl(ca->m);
ca->done = true;
LOG("rpcc::cancel: done");
}
-int
-rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
- TO to)
-{
+int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
caller ca(0, &rep);
int xid_rep;
if((proc != rpc_const::bind && !bind_done_) ||
(proc == rpc_const::bind && bind_done_)){
- jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
+ IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice");
return rpc_const::bind_failure;
}
ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
ch->send(req.cstr(), req.size());
}
- else jsl_log(JSL_DBG_1, "not reachable\n");
- jsl_log(JSL_DBG_2,
- "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
- clt_nonce_, proc, ca.xid, clt_nonce_);
+ else IF_LEVEL(1) LOG("not reachable");
+ IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc <<
+ " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
}
transmit = false; // only send once on a given channel
}
{
lock cal(ca.m);
while (!ca.done){
- jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
+ IF_LEVEL(2) LOG("rpcc:call1: wait");
if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
- jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
+ IF_LEVEL(2) LOG("rpcc::call1: timeout");
break;
}
}
if(ca.done){
- jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
+ IF_LEVEL(2) LOG("rpcc::call1: reply received");
break;
}
}
lock cal(ca.m);
- jsl_log(JSL_DBG_2,
- "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
- clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
- ntohs(dst_.sin_port), ca.done, ca.intret);
+ IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc <<
+ " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
+ ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
if(ch)
ch->decref();
rep.unpack_reply_header(&h);
if(!rep.ok()){
- jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
+ IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!");
return true;
}
update_xid_rep(h.xid);
if(calls_.find(h.xid) == calls_.end()){
- jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
+ IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request");
return true;
}
caller *ca = calls_[h.xid];
ca->un->take_in(rep);
ca->intret = h.ret;
if(ca->intret < 0){
- jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
- h.xid, ca->intret);
+ IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret);
}
ca->done = 1;
}
{
set_rand_seed();
nonce_ = (unsigned int)random();
- jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
+ IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_);
char *loss_env = getenv("RPC_LOSSY");
if(loss_env != NULL){
rpcs::got_pdu(connection *c, char *b, size_t sz)
{
if(!reachable_){
- jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
+ IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable");
return true;
}
if(clt.second.size() > maxrep)
maxrep = clt.second.size();
}
- jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
- (int) reply_window_.size()-1, totalrep, maxrep);
+ IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
+ totalrep << " max per client " << maxrep);
curr_counts_ = counting_;
}
}
proc_t proc = h.proc;
if(!req.ok()){
- jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
+ IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!");
c->decref();
return;
}
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
- h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
+ IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+ dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
marshall rep;
reply_header rh(h.xid,0);
// is client sending to an old instance of server?
if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
- h.srv_nonce, nonce_, h.proc);
+ IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce <<
+ " (current " << nonce_ << ") proc " << hex << h.proc);
rh.ret = rpc_const::oldsrv_failure;
rep.pack_reply_header(rh);
c->send(rep.cstr(),rep.size());
{
lock pl(procs_m_);
if(procs_.count(proc) < 1){
- fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
- proc);
+ cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl;
c->decref();
- VERIFY(0);
+ VERIFY(0);
return;
}
if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
- h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
+ IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid <<
+ " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
}
}
rh.ret = (*f)(req, rep);
if (rh.ret == rpc_const::unmarshal_args_failure) {
- fprintf(stderr, "rpcs::dispatch: failed to"
- " unmarshall the arguments. You are"
- " probably calling RPC 0x%x with wrong"
- " types of arguments.\n", proc);
+ cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " <<
+ "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+ "types of arguments." << endl;
VERIFY(0);
}
VERIFY(rh.ret >= 0);
rep.pack_reply_header(rh);
rep.take_buf(&b1,&sz1);
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
- sz1, h.xid, proc, rh.ret, h.clt_nonce);
+ IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " <<
+ h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
if(h.clt_nonce > 0){
// only record replies for clients that require at-most-once logic
c->send(b1, sz1);
break;
case FORGOTTEN: // very old request and we don't have the response anymore
- jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
- h.xid, h.clt_nonce);
+ IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce);
rh.ret = rpc_const::atmostonce_failure;
rep.pack_reply_header(rh);
c->send(rep.cstr(),rep.size());
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
if (it == l.end() || it->xid != xid) {
- fprintf(stderr, "Could not find reply struct in add_reply");
+ cerr << "Could not find reply struct in add_reply" << endl;
l.insert(it, reply_t(xid, b, sz));
} else {
*it = reply_t(xid, b, sz);
}
int rpcs::rpcbind(unsigned int &r, int) {
- jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
+ IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_);
r = nonce_;
return 0;
}
struct hostent *hp = gethostbyname(host.c_str());
if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
- fprintf(stderr, "cannot find host name %s\n", host.c_str());
+ cerr << "cannot find host name " << host << endl;
exit(1);
}
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
#include <getopt.h>
#include <sys/types.h>
#include <unistd.h>
-#include "jsl_log.h"
#define NUM_CL 2
// to simplify rpcs::reg(). a server process can have handlers
// from multiple classes.
class srv {
- public:
- int handle_22(string & r, const string a, const string b);
- int handle_fast(int &r, const int a);
- int handle_slow(int &r, const int a);
- int handle_bigrep(string &r, const size_t a);
+ public:
+ int handle_22(string & r, const string a, const string b);
+ int handle_fast(int &r, const int a);
+ int handle_slow(int &r, const int a);
+ int handle_bigrep(string &r, const size_t a);
};
// a handler. a and b are arguments, r is the result.
int
srv::handle_22(string &r, const string a, string b)
{
- r = a + b;
- return 0;
+ r = a + b;
+ return 0;
}
int
srv::handle_fast(int &r, const int a)
{
- r = a + 1;
- return 0;
+ r = a + 1;
+ return 0;
}
int
srv::handle_slow(int &r, const int a)
{
- usleep(random() % 5000);
- r = a + 2;
- return 0;
+ usleep(random() % 5000);
+ r = a + 2;
+ return 0;
}
int
srv::handle_bigrep(string &r, const size_t len)
{
- r = string((size_t)len, 'x');
- return 0;
+ r = string((size_t)len, 'x');
+ return 0;
}
srv service;
void startserver()
{
- server = new rpcs((unsigned int)port);
- server->reg(22, &srv::handle_22, &service);
- server->reg(23, &srv::handle_fast, &service);
- server->reg(24, &srv::handle_slow, &service);
- server->reg(25, &srv::handle_bigrep, &service);
+ server = new rpcs((unsigned int)port);
+ server->reg(22, &srv::handle_22, &service);
+ server->reg(23, &srv::handle_fast, &service);
+ server->reg(24, &srv::handle_slow, &service);
+ server->reg(25, &srv::handle_bigrep, &service);
}
void
testmarshall()
{
- marshall m;
- request_header rh{1,2,3,4,5};
- m.pack_req_header(rh);
- VERIFY(m.size()==RPC_HEADER_SZ);
- int i = 12345;
- unsigned long long l = 1223344455L;
- string s = "hallo....";
- m << i;
- m << l;
- m << s;
-
- char *b;
- size_t sz;
- m.take_buf(&b,&sz);
- VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
-
- unmarshall un(b,sz);
- request_header rh1;
- un.unpack_req_header(&rh1);
- VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
- int i1;
- unsigned long long l1;
- string s1;
- un >> i1;
- un >> l1;
- un >> s1;
- VERIFY(un.okdone());
- VERIFY(i1==i && l1==l && s1==s);
+ marshall m;
+ request_header rh{1,2,3,4,5};
+ m.pack_req_header(rh);
+ VERIFY(m.size()==RPC_HEADER_SZ);
+ int i = 12345;
+ unsigned long long l = 1223344455L;
+ string s = "hallo....";
+ m << i;
+ m << l;
+ m << s;
+
+ char *b;
+ size_t sz;
+ m.take_buf(&b,&sz);
+ VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+
+ unmarshall un(b,sz);
+ request_header rh1;
+ un.unpack_req_header(&rh1);
+ VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
+ int i1;
+ unsigned long long l1;
+ string s1;
+ un >> i1;
+ un >> l1;
+ un >> s1;
+ VERIFY(un.okdone());
+ VERIFY(i1==i && l1==l && s1==s);
}
void
client1(size_t cl)
{
- // test concurrency.
- size_t which_cl = cl % NUM_CL;
-
- for(int i = 0; i < 100; i++){
- int arg = (random() % 2000);
- string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- VERIFY(ret == 0);
- if ((int)rep.size()!=arg)
- cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
- VERIFY((int)rep.size() == arg);
- }
-
- // test rpc replies coming back not in the order of
- // the original calls -- i.e. does xid reply dispatch work.
- for(int i = 0; i < 100; i++){
- int which = (random() % 2);
- int arg = (random() % 1000);
- int rep;
-
- auto start = std::chrono::steady_clock::now();
-
- int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
- auto end = std::chrono::steady_clock::now();
- auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
- if (ret != 0)
- cout << diff << " ms have elapsed!!!" << endl;
- VERIFY(ret == 0);
- VERIFY(rep == (which ? arg+1 : arg+2));
- }
+ // test concurrency.
+ size_t which_cl = cl % NUM_CL;
+
+ for(int i = 0; i < 100; i++){
+ int arg = (random() % 2000);
+ string rep;
+ int ret = clients[which_cl]->call(25, rep, arg);
+ VERIFY(ret == 0);
+ if ((int)rep.size()!=arg)
+ cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
+ VERIFY((int)rep.size() == arg);
+ }
+
+ // test rpc replies coming back not in the order of
+ // the original calls -- i.e. does xid reply dispatch work.
+ for(int i = 0; i < 100; i++){
+ int which = (random() % 2);
+ int arg = (random() % 1000);
+ int rep;
+
+ auto start = std::chrono::steady_clock::now();
+
+ int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+ auto end = std::chrono::steady_clock::now();
+ auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+ if (ret != 0)
+ cout << diff << " ms have elapsed!!!" << endl;
+ VERIFY(ret == 0);
+ VERIFY(rep == (which ? arg+1 : arg+2));
+ }
}
void
client2(size_t cl)
{
- size_t which_cl = cl % NUM_CL;
-
- time_t t1;
- time(&t1);
-
- while(time(0) - t1 < 10){
- int arg = (random() % 2000);
- string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- if ((int)rep.size()!=arg)
- cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
- VERIFY((int)rep.size() == arg);
- }
+ size_t which_cl = cl % NUM_CL;
+
+ time_t t1;
+ time(&t1);
+
+ while(time(0) - t1 < 10){
+ int arg = (random() % 2000);
+ string rep;
+ int ret = clients[which_cl]->call(25, rep, arg);
+ if ((int)rep.size()!=arg)
+ cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
+ VERIFY((int)rep.size() == arg);
+ }
}
void
client3(void *xx)
{
- rpcc *c = (rpcc *) xx;
+ rpcc *c = (rpcc *) xx;
- for(int i = 0; i < 4; i++){
- int rep;
- int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
- VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
- }
+ for(int i = 0; i < 4; i++){
+ int rep = 0;
+ int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
+ VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+ }
}
void
simple_tests(rpcc *c)
{
- cout << "simple_tests" << endl;
- // an RPC call to procedure #22.
- // rpcc::call() looks at the argument types to decide how
- // to marshall the RPC call packet, and how to unmarshall
- // the reply packet.
- string rep;
- int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
- VERIFY(intret == 0); // this is what handle_22 returns
- VERIFY(rep == "hello goodbye");
- cout << " -- string concat RPC .. ok" << endl;
-
- // small request, big reply (perhaps req via UDP, reply via TCP)
- intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
- VERIFY(intret == 0);
- VERIFY(rep.size() == 70000);
- cout << " -- small request, big reply .. ok" << endl;
-
- // specify a timeout value to an RPC that should succeed (udp)
- int xx = 0;
- intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
- VERIFY(intret == 0 && xx == 78);
- cout << " -- no spurious timeout .. ok" << endl;
-
- // specify a timeout value to an RPC that should succeed (tcp)
- {
- string arg(1000, 'x');
- string rep2;
- c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
- VERIFY(rep2.size() == 1001);
- cout << " -- no spurious timeout .. ok" << endl;
- }
-
- // huge RPC
- string big(1000000, 'x');
- intret = c->call(22, rep, big, (string)"z");
- VERIFY(rep.size() == 1000001);
- cout << " -- huge 1M rpc request .. ok" << endl;
-
- // specify a timeout value to an RPC that should timeout (udp)
+ cout << "simple_tests" << endl;
+ // an RPC call to procedure #22.
+ // rpcc::call() looks at the argument types to decide how
+ // to marshall the RPC call packet, and how to unmarshall
+ // the reply packet.
+ string rep;
+ int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == 0); // this is what handle_22 returns
+ VERIFY(rep == "hello goodbye");
+ cout << " -- string concat RPC .. ok" << endl;
+
+ // small request, big reply (perhaps req via UDP, reply via TCP)
+ intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
+ VERIFY(intret == 0);
+ VERIFY(rep.size() == 70000);
+ cout << " -- small request, big reply .. ok" << endl;
+
+ // specify a timeout value to an RPC that should succeed (udp)
+ int xx = 0;
+ intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
+ VERIFY(intret == 0 && xx == 78);
+ cout << " -- no spurious timeout .. ok" << endl;
+
+ // specify a timeout value to an RPC that should succeed (tcp)
+ {
+ string arg(1000, 'x');
+ string rep2;
+ c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
+ VERIFY(rep2.size() == 1001);
+ cout << " -- no spurious timeout .. ok" << endl;
+ }
+
+ // huge RPC
+ string big(1000000, 'x');
+ intret = c->call(22, rep, big, (string)"z");
+ VERIFY(rep.size() == 1000001);
+ cout << " -- huge 1M rpc request .. ok" << endl;
+
+ // specify a timeout value to an RPC that should timeout (udp)
string non_existent = "127.0.0.1:7661";
- rpcc *c1 = new rpcc(non_existent);
- time_t t0 = time(0);
- intret = c1->bind(rpcc::to(3000));
- time_t t1 = time(0);
- VERIFY(intret < 0 && (t1 - t0) <= 4);
- cout << " -- rpc timeout .. ok" << endl;
- cout << "simple_tests OK" << endl;
+ rpcc *c1 = new rpcc(non_existent);
+ time_t t0 = time(0);
+ intret = c1->bind(rpcc::to(3000));
+ time_t t1 = time(0);
+ VERIFY(intret < 0 && (t1 - t0) <= 4);
+ cout << " -- rpc timeout .. ok" << endl;
+ cout << "simple_tests OK" << endl;
}
void
concurrent_test(size_t nt)
{
- // create threads that make lots of calls in parallel,
- // to test thread synchronization for concurrent calls
- // and dispatches.
- cout << "start concurrent_test (" << nt << " threads) ...";
+ // create threads that make lots of calls in parallel,
+ // to test thread synchronization for concurrent calls
+ // and dispatches.
+ cout << "start concurrent_test (" << nt << " threads) ...";
vector<thread> th(nt);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i] = thread(client1, i);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- cout << " OK" << endl;
+ cout << " OK" << endl;
}
void
lossy_test()
{
- cout << "start lossy_test ...";
- VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ cout << "start lossy_test ...";
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
- if (server) {
- delete server;
- startserver();
- }
+ if (server) {
+ delete server;
+ startserver();
+ }
- for (int i = 0; i < NUM_CL; i++) {
- delete clients[i];
- clients[i] = new rpcc(dst);
- VERIFY(clients[i]->bind()==0);
- }
+ for (int i = 0; i < NUM_CL; i++) {
+ delete clients[i];
+ clients[i] = new rpcc(dst);
+ VERIFY(clients[i]->bind()==0);
+ }
- size_t nt = 1;
+ size_t nt = 1;
vector<thread> th(nt);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i] = thread(client2, i);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- cout << ".. OK" << endl;
- VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
+ cout << ".. OK" << endl;
+ VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
}
void
failure_test()
{
- rpcc *client1;
- rpcc *client = clients[0];
+ rpcc *client1;
+ rpcc *client = clients[0];
- cout << "failure_test" << endl;
+ cout << "failure_test" << endl;
- delete server;
+ delete server;
- client1 = new rpcc(dst);
- VERIFY (client1->bind(rpcc::to(3000)) < 0);
- cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
+ client1 = new rpcc(dst);
+ VERIFY (client1->bind(rpcc::to(3000)) < 0);
+ cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
- delete client1;
+ delete client1;
- startserver();
+ startserver();
- string rep;
- int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
- VERIFY(intret == rpc_const::oldsrv_failure);
- cout << " -- call recovered server with old client .. failed ok" << endl;
+ string rep;
+ int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == rpc_const::oldsrv_failure);
+ cout << " -- call recovered server with old client .. failed ok" << endl;
- delete client;
+ delete client;
- clients[0] = client = new rpcc(dst);
- VERIFY (client->bind() >= 0);
- VERIFY (client->bind() < 0);
+ clients[0] = client = new rpcc(dst);
+ VERIFY (client->bind() >= 0);
+ VERIFY (client->bind() < 0);
- intret = client->call(22, rep, (string)"hello", (string)" goodbye");
- VERIFY(intret == 0);
- VERIFY(rep == "hello goodbye");
+ intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == 0);
+ VERIFY(rep == "hello goodbye");
- cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
+ cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
- size_t nt = 10;
- cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
+ size_t nt = 10;
+ cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
vector<thread> th(nt);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i] = thread(client3, client);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- cout << "ok" << endl;
+ cout << "ok" << endl;
- delete server;
- delete client;
+ delete server;
+ delete client;
- startserver();
- clients[0] = client = new rpcc(dst);
- VERIFY (client->bind() >= 0);
- cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
+ startserver();
+ clients[0] = client = new rpcc(dst);
+ VERIFY (client->bind() >= 0);
+ cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
- cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
+ cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i] = thread(client3, client);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- cout << "ok" << endl;
+ cout << "ok" << endl;
- cout << "failure_test OK" << endl;
+ cout << "failure_test OK" << endl;
}
int
main(int argc, char *argv[])
{
- setvbuf(stdout, NULL, _IONBF, 0);
- setvbuf(stderr, NULL, _IONBF, 0);
- int debug_level = 0;
-
- bool isclient = false;
- bool isserver = false;
-
- srandom((uint32_t)getpid());
- port = 20000 + (getpid() % 10000);
-
- int ch = 0;
- while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
- switch (ch) {
- case 'c':
- isclient = true;
- break;
- case 's':
- isserver = true;
- break;
- case 'd':
- debug_level = atoi(optarg);
- break;
- case 'p':
- port = atoi(optarg);
- break;
- case 'l':
- VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+ int debug_level = 0;
+
+ bool isclient = false;
+ bool isserver = false;
+
+ srandom((uint32_t)getpid());
+ port = 20000 + (getpid() % 10000);
+
+ int ch = 0;
+ while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
+ switch (ch) {
+ case 'c':
+ isclient = true;
+ break;
+ case 's':
+ isserver = true;
+ break;
+ case 'd':
+ debug_level = atoi(optarg);
+ break;
+ case 'p':
+ port = atoi(optarg);
+ break;
+ case 'l':
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ break;
+ default:
break;
- default:
- break;
- }
- }
+ }
+ }
- if (!isserver && !isclient) {
- isserver = isclient = true;
- }
+ if (!isserver && !isclient) {
+ isserver = isclient = true;
+ }
- if (debug_level > 0) {
- JSL_DEBUG_LEVEL = debug_level;
- jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
- }
+ if (debug_level > 0) {
+ DEBUG_LEVEL = debug_level;
+ IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+ }
- testmarshall();
+ testmarshall();
- if (isserver) {
- cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
- startserver();
- }
+ if (isserver) {
+ cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+ startserver();
+ }
- if (isclient) {
- // server's address.
+ if (isclient) {
+ // server's address.
dst = "127.0.0.1:" + std::to_string(port);
- // start the client. bind it to the server.
- // starts a thread to listen for replies and hand them to
- // the correct waiting caller thread. there should probably
- // be only one rpcc per process. you probably need one
- // rpcc per server.
- for (int i = 0; i < NUM_CL; i++) {
- clients[i] = new rpcc(dst);
- VERIFY (clients[i]->bind() == 0);
- }
+ // start the client. bind it to the server.
+ // starts a thread to listen for replies and hand them to
+ // the correct waiting caller thread. there should probably
+ // be only one rpcc per process. you probably need one
+ // rpcc per server.
+ for (int i = 0; i < NUM_CL; i++) {
+ clients[i] = new rpcc(dst);
+ VERIFY (clients[i]->bind() == 0);
+ }
- simple_tests(clients[0]);
- concurrent_test(10);
- lossy_test();
- if (isserver) {
- failure_test();
- }
+ simple_tests(clients[0]);
+ concurrent_test(10);
+ lossy_test();
+ if (isserver) {
+ failure_test();
+ }
- cout << "rpctest OK" << endl;
+ cout << "rpctest OK" << endl;
- exit(0);
- }
+ exit(0);
+ }
- while (1) {
- sleep(1);
- }
+ while (1) {
+ sleep(1);
+ }
}
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
- LOG("joinreq: src " << m << " last (" << last.vid << "," << last.seqno << ") mylast (" <<
+ LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
last_myvs.vid << "," << last_myvs.seqno << ")");
if (cfg->ismember(m, vid_commit)) {
- LOG("joinreq: is still a member");
+ LOG(m << " is still a member -- nothing to do");
log = cfg->dump();
} else if (cfg->myaddr() != primary) {
- LOG("joinreq: busy");
+ LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
ret = rsm_protocol::BUSY;
} else {
// We cache vid_commit to avoid adding m to a view which already contains
// m due to race condition
+ LOG("calling down to config layer");
unsigned vid_cache = vid_commit;
bool succ;
{
}
if (cfg->ismember(m, cfg->view_id())) {
log = cfg->dump();
- LOG("joinreq: ret " << ret << " log " << log);
+ LOG("ret " << ret << " log " << log);
} else {
- LOG("joinreq: failed; proposer couldn't add " << succ);
+ LOG("failed; proposer couldn't add " << succ);
ret = rsm_protocol::BUSY;
}
}
cfg->get_view(vid_commit, m);
m.push_back(primary);
r = m;
- LOG("rsm::client_members return " << print_members(m) << " m " << primary);
+ LOG("rsm::client_members return " << m << " m " << primary);
return rsm_client_protocol::OK;
}
bool statetransferdone(string m, lock & rsm_mutex_lock);
bool join(string m, lock & rsm_mutex_lock);
void set_primary(unsigned vid);
- string find_highest(viewstamp &vs, string &m, unsigned &vid);
bool sync_with_backups(lock & rsm_mutex_lock);
bool sync_with_primary(lock & rsm_mutex_lock);
void net_repair(bool heal, lock & rsm_mutex_lock);
if (intret < 0) return intret;
u >> res;
if (!u.okdone()) {
- fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
- "You probably forgot to set the reply string in "
- "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
- "type\n", proc);
+ cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
+ cerr << "You probably forgot to set the reply string in "
+ "rsm::client_invoke, or you may have called RPC 0x" << hex <<
+ proc << " with the wrong return type" << endl;
VERIFY(0);
return rpc_const::unmarshal_reply_failure;
}
unmarshall u1(res);
u1 >> r;
if(!u1.okdone()) {
- fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
- "You are probably calling RPC 0x%x with wrong return "
- "type.\n", proc);
+ cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
+ cerr << "You are probably calling RPC 0x" << hex << proc <<
+ " with the wrong return type." << endl;
VERIFY(0);
return rpc_const::unmarshal_reply_failure;
}
unsigned int vid;
unsigned int seqno;
inline void operator++(int) { seqno++; }
+
+ MEMBERS(vid, seqno)
+ LEXICOGRAPHIC_COMPARISON(viewstamp)
};
+MARSHALLABLE(viewstamp)
+
class rsm_protocol {
public:
enum status : status_t { OK, ERR, BUSY};
struct transferres {
string state;
viewstamp last;
+
+ MEMBERS(state, last)
};
};
-inline bool operator==(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) == tie(b.vid, b.seqno); }
-inline bool operator>(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) > tie(b.vid, b.seqno); }
-inline bool operator!=(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) != tie(b.vid, b.seqno); }
-
-inline marshall& operator<<(marshall &m, viewstamp v) {
- return m << v.vid << v.seqno;
-}
-
-inline unmarshall& operator>>(unmarshall &u, viewstamp &v) {
- return u >> v.vid >> v.seqno;
-}
-
-inline marshall & operator<<(marshall &m, rsm_protocol::transferres r) {
- return m << r.state << r.last;
-}
-
-inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) {
- return u >> r.state >> r.last;
-}
+MARSHALLABLE(rsm_protocol::transferres)
class rsm_test_protocol {
public:
VERIFY (ret == rsm_test_protocol::OK);
return r;
}
-
-
+++ /dev/null
-#!/usr/bin/env bash
-
-ulimit -c unlimited
-
-NUM_LS=${1:-0}
-
-BASE_PORT=$RANDOM
-BASE_PORT=$[BASE_PORT+2000]
-LOCK_PORT=$[BASE_PORT+6]
-
-if [ $NUM_LS -gt 1 ]; then
- x=0
- rm config
- while [ $x -lt $NUM_LS ]; do
- port=$[LOCK_PORT+2*x]
- x=$[x+1]
- echo $port >> config
- done
- x=0
- while [ $x -lt $NUM_LS ]; do
- port=$[LOCK_PORT+2*x]
- x=$[x+1]
- echo "starting ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &"
- ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &
- sleep 1
- done
-else
- echo "starting ./lock_server $LOCK_PORT > lock_server.log 2>&1 &"
- ./lock_server $LOCK_PORT > lock_server.log 2>&1 &
- sleep 1
-fi
+++ /dev/null
-#!/usr/bin/env bash
-
-pkill -u $USER lock_server
int next_thread_num = 0;
map<void *, int> instance_name_map;
int next_instance_num = 0;
+int DEBUG_LEVEL = 0;
extern char log_thread_prefix;
namespace std {
- // This... is an awful hack. But sticking this in std:: makes it possible for
+ // This is an awful hack. But sticking this in std:: makes it possible for
// ostream_iterator to use it.
template <class A, class B>
ostream & operator<<(ostream &o, const pair<A,B> &d) {
template <class A>
typename enable_if<is_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
operator<<(ostream &o, const A &a) {
- o << "[";
- auto oit = ostream_iterator<typename A::value_type>(o, ", ");
- copy(a.begin(), a.end(), oit);
- o << "]";
- return o;
+ return o << "[" << implode(a, ", ") << "]";
}
#define LOG_PREFIX { \
cerr << _x_ << endl; \
}
+extern int DEBUG_LEVEL;
+
+#define IF_LEVEL(level) if(DEBUG_LEVEL >= abs(level))
+
#endif
decltype(declval<A&>().cbegin(), declval<A&>().cend(), void())
> : true_type {};
+template <class C>
+inline typename enable_if<is_iterable<C>::value, string>::type
+implode(const C & v, string delim=" ") {
+ if (v.begin() == v.end())
+ return string();
+ ostringstream oss;
+ auto last = prev(v.end());
+ copy(v.begin(), last, ostream_iterator<typename C::value_type>(oss, delim.c_str()));
+ oss << *last;
+ return oss.str();
+}
+
+inline vector<string> explode(const string &s, string delim=" ") {
+ vector<string> out;
+ size_t start = 0, end = 0;
+ while ((end = s.find(delim, start)) != string::npos) {
+ out.push_back(s.substr(start, end - start));
+ start = end + 1;
+ }
+ out.push_back(s.substr(start));
+ return out;
+}
+
#include "lang/verify.h"
#include "threaded_log.h"
+#define MEMBERS(...) \
+inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \
+inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); }
+
+#define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \
+inline bool operator _op_(const _c_ &b) const { return _tuple_() _op_ b._tuple_(); }
+
+#define LEXICOGRAPHIC_COMPARISON(_c_) \
+LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \
+LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \
+LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
+
#endif