all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o
rm -f $@
ar cq $@ $^
ranlib rpc/librpc.a
#include <unistd.h>
#include "marshall.h"
-connection::connection(chanmgr *m1, int f1, int l1)
-: mgr_(m1), fd_(f1), lossy_(l1)
+connection::connection(connection_delegate *m1, socket_t && f1, int l1)
+: mgr_(m1), fd_(move(f1)), lossy_(l1)
{
fd_.flags() |= O_NONBLOCK;
create_time_ = steady_clock::now();
- PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
+ poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
}
connection::~connection() {
VERIFY(!wpdu_.buf.size());
}
+shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
+ socket_t s = socket(AF_INET, SOCK_STREAM, 0);
+ s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
+ if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+ IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+ close(s);
+ return nullptr;
+ }
+ IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+ return make_shared<connection>(mgr, move(s), lossy);
+}
+
void connection::closeconn() {
{
lock ml(m_);
}
//after block_remove_fd, select will never wait on fd_
//and no callbacks will be active
- PollMgr::Instance().block_remove_fd(fd_);
+ poll_mgr::shared_mgr.block_remove_fd(fd_);
}
bool connection::send(const string & b) {
if (!writepdu()) {
dead_ = true;
ml.unlock();
- PollMgr::Instance().block_remove_fd(fd_);
+ poll_mgr::shared_mgr.block_remove_fd(fd_);
ml.lock();
} else if (wpdu_.solong != wpdu_.buf.size()) {
// should be rare to need to explicitly add write callback
- PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
+ poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
send_complete_.wait(ml);
}
VERIFY(!dead_);
VERIFY(fd_ == s);
if (wpdu_.buf.size() == 0) {
- PollMgr::Instance().del_callback(fd_,CB_WRONLY);
+ poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
return;
}
if (!writepdu()) {
- PollMgr::Instance().del_callback(fd_, CB_RDWR);
+ poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
dead_ = true;
} else {
VERIFY(wpdu_.solong != size_t_max);
void connection::read_cb(int s) {
lock ml(m_);
VERIFY(fd_ == s);
- if (dead_) {
+ if (dead_)
return;
- }
IF_LEVEL(5) LOG("got data on fd " << s);
bool succ = true;
- if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
+ if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
succ = readpdu();
- }
if (!succ) {
IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
- PollMgr::Instance().del_callback(fd_,CB_RDWR);
+ poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
dead_ = true;
send_complete_.notify_one();
}
if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
- // chanmgr has successfully consumed the pdu
+ // connection_delegate has successfully consumed the pdu
rpdu_.buf.clear();
rpdu_.solong = 0;
}
return true;
}
-tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
+tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
{
- struct sockaddr_in sin;
+ sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = hton(port);
tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
- struct timeval timeout = {0, 50000};
-
- if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+ if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0)
perror("accept_loop setsockopt");
- if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+ if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0)
perror("accept_loop setsockopt");
// careful to exactly match type signature of bind arguments so we don't
// get std::bind instead
- if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+ if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
perror("accept_loop bind");
VERIFY(0);
}
if (FD_ISSET(pipe_[0], &rfds))
return;
- if (!FD_ISSET(tcp_, &rfds))
- VERIFY(0);
+ VERIFY(FD_ISSET(tcp_, &rfds));
try {
process_accept();
}
}
-shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
- int s = socket(AF_INET, SOCK_STREAM, 0);
- int yes = 1;
- setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
- if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
- IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
- close(s);
- return nullptr;
- }
- IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
- return make_shared<connection>(mgr, s, lossy);
-}
-
#include "types.h"
#include <arpa/inet.h>
#include <netinet/in.h>
-#include "pollmgr.h"
+#include "poll_mgr.h"
#include "file.h"
constexpr size_t size_t_max = numeric_limits<size_t>::max();
class connection;
-class chanmgr {
+class connection_delegate {
public:
virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
- virtual ~chanmgr() {}
+ virtual ~connection_delegate() {}
};
class connection : public aio_callback, public enable_shared_from_this<connection> {
size_t solong = 0; // number of bytes written or read so far
};
- connection(chanmgr *m1, int f1, int lossytest=0);
+ connection(connection_delegate *m1, socket_t && f1, int lossytest=0);
~connection();
int channo() { return fd_; }
time_point<steady_clock> create_time() const { return create_time_; }
+ static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
+
private:
bool readpdu();
bool writepdu();
- chanmgr *mgr_;
+ connection_delegate *mgr_;
const file_t fd_;
bool dead_ = false;
class tcpsconn {
public:
- tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
+ tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0);
~tcpsconn();
inline in_port_t port() { return port_; }
void accept_conn();
file_t pipe_[2];
socket_t tcp_; // listens for connections
- chanmgr *mgr_;
+ connection_delegate *mgr_;
int lossy_;
map<int, shared_ptr<connection>> conns_;
void process_accept();
};
-
-struct bundle {
- bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
- chanmgr *mgr;
- int tcp;
- int lossy;
-};
-
-shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
#endif
-#include "pollmgr.h"
+#include "poll_mgr.h"
#include <errno.h>
#include <sys/select.h>
#include "file.h"
#include <sys/epoll.h>
#endif
-static PollMgr instance;
-
-PollMgr & PollMgr::Instance() { return instance; }
+poll_mgr poll_mgr::shared_mgr;
class wait_manager {
public:
#endif
-PollMgr::PollMgr() : aio_(new SelectAIO()) {
- th_ = thread(&PollMgr::wait_loop, this);
+poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
+ th_ = thread(&poll_mgr::wait_loop, this);
}
-PollMgr::~PollMgr()
+poll_mgr::~poll_mgr()
{
lock ml(m_);
for (auto p : callbacks_)
pending_change_ = true;
shutdown_ = true;
changedone_c_.wait(ml);
- delete aio_;
+ aio_ = nullptr;
th_.join();
}
void
-PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
+poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
{
lock ml(m_);
aio_->watch_fd(fd, flag);
// Remove all callbacks related to fd. After this returns, we guarantee that
// callbacks related to fd will never be called again.
-void PollMgr::block_remove_fd(int fd) {
+void poll_mgr::block_remove_fd(int fd) {
lock ml(m_);
aio_->unwatch_fd(fd, CB_RDWR);
pending_change_ = true;
callbacks_[fd] = nullptr;
}
-void PollMgr::del_callback(int fd, poll_flag flag) {
+void poll_mgr::del_callback(int fd, poll_flag flag) {
lock ml(m_);
if (aio_->unwatch_fd(fd, flag))
callbacks_[fd] = nullptr;
}
-void PollMgr::wait_loop() {
+void poll_mgr::wait_loop() {
vector<int> readable;
vector<int> writable;
aio_callback * cb;
-#ifndef pollmgr_h
-#define pollmgr_h
+#ifndef poll_mgr_h
+#define poll_mgr_h
#include "types.h"
virtual ~aio_callback() {}
};
-class PollMgr {
+class poll_mgr {
public:
- PollMgr();
- ~PollMgr();
+ poll_mgr();
+ ~poll_mgr();
- static PollMgr & Instance();
+ static poll_mgr shared_mgr;
void add_callback(int fd, poll_flag flag, aio_callback *ch);
void del_callback(int fd, poll_flag flag);
cond changedone_c_;
map<int, aio_callback *> callbacks_;
- class wait_manager *aio_;
+ unique_ptr<class wait_manager> aio_;
bool pending_change_=false, shutdown_=false;
thread th_;
{
lock ml(chan_m_);
if (!chan_ || chan_->isdead())
- chan_ = connect_to_dst(dst_, this, lossytest_);
+ chan_ = connection::to_dst(dst_, this, lossytest_);
if (chan_)
ch = chan_;
IF_LEVEL(2) LOG("created with nonce " << nonce_);
reg(rpc_const::bind, &rpcs::rpcbind, this);
- dispatchpool_ = unique_ptr<ThrPool>(new ThrPool(6, false));
+ dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
}
void rpcs::start() {
// rpc client endpoint.
// manages a xid space per destination socket
// threaded: multiple threads can be sending RPCs,
-class rpcc : public chanmgr {
+class rpcc : public connection_delegate {
private:
//manages per rpc info
}
// rpc server endpoint.
-class rpcs : public chanmgr {
+class rpcs : public connection_delegate {
typedef enum {
NEW, // new RPC, not a duplicate
// internal handler registration
void reg1(proc_t proc, handler *);
- unique_ptr<ThrPool> dispatchpool_;
+ unique_ptr<thread_pool> dispatchpool_;
unique_ptr<tcpsconn> listener_;
public:
// if blocking, then addJob() blocks when queue is full
// otherwise, addJob() simply returns false when queue is full
-ThrPool::ThrPool(size_t sz, bool blocking)
+thread_pool::thread_pool(size_t sz, bool blocking)
: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
for (size_t i=0; i<nthreads_; i++)
- th_.emplace_back(&ThrPool::do_worker, this);
+ th_.emplace_back(&thread_pool::do_worker, this);
}
// IMPORTANT: this function can be called only when no external thread
// will ever use this thread pool again or is currently blocking on it
-ThrPool::~ThrPool() {
+thread_pool::~thread_pool() {
for (size_t i=0; i<nthreads_; i++)
jobq_.enq(job_t());
th_[i].join();
}
-bool ThrPool::addJob(const job_t &j) {
+bool thread_pool::addJob(const job_t &j) {
return jobq_.enq(j,blockadd_);
}
-void ThrPool::do_worker() {
+void thread_pool::do_worker() {
job_t j;
while (1) {
jobq_.deq(&j);
typedef function<void()> job_t;
-class ThrPool {
+class thread_pool {
public:
- ThrPool(size_t sz, bool blocking=true);
- ~ThrPool();
+ thread_pool(size_t sz, bool blocking=true);
+ ~thread_pool();
bool addJob(const job_t &j);
using std::make_shared;
using std::shared_ptr;
using std::unique_ptr;
+using std::weak_ptr;
#include <mutex>
using std::mutex;