lock_tester
lock_demo
lock_server
-*.swp
-*.swo
+*.sw*
*.a
*.log
rsm_tester
ar cq $@ $^
ranlib rpc/librpc.a
-rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
+rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a t4.o
-lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
+lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o
-lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
+lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o
-lock_server : lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a
+lock_server : lock_smain.o threaded_log.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o
-rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a
+rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a t4.o
%.o: %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@
#include "config.h"
-#include "handle.h"
using std::vector;
LOG << "is " << mem << " still a member?";
if (!isamember(mem, newmem) && me != mem) {
LOG << "delete " << mem;
- handle(mem).invalidate();
+ rpcc::unbind_cached(mem);
}
}
VERIFY(cfg_mutex_lock);
unsigned vid = my_view_id;
LOG << "heartbeat to " << m << " (" << vid << ")";
- handle h(m);
cfg_mutex_lock.unlock();
int r = 0, ret = rpc_protocol::bind_failure;
- if (rpcc *cl = h.safebind())
+ if (auto cl = rpcc::bind_cached(m))
ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
cfg_mutex_lock.lock();
break;
case rpc_protocol::atmostonce_failure:
case rpc_protocol::oldsrv_failure:
- h.invalidate();
+ rpcc::unbind_cached(m);
break;
default:
LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
+++ /dev/null
-#include "handle.h"
-
-class hinfo {
-public:
- unique_ptr<rpcc> client;
- bool valid = true;
- string destination;
- std::mutex client_mutex;
- hinfo(const string & destination_) : destination(destination_) {}
-};
-
-static std::mutex mgr_mutex;
-static std::map<string, shared_ptr<hinfo>> hmap;
-
-void handle::shutdown() {
- lock ml(mgr_mutex);
- LOG_NONMEMBER << "Shutting down handle manager";
- for (auto p : hmap) {
- p.second->valid = false;
- LOG_NONMEMBER << "cl " << p.first << " refcnt " << p.second.use_count();
- }
- hmap.clear();
-}
-
-handle::handle(const string & destination) : destination_(destination) {
- lock ml(mgr_mutex);
- h = hmap[destination];
- if (!h || !h->valid)
- h = (hmap[destination] = std::make_shared<hinfo>(destination));
-}
-
-rpcc * handle::safebind() {
- if (!h)
- return nullptr;
- lock cl(h->client_mutex);
- if (!h->valid)
- return nullptr;
- if (!h->client) {
- unique_ptr<rpcc> client(new rpcc(h->destination));
- LOG << "bind(\"" << h->destination << "\")";
- int ret = client->bind(milliseconds(1000));
- if (ret < 0) {
- LOG << "bind failure! " << h->destination << " " << ret;
- h->valid = false;
- } else {
- LOG << "bind succeeded " << h->destination;
- h->client = std::move(client);
- }
- }
- return h->client.get();
-}
-
-void handle::invalidate() {
- h.reset();
- lock ml(mgr_mutex);
- if (hmap.find(destination_) != hmap.end()) {
- hmap[destination_]->valid = false;
- LOG << "cl " << destination_ << " refcnt " << hmap[destination_].use_count();
- hmap.erase(destination_);
- }
-}
+++ /dev/null
-#ifndef handle_h
-#define handle_h
-
-#include "types.h"
-#include "rpc/rpc.h"
-
-// Manage a cache of RPC connections. Typical usage:
-// handle h(dst);
-// rpc_protocol::status ret = rpc_protocol::bind_failure;
-// if (rpcc *cl = h.safebind())
-// ret = cl->call(...);
-// assuming dst is a string holding the host:port of the RPC server you want to
-// talk to.
-//
-// If the calling program has not contacted dst before, safebind() will create
-// a new connection, call bind(), and return an rpcc*, or 0 if bind() failed.
-// if the program has previously contacted dst, safebind() just returns the
-// previously created rpcc*. Because safebind() may block, callers should
-// probably not hold mutexes.
-
-class handle {
- private:
- shared_ptr<class hinfo> h;
- const string destination_;
- public:
- handle(const string & destination);
- rpcc *safebind();
- void invalidate();
- static void shutdown();
-};
-
-#endif
#include "lock_client.h"
-char log_thread_prefix = 'd';
-
int main(int argc, char *argv[]) {
+ global = new t4_state('d');
if(argc != 2) {
LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port";
return 1;
#include "lock_server.h"
#include <unistd.h>
#include <arpa/inet.h>
-#include "handle.h"
lock_state::lock_state():
held(false)
continue;
lock_state & st = get_lock_state(lid);
- holder_t held_by;
- {
- lock sl(st.m);
- held_by = st.held_by;
- }
+ lock sl(st.m);
+ holder_t held_by = st.held_by;
+ sl.unlock();
- rpcc *proxy = NULL;
- // try a few times?
- //int t=5;
- //while (t-- && !proxy)
- proxy = handle(held_by.first).safebind();
- if (proxy) {
+ if (auto cl = rpcc::bind_cached(held_by.first)) {
int r;
- auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+ auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
LOG << "Revoke returned " << ret;
}
}
front = st.wanted_by.front();
}
- rpcc *proxy = NULL;
- // try a few times?
- //int t=5;
- //while (t-- && !proxy)
- proxy = handle(front.first).safebind();
- if (proxy) {
+ if (auto cl = rpcc::bind_cached(front.first)) {
int r;
- auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
+ auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
LOG << "Retry returned " << ret;
}
}
// Main loop of lock_server
-char log_thread_prefix = 's';
-
int main(int argc, char *argv[]) {
+ global = new t4_state('s');
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
#include "lock_client.h"
#include <arpa/inet.h>
#include <unistd.h>
-#include "handle.h"
-
-char log_thread_prefix = 'c';
// must be >= 2
const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
int
main(int argc, char *argv[])
{
+ global = new t4_state('c');
thread th[nt];
int test = 0;
for (int i = 0; i < nt; i++)
delete lc[i];
-
- handle::shutdown();
- poll_mgr::shared_mgr.shutdown();
-
- LOG_NONMEMBER << argv[0] << ": clean-up complete";
}
#include "paxos.h"
-#include "handle.h"
using namespace std::placeholders;
prepareres res;
prop_t highest_n_a{0, ""};
for (auto i : nodes) {
- handle h(i);
- rpcc *r = h.safebind();
+ auto r = rpcc::bind_cached(i);
if (!r)
continue;
auto status = (paxos_protocol::status)r->call_timeout(
void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, const value_t & v) {
for (auto i : nodes) {
- handle h(i);
- rpcc *r = h.safebind();
+ auto r = rpcc::bind_cached(i);
if (!r)
continue;
bool accept = false;
void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
for (auto i : accepts) {
- handle h(i);
- rpcc *r = h.safebind();
+ auto r = rpcc::bind_cached(i);
if (!r)
continue;
int res = 0;
signal(SIGPIPE, SIG_IGN);
- poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
+ global->shared_mgr.add_callback(fd, CB_RDONLY, this);
}
connection::~connection() {
}
// after block_remove_fd, select will never wait on fd and no callbacks
// will be active
- poll_mgr::shared_mgr.block_remove_fd(fd);
+ global->shared_mgr.block_remove_fd(fd);
VERIFY(dead_);
VERIFY(wpdu_.status == unused);
}
if (!writepdu()) {
dead_ = true;
ml.unlock();
- poll_mgr::shared_mgr.block_remove_fd(fd);
+ global->shared_mgr.block_remove_fd(fd);
ml.lock();
} else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
// should be rare to need to explicitly add write callback
- poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
+ global->shared_mgr.add_callback(fd, CB_WRONLY, this);
while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
send_complete_.wait(ml);
}
VERIFY(!dead_);
VERIFY(fd == s);
if (wpdu_.status != inflight) {
- poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
+ global->shared_mgr.del_callback(fd, CB_WRONLY);
return;
}
if (!writepdu()) {
- poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
+ global->shared_mgr.del_callback(fd, CB_RDWR);
dead_ = true;
} else {
VERIFY(wpdu_.status != error);
if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
if (!readpdu()) {
IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
- poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
+ global->shared_mgr.del_callback(fd, CB_RDWR);
dead_ = true;
send_complete_.notify_one();
}
IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
- poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
+ global->shared_mgr.add_callback(tcp_, CB_RDONLY, this);
}
connection_listener::~connection_listener() {
- poll_mgr::shared_mgr.block_remove_fd(tcp_);
+ global->shared_mgr.block_remove_fd(tcp_);
}
void connection_listener::read_cb(int) {
#include "types.h"
#include <arpa/inet.h>
#include <netinet/in.h>
+#include "t4.h"
#include "poll_mgr.h"
#include "file.h"
+#include "threaded_log.h"
class connection;
#include <errno.h>
#include <sys/select.h>
#include "file.h"
+#include "threaded_log.h"
#ifdef __linux__
#include <sys/epoll.h>
aio_callback::~aio_callback() {}
-poll_mgr poll_mgr::shared_mgr;
-
class wait_manager {
public:
virtual void watch_fd(int fd, poll_flag flag) = 0;
poll_mgr();
~poll_mgr();
- static poll_mgr shared_mgr;
-
void add_callback(int fd, poll_flag flag, aio_callback *ch);
void del_callback(int fd, poll_flag flag);
void block_remove_fd(int fd);
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
- cancel();
+ lock ml(m_);
+ cancel(ml);
IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1);
chan_.reset();
VERIFY(calls_.size() == 0);
return ret;
}
+shared_ptr<rpcc> rpcc::bind_cached(const string & destination) {
+ auto client = global->get_handle(destination);
+ lock cl = lock(client->bind_m_);
+ if (!client->bind_done_) {
+ LOG_NONMEMBER << "bind(\"" << destination << "\")";
+ int ret = client->bind(milliseconds(1000));
+ if (ret < 0) {
+ LOG_NONMEMBER << "bind failure! " << destination << " " << ret;
+ client.reset();
+ } else {
+ LOG_NONMEMBER << "bind succeeded " << destination;
+ }
+ }
+ return client;
+}
+
+void rpcc::unbind_cached(const string & destination) {
+ global->erase_handle(destination);
+}
+
// Cancel all outstanding calls
-void rpcc::cancel(void) {
- lock ml(m_);
+void rpcc::cancel(lock & m_lock) {
+ VERIFY(m_lock);
if (calls_.size()) {
LOG << "force callers to fail";
for (auto & p : calls_) {
destroy_wait_ = true;
while (calls_.size () > 0)
- destroy_wait_c_.wait(ml);
+ destroy_wait_c_.wait(m_lock);
LOG << "done";
}
#include "marshall.h"
#include "marshall_wrap.h"
#include "connection.h"
+#include "threaded_log.h"
using std::chrono::milliseconds;
std::mutex m_; // protect insert/delete to calls[]
std::mutex chan_m_;
+ std::mutex bind_m_; // protect bind operations
bool destroy_wait_ = false;
cond destroy_wait_c_;
int bind(milliseconds to = rpc::to_max);
+ // Manages a cache of RPC connections. Usage:
+ // if (auto cl = rpcc::bind_cached(dst))
+ // ret = cl->call(...);
+ // where the string dst has the form "host:port". Because bind_cached()
+ // may block, callers should probably not hold mutexes.
+ static shared_ptr<rpcc> bind_cached(const string & destination);
+ static void unbind_cached(const string & destination);
+
void set_reachable(bool r) { reachable_ = r; }
- void cancel();
+ void cancel(lock & m_lock);
template<class P, class R, typename ...Args>
inline int call(proc_t<P> proc, R & r, const Args & ... args) {
#include <getopt.h>
#include <unistd.h>
#include <string.h>
+#include "threaded_log.h"
#define NUM_CL 2
-char log_thread_prefix = 'r';
-
static rpcs *server; // server rpc object
static rpcc *clients[NUM_CL]; // client rpc object
static string dst; //server's ip address
}
int main(int argc, char *argv[]) {
+ global = new t4_state('r');
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
// upcalls, but can keep its locks when calling down.
#include "rsm.h"
-#include "handle.h"
#include "rsm_client.h"
#include <unistd.h>
bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
{
rsm_protocol::transferres r;
- handle h(m);
int ret = 0;
LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
- rpcc *cl;
+ shared_ptr<rpcc> cl;
{
rsm_mutex_lock.unlock();
- cl = h.safebind();
+ cl = rpcc::bind_cached(m);
if (cl) {
ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
r, cfg->myaddr(), last_myvs, vid_insync);
bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
rsm_mutex_lock.unlock();
- handle h(m);
- rpcc *cl = h.safebind();
bool done = false;
- if (cl) {
+ if (auto cl = rpcc::bind_cached(m)) {
int r;
auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
done = (ret == rsm_protocol::OK);
bool rsm::join(const string & m, lock & rsm_mutex_lock) {
- handle h(m);
int ret = 0;
string log;
LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
- rpcc *cl;
- {
- rsm_mutex_lock.unlock();
- cl = h.safebind();
- if (cl != 0) {
- ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
- cfg->myaddr(), last_myvs);
- }
- rsm_mutex_lock.lock();
- }
+
+ rsm_mutex_lock.unlock();
+ auto cl = rpcc::bind_cached(m);
+ if (cl)
+ ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+ rsm_mutex_lock.lock();
if (cl == 0 || ret != rsm_protocol::OK) {
LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != myaddr) {
// if invoke on slave fails, return rsm_client_protocol::BUSY
- handle h(m[i]);
LOG << "Sending invoke to " << m[i];
- rpcc *cl = h.safebind();
+ auto cl = rpcc::bind_cached(m[i]);
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
- handle h(m[i]);
LOG << "member " << m[i] << " " << heal;
- if (h.safebind()) h.safebind()->set_reachable(heal);
+ if (auto cl = rpcc::bind_cached(m[i]))
+ cl->set_reachable(heal);
}
}
rsmrpc->set_reachable(heal);
#include "rsm_client.h"
#include <arpa/inet.h>
-#include <handle.h>
#include <unistd.h>
rsm_client::rsm_client(string dst) : primary(dst) {
lock ml(rsm_client_mutex);
while (1) {
LOG << "proc " << std::hex << proc << " primary " << primary;
- handle h(primary);
+ string prim = primary;
ml.unlock();
- rpcc *cl = h.safebind();
+ auto cl = rpcc::bind_cached(prim);
auto ret = rsm_client_protocol::OK;
if (cl)
ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
if (!cl)
goto prim_fail;
- LOG << "proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret;
+ LOG << "proc " << std::hex << proc << " primary " << prim << " ret " << std::dec << ret;
if (ret == rsm_client_protocol::OK)
return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
- LOG << "rsm is busy " << primary;
+ LOG << "rsm is busy " << prim;
usleep(300000);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
- LOG << "primary " << primary << " isn't the primary--let's get a complete list of mems";
+ LOG << "primary " << prim << " isn't the primary--let's get a complete list of mems";
if (init_members(ml))
continue;
}
prim_fail:
- LOG << "primary " << primary << " failed ret " << std::dec << ret;
+ LOG << "primary " << prim << " failed ret " << std::dec << ret;
primary_failure(ml);
- LOG << "retry new primary " << primary;
+ LOG << "retry new primary " << prim;
}
}
bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
LOG << "get members!";
- handle h(primary);
+ string prim = primary;
int ret = rsm_client_protocol::ERR;
- rpcc *cl;
+ shared_ptr<rpcc> cl;
{
rsm_client_mutex_lock.unlock();
- cl = h.safebind();
+ cl = rpcc::bind_cached(prim);
if (cl)
ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
rsm_client_mutex_lock.lock();
#include "rsm_protocol.h"
#include "rsmtest_client.h"
-char log_thread_prefix = 't';
-
int main(int argc, char *argv[]) {
+ global = new t4_state('t');
if(argc != 4){
LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [partition] arg";
return 1;
--- /dev/null
+#include "t4.h"
+#include <unistd.h>
+#include "rpc/rpc.h"
+
+using namespace std::chrono;
+
+t4_state *global;
+
+t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) {
+ uint32_t seed = std::random_device()();
+ auto time = system_clock::now().time_since_epoch();
+ auto ticks = duration_cast<nanoseconds>(time).count();
+ seed ^= (uint32_t)ticks;
+ auto pid = getpid();
+ seed ^= (uint32_t)pid;
+ auto tid = std::hash<std::thread::id>()(std::this_thread::get_id());
+ seed ^= (uint32_t)tid;
+ random_generator.seed(seed);
+ // make sure the clock will read differently next time!
+ std::this_thread::sleep_for(microseconds(1));
+}
+
+t4_state::~t4_state() {
+ lock ml(handle_cache_mutex);
+ handle_cache.clear();
+ shared_mgr.shutdown();
+}
+
+shared_ptr<rpcc> t4_state::get_handle(const string & destination) {
+ lock ml(handle_cache_mutex);
+ if (!handle_cache[destination])
+ handle_cache[destination] = std::make_shared<rpcc>(destination);
+ return handle_cache[destination];
+}
+
+void t4_state::erase_handle(const string & destination) {
+ lock ml(handle_cache_mutex);
+ handle_cache.erase(destination);
+}
--- /dev/null
+#ifndef t4_h
+#define t4_h
+
+#include "types.h"
+#include "rpc/poll_mgr.h"
+
+struct t4_state {
+ std::mutex log_mutex;
+ std::map<thread::id, int> thread_name_map;
+ std::map<const void *, int> instance_name_map;
+
+ poll_mgr shared_mgr;
+
+ int next_thread_num = 0;
+ int next_instance_num = 0;
+ int DEBUG_LEVEL = 0;
+ char log_thread_prefix;
+
+ std::mutex handle_cache_mutex;
+ std::map<string, shared_ptr<class rpcc>> handle_cache;
+
+ shared_ptr<class rpcc> get_handle(const string & destination);
+ void erase_handle(const string & destination);
+
+ std::mt19937_64 random_generator;
+
+ t4_state(char log_prefix = ' ');
+ ~t4_state();
+};
+
+extern t4_state *global;
+
+#endif
#include "threaded_log.h"
+#include "t4.h"
static std::mutex log_mutex;
static std::map<thread::id, int> thread_name_map;
auto utime = duration_cast<microseconds>(
system_clock::now().time_since_epoch()).count() % 1000000000;
f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " ";
- f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid;
+ f << std::setfill(' ') << global->log_thread_prefix << std::left << std::setw(2) << tid;
f << " " << std::setw(20) << file << " " << std::setw(18) << func;
return std::move(f);
}
#include <map>
#include <memory>
#include <mutex>
+#include <random>
#include <stdexcept>
#include <sstream>
#include <string>
}
#include "verify.h"
-#include "threaded_log.h"
// struct tuple adapter, useful for marshalling and endian swapping. usage:
//