ar cq $@ $^
ranlib rpc/librpc.a
-rpc/rpctest: rpc/rpctest.o tprintf.o rpc/librpc.a
+rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
-lock_demo=lock_demo.o lock_client.o tprintf.o rsm_client.o handle.o
+lock_demo=lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o
lock_demo : $(lock_demo) rpc/librpc.a
-lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o
+lock_tester=lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o
lock_tester : $(lock_tester) rpc/librpc.a
-lock_server=lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server.o
+lock_server=lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o
lock_server : $(lock_server) rpc/librpc.a
-rsm_tester=rsm_tester.o rsmtest_client.o tprintf.o
+rsm_tester=rsm_tester.o rsmtest_client.o threaded_log.o
rsm_tester: $(rsm_tester) rpc/librpc.a
%.o: %.cc
-#PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations
-#PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors
+PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes -Wmissing-declarations
+PEDANTRY += -Wno-weak-vtables -Wno-global-constructors -Wno-exit-time-destructors
CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY)
LDFLAGS = -stdlib=libc++
CXX = clang++
#include <thread>
#include <sstream>
-#include <iostream>
-#include <stdio.h>
#include "config.h"
#include "paxos.h"
#include "handle.h"
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lang/verify.h"
-using namespace std::chrono;
-using std::string;
-using std::vector;
-using std::thread;
-using std::ostringstream;
-using std::istringstream;
-
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
// the new node added or removed. The first view contains only node
// all views, the other nodes can bring this re-joined node up to
// date.
-config::config(
- const string &_first,
- const string &_me,
- config_view_change *_vc)
- : my_view_id(0), first(_first), me(_me), vc(_vc)
+config::config(const string &_first, const string &_me, config_view_change *_vc)
+ : my_view_id(0), first(_first), me(_me), vc(_vc),
+ paxos_acceptor(this, me == _first, me, me),
+ paxos_proposer(this, &paxos_acceptor, me)
{
- paxos_acceptor = new acceptor(this, me == _first, me, me);
- paxos_proposer = new proposer(this, paxos_acceptor, me);
-
- // XXX hack; maybe should have its own port number
- paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
-
- {
- lock ml(cfg_mutex);
- reconstruct(ml);
- thread(&config::heartbeater, this).detach();
- }
+ get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
+ lock cfg_mutex_lock(cfg_mutex);
+ reconstruct(cfg_mutex_lock);
+ thread(&config::heartbeater, this).detach();
}
-void
-config::restore(const string &s)
-{
- lock ml(cfg_mutex);
- paxos_acceptor->restore(s);
- reconstruct(ml);
+void config::restore(const string &s) {
+ lock cfg_mutex_lock(cfg_mutex);
+ paxos_acceptor.restore(s);
+ reconstruct(cfg_mutex_lock);
}
-void
-config::get_view(unsigned instance, vector<string> &m)
-{
- lock ml(cfg_mutex);
- get_view(instance, m, ml);
+void config::get_view(unsigned instance, vector<string> &m) {
+ lock cfg_mutex_lock(cfg_mutex);
+ get_view(instance, m, cfg_mutex_lock);
}
-// caller should hold cfg_mutex
-void
-config::get_view(unsigned instance, vector<string> &m, lock &)
-{
- string value = paxos_acceptor->value(instance);
- tprintf("get_view(%d): returns %s\n", instance, value.c_str());
- members(value, m);
+void config::get_view(unsigned instance, vector<string> &m, lock &) {
+ string value = paxos_acceptor.value(instance);
+ LOG("get_view(" << instance << "): returns " << value);
+ m = members(value);
}
-void
-config::members(const string &value, vector<string> &view) const
-{
+vector<string> config::members(const string &value) const {
istringstream ist(value);
- string m;
- view.clear();
- while (ist >> m)
- view.push_back(m);
+ using it = istream_iterator<string>;
+ return {it(ist), it()};
}
-string
-config::value(const vector<string> &m) const
-{
+string config::value(const vector<string> &m) const {
ostringstream ost;
- for (unsigned i = 0; i < m.size(); i++) {
- ost << m[i];
- ost << " ";
- }
+ copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
return ost.str();
}
-void
-config::reconstruct(lock &cfg_mutex_lock)
-{
+void config::reconstruct(lock &cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
- if (paxos_acceptor->instance() > 0) {
- my_view_id = paxos_acceptor->instance();
+ if (paxos_acceptor.instance() > 0) {
+ my_view_id = paxos_acceptor.instance();
get_view(my_view_id, mems, cfg_mutex_lock);
- tprintf("config::reconstruct: %d %s\n",
- my_view_id, print_members(mems).c_str());
+ LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
}
}
// Called by Paxos's acceptor.
-void
-config::paxos_commit(unsigned instance, const string &value)
-{
- vector<string> newmem;
- lock ml(cfg_mutex);
+void config::paxos_commit(unsigned instance, const string &value) {
+ lock cfg_mutex_lock(cfg_mutex);
- members(value, newmem);
- tprintf("config::paxos_commit: %d: %s\n", instance,
- print_members(newmem).c_str());
+ vector<string> newmem = members(value);
+ LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
- for (unsigned i = 0; i < mems.size(); i++) {
- tprintf("config::paxos_commit: is %s still a member?\n",
- mems[i].c_str());
- if (!isamember(mems[i], newmem) && me != mems[i]) {
- tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
- mgr.delete_handle(mems[i]);
+ for (auto mem : mems) {
+ LOG("config::paxos_commit: is " << mem << " still a member?");
+ if (!isamember(mem, newmem) && me != mem) {
+ LOG("config::paxos_commit: delete " << mem);
+ invalidate_handle(mem);
}
}
mems = newmem;
my_view_id = instance;
if (vc) {
- ml.unlock();
+ cfg_mutex_lock.unlock();
vc->commit_change(instance);
- ml.lock();
+ cfg_mutex_lock.lock();
}
}
-bool
-config::ismember(const string &m, unsigned vid)
-{
- lock ml(cfg_mutex);
+bool config::ismember(const string &m, unsigned vid) {
+ lock cfg_mutex_lock(cfg_mutex);
vector<string> v;
- get_view(vid, v, ml);
+ get_view(vid, v, cfg_mutex_lock);
return isamember(m, v);
}
-bool
-config::add(const string &new_m, unsigned vid)
-{
- vector<string> m;
- vector<string> curm;
- lock ml(cfg_mutex);
+bool config::add(const string &new_m, unsigned vid) {
+ lock cfg_mutex_lock(cfg_mutex);
if (vid != my_view_id)
return false;
- tprintf("config::add %s\n", new_m.c_str());
- m = mems;
+ LOG("config::add " << new_m);
+ vector<string> m = mems;
m.push_back(new_m);
- curm = mems;
- string v = value(m);
+ vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
- bool r;
- {
- ml.unlock();
- r = paxos_proposer->run(nextvid, curm, v);
- ml.lock();
- }
- tprintf("config::add: proposer returned %s\n",
- r ? "success" : "failure");
+ cfg_mutex_lock.unlock();
+ bool r = paxos_proposer.run(nextvid, cmems, value(m));
+ cfg_mutex_lock.lock();
+ LOG("config::add: proposer returned " << (r ? "success" : "failure"));
return r;
}
// caller should hold cfg_mutex
-bool
-config::remove(const string &m)
-{
- adopt_lock ml(cfg_mutex);
- tprintf("config::remove: my_view_id %d remove? %s\n",
- my_view_id, m.c_str());
+bool config::remove(const string &m, lock &cfg_mutex_lock) {
+ LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
vector<string> n;
- for (unsigned i = 0; i < mems.size(); i++) {
- if (mems[i] != m)
- n.push_back(mems[i]);
+ for (auto mem : mems) {
+ if (mem != m)
+ n.push_back(mem);
}
- string v = value(n);
vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
- bool r;
- {
- ml.unlock();
- r = paxos_proposer->run(nextvid, cmems, v);
- ml.lock();
- }
- tprintf("config::remove: proposer returned %s\n",
- r ? "success" : "failure");
+ cfg_mutex_lock.unlock();
+ bool r = paxos_proposer.run(nextvid, cmems, value(n));
+ cfg_mutex_lock.lock();
+ LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
return r;
}
-void
-config::heartbeater() [[noreturn]]
-{
- string m;
- heartbeat_t h;
- bool stable;
- unsigned vid;
- vector<string> cmems;
- lock ml(cfg_mutex);
+void config::heartbeater() [[noreturn]] {
+ lock cfg_mutex_lock(cfg_mutex);
while (1) {
auto next_timeout = steady_clock::now() + seconds(3);
- tprintf("heartbeater: go to sleep\n");
- config_cond.wait_until(ml, next_timeout);
+ LOG("heartbeater: go to sleep");
+ config_cond.wait_until(cfg_mutex_lock, next_timeout);
- stable = true;
- vid = my_view_id;
- get_view(vid, cmems, ml);
- tprintf("heartbeater: current membership %s\n",
- print_members(cmems).c_str());
+ unsigned vid = my_view_id;
+ vector<string> cmems;
+ get_view(vid, cmems, cfg_mutex_lock);
+ LOG("heartbeater: current membership " << print_members(cmems));
if (!isamember(me, cmems)) {
- tprintf("heartbeater: not member yet; skip hearbeat\n");
+ LOG("heartbeater: not member yet; skip hearbeat");
continue;
}
// who has the smallest ID?
- m = me;
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (m > cmems[i])
- m = cmems[i];
- }
+ string m = min(me, *min_element(cmems.begin(), cmems.end()));
if (m == me) {
// ping the other nodes
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (cmems[i] != me) {
- if ((h = doheartbeat(cmems[i])) != OK) {
- stable = false;
- m = cmems[i];
- break;
- }
- }
+ for (string mem : cmems) {
+ if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
+ continue;
+ if (vid == my_view_id)
+ remove(mem, cfg_mutex_lock);
+ break;
}
} else {
// ping the node with the smallest ID
- if ((h = doheartbeat(m)) != OK)
- stable = false;
- }
-
- if (!stable && vid == my_view_id) {
- remove(m);
+ if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
+ remove(m, cfg_mutex_lock);
}
}
}
-paxos_protocol::status
-config::heartbeat(int &r, string m, unsigned vid)
-{
- lock ml(cfg_mutex);
- int ret = paxos_protocol::ERR;
+paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+ lock cfg_mutex_lock(cfg_mutex);
r = (int) my_view_id;
- tprintf("heartbeat from %s(%d) my_view_id %d\n",
- m.c_str(), vid, my_view_id);
- if (vid == my_view_id) {
- ret = paxos_protocol::OK;
- } else if (paxos_proposer->isrunning()) {
+ LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+ if (vid == my_view_id)
+ return paxos_protocol::OK;
+ else if (paxos_proposer.isrunning()) {
VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
- ret = paxos_protocol::OK;
- } else {
- ret = paxos_protocol::ERR;
+ return paxos_protocol::OK;
}
- return ret;
+ return paxos_protocol::ERR;
}
-config::heartbeat_t
-config::doheartbeat(const string &m)
-{
- adopt_lock ml(cfg_mutex);
- int ret = rpc_const::timeout_failure;
- int r = 0;
+config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
unsigned vid = my_view_id;
- heartbeat_t res = OK;
-
- tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+ LOG("doheartbeater to " << m << " (" << vid << ")");
handle h(m);
- {
- ml.unlock();
- rpcc *cl = h.safebind();
- if (cl) {
- ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
- }
- ml.lock();
- }
- if (ret != paxos_protocol::OK) {
- if (ret == rpc_const::atmostonce_failure ||
- ret == rpc_const::oldsrv_failure) {
- mgr.delete_handle(m);
- } else {
- tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
- m.c_str(), ret, vid, r);
- if (ret < 0) res = FAILURE;
- else res = VIEWERR;
- }
+
+ cfg_mutex_lock.unlock();
+ int r = 0, ret = rpc_const::bind_failure;
+ if (rpcc *cl = h.safebind())
+ ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
+ cfg_mutex_lock.lock();
+
+ heartbeat_t res = OK;
+ switch (ret) {
+ case paxos_protocol::OK:
+ break;
+ case rpc_const::atmostonce_failure:
+ case rpc_const::oldsrv_failure:
+ invalidate_handle(m);
+ break;
+ default:
+ LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+ res = (ret < 0) ? FAILURE : VIEWERR;
}
- tprintf("doheartbeat done %d\n", res);
+ LOG("doheartbeat done " << res);
return res;
}
-
#include "paxos.h"
#include "lock.h"
+using std::chrono::steady_clock;
+using std::chrono::seconds;
+using std::string;
+using std::vector;
+using std::thread;
+using std::ostringstream;
+using std::istringstream;
+using std::ostream_iterator;
+using std::istream_iterator;
+using std::copy;
+using std::min;
+using std::min_element;
+
class config_view_change {
public:
virtual void commit_change(unsigned view_id) = 0;
class config : public paxos_change {
private:
- acceptor *paxos_acceptor;
- proposer *paxos_proposer;
unsigned my_view_id;
- std::string first;
- std::string me;
+ string first;
+ string me;
config_view_change *vc;
- std::vector<std::string> mems;
+ acceptor paxos_acceptor;
+ proposer paxos_proposer;
+ vector<string> mems;
mutex cfg_mutex;
- std::condition_variable config_cond;
- paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance);
- std::string value(const std::vector<std::string> &mems) const;
- void members(const std::string &v, std::vector<std::string> &m) const;
- void get_view(unsigned instance, std::vector<std::string> &m, lock &cfg_mutex_lock);
- bool remove(const std::string &);
+ cond config_cond;
+ paxos_protocol::status heartbeat(int &r, string m, unsigned instance);
+ string value(const vector<string> &mems) const;
+ vector<string> members(const string &v) const;
+ void get_view(unsigned instance, vector<string> &m, lock &cfg_mutex_lock);
+ bool remove(const string &, lock &cfg_mutex_lock);
void reconstruct(lock &cfg_mutex_lock);
typedef enum {
OK, // response and same view #
VIEWERR, // response but different view #
FAILURE, // no response
} heartbeat_t;
- heartbeat_t doheartbeat(const std::string &m);
+ heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock);
public:
- config(const std::string &_first,
- const std::string &_me,
- config_view_change *_vc);
+ config(const string &_first, const string &_me, config_view_change *_vc);
unsigned view_id() { return my_view_id; }
- const std::string &myaddr() const { return me; }
- std::string dump() { return paxos_acceptor->dump(); }
- void get_view(unsigned instance, std::vector<std::string> &m);
- void restore(const std::string &s);
- bool add(const std::string &, unsigned view_id);
- bool ismember(const std::string &m, unsigned view_id);
+ const string &myaddr() const { return me; }
+ string dump() { return paxos_acceptor.dump(); }
+ void get_view(unsigned instance, vector<string> &m);
+ void restore(const string &s);
+ bool add(const string &, unsigned view_id);
+ bool ismember(const string &m, unsigned view_id);
void heartbeater(void);
- void paxos_commit(unsigned instance, const std::string &v);
- rpcs *get_rpcs() { return paxos_acceptor->get_rpcs(); }
- void breakpoint(int b) { paxos_proposer->breakpoint(b); }
+ void paxos_commit(unsigned instance, const string &v);
+ // XXX hack; maybe should have its own port number
+ rpcs *get_rpcs() { return paxos_acceptor.get_rpcs(); }
+ void breakpoint(int b) { paxos_proposer.breakpoint(b); }
};
#endif
#include "handle.h"
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lock.h"
+#include <map>
-handle_mgr mgr;
+using std::map;
-handle::handle(std::string m)
-{
- h = mgr.get_handle(m);
-}
+class hinfo {
+public:
+ rpcc *cl = nullptr;
+ int refcnt = 0;
+ bool del = false;
+ string m;
+ mutex client_mutex;
+ hinfo(const string & m_) : m(m_) {}
+};
+
+class handle_mgr {
+ private:
+ mutex mgr_mutex;
+ map<string, hinfo *> hmap;
+ void delete_handle(const string & m, lock & handle_mutex_lock);
+ public:
+ hinfo *acquire_handle(string m);
+ void release_handle(hinfo *h);
+ void delete_handle(const string & m);
+};
+
+static handle_mgr mgr;
-rpcc *
-handle::safebind()
-{
+handle::handle(const string & m) : h(mgr.acquire_handle(m)) {}
+
+rpcc * handle::safebind() {
if (!h)
- return NULL;
- lock ml(h->cl_mutex);
+ return nullptr;
+ lock ml(h->client_mutex);
if (h->del)
- return NULL;
+ return nullptr;
if (h->cl)
return h->cl;
- sockaddr_in dstsock;
- make_sockaddr(h->m.c_str(), &dstsock);
- rpcc *cl = new rpcc(dstsock);
- tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
- int ret;
+ rpcc *cl = new rpcc(h->m);
+ LOG("handler_mgr::acquire_handle trying to bind..." << h->m);
// The test script assumes that the failure can be detected by paxos and
// rsm layer within few seconds. We have to set the timeout with a small
// value to support the assumption.
//
// With RPC_LOSSY=5, tests may fail due to delays and time outs.
- ret = cl->bind(rpcc::to(1000));
+ int ret = cl->bind(rpcc::to(1000));
if (ret < 0) {
- tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
+ LOG("handle_mgr::acquire_handle bind failure! " << h->m << " " << ret);
delete cl;
h->del = true;
} else {
- tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
+ LOG("handle_mgr::acquire_handle bind succeeded " << h->m);
h->cl = cl;
}
return h->cl;
}
-handle::~handle()
-{
- if (h) mgr.done_handle(h);
-}
-
-handle_mgr::handle_mgr()
-{
+handle::~handle() {
+ if (h) mgr.release_handle(h);
}
-struct hinfo *
-handle_mgr::get_handle(std::string m)
-{
- lock ml(handle_mutex);
- struct hinfo *h = 0;
+hinfo * handle_mgr::acquire_handle(string m) {
+ lock ml(mgr_mutex);
+ hinfo *h = nullptr;
if (hmap.find(m) == hmap.end()) {
- h = new hinfo;
- h->cl = NULL;
- h->del = false;
- h->refcnt = 1;
- h->m = m;
+ h = new hinfo(m);
hmap[m] = h;
} else if (!hmap[m]->del) {
h = hmap[m];
- h->refcnt ++;
}
+ h->refcnt++;
return h;
}
-void
-handle_mgr::done_handle(struct hinfo *h)
-{
- lock ml(handle_mutex);
- h->refcnt--;
- if (h->refcnt == 0 && h->del)
- delete_handle_wo(h->m);
+void handle_mgr::release_handle(hinfo *h) {
+ lock ml(mgr_mutex);
+ if (--h->refcnt == 0 && h->del)
+ delete_handle(h->m, ml);
}
-void
-handle_mgr::delete_handle(std::string m)
-{
- lock ml(handle_mutex);
- delete_handle_wo(m);
+void handle_mgr::delete_handle(const string & m) {
+ lock ml(mgr_mutex);
+ delete_handle(m, ml);
}
-// Must be called with handle_mutex locked.
-void
-handle_mgr::delete_handle_wo(std::string m)
-{
+void handle_mgr::delete_handle(const string & m, lock &) {
if (hmap.find(m) == hmap.end()) {
- tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
- } else {
- tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
- hmap[m]->refcnt);
- struct hinfo *h = hmap[m];
- if (h->refcnt == 0) {
- if (h->cl) {
- h->cl->cancel();
- delete h->cl;
- }
- hmap.erase(m);
- delete h;
- } else {
- h->del = true;
- }
+ LOG("handle_mgr::delete_handle: cl " << m << " isn't in cl list");
+ return;
}
+ LOG("handle_mgr::delete_handle: cl " << m << " refcnt " << hmap[m]->refcnt);
+ hinfo *h = hmap[m];
+ if (h->refcnt == 0) {
+ if (h->cl) {
+ h->cl->cancel();
+ delete h->cl;
+ }
+ hmap.erase(m);
+ delete h;
+ } else
+ h->del = true;
+}
+
+void invalidate_handle(const string & m) {
+ mgr.delete_handle(m);
}
#ifndef handle_h
#define handle_h
-#include <string>
-#include <vector>
#include "rpc/rpc.h"
+#include <string>
-struct hinfo {
- rpcc *cl;
- int refcnt;
- bool del;
- std::string m;
- std::mutex cl_mutex;
-};
+using std::string;
+
+class hinfo;
class handle {
private:
- struct hinfo *h;
+ hinfo *h;
public:
- handle(std::string m);
+ handle(const string & m);
~handle();
/* safebind will try to bind with the rpc server on the first call.
* Since bind may block, the caller probably should not hold a mutex
rpcc *safebind();
};
-class handle_mgr {
- private:
- std::mutex handle_mutex;
- std::map<std::string, struct hinfo *> hmap;
- public:
- handle_mgr();
- struct hinfo *get_handle(std::string m);
- void done_handle(struct hinfo *h);
- void delete_handle(std::string m);
- void delete_handle_wo(std::string m);
-};
-
-extern class handle_mgr mgr;
+void invalidate_handle(const string & m);
#endif
using lock = std::unique_lock<std::mutex>;
using cond = std::condition_variable;
-class adopt_lock : public lock {
-public:
- explicit inline adopt_lock(class mutex &m) : std::unique_lock<std::mutex>(m, std::adopt_lock) {
- }
- inline ~adopt_lock() {
- release();
- }
-};
-
#endif
#include "lock_client.h"
#include "rpc/rpc.h"
-#include <sstream>
-#include <iostream>
#include <algorithm>
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
#include <arpa/inet.h>
#include "rsm_client.h"
#include "lock.h"
-using std::ostringstream;
-
-lock_state::lock_state():
- state(none)
-{
-}
-
-void lock_state::wait() {
+void lock_state::wait(lock & mutex_lock) {
auto self = std::this_thread::get_id();
- {
- adopt_lock ml(m);
- c[self].wait(ml);
- }
+ c[self].wait(mutex_lock);
c.erase(self);
}
c.begin()->second.notify_one();
}
-void lock_state::signal(std::thread::id who) {
+void lock_state::signal(thread::id who) {
if (c.count(who))
c[who].notify_one();
}
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
unsigned int lock_client::last_port = 0;
lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
- // by the semantics of std::map, this will create
- // the lock if it doesn't already exist
- return lock_table[lid];
+ return lock_table[lid]; // creates the lock if it doesn't already exist
}
-lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) {
- sockaddr_in dstsock;
- make_sockaddr(xdst.c_str(), &dstsock);
- cl = new rpcc(dstsock);
- if (cl->bind() < 0) {
+lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu), next_xid(0) {
+ cl = new rpcc(xdst);
+ if (cl->bind() < 0)
LOG("lock_client: call bind");
- }
srandom((uint32_t)time(NULL)^last_port);
rlock_port = ((random()%32000) | (0x1 << 10));
- const char *hname;
- // VERIFY(gethostname(hname, 100) == 0);
- hname = "127.0.0.1";
- ostringstream host;
- host << hname << ":" << rlock_port;
- id = host.str();
+ id = "127.0.0.1:" + std::to_string(rlock_port);
last_port = rlock_port;
rpcs *rlsrpc = new rpcs(rlock_port);
rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
- {
- lock sl(xid_mutex);
- next_xid = 0;
- }
rsmc = new rsm_client(xdst);
- releaser_thread = std::thread(&lock_client::releaser, this);
+ releaser_thread = thread(&lock_client::releaser, this);
}
void lock_client::releaser() [[noreturn]] {
int lock_client::stat(lock_protocol::lockid_t lid) {
VERIFY(0);
int r;
- lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid);
+ auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
VERIFY (ret == lock_protocol::OK);
return r;
}
{
sl.unlock();
int r;
- result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
+ result = (lock_protocol::status)rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
sl.lock();
}
LOG("acquire returned " << result);
}
LOG("waiting...");
- st.wait();
+ st.wait(sl);
LOG("wait ended");
}
}
t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
- return ((lock_client *)client)->acquire(lid);
+ return ((lock_client *)client)->release(lid);
}
t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
#include "lang/verify.h"
#include "rpc/fifo.h"
#include "rsm_client.h"
+#include "lock.h"
class lock_release_user {
public:
};
using std::string;
+using std::map;
using std::thread;
using std::list;
-using std::map;
-
-typedef string callback;
class lock_state {
public:
- lock_state();
enum {
none = 0,
retrying,
locked,
acquiring,
releasing
- } state;
- std::thread::id held_by;
- list<std::thread::id> wanted_by;
+ } state = none;
+ thread::id held_by;
+ list<thread::id> wanted_by;
mutex m;
- map<std::thread::id, std::condition_variable> c;
+ map<thread::id, cond> c;
lock_protocol::xid_t xid;
- void wait();
+ void wait(lock & mutex_lock);
void signal();
- void signal(std::thread::id who);
+ void signal(thread::id who);
};
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
class lock_client {
private:
rpcc *cl;
- std::thread releaser_thread;
+ thread releaser_thread;
rsm_client *rsmc;
class lock_release_user *lu;
unsigned int rlock_port;
#include "lock_client.h"
-#include "tprintf.h"
+#include "threaded_log.h"
-char tprintf_thread_prefix = 'd';
+char log_thread_prefix = 'd';
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
if(argc != 2) {
fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
return 1;
#include "rpc/rpc.h"
#include <string>
+using std::string;
+
class lock_protocol {
public:
- enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR };
- typedef int status;
- typedef std::string lockid_t;
- typedef unsigned long long xid_t;
- enum rpc_numbers {
+ enum status : status_t { OK, RETRY, RPCERR, NOENT, IOERR };
+ using lockid_t = string;
+ using xid_t = uint64_t;
+ enum rpc_numbers : proc_t {
acquire = 0x7001,
release,
stat
class rlock_protocol {
public:
- enum xxstatus { OK, RPCERR };
- typedef int status;
- enum rpc_numbers {
+ enum status : status_t { OK, RPCERR };
+ enum rpc_numbers : proc_t {
revoke = 0x8001,
retry = 0x8002
};
#include <arpa/inet.h>
#include "lang/verify.h"
#include "handle.h"
-#include "tprintf.h"
+#include "threaded_log.h"
#include "rpc/marshall.h"
#include "lock.h"
proxy = handle(held_by.first).safebind();
if (proxy) {
int r;
- rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+ auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
LOG("Revoke returned " << ret);
}
}
front = st.wanted_by.front();
}
- rlock_protocol::status ret = -1;
-
rpcc *proxy = NULL;
// try a few times?
//int t=5;
proxy = handle(front.first).safebind();
if (proxy) {
int r;
- ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
+ auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
LOG("Retry returned " << ret);
}
}
#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <stdlib.h>
-#include "tprintf.h"
+#include "threaded_log.h"
#include <unistd.h>
#include "lock_server.h"
#include "rsm.h"
// Main loop of lock_server
-char tprintf_thread_prefix = 's';
+char log_thread_prefix = 's';
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
#include <stdlib.h>
#include <stdio.h>
#include "lang/verify.h"
-#include "tprintf.h"
+#include "threaded_log.h"
#include <sys/types.h>
#include <unistd.h>
#include "lock.h"
-char tprintf_thread_prefix = 'c';
+char log_thread_prefix = 'c';
// must be >= 2
const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
int ct[256];
std::mutex count_mutex;
-void
-check_grant(lock_protocol::lockid_t lid)
-{
+void check_grant(lock_protocol::lockid_t lid) {
lock ml(count_mutex);
int x = lid[0] & 0x0f;
- if(ct[x] != 0){
+ if (ct[x] != 0) {
fprintf(stderr, "error: server granted %s twice\n", lid.c_str());
fprintf(stdout, "error: server granted %s twice\n", lid.c_str());
exit(1);
ct[x] += 1;
}
-void
-check_release(lock_protocol::lockid_t lid)
-{
+void check_release(lock_protocol::lockid_t lid) {
lock ml(count_mutex);
int x = lid[0] & 0x0f;
- if(ct[x] != 1){
+ if (ct[x] != 1) {
fprintf(stderr, "error: client released un-held lock %s\n", lid.c_str());
exit(1);
}
ct[x] -= 1;
}
-void
-test1(void)
-{
- tprintf ("acquire a release a acquire a release a\n");
+void test1(void) {
+ LOG_NONMEMBER("acquire a release a acquire a release a");
lc[0]->acquire(a);
check_grant(a);
lc[0]->release(a);
lc[0]->release(a);
check_release(a);
- tprintf ("acquire a acquire b release b release a\n");
+ LOG_NONMEMBER("acquire a acquire b release b release a");
lc[0]->acquire(a);
check_grant(a);
lc[0]->acquire(b);
check_release(a);
}
-void *
-test2(int i)
-{
- tprintf ("test2: client %d acquire a release a\n", i);
+void test2(int i) {
+ LOG_NONMEMBER("test2: client " << i << " acquire a release a");
lc[i]->acquire(a);
- tprintf ("test2: client %d acquire done\n", i);
+ LOG_NONMEMBER("test2: client " << i << " acquire done");
check_grant(a);
sleep(1);
- tprintf ("test2: client %d release\n", i);
+ LOG_NONMEMBER("test2: client " << i << " release");
check_release(a);
lc[i]->release(a);
- tprintf ("test2: client %d release done\n", i);
- return 0;
+ LOG_NONMEMBER("test2: client " << i << " release done");
}
-void *
-test3(int i)
-{
- tprintf ("test3: client %d acquire a release a concurrent\n", i);
+void test3(int i) {
+ LOG_NONMEMBER("test3: client " << i << " acquire a release a concurrent");
for (int j = 0; j < 10; j++) {
lc[i]->acquire(a);
check_grant(a);
- tprintf ("test3: client %d got lock\n", i);
+ LOG_NONMEMBER("test3: client " << i << " got lock");
check_release(a);
lc[i]->release(a);
}
- return 0;
}
-void *
-test4(int i)
-{
- tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
+void test4(int i) {
+ LOG_NONMEMBER("test4: thread " << i << " acquire a release a concurrent; same clnt");
for (int j = 0; j < 10; j++) {
lc[0]->acquire(a);
check_grant(a);
- tprintf ("test4: thread %d on client 0 got lock\n", i);
+ LOG_NONMEMBER("test4: thread " << i << " on client 0 got lock");
check_release(a);
lc[0]->release(a);
}
- return 0;
}
-void *
-test5(int i)
-{
- tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
+void test5(int i) {
+ LOG_NONMEMBER("test5: client " << i << " acquire a release a concurrent; same and diff clnt");
for (int j = 0; j < 10; j++) {
if (i < 5) lc[0]->acquire(a);
else lc[1]->acquire(a);
check_grant(a);
- tprintf ("test5: client %d got lock\n", i);
+ LOG_NONMEMBER("test5: client " << i << " got lock");
check_release(a);
if (i < 5) lc[0]->release(a);
else lc[1]->release(a);
}
- return 0;
}
int
setvbuf(stderr, NULL, _IONBF, 0);
srandom((uint32_t)getpid());
- if(argc < 2) {
+ if (argc < 2) {
fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
exit(1);
}
if (argc > 2) {
test = atoi(argv[2]);
- if(test < 1 || test > 5){
- tprintf("Test number must be between 1 and 5\n");
+ if (test < 1 || test > 5) {
+ LOG_NONMEMBER("Test number must be between 1 and 5");
exit(1);
}
}
- tprintf("cache lock client\n");
+ LOG_NONMEMBER("cache lock client");
for (int i = 0; i < nt; i++) lc[i] = new lock_client(dst);
- if(!test || test == 1){
+ if (!test || test == 1) {
test1();
}
- if(!test || test == 2){
+ if (!test || test == 2) {
// test2
for (int i = 0; i < nt; i++)
th[i] = std::thread(test2, i);
th[i].join();
}
- if(!test || test == 3){
- tprintf("test 3\n");
+ if (!test || test == 3) {
+ LOG_NONMEMBER("test 3");
- // test3
for (int i = 0; i < nt; i++)
th[i] = std::thread(test3, i);
for (int i = 0; i < nt; i++)
th[i].join();
}
- if(!test || test == 4){
- tprintf("test 4\n");
+ if (!test || test == 4) {
+ LOG_NONMEMBER("test 4");
- // test 4
for (int i = 0; i < 2; i++)
th[i] = std::thread(test4, i);
for (int i = 0; i < 2; i++)
th[i].join();
}
- if(!test || test == 5){
- tprintf("test 5\n");
+ if (!test || test == 5) {
+ LOG_NONMEMBER("test 5");
- // test 5
for (int i = 0; i < nt; i++)
th[i] = std::thread(test5, i);
for (int i = 0; i < nt; i++)
th[i].join();
}
- tprintf ("%s: passed all tests successfully\n", argv[0]);
+ LOG_NONMEMBER(argv[0] << ": passed all tests successfully");
}
#include "paxos.h"
#include <fstream>
#include <iostream>
-#include "tprintf.h"
+#include "threaded_log.h"
// Paxos must maintain some durable state (i.e., that survives power
// failures) to run Paxos correct. This module implements a log with
// all durable state to run Paxos. Since the values chosen correspond
// to views, the log contains all views since the beginning of time.
-log::log(acceptor *_acc, std::string _me)
- : pxs (_acc)
-{
- name = "paxos-" + _me + ".log";
- logread();
+log::log(acceptor *_acc, std::string _me) : pxs (_acc) {
+ name = "paxos-" + _me + ".log";
+ logread();
}
-void
-log::logread(void)
-{
- std::ifstream from;
- std::string type;
- unsigned instance;
+void log::logread(void) {
+ std::ifstream from;
+ std::string type;
+ unsigned instance;
- from.open(name.c_str());
- LOG("logread");
- while (from >> type) {
- if (type == "done") {
- std::string v;
- from >> instance;
- from.get();
- getline(from, v);
- pxs->values[instance] = v;
- pxs->instance_h = instance;
- LOG("logread: instance: " << instance << " w. v = " <<
- pxs->values[instance]);
- pxs->v_a.clear();
- pxs->n_h.n = 0;
- pxs->n_a.n = 0;
- } else if (type == "propseen") {
- from >> pxs->n_h.n;
- from >> pxs->n_h.m;
- LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
- } else if (type == "accepted") {
- std::string v;
- from >> pxs->n_a.n;
- from >> pxs->n_a.m;
- from.get();
- getline(from, v);
- pxs->v_a = v;
- LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
- } else {
- LOG("logread: unknown log record");
- VERIFY(0);
- }
- }
- from.close();
+ from.open(name.c_str());
+ LOG("logread");
+ while (from >> type) {
+ if (type == "done") {
+ std::string v;
+ from >> instance;
+ from.get();
+ getline(from, v);
+ pxs->values[instance] = v;
+ pxs->instance_h = instance;
+ LOG("logread: instance: " << instance << " w. v = " <<
+ pxs->values[instance]);
+ pxs->v_a.clear();
+ pxs->n_h.n = 0;
+ pxs->n_a.n = 0;
+ } else if (type == "propseen") {
+ from >> pxs->n_h.n;
+ from >> pxs->n_h.m;
+ LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
+ } else if (type == "accepted") {
+ std::string v;
+ from >> pxs->n_a.n;
+ from >> pxs->n_a.m;
+ from.get();
+ getline(from, v);
+ pxs->v_a = v;
+ LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
+ } else {
+ LOG("logread: unknown log record");
+ VERIFY(0);
+ }
+ }
+ from.close();
}
-std::string
-log::dump()
-{
- std::ifstream from;
- std::string res;
- std::string v;
- from.open(name.c_str());
- while (getline(from, v)) {
- res = res + v + "\n";
- }
- from.close();
- return res;
+std::string log::dump() {
+ std::ifstream from;
+ std::string res;
+ std::string v;
+ from.open(name.c_str());
+ while (getline(from, v))
+ res += v + "\n";
+ from.close();
+ return res;
}
-void
-log::restore(std::string s)
-{
- std::ofstream f;
- LOG("restore: " << s);
- f.open(name.c_str(), std::ios::trunc);
- f << s;
- f.close();
+void log::restore(std::string s) {
+ std::ofstream f;
+ LOG("restore: " << s);
+ f.open(name.c_str(), std::ios::trunc);
+ f << s;
+ f.close();
}
// XXX should be an atomic operation
-void
-log::loginstance(unsigned instance, std::string v)
-{
- std::ofstream f;
- f.open(name.c_str(), std::ios::app);
- f << "done";
- f << " ";
- f << instance;
- f << " ";
- f << v;
- f << "\n";
- f.close();
+void log::loginstance(unsigned instance, std::string v) {
+ std::ofstream f(name, std::ios::app);
+ f << "done " << instance << " " << v << "\n";
+ f.close();
}
// an acceptor should call logprop(n_h) when it
// receives a prepare to which it responds prepare_ok().
-void
-log::logprop(prop_t n_h)
-{
- std::ofstream f;
- f.open(name.c_str(), std::ios::app);
- f << "propseen";
- f << " ";
- f << n_h.n;
- f << " ";
- f << n_h.m;
- f << "\n";
- f.close();
+void log::logprop(prop_t n_h) {
+ std::ofstream f;
+ f.open(name.c_str(), std::ios::app);
+ f << "propseen";
+ f << " ";
+ f << n_h.n;
+ f << " ";
+ f << n_h.m;
+ f << "\n";
+ f.close();
}
// an acceptor should call logaccept(n_a, v_a) when it
// receives an accept RPC to which it replies accept_ok().
-void
-log::logaccept(prop_t n, std::string v)
-{
- std::ofstream f;
- f.open(name.c_str(), std::ios::app);
- f << "accepted";
- f << " ";
- f << n.n;
- f << " ";
- f << n.m;
- f << " ";
- f << v;
- f << "\n";
- f.close();
+void log::logaccept(prop_t n, std::string v) {
+ std::ofstream f(name, std::ios::app);
+ f << "accepted " << n.n << " " << n.m << " " << v << "\n";
+ f.close();
}
-
#include "paxos.h"
#include "handle.h"
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lang/verify.h"
#include "lock.h"
+using std::stoi;
+
// This module implements the proposer and acceptor of the Paxos
// distributed algorithm as described by Lamport's "Paxos Made
// Simple". To kick off an instance of Paxos, the caller supplies a
return (a.n > b.n || (a.n == b.n && a.m >= b.m));
}
-std::string
-print_members(const std::vector<std::string> &nodes) {
- std::string s;
+string
+print_members(const vector<string> &nodes) {
+ string s;
s.clear();
for (unsigned i = 0; i < nodes.size(); i++) {
s += nodes[i];
}
-bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+bool isamember(const string & m, const vector<string> & nodes) {
for (auto n : nodes) {
if (n == m)
return 1;
}
// check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const std::vector<std::string> &l1,
- const std::vector<std::string> &l2) {
+bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
unsigned n = 0;
for (unsigned i = 0; i < l1.size(); i++) {
return n >= (l1.size() >> 1) + 1;
}
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
- const std::string &_me)
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
: cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
stable (true)
{
my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
}
-bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
- const std::string & newv)
+bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
{
- std::vector<std::string> accepts;
- std::vector<std::string> nodes;
- std::string v;
+ vector<string> accepts;
+ vector<string> nodes;
+ string v;
bool r = false;
lock ml(pxs_mutex);
- tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
- print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+ LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
if (!stable) { // already running proposer?
- tprintf("proposer::run: already running\n");
+ LOG("proposer::run: already running");
return false;
}
stable = false;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of prepare responses\n");
+ LOG("paxos::manager: received a majority of prepare responses");
if (v.size() == 0)
v = newv;
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of accept responses\n");
+ LOG("paxos::manager: received a majority of accept responses");
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- tprintf("paxos::manager: no majority of accept responses\n");
+ LOG("paxos::manager: no majority of accept responses");
}
} else {
- tprintf("paxos::manager: no majority of prepare responses\n");
+ LOG("paxos::manager: no majority of prepare responses");
}
} else {
- tprintf("paxos::manager: prepare is rejected %d\n", stable);
+ LOG("paxos::manager: prepare is rejected " << stable);
}
stable = true;
return r;
// otherwise fill in accepts with set of nodes that accepted,
// set v to the v_a with the highest n_a, and return true.
bool
-proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes,
- std::string & v)
+proposer::prepare(unsigned instance, vector<string> & accepts,
+ const vector<string> & nodes,
+ string & v)
{
struct paxos_protocol::preparearg arg = { instance, my_n };
struct paxos_protocol::prepareres res;
int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
- tprintf("commiting old instance!\n");
+ LOG("commiting old instance!");
acc->commit(instance, res.v_a);
return false;
}
if (res.accept) {
accepts.push_back(i);
if (res.n_a >= n_a) {
- tprintf("found a newer accepted proposal\n");
+ LOG("found a newer accepted proposal");
v = res.v_a;
n_a = res.n_a;
}
// run() calls this to send out accept RPCs to accepts.
// fill in accepts with list of nodes that accepted.
void
-proposer::accept(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes, const std::string & v)
+proposer::accept(unsigned instance, vector<string> & accepts,
+ const vector<string> & nodes, const string & v)
{
struct paxos_protocol::acceptarg arg = { instance, my_n, v };
rpcc *r;
}
void
-proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
- const std::string & v)
+proposer::decide(unsigned instance, const vector<string> & accepts,
+ const string & v)
{
struct paxos_protocol::decidearg arg = { instance, v };
rpcc *r;
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
- const std::string & _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
+ const string & _value)
: cfg(_cfg), me (_me), instance_h(0)
{
n_h.n = 0;
instance_h = 1;
}
- pxs = new rpcs((uint32_t)std::stoi(_me));
+ pxs = new rpcs((uint32_t)stoi(_me));
pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
}
paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
paxos_protocol::preparearg a)
{
lock ml(pxs_mutex);
l->logprop(n_h);
r.accept = true;
} else {
- tprintf("I totally rejected this request. Ha.\n");
+ LOG("I totally rejected this request. Ha.");
}
return paxos_protocol::OK;
}
paxos_protocol::status
-acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
{
lock ml(pxs_mutex);
r = false;
// the src argument is only for debugging
paxos_protocol::status
-acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
+acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
{
lock ml(pxs_mutex);
- tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
- a.instance, instance_h, v_a.c_str());
+ LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
if (a.instance == instance_h + 1) {
VERIFY(v_a == a.v);
commit(a.instance, v_a, ml);
}
void
-acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
+acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
{
- tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+ LOG("acceptor::commit: instance=" << instance << " has v=" << value);
if (instance > instance_h) {
- tprintf("commit: highestaccepteinstance = %d\n", instance);
+ LOG("commit: highestaccepteinstance = " << instance);
values[instance] = value;
l->loginstance(instance, value);
instance_h = instance;
}
void
-acceptor::commit(unsigned instance, const std::string & value)
+acceptor::commit(unsigned instance, const string & value)
{
lock ml(pxs_mutex);
commit(instance, value, ml);
}
-std::string
+string
acceptor::dump()
{
return l->dump();
}
void
-acceptor::restore(const std::string & s)
+acceptor::restore(const string & s)
{
l->restore(s);
l->logread();
proposer::breakpoint1()
{
if (break1) {
- tprintf("Dying at breakpoint 1!\n");
+ LOG("Dying at breakpoint 1!");
exit(1);
}
}
proposer::breakpoint2()
{
if (break2) {
- tprintf("Dying at breakpoint 2!\n");
+ LOG("Dying at breakpoint 2!");
exit(1);
}
}
proposer::breakpoint(int b)
{
if (b == 3) {
- tprintf("Proposer: breakpoint 1\n");
+ LOG("Proposer: breakpoint 1");
break1 = true;
} else if (b == 4) {
- tprintf("Proposer: breakpoint 2\n");
+ LOG("Proposer: breakpoint 2");
break2 = true;
}
}
#include <string>
#include <vector>
+#include <map>
#include "rpc/rpc.h"
#include "paxos_protocol.h"
#include "log.h"
#include "lock.h"
+using std::string;
+using std::map;
+using std::vector;
class paxos_change {
public:
- virtual void paxos_commit(unsigned instance, const std::string & v) = 0;
+ virtual void paxos_commit(unsigned instance, const string & v) = 0;
virtual ~paxos_change() {}
};
log *l;
rpcs *pxs;
paxos_change *cfg;
- std::string me;
+ string me;
mutex pxs_mutex;
// Acceptor state
prop_t n_h; // number of the highest proposal seen in a prepare
prop_t n_a; // number of highest proposal accepted
- std::string v_a; // value of highest proposal accepted
+ string v_a; // value of highest proposal accepted
unsigned instance_h; // number of the highest instance we have decided
- std::map<unsigned,std::string> values; // vals of each instance
+ map<unsigned,string> values; // vals of each instance
- void commit(unsigned instance, const std::string & v, lock & pxs_mutex_lock);
+ void commit(unsigned instance, const string & v, lock & pxs_mutex_lock);
paxos_protocol::status preparereq(paxos_protocol::prepareres & r,
- const std::string & src, paxos_protocol::preparearg a);
- paxos_protocol::status acceptreq(bool & r, const std::string & src,
+ const string & src, paxos_protocol::preparearg a);
+ paxos_protocol::status acceptreq(bool & r, const string & src,
paxos_protocol::acceptarg a);
- paxos_protocol::status decidereq(int & r, const std::string & src,
+ paxos_protocol::status decidereq(int & r, const string & src,
paxos_protocol::decidearg a);
friend class log;
public:
- acceptor(class paxos_change *cfg, bool _first, const std::string & _me,
- const std::string & _value);
+ acceptor(class paxos_change *cfg, bool _first, const string & _me,
+ const string & _value);
~acceptor() {}
- void commit(unsigned instance, const std::string & v);
+ void commit(unsigned instance, const string & v);
unsigned instance() { return instance_h; }
- const std::string & value(unsigned instance) { return values[instance]; }
- std::string dump();
- void restore(const std::string &);
+ const string & value(unsigned instance) { return values[instance]; }
+ string dump();
+ void restore(const string &);
rpcs *get_rpcs() { return pxs; }
prop_t get_n_h() { return n_h; }
unsigned get_instance_h() { return instance_h; }
};
-extern bool isamember(const std::string & m, const std::vector<std::string> & nodes);
-extern std::string print_members(const std::vector<std::string> & nodes);
+extern bool isamember(const string & m, const vector<string> & nodes);
+extern string print_members(const vector<string> & nodes);
class proposer {
private:
log *l;
paxos_change *cfg;
acceptor *acc;
- std::string me;
+ string me;
bool break1;
bool break2;
prop_t my_n; // number of the last proposal used in this instance
void setn();
- bool prepare(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes,
- std::string & v);
- void accept(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes, const std::string & v);
- void decide(unsigned instance, const std::vector<std::string> & accepts,
- const std::string & v);
+ bool prepare(unsigned instance, vector<string> & accepts,
+ const vector<string> & nodes,
+ string & v);
+ void accept(unsigned instance, vector<string> & accepts,
+ const vector<string> & nodes, const string & v);
+ void decide(unsigned instance, const vector<string> & accepts,
+ const string & v);
void breakpoint1();
void breakpoint2();
- bool majority(const std::vector<std::string> & l1, const std::vector<std::string> & l2);
+ bool majority(const vector<string> & l1, const vector<string> & l2);
friend class log;
public:
- proposer(class paxos_change *cfg, class acceptor *_acceptor, const std::string &_me);
+ proposer(class paxos_change *cfg, class acceptor *_acceptor, const string &_me);
~proposer() {}
- bool run(unsigned instance, const std::vector<std::string> & cnodes, const std::string & v);
+ bool run(unsigned instance, const vector<string> & cnodes, const string & v);
bool isrunning();
void breakpoint(int b);
};
#include "rpc/rpc.h"
struct prop_t {
- unsigned n;
- std::string m;
+ unsigned n;
+ std::string m;
};
class paxos_protocol {
- public:
- enum xxstatus { OK, ERR };
- typedef int status;
- enum rpc_numbers {
- preparereq = 0x11001,
- acceptreq,
- decidereq,
- heartbeat,
- };
-
- struct preparearg {
- unsigned instance;
- prop_t n;
- };
-
- struct prepareres {
- bool oldinstance;
- bool accept;
- prop_t n_a;
- std::string v_a;
- };
-
- struct acceptarg {
- unsigned instance;
- prop_t n;
- std::string v;
- };
-
- struct decidearg {
- unsigned instance;
- std::string v;
- };
-
+ public:
+ enum status : status_t { OK, ERR };
+ enum rpc_numbers : proc_t {
+ preparereq = 0x11001,
+ acceptreq,
+ decidereq,
+ heartbeat,
+ };
+
+ struct preparearg {
+ unsigned instance;
+ prop_t n;
+ };
+
+ struct prepareres {
+ bool oldinstance;
+ bool accept;
+ prop_t n_a;
+ std::string v_a;
+ };
+
+ struct acceptarg {
+ unsigned instance;
+ prop_t n;
+ std::string v;
+ };
+
+ struct decidearg {
+ unsigned instance;
+ std::string v;
+ };
};
-inline unmarshall &
-operator>>(unmarshall &u, prop_t &a)
-{
- u >> a.n;
- u >> a.m;
- return u;
+inline unmarshall & operator>>(unmarshall &u, prop_t &a) {
+ return u >> a.n >> a.m;
}
-inline marshall &
-operator<<(marshall &m, prop_t a)
-{
- m << a.n;
- m << a.m;
- return m;
+inline marshall & operator<<(marshall &m, prop_t a) {
+ return m << a.n << a.m;
}
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::preparearg &a)
-{
- u >> a.instance;
- u >> a.n;
- return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::preparearg &a) {
+ return u >> a.instance >> a.n;
}
-inline marshall &
-operator<<(marshall &m, paxos_protocol::preparearg a)
-{
- m << a.instance;
- m << a.n;
- return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::preparearg a) {
+ return m << a.instance << a.n;
}
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::prepareres &r)
-{
- u >> r.oldinstance;
- u >> r.accept;
- u >> r.n_a;
- u >> r.v_a;
- return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) {
+ return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a;
}
-inline marshall &
-operator<<(marshall &m, paxos_protocol::prepareres r)
-{
- m << r.oldinstance;
- m << r.accept;
- m << r.n_a;
- m << r.v_a;
- return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::prepareres r) {
+ return m << r.oldinstance << r.accept << r.n_a << r.v_a;
}
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::acceptarg &a)
-{
- u >> a.instance;
- u >> a.n;
- u >> a.v;
- return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::acceptarg &a) {
+ return u >> a.instance >> a.n >> a.v;
}
-inline marshall &
-operator<<(marshall &m, paxos_protocol::acceptarg a)
-{
- m << a.instance;
- m << a.n;
- m << a.v;
- return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::acceptarg a) {
+ return m << a.instance << a.n << a.v;
}
-inline unmarshall &
-operator>>(unmarshall &u, paxos_protocol::decidearg &a)
-{
- u >> a.instance;
- u >> a.v;
- return u;
+inline unmarshall & operator>>(unmarshall &u, paxos_protocol::decidearg &a) {
+ return u >> a.instance >> a.v;
}
-inline marshall &
-operator<<(marshall &m, paxos_protocol::decidearg a)
-{
- m << a.instance;
- m << a.v;
- return m;
+inline marshall & operator<<(marshall &m, paxos_protocol::decidearg a) {
+ return m << a.instance << a.v;
}
#endif
#include <inttypes.h>
#include "lang/verify.h"
+using proc_t = uint32_t;
+using status_t = int32_t;
+
struct request_header {
- request_header(int x=0, int p=0, unsigned c=0, unsigned s=0, int xi=0) :
+ request_header(int x=0, proc_t p=0, unsigned c=0, unsigned s=0, int xi=0) :
xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
int xid;
- int proc;
+ proc_t proc;
unsigned int clt_nonce;
unsigned int srv_nonce;
int xid_rep;
marshall& operator<<(marshall &, uint64_t);
marshall& operator<<(marshall &, const std::string &);
-template <class A> marshall &
+template <class A, typename I=void>
+struct is_enumerable : std::false_type {};
+
+template<class A> struct is_enumerable<A,
+ decltype(std::declval<A&>().cbegin(), std::declval<A&>().cend(), void())
+> : std::true_type {};
+
+template <class A> typename std::enable_if<is_enumerable<A>::value, marshall>::type &
operator<<(marshall &m, const A &x) {
m << (unsigned int) x.size();
for (const auto &a : x)
template <class A, class B> marshall &
operator<<(marshall &m, const std::pair<A,B> &d) {
- m << d.first;
- m << d.second;
- return m;
+ return m << d.first << d.second;
+}
+
+template<typename E>
+using enum_type_t = typename std::enable_if<std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
+template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
+template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
+
+template <class E> typename std::enable_if<std::is_enum<E>::value, marshall>::type &
+operator<<(marshall &m, E e) {
+ return m << from_enum(e);
}
class unmarshall;
unmarshall& operator>>(unmarshall &, uint64_t &);
unmarshall& operator>>(unmarshall &, int64_t &);
unmarshall& operator>>(unmarshall &, std::string &);
+template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+operator>>(unmarshall &u, E &e);
class unmarshall {
private:
}
};
-template <class A> unmarshall & operator>>(unmarshall &u, A &x) {
+template <class A> typename std::enable_if<is_enumerable<A>::value, unmarshall>::type &
+operator>>(unmarshall &u, A &x) {
unsigned n = u.grab<unsigned>();
x.clear();
while (n--)
return u >> d.first >> d.second;
}
+template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+operator>>(unmarshall &u, E &e) {
+ e = to_enum<E>(u.grab<enum_type_t<E>>());
+ return u;
+}
+
typedef std::function<int(unmarshall &, marshall &)> handler;
//
// One for function pointers...
-template <class F, class R, class args_type, size_t ...Indices>
-typename std::enable_if<!std::is_member_function_pointer<F>::value, int>::type
-invoke(F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
+template <class F, class R, class RV, class args_type, size_t ...Indices>
+typename std::enable_if<!std::is_member_function_pointer<F>::value, RV>::type
+invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
return f(r, std::move(std::get<Indices>(t))...);
}
// And one for pointers to member functions...
-template <class F, class C, class R, class args_type, size_t ...Indices>
-typename std::enable_if<std::is_member_function_pointer<F>::value, int>::type
-invoke(F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
+template <class F, class C, class RV, class R, class args_type, size_t ...Indices>
+typename std::enable_if<std::is_member_function_pointer<F>::value, RV>::type
+invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
return (c->*f)(r, std::move(std::get<Indices>(t))...);
}
// the same pattern as Signature; this allows us to ignore the distinctions
// between various types of callable objects at this level of abstraction.
-template <class F, class C, class ErrorHandler, class R, class... Args>
-struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
+template <class F, class C, class ErrorHandler, class R, class RV, class... Args>
+struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
static inline handler *wrap(F f, C *c=nullptr) {
// This type definition corresponds to an empty struct with
// template parameters running from 0 up to (# args) - 1.
using ArgsStorage = std::tuple<typename std::decay<Args>::type...>;
// Allocate a handler (i.e. std::function) to hold the lambda
// which will unmarshall RPCs and call f.
- return new handler([=](unmarshall &u, marshall &m) -> int {
+ return new handler([=](unmarshall &u, marshall &m) -> RV {
// Unmarshall each argument with the correct type and store the
// result in a tuple.
ArgsStorage t = {u.grab<typename std::decay<Args>::type>()...};
// Verify successful unmarshalling of the entire input stream.
if (!u.okdone())
- return ErrorHandler::unmarshall_args_failure();
+ return (RV)ErrorHandler::unmarshall_args_failure();
// Allocate space for the RPC response -- will be passed into the
// function as an lvalue reference.
R r;
// Perform the invocation. Note that Indices() calls the default
// constructor of the empty struct with the special template
// parameters.
- int b = invoke(f, c, r, t, Indices());
+ RV b = invoke(RV(), f, c, r, t, Indices());
// Marshall the response.
m << r;
// Make like a tree.
template <class Functor, class ErrorHandler=VerifyOnFailure,
class Signature=Functor> struct marshalled_func;
-template <class F, class ErrorHandler, class... Args>
-struct marshalled_func<F, ErrorHandler, int(*)(Args...)> :
- public marshalled_func_imp<F, void, int(Args...), ErrorHandler> {};
+template <class F, class ErrorHandler, class RV, class... Args>
+struct marshalled_func<F, ErrorHandler, RV(*)(Args...)> :
+ public marshalled_func_imp<F, void, RV(Args...), ErrorHandler> {};
-template <class F, class ErrorHandler, class C, class... Args>
-struct marshalled_func<F, ErrorHandler, int(C::*)(Args...)> :
- public marshalled_func_imp<F, C, int(Args...), ErrorHandler> {};
+template <class F, class ErrorHandler, class RV, class C, class... Args>
+struct marshalled_func<F, ErrorHandler, RV(C::*)(Args...)> :
+ public marshalled_func_imp<F, C, RV(Args...), ErrorHandler> {};
template <class F, class ErrorHandler, class Signature>
struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
#include "lock.h"
#include "jsl_log.h"
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lang/verify.h"
+using std::stoi;
+
const rpcc::TO rpcc::to_max = { 120000 };
const rpcc::TO rpcc::to_min = { 1000 };
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
-rpcc::rpcc(sockaddr_in d, bool retrans) :
- dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+rpcc::rpcc(const string & d, bool retrans) :
+ dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
{
if(retrans){
rpcc::cancel(void)
{
lock ml(m_);
- tprintf("rpcc::cancel: force callers to fail");
+ LOG("rpcc::cancel: force callers to fail");
for(auto &p : calls_){
caller *ca = p.second;
destroy_wait_ = true;
destroy_wait_c_.wait(ml);
}
- tprintf("rpcc::cancel: done");
+ LOG("rpcc::cancel: done");
}
int
-rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
+rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
TO to)
{
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
xid_rep = xid_rep_window_.front();
}
}
void
-rpcs::reg1(unsigned int proc, handler *h)
+rpcs::reg1(proc_t proc, handler *h)
{
lock pl(procs_m_);
VERIFY(procs_.count(proc) == 0);
}
void
-rpcs::updatestat(unsigned int proc)
+rpcs::updatestat(proc_t proc)
{
lock cl(count_m_);
counts_[proc]++;
curr_counts_--;
if(curr_counts_ == 0){
- tprintf("RPC STATS: ");
+ LOG("RPC STATS: ");
for (auto i = counts_.begin(); i != counts_.end(); i++)
- tprintf("%x:%lu ", i->first, i->second);
+ LOG(std::hex << i->first << ":" << std::dec << i->second);
lock rwl(reply_window_m_);
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
+ map<unsigned int,list<reply_t> >::iterator clt;
size_t totalrep = 0, maxrep = 0;
for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
request_header h;
req.unpack_req_header(&h);
- unsigned int proc = (unsigned int)h.proc;
+ proc_t proc = h.proc;
if(!req.ok()){
jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
{
lock rwl(reply_window_m_);
- std::list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> &l = reply_window_[clt_nonce];
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
int past_xid_rep = l.begin()->xid;
- std::list<reply_t>::iterator start = l.begin(), it;
+ list<reply_t>::iterator start = l.begin(), it;
it = ++start;
if (past_xid_rep < xid_rep || past_xid_rep == -1) {
{
lock rwl(reply_window_m_);
// remember the RPC reply value
- std::list<reply_t> &l = reply_window_[clt_nonce];
- std::list<reply_t>::iterator it = l.begin();
+ list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t>::iterator it = l.begin();
// skip to our place in the list
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
marshall &
-operator<<(marshall &m, const std::string &s) {
+operator<<(marshall &m, const string &s) {
m << (unsigned int) s.size();
m.rawbytes(s.data(), s.size());
return m;
}
void
-unmarshall::rawbytes(std::string &ss, size_t n)
+unmarshall::rawbytes(string &ss, size_t n)
{
VERIFY(ensure(n));
ss.assign(buf_+index_, n);
unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, std::string &s) {
+unmarshall & operator>>(unmarshall &u, string &s) {
unsigned sz = u.grab<unsigned>();
if(u.ok())
u.rawbytes(s, sz);
}
/*---------------auxilary function--------------*/
-void
-make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
+sockaddr_in make_sockaddr(const string &hostandport) {
auto colon = hostandport.find(':');
- if (colon == std::string::npos)
- make_sockaddr("127.0.0.1", hostandport, dst);
+ if (colon == string::npos)
+ return make_sockaddr("127.0.0.1", hostandport);
else
- make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
+ return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
}
-void
-make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
- bzero(dst, sizeof(*dst));
- dst->sin_family = AF_INET;
+sockaddr_in make_sockaddr(const string &host, const string &port) {
+ sockaddr_in dst;
+ bzero(&dst, sizeof(dst));
+ dst.sin_family = AF_INET;
struct in_addr a{inet_addr(host.c_str())};
if(a.s_addr != INADDR_NONE)
- dst->sin_addr.s_addr = a.s_addr;
+ dst.sin_addr.s_addr = a.s_addr;
else {
struct hostent *hp = gethostbyname(host.c_str());
exit(1);
}
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
- dst->sin_addr.s_addr = a.s_addr;
+ dst.sin_addr.s_addr = a.s_addr;
}
- dst->sin_port = hton((uint16_t)std::stoi(port));
+ dst.sin_port = hton((uint16_t)stoi(port));
+ return dst;
}
#include "thr_pool.h"
#include "marshall.h"
#include "connection.h"
+#include "lock.h"
-#ifdef DMALLOC
-#include "dmalloc.h"
-#endif
+using std::string;
+using std::map;
+using std::list;
class rpc_const {
public:
unmarshall *un;
int intret;
bool done;
- std::mutex m;
- std::condition_variable c;
+ mutex m;
+ cond c;
};
void get_refconn(connection **ch);
connection *chan_;
- std::mutex m_; // protect insert/delete to calls[]
- std::mutex chan_m_;
+ mutex m_; // protect insert/delete to calls[]
+ mutex chan_m_;
bool destroy_wait_;
- std::condition_variable destroy_wait_c_;
+ cond destroy_wait_c_;
- std::map<int, caller *> calls_;
- std::list<int> xid_rep_window_;
+ map<int, caller *> calls_;
+ list<int> xid_rep_window_;
struct request {
request() { clear(); }
void clear() { buf.clear(); xid = -1; }
bool isvalid() { return xid != -1; }
- std::string buf;
+ string buf;
int xid;
};
struct request dup_req_;
int xid_rep_done_;
public:
- rpcc(sockaddr_in d, bool retrans=true);
+ rpcc(const string & d, bool retrans=true);
~rpcc();
struct TO {
void set_reachable(bool r) { reachable_ = r; }
void cancel();
-
- int islossy() { return lossytest_ > 0; }
- int call1(unsigned int proc,
+ int islossy() { return lossytest_ > 0; }
+
+ int call1(proc_t proc,
marshall &req, unmarshall &rep, TO to);
bool got_pdu(connection *c, char *b, size_t sz);
-
template<class R>
- int call_m(unsigned int proc, marshall &req, R & r, TO to);
+ int call_m(proc_t proc, marshall &req, R & r, TO to);
template<class R, typename ...Args>
- inline int call(unsigned int proc, R & r, const Args&... args);
+ inline int call(proc_t proc, R & r, const Args&... args);
template<class R, typename ...Args>
- inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
+ inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
};
template<class R> int
-rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to)
+rpcc::call_m(proc_t proc, marshall &req, R & r, TO to)
{
unmarshall u;
int intret = call1(proc, req, u, to);
}
template<class R, typename... Args> inline int
-rpcc::call(unsigned int proc, R & r, const Args&... args)
+rpcc::call(proc_t proc, R & r, const Args&... args)
{
return call_timeout(proc, rpcc::to_max, r, args...);
}
template<class R, typename... Args> inline int
-rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
+rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
{
marshall m{args...};
return call_m(proc, m, r, to);
// provide at most once semantics by maintaining a window of replies
// per client that that client hasn't acknowledged receiving yet.
// indexed by client nonce.
- std::map<unsigned int, std::list<reply_t> > reply_window_;
+ map<unsigned int, list<reply_t> > reply_window_;
void free_reply_window(void);
void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
int xid, int rep_xid,
char **b, size_t *sz);
- void updatestat(unsigned int proc);
+ void updatestat(proc_t proc);
// latest connection to the client
- std::map<unsigned int, connection *> conns_;
+ map<unsigned int, connection *> conns_;
// counting
const size_t counting_;
size_t curr_counts_;
- std::map<unsigned int, size_t> counts_;
+ map<proc_t, size_t> counts_;
int lossytest_;
bool reachable_;
// map proc # to function
- std::map<unsigned int, handler *> procs_;
+ map<proc_t, handler *> procs_;
- std::mutex procs_m_; // protect insert/delete to procs[]
- std::mutex count_m_; //protect modification of counts
- std::mutex reply_window_m_; // protect reply window et al
- std::mutex conss_m_; // protect conns_
+ mutex procs_m_; // protect insert/delete to procs[]
+ mutex count_m_; //protect modification of counts
+ mutex reply_window_m_; // protect reply window et al
+ mutex conss_m_; // protect conns_
protected:
void dispatch(djob_t *);
// internal handler registration
- void reg1(unsigned int proc, handler *);
+ void reg1(proc_t proc, handler *);
ThrPool* dispatchpool_;
tcpsconn* listener_;
bool got_pdu(connection *c, char *b, size_t sz);
- template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
+ template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
};
struct ReturnOnFailure {
}
};
-template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
+template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
}
-void make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst);
-void make_sockaddr(const std::string &host, const std::string &port, struct
- sockaddr_in *dst);
+sockaddr_in make_sockaddr(const string &hostandport);
+sockaddr_in make_sockaddr(const string &host, const string &port);
#endif
#include "rpc.h"
#include <arpa/inet.h>
-#include <stdio.h>
+#include <iostream>
+#include <vector>
+#include <thread>
#include <stdlib.h>
-#include <string.h>
#include <getopt.h>
#include <sys/types.h>
#include <unistd.h>
#define NUM_CL 2
-char tprintf_thread_prefix = 'r';
+char log_thread_prefix = 'r';
+
+using std::string;
+using std::cout;
+using std::endl;
+using std::vector;
+using std::thread;
rpcs *server; // server rpc object
rpcc *clients[NUM_CL]; // client rpc object
-struct sockaddr_in dst; //server's ip address
+string dst; //server's ip address
int port;
// server-side handlers. they must be methods of some class
// from multiple classes.
class srv {
public:
- int handle_22(std::string & r, const std::string a, const std::string b);
+ int handle_22(string & r, const string a, const string b);
int handle_fast(int &r, const int a);
int handle_slow(int &r, const int a);
- int handle_bigrep(std::string &r, const size_t a);
+ int handle_bigrep(string &r, const size_t a);
};
// a handler. a and b are arguments, r is the result.
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
int
-srv::handle_22(std::string &r, const std::string a, std::string b)
+srv::handle_22(string &r, const string a, string b)
{
r = a + b;
return 0;
}
int
-srv::handle_bigrep(std::string &r, const size_t len)
+srv::handle_bigrep(string &r, const size_t len)
{
- r = std::string((size_t)len, 'x');
+ r = string((size_t)len, 'x');
return 0;
}
VERIFY(m.size()==RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
- std::string s = std::string("hallo....");
+ string s = "hallo....";
m << i;
m << l;
m << s;
VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
int i1;
unsigned long long l1;
- std::string s1;
+ string s1;
un >> i1;
un >> l1;
un >> s1;
for(int i = 0; i < 100; i++){
int arg = (random() % 2000);
- std::string rep;
+ string rep;
int ret = clients[which_cl]->call(25, rep, arg);
VERIFY(ret == 0);
- if ((int)rep.size()!=arg) {
- printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
- }
+ if ((int)rep.size()!=arg)
+ cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
VERIFY((int)rep.size() == arg);
}
auto end = std::chrono::steady_clock::now();
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (ret != 0)
- printf("%d ms have elapsed!!!\n", (int)diff);
+ cout << diff << " ms have elapsed!!!" << endl;
VERIFY(ret == 0);
VERIFY(rep == (which ? arg+1 : arg+2));
}
while(time(0) - t1 < 10){
int arg = (random() % 2000);
- std::string rep;
+ string rep;
int ret = clients[which_cl]->call(25, rep, arg);
- if ((int)rep.size()!=arg) {
- printf("ask for %d reply got %d ret %d\n",
- arg, (int)rep.size(), ret);
- }
+ if ((int)rep.size()!=arg)
+ cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
VERIFY((int)rep.size() == arg);
}
}
void
simple_tests(rpcc *c)
{
- printf("simple_tests\n");
+ cout << "simple_tests" << endl;
// an RPC call to procedure #22.
// rpcc::call() looks at the argument types to decide how
// to marshall the RPC call packet, and how to unmarshall
// the reply packet.
- std::string rep;
- int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+ string rep;
+ int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == 0); // this is what handle_22 returns
VERIFY(rep == "hello goodbye");
- printf(" -- string concat RPC .. ok\n");
+ cout << " -- string concat RPC .. ok" << endl;
// small request, big reply (perhaps req via UDP, reply via TCP)
intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
VERIFY(intret == 0);
VERIFY(rep.size() == 70000);
- printf(" -- small request, big reply .. ok\n");
+ cout << " -- small request, big reply .. ok" << endl;
// specify a timeout value to an RPC that should succeed (udp)
int xx = 0;
intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
VERIFY(intret == 0 && xx == 78);
- printf(" -- no spurious timeout .. ok\n");
+ cout << " -- no spurious timeout .. ok" << endl;
// specify a timeout value to an RPC that should succeed (tcp)
{
- std::string arg(1000, 'x');
- std::string rep2;
- c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
+ string arg(1000, 'x');
+ string rep2;
+ c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
VERIFY(rep2.size() == 1001);
- printf(" -- no spurious timeout .. ok\n");
+ cout << " -- no spurious timeout .. ok" << endl;
}
// huge RPC
- std::string big(1000000, 'x');
- intret = c->call(22, rep, big, (std::string)"z");
+ string big(1000000, 'x');
+ intret = c->call(22, rep, big, (string)"z");
VERIFY(rep.size() == 1000001);
- printf(" -- huge 1M rpc request .. ok\n");
+ cout << " -- huge 1M rpc request .. ok" << endl;
// specify a timeout value to an RPC that should timeout (udp)
- struct sockaddr_in non_existent;
- memset(&non_existent, 0, sizeof(non_existent));
- non_existent.sin_family = AF_INET;
- non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
- non_existent.sin_port = htons(7661);
+ string non_existent = "127.0.0.1:7661";
rpcc *c1 = new rpcc(non_existent);
time_t t0 = time(0);
intret = c1->bind(rpcc::to(3000));
time_t t1 = time(0);
VERIFY(intret < 0 && (t1 - t0) <= 4);
- printf(" -- rpc timeout .. ok\n");
- printf("simple_tests OK\n");
+ cout << " -- rpc timeout .. ok" << endl;
+ cout << "simple_tests OK" << endl;
}
void
// create threads that make lots of calls in parallel,
// to test thread synchronization for concurrent calls
// and dispatches.
- printf("start concurrent_test (%lu threads) ...", nt);
+ cout << "start concurrent_test (" << nt << " threads) ...";
- std::vector<std::thread> th(nt);
+ vector<thread> th(nt);
for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client1, i);
+ th[i] = thread(client1, i);
for(size_t i = 0; i < nt; i++)
th[i].join();
- printf(" OK\n");
+ cout << " OK" << endl;
}
void
lossy_test()
{
- printf("start lossy_test ...");
+ cout << "start lossy_test ...";
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
if (server) {
size_t nt = 1;
- std::vector<std::thread> th(nt);
+ vector<thread> th(nt);
for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client2, i);
+ th[i] = thread(client2, i);
for(size_t i = 0; i < nt; i++)
th[i].join();
- printf(".. OK\n");
+ cout << ".. OK" << endl;
VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
}
rpcc *client1;
rpcc *client = clients[0];
- printf("failure_test\n");
+ cout << "failure_test" << endl;
delete server;
client1 = new rpcc(dst);
VERIFY (client1->bind(rpcc::to(3000)) < 0);
- printf(" -- create new client and try to bind to failed server .. failed ok\n");
+ cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
delete client1;
startserver();
- std::string rep;
- int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+ string rep;
+ int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == rpc_const::oldsrv_failure);
- printf(" -- call recovered server with old client .. failed ok\n");
+ cout << " -- call recovered server with old client .. failed ok" << endl;
delete client;
VERIFY (client->bind() >= 0);
VERIFY (client->bind() < 0);
- intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+ intret = client->call(22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == 0);
VERIFY(rep == "hello goodbye");
- printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
+ cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
size_t nt = 10;
- printf(" -- concurrent test on new rpc client w/ %lu threads ..", nt);
+ cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
- std::vector<std::thread> th(nt);
+ vector<thread> th(nt);
for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client3, client);
+ th[i] = thread(client3, client);
for(size_t i = 0; i < nt; i++)
th[i].join();
- printf("ok\n");
+ cout << "ok" << endl;
delete server;
delete client;
startserver();
clients[0] = client = new rpcc(dst);
VERIFY (client->bind() >= 0);
- printf(" -- delete existing rpc client and server, create replacements.. ok\n");
+ cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
- printf(" -- concurrent test on new client and server w/ %lu threads ..", nt);
+ cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client3, client);
+ th[i] = thread(client3, client);
for(size_t i = 0; i < nt; i++)
th[i].join();
- printf("ok\n");
+ cout << "ok" << endl;
- printf("failure_test OK\n");
+ cout << "failure_test OK" << endl;
}
int
testmarshall();
if (isserver) {
- printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
+ cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
startserver();
}
if (isclient) {
// server's address.
- memset(&dst, 0, sizeof(dst));
- dst.sin_family = AF_INET;
- dst.sin_addr.s_addr = inet_addr("127.0.0.1");
- dst.sin_port = htons(port);
+ dst = "127.0.0.1:" + std::to_string(port);
// start the client. bind it to the server.
failure_test();
}
- printf("rpctest OK\n");
+ cout << "rpctest OK" << endl;
exit(0);
}
#include "handle.h"
#include "rsm.h"
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lang/verify.h"
#include "rsm_client.h"
#include "lock.h"
while (!cfg->ismember(cfg->myaddr(), vid_commit)) {
// XXX iannucci 2013/09/15 -- I don't understand whether accessing
// cfg->view_id in this manner involves a race. I suspect not.
- if (join(primary)) {
+ if (join(primary, ml)) {
LOG("recovery: joined");
- commit_change_wo(cfg->view_id());
+ commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
vid_insync = vid_commit;
LOG("recovery: sync vid_insync " << vid_insync);
if (primary == cfg->myaddr()) {
- r = sync_with_backups();
+ r = sync_with_backups(ml);
} else {
- r = sync_with_primary();
+ r = sync_with_primary(ml);
}
LOG("recovery: sync done");
}
}
-bool rsm::sync_with_backups() {
- adopt_lock ml(rsm_mutex);
- ml.unlock();
+bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
+ rsm_mutex_lock.unlock();
{
// Make sure that the state of lock_server is stable during
// synchronization; otherwise, the primary's state may be more recent
// than replicas after the synchronization.
- lock ml2(invoke_mutex);
+ lock invoke_mutex_lock(invoke_mutex);
// By acquiring and releasing the invoke_mutex once, we make sure that
// the state of lock_server will not be changed until all
// replicas are synchronized. The reason is that client_invoke arrives
// after this point of time will see inviewchange == true, and returns
// BUSY.
}
- ml.lock();
+ rsm_mutex_lock.lock();
// Start accepting synchronization request (statetransferreq) now!
insync = true;
cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end()));
- sync_cond.wait(ml);
+ sync_cond.wait(rsm_mutex_lock);
insync = false;
return true;
}
-bool rsm::sync_with_primary() {
+bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
// Remember the primary of vid_insync
std::string m = primary;
while (vid_insync == vid_commit) {
- if (statetransfer(m))
+ if (statetransfer(m, rsm_mutex_lock))
break;
}
- return statetransferdone(m);
+ return statetransferdone(m, rsm_mutex_lock);
}
* Call to transfer state from m to the local node.
* Assumes that rsm_mutex is already held.
*/
-bool rsm::statetransfer(std::string m)
+bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
{
rsm_protocol::transferres r;
handle h(m);
int ret = 0;
- tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n",
- m.c_str(), last_myvs.vid, last_myvs.seqno);
+ LOG("rsm::statetransfer: contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
rpcc *cl;
{
- adopt_lock ml(rsm_mutex);
- ml.unlock();
+ rsm_mutex_lock.unlock();
cl = h.safebind();
if (cl) {
ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000),
r, cfg->myaddr(), last_myvs, vid_insync);
}
- ml.lock();
+ rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(),
- (long unsigned) cl, ret);
+ LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
return false;
}
if (stf && last_myvs != r.last) {
stf->unmarshal_state(r.state);
}
last_myvs = r.last;
- tprintf("rsm::statetransfer transfer from %s success, vs(%d,%d)\n",
- m.c_str(), last_myvs.vid, last_myvs.seqno);
+ LOG("rsm::statetransfer transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
return true;
}
-bool rsm::statetransferdone(std::string m) {
- adopt_lock ml(rsm_mutex);
- ml.unlock();
+bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
+ rsm_mutex_lock.unlock();
handle h(m);
rpcc *cl = h.safebind();
bool done = false;
if (cl) {
int r;
- rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
+ auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
done = (ret == rsm_protocol::OK);
}
- ml.lock();
+ rsm_mutex_lock.lock();
return done;
}
-bool rsm::join(std::string m) {
+bool rsm::join(std::string m, lock & rsm_mutex_lock) {
handle h(m);
int ret = 0;
rsm_protocol::joinres r;
LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
rpcc *cl;
{
- adopt_lock ml(rsm_mutex);
- ml.unlock();
+ rsm_mutex_lock.unlock();
cl = h.safebind();
if (cl != 0) {
ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r,
cfg->myaddr(), last_myvs);
}
- ml.lock();
+ rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
*/
void rsm::commit_change(unsigned vid) {
lock ml(rsm_mutex);
- commit_change_wo(vid);
+ commit_change(vid, ml);
if (cfg->ismember(cfg->myaddr(), vid_commit))
breakpoint2();
}
-void rsm::commit_change_wo(unsigned vid) {
+void rsm::commit_change(unsigned vid, lock &) {
if (vid <= vid_commit)
return;
- tprintf("commit_change: new view (%d) last vs (%d,%d) %s insync %d\n",
- vid, last_myvs.vid, last_myvs.seqno, primary.c_str(), insync);
+ LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
+ last_myvs.seqno << ") " << primary << " insync " << insync);
vid_commit = vid;
inviewchange = true;
set_primary(vid);
void rsm::execute(int procno, std::string req, std::string &r) {
- tprintf("execute\n");
+ LOG("execute");
handler *h = procs[procno];
VERIFY(h);
unmarshall args(req);
marshall rep;
std::string reps;
- rsm_protocol::status ret = (*h)(args, rep);
+ auto ret = (rsm_protocol::status)(*h)(args, rep);
marshall rep1;
rep1 << ret;
rep1 << rep.str();
rpcc *cl = h.safebind();
if (!cl)
return rsm_client_protocol::BUSY;
- rsm_protocol::status ret;
int ignored_rval;
- ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req);
+ auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req);
LOG("Invoke returned " << ret);
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
breakpoint1();
- partition1();
+ lock rsm_mutex_lock(rsm_mutex);
+ partition1(rsm_mutex_lock);
}
}
execute(procno, req, r);
rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
- int ret = rsm_protocol::OK;
- tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
- last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
- if (!insync || vid != vid_insync) {
+ LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
+ last_myvs.vid << "," << last_myvs.seqno << ")");
+ if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
- }
if (stf && last != last_myvs)
r.state = stf->marshal_state();
r.last = last_myvs;
- return ret;
+ return rsm_protocol::OK;
}
/**
// joinreq to the RSM's current primary; this is the
// handler for that RPC.
rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) {
- int ret = rsm_protocol::OK;
+ auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
- tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(),
- last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
+ LOG("joinreq: src " << m << " last (" << last.vid << "," << last.seqno << ") mylast (" <<
+ last_myvs.vid << "," << last_myvs.seqno << ")");
if (cfg->ismember(m, vid_commit)) {
- tprintf("joinreq: is still a member\n");
+ LOG("joinreq: is still a member");
r.log = cfg->dump();
} else if (cfg->myaddr() != primary) {
- tprintf("joinreq: busy\n");
+ LOG("joinreq: busy");
ret = rsm_protocol::BUSY;
} else {
// We cache vid_commit to avoid adding m to a view which already contains
}
if (cfg->ismember(m, cfg->view_id())) {
r.log = cfg->dump();
- tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str());
+ LOG("joinreq: ret " << ret << " log " << r.log);
} else {
- tprintf("joinreq: failed; proposer couldn't add %d\n", succ);
+ LOG("joinreq: failed; proposer couldn't add " << succ);
ret = rsm_protocol::BUSY;
}
}
cfg->get_view(vid_commit, m);
m.push_back(primary);
r = m;
- tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(),
- primary.c_str());
+ LOG("rsm::client_members return " << print_members(m) << " m " << primary);
return rsm_client_protocol::OK;
}
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
- tprintf("set_primary: primary stays %s\n", primary.c_str());
+ LOG("set_primary: primary stays " << primary);
return;
}
for (unsigned i = 0; i < p.size(); i++) {
if (isamember(p[i], c)) {
primary = p[i];
- tprintf("set_primary: primary is %s\n", primary.c_str());
+ LOG("set_primary: primary is " << primary);
return;
}
}
// Simulate partitions
// assumes caller holds rsm_mutex
-void rsm::net_repair_wo(bool heal) {
+void rsm::net_repair(bool heal, lock &) {
std::vector<std::string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
handle h(m[i]);
- tprintf("rsm::net_repair_wo: %s %d\n", m[i].c_str(), heal);
+ LOG("rsm::net_repair: " << m[i] << " " << heal);
if (h.safebind()) h.safebind()->set_reachable(heal);
}
}
rsmrpc->set_reachable(heal);
}
-rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
lock ml(rsm_mutex);
- tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
- heal, dopartition, partitioned);
+ LOG("rsm::test_net_repairreq: " << heal << " (dopartition " <<
+ dopartition << ", partitioned " << partitioned << ")");
if (heal) {
- net_repair_wo(heal);
+ net_repair(heal, ml);
partitioned = false;
} else {
dopartition = true;
void rsm::breakpoint1() {
if (break1) {
- tprintf("Dying at breakpoint 1 in rsm!\n");
+ LOG("Dying at breakpoint 1 in rsm!");
exit(1);
}
}
void rsm::breakpoint2() {
if (break2) {
- tprintf("Dying at breakpoint 2 in rsm!\n");
+ LOG("Dying at breakpoint 2 in rsm!");
exit(1);
}
}
-void rsm::partition1() {
+void rsm::partition1(lock & rsm_mutex_lock) {
if (dopartition) {
- net_repair_wo(false);
+ net_repair(false, rsm_mutex_lock);
dopartition = false;
partitioned = true;
}
}
-rsm_test_protocol::status rsm::breakpointreq(int &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
- tprintf("rsm::breakpointreq: %d\n", b);
+ LOG("rsm::breakpointreq: " << b);
if (b == 1) break1 = true;
else if (b == 2) break2 = true;
else if (b == 3 || b == 4) cfg->breakpoint(b);
rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid);
rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src,
viewstamp last);
- rsm_test_protocol::status test_net_repairreq(int &r, int heal);
- rsm_test_protocol::status breakpointreq(int &r, int b);
+ rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
+ rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
std::mutex rsm_mutex;
std::mutex invoke_mutex;
void execute(int procno, std::string req, std::string &r);
rsm_client_protocol::status client_invoke(std::string &r, int procno,
std::string req);
- bool statetransfer(std::string m);
- bool statetransferdone(std::string m);
- bool join(std::string m);
+ bool statetransfer(std::string m, lock & rsm_mutex_lock);
+ bool statetransferdone(std::string m, lock & rsm_mutex_lock);
+ bool join(std::string m, lock & rsm_mutex_lock);
void set_primary(unsigned vid);
std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid);
- bool sync_with_backups();
- bool sync_with_primary();
- void net_repair_wo(bool heal);
+ bool sync_with_backups(lock & rsm_mutex_lock);
+ bool sync_with_primary(lock & rsm_mutex_lock);
+ void net_repair(bool heal, lock & rsm_mutex_lock);
void breakpoint1();
void breakpoint2();
- void partition1();
- void commit_change_wo(unsigned vid);
+ void partition1(lock & rsm_mutex_lock);
+ void commit_change(unsigned vid, lock & rsm_mutex_lock);
public:
rsm (std::string _first, std::string _me);
~rsm() {}
#include <unistd.h>
#include "lang/verify.h"
#include "lock.h"
-#include "tprintf.h"
+#include "threaded_log.h"
-rsm_client::rsm_client(std::string dst) {
+rsm_client::rsm_client(std::string dst) : primary(dst) {
LOG("create rsm_client");
- std::vector<std::string> mems;
-
- sockaddr_in dstsock;
- make_sockaddr(dst.c_str(), &dstsock);
- primary = dst;
-
- {
- lock ml(rsm_client_mutex);
- VERIFY (init_members());
- }
+ lock ml(rsm_client_mutex);
+ VERIFY (init_members(ml));
LOG("rsm_client: done");
}
-// Assumes caller holds rsm_client_mutex
-void rsm_client::primary_failure() {
+void rsm_client::primary_failure(lock &) {
primary = known_mems.back();
known_mems.pop_back();
}
rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
- int ret = 0;
lock ml(rsm_client_mutex);
while (1) {
LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
ml.unlock();
rpcc *cl = h.safebind();
+ auto ret = rsm_client_protocol::OK;
if (cl)
- ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
+ ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
ml.lock();
if (!cl)
LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
if (ret == rsm_client_protocol::OK)
- break;
+ return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
LOG("rsm is busy " << primary);
sleep(3);
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
- if (init_members())
+ if (init_members(ml))
continue;
}
prim_fail:
LOG("primary " << primary << " failed ret " << std::dec << ret);
- primary_failure();
+ primary_failure(ml);
LOG("rsm_client::invoke: retry new primary " << primary);
}
- return ret;
}
-bool rsm_client::init_members() {
+bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
LOG("rsm_client::init_members get members!");
handle h(primary);
int ret = rsm_client_protocol::ERR;
rpcc *cl;
{
- adopt_lock ml(rsm_client_mutex);
- ml.unlock();
+ rsm_client_mutex_lock.unlock();
cl = h.safebind();
if (cl)
ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0);
- ml.lock();
+ rsm_client_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK)
return false;
std::string primary;
std::vector<std::string> known_mems;
std::mutex rsm_client_mutex;
- void primary_failure();
- bool init_members();
+ void primary_failure(lock & rsm_client_mutex_lock);
+ bool init_members(lock & rsm_client_mutex_lock);
public:
rsm_client(std::string dst);
rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req);
template<class R>
int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
- std::string rep;
- std::string res;
- int intret = invoke(proc, rep, req.cstr());
- VERIFY( intret == rsm_client_protocol::OK );
- unmarshall u(rep);
- u >> intret;
- if (intret < 0) return intret;
- u >> res;
- if (!u.okdone()) {
- fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
- "You probably forgot to set the reply string in "
- "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
- "type\n", proc);
- VERIFY(0);
- return rpc_const::unmarshal_reply_failure;
- }
- unmarshall u1(res);
- u1 >> r;
- if(!u1.okdone()) {
- fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
- "You are probably calling RPC 0x%x with wrong return "
- "type.\n", proc);
- VERIFY(0);
- return rpc_const::unmarshal_reply_failure;
- }
- return intret;
+ std::string rep;
+ std::string res;
+ int intret = invoke(proc, rep, req.cstr());
+ VERIFY( intret == rsm_client_protocol::OK );
+ unmarshall u(rep);
+ u >> intret;
+ if (intret < 0) return intret;
+ u >> res;
+ if (!u.okdone()) {
+ fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
+ "You probably forgot to set the reply string in "
+ "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
+ "type\n", proc);
+ VERIFY(0);
+ return rpc_const::unmarshal_reply_failure;
+ }
+ unmarshall u1(res);
+ u1 >> r;
+ if(!u1.okdone()) {
+ fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
+ "You are probably calling RPC 0x%x with wrong return "
+ "type.\n", proc);
+ VERIFY(0);
+ return rpc_const::unmarshal_reply_failure;
+ }
+ return intret;
}
template<class R, class ...Args>
#include "rpc/rpc.h"
-
class rsm_client_protocol {
- public:
- enum xxstatus { OK, ERR, NOTPRIMARY, BUSY};
- typedef int status;
- enum rpc_numbers {
- invoke = 0x9001,
- members,
- };
+ public:
+ enum status : status_t {OK, ERR, NOTPRIMARY, BUSY};
+ enum rpc_numbers : proc_t {
+ invoke = 0x9001,
+ members,
+ };
};
-
struct viewstamp {
- viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) :
- vid(_vid), seqno(_seqno) {}
- unsigned int vid;
- unsigned int seqno;
- inline void operator++(int) { seqno++; }
+ viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) : vid(_vid), seqno(_seqno) {}
+ unsigned int vid;
+ unsigned int seqno;
+ inline void operator++(int) { seqno++; }
};
class rsm_protocol {
- public:
- enum xxstatus { OK, ERR, BUSY};
- typedef int status;
- enum rpc_numbers {
- invoke = 0x10001,
- transferreq,
- transferdonereq,
- joinreq,
- };
-
- struct transferres {
- std::string state;
- viewstamp last;
- };
-
- struct joinres {
- std::string log;
- };
+ public:
+ enum status : status_t { OK, ERR, BUSY};
+ enum rpc_numbers : proc_t {
+ invoke = 0x10001,
+ transferreq,
+ transferdonereq,
+ joinreq,
+ };
+
+ struct transferres {
+ std::string state;
+ viewstamp last;
+ };
+
+ struct joinres {
+ std::string log;
+ };
};
inline bool operator==(viewstamp a, viewstamp b) {
- return a.vid == b.vid && a.seqno == b.seqno;
+ return a.vid == b.vid && a.seqno == b.seqno;
}
inline bool operator>(viewstamp a, viewstamp b) {
- return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
+ return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
}
inline bool operator!=(viewstamp a, viewstamp b) {
- return a.vid != b.vid || a.seqno != b.seqno;
+ return a.vid != b.vid || a.seqno != b.seqno;
}
-inline marshall& operator<<(marshall &m, viewstamp v)
-{
- m << v.vid;
- m << v.seqno;
- return m;
+inline marshall& operator<<(marshall &m, viewstamp v) {
+ return m << v.vid << v.seqno;
}
inline unmarshall& operator>>(unmarshall &u, viewstamp &v) {
- u >> v.vid;
- u >> v.seqno;
- return u;
+ return u >> v.vid >> v.seqno;
}
-inline marshall &
-operator<<(marshall &m, rsm_protocol::transferres r)
-{
- m << r.state;
- m << r.last;
- return m;
+inline marshall & operator<<(marshall &m, rsm_protocol::transferres r) {
+ return m << r.state << r.last;
}
-inline unmarshall &
-operator>>(unmarshall &u, rsm_protocol::transferres &r)
-{
- u >> r.state;
- u >> r.last;
- return u;
+inline unmarshall & operator>>(unmarshall &u, rsm_protocol::transferres &r) {
+ return u >> r.state >> r.last;
}
-inline marshall &
-operator<<(marshall &m, rsm_protocol::joinres r)
-{
- m << r.log;
- return m;
+inline marshall & operator<<(marshall &m, rsm_protocol::joinres r) {
+ return m << r.log;
}
-inline unmarshall &
-operator>>(unmarshall &u, rsm_protocol::joinres &r)
-{
- u >> r.log;
- return u;
+inline unmarshall & operator>>(unmarshall &u, rsm_protocol::joinres &r) {
+ return u >> r.log;
}
class rsm_test_protocol {
- public:
- enum xxstatus { OK, ERR};
- typedef int status;
- enum rpc_numbers {
- net_repair = 0x12001,
- breakpoint = 0x12002,
- };
+ public:
+ enum status : status_t {OK, ERR};
+ enum rpc_numbers : proc_t {
+ net_repair = 0x12001,
+ breakpoint = 0x12002,
+ };
};
#endif
#include <stdio.h>
#include <string>
-char tprintf_thread_prefix = 't';
+char log_thread_prefix = 't';
int
main(int argc, char *argv[])
#include <iostream>
#include <stdio.h>
-rsmtest_client::rsmtest_client(std::string dst)
-{
- sockaddr_in dstsock;
- make_sockaddr(dst.c_str(), &dstsock);
- cl = new rpcc(dstsock);
- if (cl->bind() < 0) {
+rsmtest_client::rsmtest_client(std::string dst) : cl(dst) {
+ if (cl.bind() < 0)
printf("rsmtest_client: call bind\n");
- }
}
-int
-rsmtest_client::net_repair(int heal)
-{
- int r;
- int ret = cl->call(rsm_test_protocol::net_repair, r, heal);
+rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
+ rsm_test_protocol::status r;
+ auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::net_repair, r, heal);
VERIFY (ret == rsm_test_protocol::OK);
return r;
}
-int
-rsmtest_client::breakpoint(int b)
-{
- int r;
- int ret = cl->call(rsm_test_protocol::breakpoint, r, b);
+rsm_test_protocol::status rsmtest_client::breakpoint(int b) {
+ rsm_test_protocol::status r;
+ auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::breakpoint, r, b);
VERIFY (ret == rsm_test_protocol::OK);
return r;
}
// Client interface to the rsmtest server
class rsmtest_client {
protected:
- rpcc *cl;
+ rpcc cl;
public:
rsmtest_client(std::string d);
virtual ~rsmtest_client() {}
#include <sys/time.h>
#include <stdint.h>
-#include "tprintf.h"
+#include "threaded_log.h"
std::mutex cerr_mutex;
std::map<std::thread::id, int> thread_name_map;
-#ifndef tprintf_h
-#define tprintf_h
+#ifndef threaded_log_h
+#define threaded_log_h
#include <iomanip>
#include <iostream>
extern int next_thread_num;
extern std::map<void *, int> instance_name_map;
extern int next_instance_num;
-extern char tprintf_thread_prefix;
+extern char log_thread_prefix;
template <class A>
struct iterator_pair : public std::pair<A, A> {
_tid_ = thread_name_map[_thread_] = ++next_thread_num; \
auto _utime_ = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \
- std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << _tid_; \
+ std::cerr << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << _tid_; \
std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \
}
#define LOG_THIS_POINTER { \
LOG_SUFFIX; \
}
-#define tprintf(...) { \
- char *buf = nullptr; \
- int len = asprintf(&buf, __VA_ARGS__); \
- if (buf[len-1]=='\n') \
- buf[len-1] = '\0'; \
- LOG_NONMEMBER(buf); \
- free(buf); \
-}
-
#endif