bool remove(const string &, lock &cfg_mutex_lock);
void reconstruct(lock &cfg_mutex_lock);
typedef enum {
- OK, // response and same view #
- VIEWERR, // response but different view #
- FAILURE, // no response
+ OK, // response and same view #
+ VIEWERR, // response but different view #
+ FAILURE, // no response
} heartbeat_t;
heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock);
public:
// blocks enq() and deq() when queue is FULL or EMPTY
template<class T>
class fifo {
- public:
- fifo(size_t limit=0) : max_(limit) {}
- bool enq(T, bool blocking=true);
- void deq(T *);
- bool size() {
+ public:
+ fifo(size_t limit=0) : max_(limit) {}
+ bool enq(T, bool blocking=true);
+ void deq(T *);
+ bool size() {
lock ml(m_);
return q_.size();
}
- private:
- list<T> q_;
+ private:
+ list<T> q_;
mutex m_;
cond non_empty_c_; // q went non-empty
- cond has_space_c_; // q is not longer overfull
- size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
+ cond has_space_c_; // q is not longer overfull
+ size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
};
template<class T> bool
fifo<T>::enq(T e, bool blocking)
{
lock ml(m_);
- while (max_ && q_.size() >= max_) {
- if (!blocking)
- return false;
+ while (max_ && q_.size() >= max_) {
+ if (!blocking)
+ return false;
has_space_c_.wait(ml);
- }
+ }
q_.push_back(e);
non_empty_c_.notify_one();
- return true;
+ return true;
}
template<class T> void
fifo<T>::deq(T *e)
{
- lock ml(m_);
- while(q_.empty())
+ lock ml(m_);
+ while(q_.empty())
non_empty_c_.wait(ml);
*e = q_.front();
q_.pop_front();
bool ok_ = false;
public:
- unmarshall() {}
unmarshall(const string &s, bool has_header)
: buf_(s),index_(RPC_HEADER_SZ) {
if (!has_header)
#define MAX_POLL_FDS 128
typedef enum {
- CB_NONE = 0x0,
- CB_RDONLY = 0x1,
- CB_WRONLY = 0x10,
- CB_RDWR = 0x11,
- CB_MASK = ~0x11,
+ CB_NONE = 0x0,
+ CB_RDONLY = 0x1,
+ CB_WRONLY = 0x10,
+ CB_RDWR = 0x11,
+ CB_MASK = ~0x11,
} poll_flag;
class aio_mgr {
- public:
- virtual void watch_fd(int fd, poll_flag flag) = 0;
- virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
- virtual bool is_watched(int fd, poll_flag flag) = 0;
- virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
- virtual ~aio_mgr() {}
+ public:
+ virtual void watch_fd(int fd, poll_flag flag) = 0;
+ virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+ virtual bool is_watched(int fd, poll_flag flag) = 0;
+ virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
+ virtual ~aio_mgr() {}
};
class aio_callback {
- public:
- virtual void read_cb(int fd) = 0;
- virtual void write_cb(int fd) = 0;
- virtual ~aio_callback() {}
+ public:
+ virtual void read_cb(int fd) = 0;
+ virtual void write_cb(int fd) = 0;
+ virtual ~aio_callback() {}
};
class PollMgr {
- public:
- PollMgr();
- ~PollMgr();
+ public:
+ PollMgr();
+ ~PollMgr();
- static PollMgr *Instance();
- static PollMgr *CreateInst();
+ static PollMgr *Instance();
+ static PollMgr *CreateInst();
- void add_callback(int fd, poll_flag flag, aio_callback *ch);
- void del_callback(int fd, poll_flag flag);
- bool has_callback(int fd, poll_flag flag, aio_callback *ch);
- void block_remove_fd(int fd);
- void wait_loop();
+ void add_callback(int fd, poll_flag flag, aio_callback *ch);
+ void del_callback(int fd, poll_flag flag);
+ bool has_callback(int fd, poll_flag flag, aio_callback *ch);
+ void block_remove_fd(int fd);
+ void wait_loop();
- static PollMgr *instance;
- static int useful;
- static int useless;
+ static PollMgr *instance;
+ static int useful;
+ static int useless;
- private:
+ private:
mutex m_;
cond changedone_c_;
thread th_;
- aio_callback *callbacks_[MAX_POLL_FDS];
- aio_mgr *aio_;
- bool pending_change_;
+ aio_callback *callbacks_[MAX_POLL_FDS];
+ aio_mgr *aio_;
+ bool pending_change_;
};
class SelectAIO : public aio_mgr {
- public :
+ public :
- SelectAIO();
- ~SelectAIO();
- void watch_fd(int fd, poll_flag flag);
- bool unwatch_fd(int fd, poll_flag flag);
- bool is_watched(int fd, poll_flag flag);
- void wait_ready(vector<int> *readable, vector<int> *writable);
+ SelectAIO();
+ ~SelectAIO();
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ bool is_watched(int fd, poll_flag flag);
+ void wait_ready(vector<int> *readable, vector<int> *writable);
- private:
+ private:
- fd_set rfds_;
- fd_set wfds_;
- int highfds_;
- int pipefd_[2];
+ fd_set rfds_;
+ fd_set wfds_;
+ int highfds_;
+ int pipefd_[2];
mutex m_;
#ifdef __linux__
class EPollAIO : public aio_mgr {
- public:
- EPollAIO();
- ~EPollAIO();
- void watch_fd(int fd, poll_flag flag);
- bool unwatch_fd(int fd, poll_flag flag);
- bool is_watched(int fd, poll_flag flag);
- void wait_ready(vector<int> *readable, vector<int> *writable);
-
- private:
- int pollfd_;
- struct epoll_event ready_[MAX_POLL_FDS];
- int fdstatus_[MAX_POLL_FDS];
+ public:
+ EPollAIO();
+ ~EPollAIO();
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ bool is_watched(int fd, poll_flag flag);
+ void wait_ready(vector<int> *readable, vector<int> *writable);
+
+ private:
+ int pollfd_;
+ struct epoll_event ready_[MAX_POLL_FDS];
+ int fdstatus_[MAX_POLL_FDS];
};
#endif /* __linux */
// otherwise, addJob() simply returns false when queue is full
ThrPool::ThrPool(size_t sz, bool blocking)
: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
- for (size_t i=0; i<nthreads_; i++)
+ for (size_t i=0; i<nthreads_; i++)
th_.emplace_back(&ThrPool::do_worker, this);
}
// IMPORTANT: this function can be called only when no external thread
// will ever use this thread pool again or is currently blocking on it
ThrPool::~ThrPool() {
- for (size_t i=0; i<nthreads_; i++)
- jobq_.enq(job_t());
+ for (size_t i=0; i<nthreads_; i++)
+ jobq_.enq(job_t());
- for (size_t i=0; i<nthreads_; i++)
+ for (size_t i=0; i<nthreads_; i++)
th_[i].join();
}
bool ThrPool::addJob(const job_t &j) {
- return jobq_.enq(j,blockadd_);
+ return jobq_.enq(j,blockadd_);
}
void ThrPool::do_worker() {
job_t j;
- while (1) {
+ while (1) {
jobq_.deq(&j);
- if (!j)
- break;
- j();
- }
+ if (!j)
+ break;
+ j();
+ }
}
typedef function<void()> job_t;
class ThrPool {
- public:
- ThrPool(size_t sz, bool blocking=true);
- ~ThrPool();
+ public:
+ ThrPool(size_t sz, bool blocking=true);
+ ~ThrPool();
- bool addJob(const job_t &j);
+ bool addJob(const job_t &j);
- private:
+ private:
size_t nthreads_;
- bool blockadd_;
+ bool blockadd_;
- fifo<job_t> jobq_;
- vector<thread> th_;
+ fifo<job_t> jobq_;
+ vector<thread> th_;
void do_worker();
};
my $start = time();
while( (get_num_views( $log, $including ) < $num_views) and
($start + $timeout > time()) ) {
- my $lastv = `grep done $log | tail -n 1`;
- chomp $lastv;
+ my $lastv = `grep done $log | tail -n 1`;
+ chomp $lastv;
print " Waiting for $including to be present in >=$num_views views in $log (Last view: $lastv)\n";
sleep 1;
}
my $command = shift;
for (my $i = 0; $i < $n; $i++) {
- if ($command eq "ls") {
- @pid = (@pid, spawn_ls($p[0],$p[$i]));
- print "Start lock_server on $p[$i]\n";
- }
+ if ($command eq "ls") {
+ @pid = (@pid, spawn_ls($p[0],$p[$i]));
+ print "Start lock_server on $p[$i]\n";
+ }
sleep 1;
my @vv = @p[0..$i];
foreach my $port (@lastv) {
wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
}
- sleep 10;
+ sleep 10;
# now check the paxos logs and make sure the logs go through the right
# views
if ($do_run[8]) {
print "test8: start 3-process lock service\n";
- start_nodes(3,"ls");
+ start_nodes(3,"ls");
print "Start lock_tester $p[0]\n";
$t = spawn("./lock_tester", $p[0]);
if ($do_run[9]) {
print "test9: start 3-process rsm, kill second slave while lock_tester is running\n";
- start_nodes(3,"ls");
+ start_nodes(3,"ls");
print "Start lock_tester $p[0]\n";
$t = spawn("./lock_tester", $p[0]);
if ($do_run[10]) {
print "test10: start 3-process rsm, kill second slave and restarts it later while lock_tester is running\n";
- start_nodes(3,"ls");
+ start_nodes(3,"ls");
print "Start lock_tester $p[0]\n";
$t = spawn("./lock_tester", $p[0]);
if ($do_run[11]) {
print "test11: start 3-process rsm, kill primary while lock_tester is running\n";
- start_nodes(3,"ls");
+ start_nodes(3,"ls");
print "Start lock_tester $p[0]\n";
$t = spawn("./lock_tester", $p[0]);