-CXXFLAGS = -g -MMD -Werror -I. -std=c++11
-LDFLAGS =
-CXX = g++
-CC = g++
+CXXFLAGS ?= -g -MMD -Werror -I. -std=c++11
+LDFLAGS ?=
+CXX ?= g++
+CC ?= g++
+EXTRA_TARGETS ?=
-all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest
+all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o
rm -f $@
-include rpc/*.d
clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester
-.PHONY: clean
+.PHONY: clean $(EXTRA_TARGETS)
clean:
rm -rf $(clean_files)
--- /dev/null
+#PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations
+#PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors
+CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY)
+LDFLAGS = -stdlib=libc++
+CXX = clang++
+CC = clang++
+EXTRA_TARGETS = signatures
+
+socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw
+signatures : lock_server lock_tester rpc/rpctest
+ echo $^ | sudo xargs -n 1 $(socketfilterfw) -s || true
+ echo $^ | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true
rpcc *cl = new rpcc(dstsock);
tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
int ret;
- // Starting with lab 6, our test script assumes that the failure
- // can be detected by paxos and rsm layer within few seconds. We have
- // to set the timeout with a small value to support the assumption.
+ // The test script assumes that the failure can be detected by paxos and
+ // rsm layer within few seconds. We have to set the timeout with a small
+ // value to support the assumption.
//
- // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
- // lab 6 and lab 7 because the rpc layer may delay your RPC request,
- // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
+ // With RPC_LOSSY=5, tests may fail due to delays and time outs.
ret = cl->bind(rpcc::to(1000));
if (ret < 0) {
tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
using std::mutex;
using lock = std::unique_lock<std::mutex>;
+using cond = std::condition_variable;
class adopt_lock : public lock {
public:
{
}
+lock_state::lock_state(const lock_state &other) {
+ *this = other;
+}
+
lock_state& lock_state::operator=(const lock_state& o) {
held = o.held;
held_by = o.held_by;
return o;
}
-template <class A>
-marshall & operator<<(marshall &m, const list<A> &d) {
- m << vector<A>(d.begin(), d.end());
- return m;
-}
-
-template <class A>
-unmarshall & operator>>(unmarshall &u, list<A> &d) {
- vector<A> v;
- u >> v;
- d.assign(v.begin(), v.end());
- return u;
-}
-
-
-template <class A, class B>
-marshall & operator<<(marshall &m, const pair<A,B> &d) {
- m << d.first;
- m << d.second;
- return m;
-}
-
-template <class A, class B>
-unmarshall & operator>>(unmarshall &u, pair<A,B> &d) {
- u >> d.first;
- u >> d.second;
- return u;
-}
-
marshall & operator<<(marshall &m, const lock_state &d) {
- m << d.held;
- m << d.held_by;
- m << d.wanted_by;
- return m;
+ return m << d.held << d.held_by << d.wanted_by;
}
unmarshall & operator>>(unmarshall &u, lock_state &d) {
- u >> d.held;
- u >> d.held_by;
- u >> d.wanted_by;
- return u;
+ return u >> d.held >> d.held_by >> d.wanted_by;
}
-
lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
// by the semantics of map, this will create
class lock_state {
public:
lock_state();
+ lock_state(const lock_state &other);
bool held;
holder held_by;
list<holder> wanted_by;
int
main(int argc, char *argv[])
{
- int count = 0;
-
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
exit(1);
}
- char *count_env = getenv("RPC_COUNT");
- if(count_env != NULL){
- count = atoi(count_env);
- }
-
rsm rsm(argv[1], argv[2]);
lock_server_cache_rsm ls(&rsm);
rsm.set_state_transfer((rsm_state_transfer *)&ls);
signal(SIGPIPE, SIG_IGN);
- VERIFY(gettimeofday(&create_time_, NULL) == 0);
+ create_time_ = std::chrono::steady_clock::now();
PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
}
int
connection::compare(connection *another)
{
- if (create_time_.tv_sec > another->create_time_.tv_sec)
+ if (create_time_ > another->create_time_)
return 1;
- if (create_time_.tv_sec < another->create_time_.tv_sec)
- return -1;
- if (create_time_.tv_usec > another->create_time_.tv_usec)
- return 1;
- if (create_time_.tv_usec < another->create_time_.tv_usec)
+ if (create_time_ < another->create_time_)
return -1;
return 0;
}
ml.unlock();
PollMgr::Instance()->block_remove_fd(fd_);
ml.lock();
- }else{
+ } else {
if (wpdu_.solong == wpdu_.sz) {
- }else{
+ } else {
//should be rare to need to explicitly add write callback
PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
sin.sin_port = htons(port);
tcp_ = socket(AF_INET, SOCK_STREAM, 0);
- if(tcp_ < 0){
+ if (tcp_ < 0) {
perror("tcpsconn::tcpsconn accept_loop socket:");
VERIFY(0);
}
setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
- if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){
+ if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
perror("accept_loop tcp bind:");
VERIFY(0);
}
- if(listen(tcp_, 1000) < 0) {
+ if (listen(tcp_, 1000) < 0) {
perror("tcpsconn::tcpsconn listen:");
VERIFY(0);
}
s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
connection *ch = new connection(mgr_, s1, lossy_);
- // garbage collect all dead connections with refcount of 1
- std::map<int, connection *>::iterator i;
- for (i = conns_.begin(); i != conns_.end();) {
- if (i->second->isdead() && i->second->ref() == 1) {
- jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
- i->second->channo());
- i->second->decref();
- // Careful not to reuse i right after erase. (i++) will
- // be evaluated before the erase call because in C++,
- // there is a sequence point before a function call.
- // See http://en.wikipedia.org/wiki/Sequence_point.
- conns_.erase(i++);
- } else
- ++i;
- }
+ // garbage collect all dead connections with refcount of 1
+ std::map<int, connection *>::iterator i;
+ for (i = conns_.begin(); i != conns_.end();) {
+ if (i->second->isdead() && i->second->ref() == 1) {
+ jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
+ i->second->channo());
+ i->second->decref();
+ // Careful not to reuse i right after erase. (i++) will
+ // be evaluated before the erase call because in C++,
+ // there is a sequence point before a function call.
+ // See http://en.wikipedia.org/wiki/Sequence_point.
+ conns_.erase(i++);
+ } else
+ ++i;
+ }
conns_[ch->channo()] = ch;
}
int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
try {
-
while (1) {
FD_ZERO(&rfds);
FD_SET(pipe_[0], &rfds);
}
catch (thread_exit_exception e)
{
- return;
}
}
connection *
connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
{
- int s= socket(AF_INET, SOCK_STREAM, 0);
+ int s = socket(AF_INET, SOCK_STREAM, 0);
int yes = 1;
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
- if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+ if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
close(s);
charbuf wpdu_;
charbuf rpdu_;
- struct timeval create_time_;
+ std::chrono::time_point<std::chrono::steady_clock> create_time_;
int waiters_;
int refno_;
#ifndef fifo_h
#define fifo_h
-// fifo template
-// blocks enq() and deq() when queue is FULL or EMPTY
-
-#include <errno.h>
#include <list>
-#include <sys/time.h>
-#include <time.h>
-#include <errno.h>
-#include "lang/verify.h"
#include "lock.h"
+// blocks enq() and deq() when queue is FULL or EMPTY
template<class T>
class fifo {
public:
- fifo(int m=0);
+ fifo(int limit=0) : max_(limit) {};
bool enq(T, bool blocking=true);
void deq(T *);
- bool size();
+ bool size() {
+ lock ml(m_);
+ return q_.size();
+ };
private:
std::list<T> q_;
mutex m_;
- std::condition_variable non_empty_c_; // q went non-empty
- std::condition_variable has_space_c_; // q is not longer overfull
- unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit
+ cond non_empty_c_; // q went non-empty
+ cond has_space_c_; // q is not longer overfull
+ unsigned int max_; // maximum capacity of the queue, block enq threads if exceeds this limit
};
-template<class T>
-fifo<T>::fifo(int limit) : max_(limit)
-{
-}
-
-template<class T> bool
-fifo<T>::size()
-{
- lock ml(m_);
- return q_.size();
-}
-
template<class T> bool
fifo<T>::enq(T e, bool blocking)
{
lock ml(m_);
- while (1) {
- if (!max_ || q_.size() < max_) {
- q_.push_back(e);
- break;
- }
- if (blocking) {
- has_space_c_.wait(ml);
- }
- else
+ while (max_ && q_.size() >= max_) {
+ if (!blocking)
return false;
+ has_space_c_.wait(ml);
}
+ q_.push_back(e);
non_empty_c_.notify_one();
return true;
}
fifo<T>::deq(T *e)
{
lock ml(m_);
-
- while(1) {
- if(q_.empty()){
- non_empty_c_.wait(ml);
- } else {
- *e = q_.front();
- q_.pop_front();
- if (max_ && q_.size() < max_) {
- has_space_c_.notify_one();
- }
- break;
- }
- }
- return;
+ while(q_.empty())
+ non_empty_c_.wait(ml);
+ *e = q_.front();
+ q_.pop_front();
+ if (max_ && q_.size() < max_)
+ has_space_c_.notify_one();
}
#endif
-#include "jsl_log.h"
-
int JSL_DEBUG_LEVEL = 0;
-void
-jsl_set_debug(int level) {
- JSL_DEBUG_LEVEL = level;
-}
-
-
#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);}
-void jsl_set_debug(int level);
-
#endif
int ret;
};
-typedef uint64_t rpc_checksum_t;
typedef int rpc_sz_t;
//size of initial buffer allocation
#define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ_NO_CHECKSUM (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
-#if RPC_CHECKSUMMING
-//size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
-#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM + sizeof(rpc_checksum_t))
-#else
-#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM)
-#endif
+#define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
class marshall {
private:
int saved_sz = _ind;
//leave the first 4-byte empty for channel to fill size of pdu
_ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
pack(h.xid);
pack(h.proc);
pack((int)h.clt_nonce);
int saved_sz = _ind;
//leave the first 4-byte empty for channel to fill size of pdu
_ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
pack(h.xid);
pack(h.ret);
_ind = saved_sz;
return;
}
};
+
marshall& operator<<(marshall &, bool);
marshall& operator<<(marshall &, unsigned int);
marshall& operator<<(marshall &, int);
marshall& operator<<(marshall &, unsigned long long);
marshall& operator<<(marshall &, const std::string &);
+template <class C> marshall &
+operator<<(marshall &m, std::vector<C> v)
+{
+ m << (unsigned int) v.size();
+ for(unsigned i = 0; i < v.size(); i++)
+ m << v[i];
+ return m;
+}
+
+template <class A, class B> marshall &
+operator<<(marshall &m, const std::map<A,B> &d) {
+ typename std::map<A,B>::const_iterator i;
+
+ m << (unsigned int) d.size();
+
+ for (i = d.begin(); i != d.end(); i++) {
+ m << i->first << i->second;
+ }
+ return m;
+}
+
+template <class A> marshall &
+operator<<(marshall &m, const std::list<A> &d) {
+ m << std::vector<A>(d.begin(), d.end());
+ return m;
+}
+
+template <class A, class B> marshall &
+operator<<(marshall &m, const std::pair<A,B> &d) {
+ m << d.first;
+ m << d.second;
+ return m;
+}
+
class unmarshall {
private:
char *_buf;
void unpack_req_header(req_header *h) {
//the first 4-byte is for channel to fill size of pdu
_ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
unpack(&h->xid);
unpack(&h->proc);
unpack((int *)&h->clt_nonce);
void unpack_reply_header(reply_header *h) {
//the first 4-byte is for channel to fill size of pdu
_ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
unpack(&h->xid);
unpack(&h->ret);
_ind = RPC_HEADER_SZ;
unmarshall& operator>>(unmarshall &, unsigned long long &);
unmarshall& operator>>(unmarshall &, std::string &);
-template <class C> marshall &
-operator<<(marshall &m, std::vector<C> v)
-{
- m << (unsigned int) v.size();
- for(unsigned i = 0; i < v.size(); i++)
- m << v[i];
- return m;
-}
-
template <class C> unmarshall &
operator>>(unmarshall &u, std::vector<C> &v)
{
unsigned n;
u >> n;
- for(unsigned i = 0; i < n; i++){
- C z;
- u >> z;
- v.push_back(z);
- }
+ v.clear();
+ while (n--) {
+ C c;
+ u >> c;
+ v.push_back(c);
+ }
return u;
}
-template <class A, class B> marshall &
-operator<<(marshall &m, const std::map<A,B> &d) {
- typename std::map<A,B>::const_iterator i;
-
- m << (unsigned int) d.size();
-
- for (i = d.begin(); i != d.end(); i++) {
- m << i->first << i->second;
- }
- return m;
-}
-
template <class A, class B> unmarshall &
operator>>(unmarshall &u, std::map<A,B> &d) {
- unsigned int n;
+ unsigned n;
u >> n;
-
d.clear();
-
- for (unsigned int lcv = 0; lcv < n; lcv++) {
- A a;
- B b;
- u >> a >> b;
- d[a] = b;
- }
+ while (n--) {
+ A a;
+ B b;
+ u >> a >> b;
+ d[a] = b;
+ }
return u;
}
+template <class C> unmarshall &
+operator>>(unmarshall &u, std::list<C> &l) {
+ unsigned n;
+ u >> n;
+ l.clear();
+ while (n--) {
+ C c;
+ u >> c;
+ l.push_back(c);
+ }
+ return u;
+}
+
+template <class A, class B> unmarshall &
+operator>>(unmarshall &u, std::pair<A,B> &d) {
+ return u >> d.first >> d.second;
+}
+
#endif
djob_t *j = new djob_t(c, b, sz);
c->incref();
- bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
+ bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
if(!succ || !reachable_){
c->decref();
delete j;
VERIFY(rep.size() == 70000);
printf(" -- small request, big reply .. ok\n");
-#if 0
- // too few arguments
- intret = c->call(22, (std::string)"just one", rep);
- VERIFY(intret < 0);
- printf(" -- too few arguments .. failed ok\n");
-
- // too many arguments; proc #23 expects just one.
- intret = c->call(23, 1001, 1002, rep);
- VERIFY(intret < 0);
- printf(" -- too many arguments .. failed ok\n");
-
- // wrong return value size
- int wrongrep;
- intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
- VERIFY(intret < 0);
- printf(" -- wrong ret value size .. failed ok\n");
-#endif
-
// specify a timeout value to an RPC that should succeed (udp)
int xx = 0;
intret = c->call(23, 77, xx, rpcc::to(3000));
}
if (debug_level > 0) {
- //__loginit.initNow();
- jsl_set_debug(debug_level);
+ JSL_DEBUG_LEVEL = debug_level;
jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
}
#include <errno.h>
#include "lang/verify.h"
-static void
-do_worker(void *arg)
-{
- ThrPool *tp = (ThrPool *)arg;
- while (1) {
- ThrPool::job_t j;
- if (!tp->takeJob(&j))
- break; //die
-
- (void)(j.f)(j.a);
- }
-}
-
-//if blocking, then addJob() blocks when queue is full
-//otherwise, addJob() simply returns false when queue is full
+// if blocking, then addJob() blocks when queue is full
+// otherwise, addJob() simply returns false when queue is full
ThrPool::ThrPool(int sz, bool blocking)
: nthreads_(sz),blockadd_(blocking),jobq_(100*sz)
{
- for (int i = 0; i < sz; i++) {
- th_.push_back(std::thread(do_worker, this));
- }
+ for (int i=0; i<nthreads_; i++)
+ th_.push_back(std::thread(&ThrPool::do_worker, this));
}
-//IMPORTANT: this function can be called only when no external thread
-//will ever use this thread pool again or is currently blocking on it
+// IMPORTANT: this function can be called only when no external thread
+// will ever use this thread pool again or is currently blocking on it
ThrPool::~ThrPool()
{
- for (int i = 0; i < nthreads_; i++) {
- job_t j;
- j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit
- jobq_.enq(j);
- }
+ for (int i=0; i<nthreads_; i++)
+ jobq_.enq(job_t());
- for (int i = 0; i < nthreads_; i++) {
+ for (int i=0; i<nthreads_; i++)
th_[i].join();
- }
}
bool
-ThrPool::addJob(void (*f)(void *), void *a)
+ThrPool::addJob(const job_t &j)
{
- job_t j;
- j.f = f;
- j.a = a;
-
return jobq_.enq(j,blockadd_);
}
-bool
-ThrPool::takeJob(job_t *j)
+void
+ThrPool::do_worker()
{
- jobq_.deq(j);
- return (j->f!=NULL);
+ job_t j;
+ while (1) {
+ jobq_.deq(&j);
+ if (!j)
+ break;
+ j();
+ }
}
-
#include "fifo.h"
+typedef std::function<void()> job_t;
+
class ThrPool {
public:
- struct job_t {
- void (*f)(void *); //function point
- void *a; //function arguments
- };
-
ThrPool(int sz, bool blocking=true);
~ThrPool();
- template<class C, class A> bool addObjJob(C *o, void (C::*m)(A), A a);
- void waitDone();
- bool takeJob(job_t *j);
+ bool addJob(const job_t &j);
private:
int nthreads_;
bool blockadd_;
-
fifo<job_t> jobq_;
std::vector<std::thread> th_;
- bool addJob(void (*f)(void *), void *a);
+ void do_worker();
};
-template <class C, class A> bool
-ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
-{
-
- class objfunc_wrapper {
- public:
- C *o;
- void (C::*m)(A a);
- A a;
- static void func(void *vvv) {
- objfunc_wrapper *x = (objfunc_wrapper*)vvv;
- C *o = x->o;
- void (C::*m)(A ) = x->m;
- A a = x->a;
- (o->*m)(a);
- delete x;
- }
- };
-
- objfunc_wrapper *x = new objfunc_wrapper;
- x->o = o;
- x->m = m;
- x->a = a;
- return addJob(&objfunc_wrapper::func, (void *)x);
-}
-
-
#endif
-
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
- std::thread th;
-
last_myvs.vid = 0;
last_myvs.seqno = 0;
myvs = last_myvs;
{
lock ml(rsm_mutex);
- th = std::thread(&rsm::recovery, this);
+ std::thread(&rsm::recovery, this).detach();
}
}
{
rsm_protocol::transferres r;
handle h(m);
- int ret;
+ int ret = 0;
tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n",
m.c_str(), last_myvs.vid, last_myvs.seqno);
rpcc *cl;
bool rsm::join(std::string m) {
handle h(m);
- int ret;
+ int ret = 0;
rsm_protocol::joinres r;
tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid,
}
rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
- int ret;
+ int ret = 0;
lock ml(rsm_client_mutex);
while (1) {
printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
if (my $pid = fork) {
# parent
push( @logs, "$p-$aa.log" );
- if( $p =~ /config_server/ ) {
- push( @logs, paxos_log($a[1]) );
- }
if( $p =~ /lock_server/ ) {
push( @logs, paxos_log($a[1]) );
}
return spawn( "./lock_server", $master, $port );
}
-sub spawn_config {
- my $master = shift;
- my $port = shift;
- return spawn( "./config_server", $master, $port );
-}
-
sub check_views {
my $l = shift;
if ($command eq "ls") {
@pid = (@pid, spawn_ls($p[0],$p[$i]));
print "Start lock_server on $p[$i]\n";
- }elsif ($command eq "config_server"){
- @pid = (@pid, spawn_config($p[0],$p[$i]));
- print "Start config on $p[$i]\n";
}
sleep 1;