-#include <thread>
-#include <sstream>
#include "config.h"
-#include "paxos.h"
#include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
config::config(const string &_first, const string &_me, config_view_change *_vc)
: my_view_id(0), first(_first), me(_me), vc(_vc),
- paxos_acceptor(this, me == _first, me, me),
- paxos_proposer(this, &paxos_acceptor, me)
+ paxos(this, me == _first, me, me)
{
get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
lock cfg_mutex_lock(cfg_mutex);
void config::restore(const string &s) {
lock cfg_mutex_lock(cfg_mutex);
- paxos_acceptor.restore(s);
+ paxos.restore(s);
reconstruct(cfg_mutex_lock);
}
}
void config::get_view(unsigned instance, vector<string> &m, lock &) {
- string value = paxos_acceptor.value(instance);
+ string value = paxos.value(instance);
LOG("get_view(" << instance << "): returns " << value);
m = members(value);
}
void config::reconstruct(lock &cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
- if (paxos_acceptor.instance() > 0) {
- my_view_id = paxos_acceptor.instance();
+ my_view_id = paxos.instance();
+ if (my_view_id > 0) {
get_view(my_view_id, mems, cfg_mutex_lock);
LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
}
vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
- bool r = paxos_proposer.run(nextvid, cmems, value(m));
+ bool r = paxos.run(nextvid, cmems, value(m));
cfg_mutex_lock.lock();
LOG("config::add: proposer returned " << (r ? "success" : "failure"));
return r;
vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
- bool r = paxos_proposer.run(nextvid, cmems, value(n));
+ bool r = paxos.run(nextvid, cmems, value(n));
cfg_mutex_lock.lock();
LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
return r;
LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
if (vid == my_view_id)
return paxos_protocol::OK;
- else if (paxos_proposer.isrunning()) {
+ else if (paxos.isrunning()) {
VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
return paxos_protocol::OK;
}
#ifndef config_h
#define config_h
-#include <string>
-#include <vector>
+#include "types.h"
#include "paxos.h"
-#include "lock.h"
-
-using std::chrono::steady_clock;
-using std::chrono::seconds;
-using std::string;
-using std::vector;
-using std::thread;
-using std::ostringstream;
-using std::istringstream;
-using std::ostream_iterator;
-using std::istream_iterator;
-using std::copy;
-using std::min;
-using std::min_element;
class config_view_change {
public:
string first;
string me;
config_view_change *vc;
- acceptor paxos_acceptor;
- proposer paxos_proposer;
+ proposer_acceptor paxos;
vector<string> mems;
mutex cfg_mutex;
cond config_cond;
config(const string &_first, const string &_me, config_view_change *_vc);
unsigned view_id() { return my_view_id; }
const string &myaddr() const { return me; }
- string dump() { return paxos_acceptor.dump(); }
+ string dump() { return paxos.dump(); }
void get_view(unsigned instance, vector<string> &m);
void restore(const string &s);
bool add(const string &, unsigned view_id);
bool ismember(const string &m, unsigned view_id);
void heartbeater(void);
void paxos_commit(unsigned instance, const string &v);
- // XXX hack; maybe should have its own port number
- rpcs *get_rpcs() { return paxos_acceptor.get_rpcs(); }
- void breakpoint(int b) { paxos_proposer.breakpoint(b); }
+ rpcs *get_rpcs() { return paxos.get_rpcs(); }
+ void breakpoint(int b) { paxos.breakpoint(b); }
};
#endif
#include "handle.h"
-#include "threaded_log.h"
-#include "lock.h"
-#include <map>
-
-using std::map;
class hinfo {
public:
#ifndef handle_h
#define handle_h
+#include "types.h"
#include "rpc/rpc.h"
-#include <string>
-
-using std::string;
class hinfo;
#ifndef verify_client_h
#define verify_client_h
-#include <stdlib.h>
-#include <assert.h>
+#include <cstdlib>
+#include <cassert>
#ifdef NDEBUG
#define VERIFY(expr) do { if (!(expr)) abort(); } while (0)
+++ /dev/null
-#ifndef lock_h
-#define lock_h
-
-#include <thread>
-#include <mutex>
-
-using std::mutex;
-using lock = std::unique_lock<std::mutex>;
-using cond = std::condition_variable;
-
-#endif
// RPC stubs for clients to talk to lock_server, and cache the locks.
#include "lock_client.h"
-#include "rpc/rpc.h"
-#include <algorithm>
-#include "threaded_log.h"
#include <arpa/inet.h>
-#include "rsm_client.h"
-#include "lock.h"
-
void lock_state::wait(lock & mutex_lock) {
auto self = std::this_thread::get_id();
c[self].wait(mutex_lock);
srandom((uint32_t)time(NULL)^last_port);
rlock_port = ((random()%32000) | (0x1 << 10));
- id = "127.0.0.1:" + std::to_string(rlock_port);
+ id = "127.0.0.1:" + to_string(rlock_port);
last_port = rlock_port;
rpcs *rlsrpc = new rpcs(rlock_port);
rlsrpc->reg(rlock_protocol::revoke, &lock_client::revoke_handler, this);
// lock client interface.
#ifndef lock_client_h
-
#define lock_client_h
#ifdef __cplusplus
-#include <string>
+#include "types.h"
#include "lock_protocol.h"
-#include "rpc/rpc.h"
-#include "lang/verify.h"
#include "rpc/fifo.h"
#include "rsm_client.h"
-#include "lock.h"
class lock_release_user {
public:
virtual ~lock_release_user() {}
};
-using std::string;
-using std::map;
-using std::thread;
-using std::list;
-
class lock_state {
public:
enum {
#include "lock_client.h"
-#include "threaded_log.h"
char log_thread_prefix = 'd';
#ifndef lock_protocol_h
#define lock_protocol_h
+#include "types.h"
#include "rpc/rpc.h"
-#include <string>
-
-using std::string;
class lock_protocol {
public:
enum rpc_numbers : proc_t {
acquire = 0x7001,
release,
- stat
+ stat,
};
};
enum status : status_t { OK, RPCERR };
enum rpc_numbers : proc_t {
revoke = 0x8001,
- retry = 0x8002
+ retry,
};
};
#endif
// the caching lock server implementation
+#include "types.h"
#include "lock_server.h"
-#include <sstream>
#include <unistd.h>
#include <arpa/inet.h>
-#include "lang/verify.h"
#include "handle.h"
-#include "threaded_log.h"
-#include "rpc/marshall.h"
-#include "lock.h"
-
-using std::ostringstream;
-using std::istringstream;
-using std::vector;
lock_state::lock_state():
held(false)
continue;
lock_state &st = get_lock_state(lid);
- holder held_by;
+ holder_t held_by;
{
lock sl(st.m);
held_by = st.held_by;
LOG("Sending retry for " << lid);
lock_state &st = get_lock_state(lid);
- holder front;
+ holder_t front;
{
lock sl(st.m);
if (st.wanted_by.empty())
}
int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
- LOG_FUNC_ENTER_SERVER;
- holder h = holder(id, xid);
+ LOG("lid=" << lid << " client=" << id << "," << xid);
+ holder_t h = holder_t(id, xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
// get in line
bool found = false;
- for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
- if (i->first == id) {
+ for (auto p : st.wanted_by) {
+ if (p.first == id) {
// make sure client is obeying serialization
- if (i->second != xid) {
- LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+ if (p.second != xid) {
+ LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
return lock_protocol::RPCERR;
}
found = true;
if (!found)
st.wanted_by.push_back(h);
- LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
+ LOG("wanted_by=" << st.wanted_by);
// send revoke if we're first in line
if (st.wanted_by.front() == h)
return lock_protocol::RETRY;
}
-int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
- LOG_FUNC_ENTER_SERVER;
+int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
+ LOG("lid=" << lid << " client=" << id << "," << xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
- if (st.held && st.held_by == holder(id, xid)) {
+ if (st.held && st.held_by == holder_t(id, xid)) {
st.held = false;
LOG("Lock " << lid << " not held");
}
#ifndef lock_server_h
#define lock_server_h
-#include <string>
-
-#include <map>
-#include <vector>
+#include "types.h"
#include "lock_protocol.h"
-#include "rpc/rpc.h"
-#include "rsm_state_transfer.h"
#include "rsm.h"
#include "rpc/fifo.h"
-#include "lock.h"
-
-using std::string;
-using std::pair;
-using std::list;
-using std::map;
-typedef string callback;
-typedef pair<callback, lock_protocol::xid_t> holder;
+typedef string callback_t;
+typedef pair<callback_t, lock_protocol::xid_t> holder_t;
class lock_state {
public:
lock_state();
lock_state(const lock_state &other);
bool held;
- holder held_by;
- list<holder> wanted_by;
- map<callback, lock_protocol::xid_t> old_requests;
+ holder_t held_by;
+ list<holder_t> wanted_by;
+ map<callback_t, lock_protocol::xid_t> old_requests;
mutex m;
lock_state& operator=(const lock_state&);
};
-#include "rpc/rpc.h"
+#include "lock_server.h"
#include <arpa/inet.h>
-#include <stdlib.h>
-#include "threaded_log.h"
#include <unistd.h>
-#include "lock_server.h"
-#include "rsm.h"
// Main loop of lock_server
// Lock server tester
//
-#include "lock_protocol.h"
#include "lock_client.h"
-#include "rpc/rpc.h"
#include <arpa/inet.h>
-#include <vector>
-#include <stdlib.h>
-#include <stdio.h>
-#include "lang/verify.h"
-#include "threaded_log.h"
#include <sys/types.h>
#include <unistd.h>
-#include "lock.h"
char log_thread_prefix = 'c';
lock ml(count_mutex);
int x = lid[0] & 0x0f;
if (ct[x] != 0) {
- fprintf(stderr, "error: server granted %s twice\n", lid.c_str());
- fprintf(stdout, "error: server granted %s twice\n", lid.c_str());
+ cout << "error: server granted " << lid << " twice" << endl;
+ cerr << "error: server granted " << lid << " twice" << endl;
exit(1);
}
ct[x] += 1;
lock ml(count_mutex);
int x = lid[0] & 0x0f;
if (ct[x] != 1) {
- fprintf(stderr, "error: client released un-held lock %s\n", lid.c_str());
+ cerr << "error: client released un-held lock " << lid << endl;
exit(1);
}
ct[x] -= 1;
srandom((uint32_t)getpid());
if (argc < 2) {
- fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
+ cerr << "Usage: " << argv[0] << " [host:]port [test]" << endl;
exit(1);
}
+#include "log.h"
#include "paxos.h"
-#include <fstream>
-#include <iostream>
-#include "threaded_log.h"
// Paxos must maintain some durable state (i.e., that survives power
// failures) to run Paxos correct. This module implements a log with
// all durable state to run Paxos. Since the values chosen correspond
// to views, the log contains all views since the beginning of time.
-log::log(acceptor *_acc, std::string _me) : pxs (_acc) {
+log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) {
name = "paxos-" + _me + ".log";
logread();
}
void log::logread(void) {
- std::ifstream from;
- std::string type;
+ ifstream from(name);
+ string type;
unsigned instance;
- from.open(name.c_str());
LOG("logread");
while (from >> type) {
if (type == "done") {
- std::string v;
+ string v;
from >> instance;
from.get();
getline(from, v);
pxs->n_h.n = 0;
pxs->n_a.n = 0;
} else if (type == "propseen") {
- from >> pxs->n_h.n;
- from >> pxs->n_h.m;
+ from >> pxs->n_h.n >> pxs->n_h.m;
LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
} else if (type == "accepted") {
- std::string v;
- from >> pxs->n_a.n;
- from >> pxs->n_a.m;
+ string v;
+ from >> pxs->n_a.n >> pxs->n_a.m;
from.get();
getline(from, v);
pxs->v_a = v;
from.close();
}
-std::string log::dump() {
- std::ifstream from;
- std::string res;
- std::string v;
- from.open(name.c_str());
+string log::dump() {
+ ifstream from(name);
+ string res;
+ string v;
while (getline(from, v))
res += v + "\n";
from.close();
return res;
}
-void log::restore(std::string s) {
- std::ofstream f;
+void log::restore(string s) {
LOG("restore: " << s);
- f.open(name.c_str(), std::ios::trunc);
+ ofstream f(name, std::ios::trunc);
f << s;
f.close();
}
// XXX should be an atomic operation
-void log::loginstance(unsigned instance, std::string v) {
- std::ofstream f(name, std::ios::app);
+void log::loginstance(unsigned instance, string v) {
+ ofstream f(name, std::ios::app);
f << "done " << instance << " " << v << "\n";
f.close();
}
// an acceptor should call logprop(n_h) when it
// receives a prepare to which it responds prepare_ok().
void log::logprop(prop_t n_h) {
- std::ofstream f;
- f.open(name.c_str(), std::ios::app);
- f << "propseen";
- f << " ";
- f << n_h.n;
- f << " ";
- f << n_h.m;
- f << "\n";
+ ofstream f(name, std::ios::app);
+ f << "propseen " << n_h.n << " " << n_h.m << "\n";
f.close();
}
// an acceptor should call logaccept(n_a, v_a) when it
// receives an accept RPC to which it replies accept_ok().
-void log::logaccept(prop_t n, std::string v) {
- std::ofstream f(name, std::ios::app);
+void log::logaccept(prop_t n, string v) {
+ ofstream f(name, std::ios::app);
f << "accepted " << n.n << " " << n.m << " " << v << "\n";
f.close();
}
#ifndef log_h
#define log_h
-#include <string>
-#include <vector>
+#include "types.h"
+#include "paxos_protocol.h"
-
-class acceptor;
+class proposer_acceptor;
class log {
- private:
- std::string name;
- acceptor *pxs;
- public:
- log (acceptor*, std::string _me);
- std::string dump();
- void restore(std::string s);
- void logread(void);
- /* Log a committed paxos instance*/
- void loginstance(unsigned instance, std::string v);
- /* Log the highest proposal number that the local paxos acceptor has ever seen */
- void logprop(prop_t n_h);
- /* Log the proposal (proposal number and value) that the local paxos acceptor
- accept has ever accepted */
- void logaccept(prop_t n_a, std::string v);
+ private:
+ string name;
+ proposer_acceptor *pxs;
+ public:
+ log (proposer_acceptor*, string _me);
+ string dump();
+ void restore(string s);
+ void logread(void);
+ // Log a committed paxos instance
+ void loginstance(unsigned instance, string v);
+ // Log the highest proposal number that the local paxos acceptor has ever seen
+ void logprop(prop_t n_h);
+ // Log the proposal (proposal number and value) that the local paxos acceptor
+ // accept has ever accepted
+ void logaccept(prop_t n_a, string v);
};
#endif /* log_h */
#include "paxos.h"
#include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
-#include "lock.h"
-using std::stoi;
+string print_members(const nodes_t &nodes) {
+ ostringstream ost;
+ copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
+ return ost.str();
+}
+
+bool isamember(const node_t & m, const nodes_t & nodes) {
+ return find(nodes.begin(), nodes.end(), m) != nodes.end();
+}
+
+// check if l2 contains a majority of the elements of l1
+bool majority(const nodes_t &l1, const nodes_t &l2) {
+ auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+ return overlap >= (l1.size() >> 1) + 1;
+}
// This module implements the proposer and acceptor of the Paxos
// distributed algorithm as described by Lamport's "Paxos Made
// paxos_commit to inform higher layers of the agreed value for this
// instance.
-bool operator> (const prop_t &a, const prop_t &b) {
- return (a.n > b.n || (a.n == b.n && a.m > b.m));
-}
-
-bool operator>= (const prop_t &a, const prop_t &b) {
- return (a.n > b.n || (a.n == b.n && a.m >= b.m));
-}
-
-string
-print_members(const vector<string> &nodes) {
- string s;
- s.clear();
- for (unsigned i = 0; i < nodes.size(); i++) {
- s += nodes[i];
- if (i < (nodes.size()-1))
- s += ",";
- }
- return s;
-}
-
-
-bool isamember(const string & m, const vector<string> & nodes) {
- for (auto n : nodes) {
- if (n == m)
- return 1;
- }
- return 0;
-}
-
-bool proposer::isrunning() {
- bool r;
- lock ml(pxs_mutex);
- r = !stable;
- return r;
-}
-
-// check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
- unsigned n = 0;
-
- for (unsigned i = 0; i < l1.size(); i++) {
- if (isamember(l1[i], l2))
- n++;
- }
- return n >= (l1.size() >> 1) + 1;
-}
-
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
- : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
- stable (true)
+proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
+ bool _first, const node_t & _me, const value_t & _value)
+ : delegate(_delegate), me (_me)
{
- my_n.n = 0;
- my_n.m = me;
-}
+ // at this point, the log has already been replayed
+ if (instance_h == 0 && _first) {
+ values[1] = _value;
+ l.loginstance(1, _value);
+ instance_h = 1;
+ }
-void proposer::setn()
-{
- my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+ pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
+ pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
+ pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
}
-bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
+bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
{
- vector<string> accepts;
- vector<string> nodes;
- string v;
- bool r = false;
-
- lock ml(pxs_mutex);
+ lock ml(proposer_mutex);
LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
if (!stable) { // already running proposer?
LOG("proposer::run: already running");
return false;
}
stable = false;
- setn();
- accepts.clear();
- v.clear();
+ bool r = false;
+ my_n.n = std::max(n_h.n, my_n.n) + 1;
+ nodes_t accepts;
+ value_t v = newv;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- LOG("paxos::manager: received a majority of prepare responses");
-
- if (v.size() == 0)
- v = newv;
+ LOG("paxos::run: received a majority of prepare responses");
breakpoint1();
- nodes = accepts;
- accepts.clear();
+ nodes_t nodes;
+ nodes.swap(accepts);
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- LOG("paxos::manager: received a majority of accept responses");
+ LOG("paxos::run: received a majority of accept responses");
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- LOG("paxos::manager: no majority of accept responses");
+ LOG("paxos::run: no majority of accept responses");
}
} else {
- LOG("paxos::manager: no majority of prepare responses");
+ LOG("paxos::run: no majority of prepare responses");
}
} else {
- LOG("paxos::manager: prepare is rejected " << stable);
+ LOG("paxos::run: prepare is rejected " << stable);
}
stable = true;
return r;
}
-// proposer::run() calls prepare to send prepare RPCs to nodes
-// and collect responses. if one of those nodes
-// replies with an oldinstance, return false.
-// otherwise fill in accepts with set of nodes that accepted,
-// set v to the v_a with the highest n_a, and return true.
-bool
-proposer::prepare(unsigned instance, vector<string> & accepts,
- const vector<string> & nodes,
- string & v)
-{
- struct paxos_protocol::preparearg arg = { instance, my_n };
- struct paxos_protocol::prepareres res;
- prop_t n_a = { 0, "" };
- rpcc *r;
+bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
+ const nodes_t & nodes, value_t & v) {
+ prepareres res;
+ prop_t highest_n_a{0, ""};
for (auto i : nodes) {
handle h(i);
- if (!(r = h.safebind()))
+ rpcc *r = h.safebind();
+ if (!r)
continue;
- int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
+ auto status = (paxos_protocol::status)r->call_timeout(
+ paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
LOG("commiting old instance!");
- acc->commit(instance, res.v_a);
+ commit(instance, res.v_a);
return false;
}
if (res.accept) {
accepts.push_back(i);
- if (res.n_a >= n_a) {
+ if (res.n_a >= highest_n_a) {
LOG("found a newer accepted proposal");
v = res.v_a;
- n_a = res.n_a;
+ highest_n_a = res.n_a;
}
}
}
return true;
}
-// run() calls this to send out accept RPCs to accepts.
-// fill in accepts with list of nodes that accepted.
-void
-proposer::accept(unsigned instance, vector<string> & accepts,
- const vector<string> & nodes, const string & v)
-{
- struct paxos_protocol::acceptarg arg = { instance, my_n, v };
- rpcc *r;
+void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
+ const nodes_t & nodes, const value_t & v) {
for (auto i : nodes) {
handle h(i);
- if (!(r = h.safebind()))
+ rpcc *r = h.safebind();
+ if (!r)
continue;
bool accept = false;
- int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
+ int status = r->call_timeout(
+ paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
if (status == paxos_protocol::OK && accept)
accepts.push_back(i);
}
}
-void
-proposer::decide(unsigned instance, const vector<string> & accepts,
- const string & v)
-{
- struct paxos_protocol::decidearg arg = { instance, v };
- rpcc *r;
+void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
for (auto i : accepts) {
handle h(i);
- if (!(r = h.safebind()))
+ rpcc *r = h.safebind();
+ if (!r)
continue;
int res = 0;
- r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
+ r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
- const string & _value)
- : cfg(_cfg), me (_me), instance_h(0)
-{
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
-
- l = new log (this, me);
-
- if (instance_h == 0 && _first) {
- values[1] = _value;
- l->loginstance(1, _value);
- instance_h = 1;
- }
-
- pxs = new rpcs((uint32_t)stoi(_me));
- pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
- pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
- pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
-}
-
paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
- paxos_protocol::preparearg a)
-{
- lock ml(pxs_mutex);
+proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+ lock ml(acceptor_mutex);
r.oldinstance = false;
r.accept = false;
r.n_a = n_a;
r.v_a = v_a;
- if (a.instance <= instance_h) {
+ if (instance <= instance_h) {
r.oldinstance = true;
- r.v_a = values[a.instance];
- } else if (a.n > n_h) {
- n_h = a.n;
- l->logprop(n_h);
+ r.v_a = values[instance];
+ } else if (n > n_h) {
+ n_h = n;
+ l.logprop(n_h);
r.accept = true;
} else {
LOG("I totally rejected this request. Ha.");
}
paxos_protocol::status
-acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
-{
- lock ml(pxs_mutex);
+proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
+ lock ml(acceptor_mutex);
r = false;
- if (a.n >= n_h) {
- n_a = a.n;
- v_a = a.v;
- l->logaccept(n_a, v_a);
- r = true;
+ if (instance == instance_h + 1) {
+ if (n >= n_h) {
+ n_a = n;
+ v_a = v;
+ l.logaccept(n_a, v_a);
+ r = true;
+ }
+ return paxos_protocol::OK;
+ } else {
+ return paxos_protocol::ERR;
}
- return paxos_protocol::OK;
}
-// the src argument is only for debugging
paxos_protocol::status
-acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
-{
- lock ml(pxs_mutex);
- LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
- if (a.instance == instance_h + 1) {
- VERIFY(v_a == a.v);
- commit(a.instance, v_a, ml);
- } else if (a.instance <= instance_h) {
- // we are ahead ignore.
+proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
+ lock ml(acceptor_mutex);
+ LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
+ if (instance == instance_h + 1) {
+ VERIFY(v_a == v);
+ commit(instance, v_a, ml);
+ } else if (instance <= instance_h) {
+ // we are ahead; ignore.
} else {
- // we are behind
+ // we are behind.
VERIFY(0);
}
return paxos_protocol::OK;
}
-void
-acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
-{
+void proposer_acceptor::commit(unsigned instance, const value_t & value) {
+ lock ml(acceptor_mutex);
+ commit(instance, value, ml);
+}
+
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
LOG("acceptor::commit: instance=" << instance << " has v=" << value);
if (instance > instance_h) {
- LOG("commit: highestaccepteinstance = " << instance);
+ LOG("commit: highestacceptedinstance = " << instance);
values[instance] = value;
- l->loginstance(instance, value);
+ l.loginstance(instance, value);
instance_h = instance;
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
+ n_a = n_h = {0, me};
v_a.clear();
- if (cfg) {
+ if (delegate) {
pxs_mutex_lock.unlock();
- cfg->paxos_commit(instance, value);
+ delegate->paxos_commit(instance, value);
pxs_mutex_lock.lock();
}
}
}
-void
-acceptor::commit(unsigned instance, const string & value)
-{
- lock ml(pxs_mutex);
- commit(instance, value, ml);
-}
-
-string
-acceptor::dump()
-{
- return l->dump();
-}
-
-void
-acceptor::restore(const string & s)
-{
- l->restore(s);
- l->logread();
-}
-
-
-
// For testing purposes
-
-// Call this from your code between phases prepare and accept of proposer
-void
-proposer::breakpoint1()
-{
+void proposer_acceptor::breakpoint1() {
if (break1) {
LOG("Dying at breakpoint 1!");
exit(1);
}
}
-// Call this from your code between phases accept and decide of proposer
-void
-proposer::breakpoint2()
-{
+void proposer_acceptor::breakpoint2() {
if (break2) {
LOG("Dying at breakpoint 2!");
exit(1);
}
}
-void
-proposer::breakpoint(int b)
-{
+void proposer_acceptor::breakpoint(int b) {
if (b == 3) {
LOG("Proposer: breakpoint 1");
break1 = true;
#ifndef paxos_h
#define paxos_h
-#include <string>
-#include <vector>
-#include <map>
+#include "types.h"
#include "rpc/rpc.h"
#include "paxos_protocol.h"
#include "log.h"
-#include "lock.h"
-using std::string;
-using std::map;
-using std::vector;
+using prepareres = paxos_protocol::prepareres;
+
+using node_t = string;
+using nodes_t = vector<node_t>;
+using value_t = string;
class paxos_change {
public:
- virtual void paxos_commit(unsigned instance, const string & v) = 0;
+ virtual void paxos_commit(unsigned instance, const value_t & v) = 0;
virtual ~paxos_change() {}
};
-class acceptor {
+extern bool isamember(const node_t & m, const nodes_t & nodes);
+extern bool majority(const nodes_t & l1, const nodes_t & l2);
+extern string print_members(const nodes_t & nodes);
+
+class proposer_acceptor {
private:
- log *l;
- rpcs *pxs;
- paxos_change *cfg;
- string me;
- mutex pxs_mutex;
+ mutex proposer_mutex;
+ mutex acceptor_mutex;
- // Acceptor state
- prop_t n_h; // number of the highest proposal seen in a prepare
- prop_t n_a; // number of highest proposal accepted
- string v_a; // value of highest proposal accepted
- unsigned instance_h; // number of the highest instance we have decided
- map<unsigned,string> values; // vals of each instance
-
- void commit(unsigned instance, const string & v, lock & pxs_mutex_lock);
- paxos_protocol::status preparereq(paxos_protocol::prepareres & r,
- const string & src, paxos_protocol::preparearg a);
- paxos_protocol::status acceptreq(bool & r, const string & src,
- paxos_protocol::acceptarg a);
- paxos_protocol::status decidereq(int & r, const string & src,
- paxos_protocol::decidearg a);
+ paxos_change *delegate;
+ node_t me;
- friend class log;
+ rpcs pxs = {(uint32_t)std::stoi(me)};
- public:
- acceptor(class paxos_change *cfg, bool _first, const string & _me,
- const string & _value);
- ~acceptor() {}
- void commit(unsigned instance, const string & v);
- unsigned instance() { return instance_h; }
- const string & value(unsigned instance) { return values[instance]; }
- string dump();
- void restore(const string &);
- rpcs *get_rpcs() { return pxs; }
- prop_t get_n_h() { return n_h; }
- unsigned get_instance_h() { return instance_h; }
-};
+ bool break1 = false;
+ bool break2 = false;
-extern bool isamember(const string & m, const vector<string> & nodes);
-extern string print_members(const vector<string> & nodes);
+ // Proposer state
+ bool stable = true;
+ prop_t my_n = {0, me}; // number of the last proposal used in this instance
-class proposer {
- private:
- log *l;
- paxos_change *cfg;
- acceptor *acc;
- string me;
- bool break1;
- bool break2;
+ // Acceptor state
+ prop_t n_h = {0, me}; // number of the highest proposal seen in a prepare
+ prop_t n_a = {0, me}; // number of highest proposal accepted
+ value_t v_a; // value of highest proposal accepted
+ unsigned instance_h = 0; // number of the highest instance we have decided
+ map<unsigned,value_t> values; // vals of each instance
- mutex pxs_mutex;
+ friend class log;
+ log l = {this, me};
- // Proposer state
- bool stable;
- prop_t my_n; // number of the last proposal used in this instance
-
- void setn();
- bool prepare(unsigned instance, vector<string> & accepts,
- const vector<string> & nodes,
- string & v);
- void accept(unsigned instance, vector<string> & accepts,
- const vector<string> & nodes, const string & v);
- void decide(unsigned instance, const vector<string> & accepts,
- const string & v);
+ void commit(unsigned instance, const value_t & v);
+ void commit(unsigned instance, const value_t & v, lock & pxs_mutex_lock);
+
+ paxos_protocol::status preparereq(prepareres & r, const node_t & src, unsigned instance, prop_t n);
+ paxos_protocol::status acceptreq(bool & r, const node_t & src, unsigned instance, prop_t n, const value_t & v);
+ paxos_protocol::status decidereq(int & r, const node_t & src, unsigned instance, const value_t & v);
+
+ bool prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v);
+ void accept(unsigned instance, nodes_t & accepts, const nodes_t & nodes, const value_t & v);
+ void decide(unsigned instance, const nodes_t & accepts, const value_t & v);
void breakpoint1();
void breakpoint2();
- bool majority(const vector<string> & l1, const vector<string> & l2);
- friend class log;
public:
- proposer(class paxos_change *cfg, class acceptor *_acceptor, const string &_me);
- ~proposer() {}
- bool run(unsigned instance, const vector<string> & cnodes, const string & v);
- bool isrunning();
+ proposer_acceptor(paxos_change *delegate, bool _first, const node_t & _me, const value_t & _value);
+ unsigned instance() { return instance_h; }
+ const value_t & value(unsigned instance) { return values[instance]; }
+ string dump() { return l.dump(); }
+ void restore(const string &s) { l.restore(s); l.logread(); }
+ rpcs *get_rpcs() { return &pxs; }
+
+ bool run(unsigned instance, const nodes_t & cnodes, const value_t & v);
+ bool isrunning() { lock ml(proposer_mutex); return !stable; }
void breakpoint(int b);
};
-
-
-#endif /* paxos_h */
+#endif
#ifndef paxos_protocol_h
#define paxos_protocol_h
+#include "types.h"
#include "rpc/rpc.h"
struct prop_t {
unsigned n;
- std::string m;
+ string m;
};
class paxos_protocol {
heartbeat,
};
- struct preparearg {
- unsigned instance;
- prop_t n;
- };
-
struct prepareres {
bool oldinstance;
bool accept;
prop_t n_a;
- std::string v_a;
- };
-
- struct acceptarg {
- unsigned instance;
- prop_t n;
- std::string v;
- };
-
- struct decidearg {
- unsigned instance;
- std::string v;
+ string v_a;
};
};
-inline unmarshall & operator>>(unmarshall &u, prop_t &a) {
- return u >> a.n >> a.m;
-}
-
-inline marshall & operator<<(marshall &m, prop_t a) {
- return m << a.n << a.m;
-}
-
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::preparearg &a) {
- return u >> a.instance >> a.n;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::preparearg a) {
- return m << a.instance << a.n;
-}
+inline unmarshall & operator>>(unmarshall &u, prop_t &a) { return u >> a.n >> a.m; }
+inline marshall & operator<<(marshall &m, prop_t a) { return m << a.n << a.m; }
+inline bool operator>(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) > tie(b.n, b.m); }
+inline bool operator>=(const prop_t &a, const prop_t &b) { return tie(a.n, a.m) >= tie(b.n, b.m); }
inline unmarshall & operator>>(unmarshall &u, paxos_protocol::prepareres &r) {
return u >> r.oldinstance >> r.accept >> r.n_a >> r.v_a;
return m << r.oldinstance << r.accept << r.n_a << r.v_a;
}
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::acceptarg &a) {
- return u >> a.instance >> a.n >> a.v;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::acceptarg a) {
- return m << a.instance << a.n << a.v;
-}
-
-inline unmarshall & operator>>(unmarshall &u, paxos_protocol::decidearg &a) {
- return u >> a.instance >> a.v;
-}
-
-inline marshall & operator<<(marshall &m, paxos_protocol::decidearg a) {
- return m << a.instance << a.v;
-}
-
#endif
+// std::bind and syscall bind have the same name, so don't use std::bind in this file
+#define LIBT4_NO_FUNCTIONAL
+#include "connection.h"
#include <fcntl.h>
#include <sys/types.h>
-#include <sys/time.h>
#include <netinet/tcp.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
-
-#include "connection.h"
-#include "pollmgr.h"
#include "jsl_log.h"
-#include "lang/verify.h"
-#include "lock.h"
+#include <sys/socket.h>
#define MAX_PDU (10<<20) //maximum PDF is 10M
-
connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
{
signal(SIGPIPE, SIG_IGN);
- create_time_ = std::chrono::steady_clock::now();
+ create_time_ = steady_clock::now();
PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
}
flags |= O_NONBLOCK;
fcntl(pipe_[0], F_SETFL, flags);
- th_ = std::thread(&tcpsconn::accept_conn, this);
+ th_ = thread(&tcpsconn::accept_conn, this);
}
tcpsconn::~tcpsconn()
th_.join();
//close all the active connections
- std::map<int, connection *>::iterator i;
+ map<int, connection *>::iterator i;
for (i = conns_.begin(); i != conns_.end(); i++) {
i->second->closeconn();
i->second->decref();
connection *ch = new connection(mgr_, s1, lossy_);
// garbage collect all dead connections with refcount of 1
- std::map<int, connection *>::iterator i;
- for (i = conns_.begin(); i != conns_.end();) {
+ for (auto i = conns_.begin(); i != conns_.end();) {
if (i->second->isdead() && i->second->ref() == 1) {
jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
i->second->channo());
#ifndef connection_h
#define connection_h
+#include "types.h"
#include <sys/types.h>
-#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstddef>
-#include <thread>
-
-#include <map>
-#include <limits>
-
#include "pollmgr.h"
-constexpr size_t size_t_max = std::numeric_limits<size_t>::max();
+constexpr size_t size_t_max = numeric_limits<size_t>::max();
-class thread_exit_exception : std::exception {
-};
+class thread_exit_exception : exception {};
class connection;
charbuf wpdu_;
charbuf rpdu_;
- std::chrono::time_point<std::chrono::steady_clock> create_time_;
+ time_point<steady_clock> create_time_;
int waiters_;
int refno_;
const int lossy_;
- std::mutex m_;
- std::mutex ref_m_;
- std::condition_variable send_complete_;
- std::condition_variable send_wait_;
+ mutex m_;
+ mutex ref_m_;
+ cond send_complete_;
+ cond send_wait_;
};
class tcpsconn {
void accept_conn();
private:
unsigned int port_;
- std::mutex m_;
- std::thread th_;
+ mutex m_;
+ thread th_;
int pipe_[2];
int tcp_; //file desciptor for accepting connection
chanmgr *mgr_;
int lossy_;
- std::map<int, connection *> conns_;
+ map<int, connection *> conns_;
void process_accept();
};
#ifndef fifo_h
#define fifo_h
-#include <list>
-#include "lock.h"
+#include "types.h"
// blocks enq() and deq() when queue is FULL or EMPTY
template<class T>
}
private:
- std::list<T> q_;
+ list<T> q_;
mutex m_;
cond non_empty_c_; // q went non-empty
cond has_space_c_; // q is not longer overfull
#ifndef marshall_h
#define marshall_h
-#include <iostream>
-#include <sstream>
-#include <string>
-#include <vector>
-#include <map>
-#include <stdlib.h>
-#include <string.h>
+#include "types.h"
+#include <cstring>
#include <cstddef>
-#include <inttypes.h>
-#include "lang/verify.h"
+#include <cinttypes>
using proc_t = uint32_t;
using status_t = int32_t;
//size of initial buffer allocation
#define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ (std::max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
+#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
class marshall {
private:
inline void reserve(size_t n) {
if((index_+n) > capacity_){
- capacity_ += std::max(capacity_, n);
+ capacity_ += max(capacity_, n);
VERIFY (buf_ != NULL);
buf_ = (char *)realloc(buf_, capacity_);
VERIFY(buf_);
}
// Return the current content (excluding header) as a string
- std::string get_content() {
- return std::string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
+ string get_content() {
+ return string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
}
// Return the current content (excluding header) as a string
- std::string str() {
+ string str() {
return get_content();
}
marshall& operator<<(marshall &, uint16_t);
marshall& operator<<(marshall &, int16_t);
marshall& operator<<(marshall &, uint64_t);
-marshall& operator<<(marshall &, const std::string &);
+marshall& operator<<(marshall &, const string &);
-template <class A, typename I=void>
-struct is_enumerable : std::false_type {};
-
-template<class A> struct is_enumerable<A,
- decltype(std::declval<A&>().cbegin(), std::declval<A&>().cend(), void())
-> : std::true_type {};
-
-template <class A> typename std::enable_if<is_enumerable<A>::value, marshall>::type &
+template <class A> typename enable_if<is_iterable<A>::value, marshall>::type &
operator<<(marshall &m, const A &x) {
m << (unsigned int) x.size();
for (const auto &a : x)
}
template <class A, class B> marshall &
-operator<<(marshall &m, const std::pair<A,B> &d) {
+operator<<(marshall &m, const pair<A,B> &d) {
return m << d.first << d.second;
}
template<typename E>
-using enum_type_t = typename std::enable_if<std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
+using enum_type_t = typename enable_if<is_enum<E>::value, typename underlying_type<E>::type>::type;
template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
-template <class E> typename std::enable_if<std::is_enum<E>::value, marshall>::type &
+template <class E> typename enable_if<is_enum<E>::value, marshall>::type &
operator<<(marshall &m, E e) {
return m << from_enum(e);
}
unmarshall& operator>>(unmarshall &, size_t &);
unmarshall& operator>>(unmarshall &, uint64_t &);
unmarshall& operator>>(unmarshall &, int64_t &);
-unmarshall& operator>>(unmarshall &, std::string &);
-template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+unmarshall& operator>>(unmarshall &, string &);
+template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
operator>>(unmarshall &u, E &e);
class unmarshall {
public:
unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {}
- unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
+ unmarshall(const string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
{
//take the content which does not exclude a RPC header from a string
take_content(s);
void take_in(unmarshall &another);
//take the content which does not exclude a RPC header from a string
- void take_content(const std::string &s) {
+ void take_content(const string &s) {
sz_ = s.size()+RPC_HEADER_SZ;
buf_ = (char *)realloc(buf_,sz_);
VERIFY(buf_);
bool okdone() const { return ok_ && index_ == sz_; }
uint8_t rawbyte();
- void rawbytes(std::string &s, size_t n);
+ void rawbytes(string &s, size_t n);
template <class T> void rawbytes(T &t);
size_t ind() { return index_;}
}
};
-template <class A> typename std::enable_if<is_enumerable<A>::value, unmarshall>::type &
+template <class A> typename enable_if<is_iterable<A>::value, unmarshall>::type &
operator>>(unmarshall &u, A &x) {
unsigned n = u.grab<unsigned>();
x.clear();
}
template <class A, class B> unmarshall &
-operator>>(unmarshall &u, std::map<A,B> &x) {
+operator>>(unmarshall &u, map<A,B> &x) {
unsigned n = u.grab<unsigned>();
x.clear();
while (n--)
- x.emplace(u.grab<std::pair<A,B>>());
+ x.emplace(u.grab<pair<A,B>>());
return u;
}
template <class A, class B> unmarshall &
-operator>>(unmarshall &u, std::pair<A,B> &d) {
+operator>>(unmarshall &u, pair<A,B> &d) {
return u >> d.first >> d.second;
}
-template <class E> typename std::enable_if<std::is_enum<E>::value, unmarshall>::type &
+template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
operator>>(unmarshall &u, E &e) {
e = to_enum<E>(u.grab<enum_type_t<E>>());
return u;
}
-typedef std::function<int(unmarshall &, marshall &)> handler;
+typedef function<int(unmarshall &, marshall &)> handler;
//
// Automatic marshalling wrappers for RPC handlers
// C++11 does neither of these two things for us:
// 1) Declare variables using a parameter pack expansion, like so
// Args ...args;
-// 2) Call a function with a std::tuple of the arguments it expects
+// 2) Call a function with a tuple of the arguments it expects
//
// We implement an 'invoke' function for functions of the RPC handler
// signature, i.e. int(R & r, const Args...)
// One for function pointers...
template <class F, class R, class RV, class args_type, size_t ...Indices>
-typename std::enable_if<!std::is_member_function_pointer<F>::value, RV>::type
+typename enable_if<!is_member_function_pointer<F>::value, RV>::type
invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
- return f(r, std::move(std::get<Indices>(t))...);
+ return f(r, move(get<Indices>(t))...);
}
// And one for pointers to member functions...
template <class F, class C, class RV, class R, class args_type, size_t ...Indices>
-typename std::enable_if<std::is_member_function_pointer<F>::value, RV>::type
+typename enable_if<is_member_function_pointer<F>::value, RV>::type
invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
- return (c->*f)(r, std::move(std::get<Indices>(t))...);
+ return (c->*f)(r, move(get<Indices>(t))...);
}
// The class marshalled_func_imp uses partial template specialization to
// template parameters running from 0 up to (# args) - 1.
using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
// This type definition represents storage for f's unmarshalled
- // arguments. std::decay is (most notably) stripping off const
+ // arguments. decay is (most notably) stripping off const
// qualifiers.
- using ArgsStorage = std::tuple<typename std::decay<Args>::type...>;
- // Allocate a handler (i.e. std::function) to hold the lambda
+ using ArgsStorage = tuple<typename decay<Args>::type...>;
+ // Allocate a handler (i.e. function) to hold the lambda
// which will unmarshall RPCs and call f.
return new handler([=](unmarshall &u, marshall &m) -> RV {
// Unmarshall each argument with the correct type and store the
// result in a tuple.
- ArgsStorage t = {u.grab<typename std::decay<Args>::type>()...};
+ ArgsStorage t = {u.grab<typename decay<Args>::type>()...};
// Verify successful unmarshalling of the entire input stream.
if (!u.okdone())
return (RV)ErrorHandler::unmarshall_args_failure();
public marshalled_func_imp<F, C, RV(Args...), ErrorHandler> {};
template <class F, class ErrorHandler, class Signature>
-struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
+struct marshalled_func<F, ErrorHandler, function<Signature>> :
public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
#endif
+#include "types.h"
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include "jsl_log.h"
-#include "lang/verify.h"
#include "pollmgr.h"
-#include "lock.h"
PollMgr *PollMgr::instance = NULL;
static std::once_flag pollmgr_is_initialized;
#ifndef pollmgr_h
#define pollmgr_h
+#include "types.h"
#include <sys/select.h>
-#include <vector>
-#include <thread>
#ifdef __linux__
#include <sys/epoll.h>
virtual void watch_fd(int fd, poll_flag flag) = 0;
virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
virtual bool is_watched(int fd, poll_flag flag) = 0;
- virtual void wait_ready(std::vector<int> *readable, std::vector<int> *writable) = 0;
+ virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
virtual ~aio_mgr() {}
};
static int useless;
private:
- std::mutex m_;
- std::condition_variable changedone_c_;
- std::thread th_;
+ mutex m_;
+ cond changedone_c_;
+ thread th_;
aio_callback *callbacks_[MAX_POLL_FDS];
aio_mgr *aio_;
void watch_fd(int fd, poll_flag flag);
bool unwatch_fd(int fd, poll_flag flag);
bool is_watched(int fd, poll_flag flag);
- void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
+ void wait_ready(vector<int> *readable, vector<int> *writable);
private:
int highfds_;
int pipefd_[2];
- std::mutex m_;
+ mutex m_;
};
void watch_fd(int fd, poll_flag flag);
bool unwatch_fd(int fd, poll_flag flag);
bool is_watched(int fd, poll_flag flag);
- void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
+ void wait_ready(vector<int> *readable, vector<int> *writable);
private:
int pollfd_;
x exited worker threads).
*/
+#include "types.h"
#include "rpc.h"
#include <sys/types.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <unistd.h>
-#include "lock.h"
#include "jsl_log.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
-
-using std::stoi;
const rpcc::TO rpcc::to_max = { 120000 };
const rpcc::TO rpcc::to_min = { 1000 };
-rpcc::caller::caller(int xxid, unmarshall *xun)
-: xid(xxid), un(xun), done(false)
-{
-}
-
-rpcc::caller::~caller()
-{
-}
-
-inline
-void set_rand_seed()
-{
- auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
+inline void set_rand_seed() {
+ auto now = time_point_cast<nanoseconds>(steady_clock::now());
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
}
TO curr_to;
- std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
- std::chrono::steady_clock::now() +
- std::chrono::milliseconds(to.to),
- nextdeadline;
+ auto finaldeadline = steady_clock::now() + milliseconds(to.to),
+ nextdeadline = finaldeadline;
curr_to.to = to_min.to;
transmit = false; // only send once on a given channel
}
- if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
+ if(finaldeadline == time_point<steady_clock>::min())
break;
- nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
+ nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
if(nextdeadline > finaldeadline) {
nextdeadline = finaldeadline;
- finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
+ finaldeadline = time_point<steady_clock>::min();
}
{
lock cal(ca.m);
while (!ca.done){
jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
- if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
+ if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
break;
}
djob_t *j = new djob_t(c, b, sz);
c->incref();
- bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
+ bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
if(!succ || !reachable_){
c->decref();
delete j;
if(curr_counts_ == 0){
LOG("RPC STATS: ");
for (auto i = counts_.begin(); i != counts_.end(); i++)
- LOG(std::hex << i->first << ":" << std::dec << i->second);
+ LOG(hex << i->first << ":" << dec << i->second);
lock rwl(reply_window_m_);
- map<unsigned int,list<reply_t> >::iterator clt;
size_t totalrep = 0, maxrep = 0;
- for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- totalrep += clt->second.size();
- if(clt->second.size() > maxrep)
- maxrep = clt->second.size();
+ for (auto clt : reply_window_) {
+ totalrep += clt.second.size();
+ if(clt.second.size() > maxrep)
+ maxrep = clt.second.size();
}
jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
(int) reply_window_.size()-1, totalrep, maxrep);
int past_xid_rep = l.begin()->xid;
- list<reply_t>::iterator start = l.begin(), it;
- it = ++start;
+ list<reply_t>::iterator start = l.begin(), it = ++start;
if (past_xid_rep < xid_rep || past_xid_rep == -1) {
// scan for deletion candidates
}
}
-void
-rpcs::free_reply_window(void)
-{
+void rpcs::free_reply_window(void) {
lock rwl(reply_window_m_);
- for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- for (auto it = clt->second.begin(); it != clt->second.end(); it++){
- if (it->cb_present)
- free(it->buf);
+ for (auto clt : reply_window_) {
+ for (auto it : clt.second){
+ if (it.cb_present)
+ free(it.buf);
}
- clt->second.clear();
+ clt.second.clear();
}
reply_window_.clear();
}
-// rpc handler
-int
-rpcs::rpcbind(unsigned int &r, int)
-{
+int rpcs::rpcbind(unsigned int &r, int) {
jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
r = nonce_;
return 0;
#ifndef rpc_h
#define rpc_h
+#include "types.h"
#include <sys/socket.h>
#include <netinet/in.h>
-#include <list>
-#include <map>
-#include <stdio.h>
#include "thr_pool.h"
#include "marshall.h"
#include "connection.h"
-#include "lock.h"
-
-using std::string;
-using std::map;
-using std::list;
class rpc_const {
public:
//manages per rpc info
struct caller {
- caller(int xxid, unmarshall *un);
- ~caller();
+ caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {}
int xid;
unmarshall *un;
int intret;
- bool done;
+ bool done = false;
mutex m;
cond c;
};
if (intret < 0) return intret;
u >> r;
if (u.okdone() != true) {
- fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
- "You are probably calling RPC 0x%x with wrong return "
- "type.\n", proc);
+ cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " <<
+ "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
VERIFY(0);
return rpc_const::unmarshal_reply_failure;
}
// RPC test and pseudo-documentation.
// generates print statements on failures, but eventually says "rpctest OK"
+#include "types.h"
#include "rpc.h"
#include <arpa/inet.h>
-#include <iostream>
-#include <vector>
-#include <thread>
-#include <stdlib.h>
#include <getopt.h>
#include <sys/types.h>
#include <unistd.h>
#include "jsl_log.h"
-#include "lang/verify.h"
#define NUM_CL 2
char log_thread_prefix = 'r';
-using std::string;
-using std::cout;
-using std::endl;
-using std::vector;
-using std::thread;
-
rpcs *server; // server rpc object
rpcc *clients[NUM_CL]; // client rpc object
string dst; //server's ip address
#include "thr_pool.h"
-#include <stdlib.h>
-#include <errno.h>
-#include "lang/verify.h"
// if blocking, then addJob() blocks when queue is full
// otherwise, addJob() simply returns false when queue is full
#ifndef thr_pool_h
#define thr_pool_h
-#include <vector>
-#include <thread>
-
+#include "types.h"
#include "fifo.h"
typedef std::function<void()> job_t;
// The rule is that a module releases its internal locks before it
// upcalls, but can keep its locks when calling down.
-#include <fstream>
-#include <iostream>
-#include <algorithm>
#include <sys/types.h>
#include <unistd.h>
+#include "types.h"
#include "handle.h"
#include "rsm.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
#include "rsm_client.h"
-#include "lock.h"
rsm::rsm(std::string _first, std::string _me) :
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
insync = true;
cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
- LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end()));
+ LOG("rsm::sync_with_backups " << backups);
sync_cond.wait(rsm_mutex_lock);
insync = false;
return true;
bool rsm::join(std::string m, lock & rsm_mutex_lock) {
handle h(m);
int ret = 0;
- rsm_protocol::joinres r;
+ string log;
LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
rpcc *cl;
rsm_mutex_lock.unlock();
cl = h.safebind();
if (cl != 0) {
- ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r,
+ ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), log,
cfg->myaddr(), last_myvs);
}
rsm_mutex_lock.lock();
LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
return false;
}
- LOG("rsm::join: succeeded " << r.log);
- cfg->restore(r.log);
+ LOG("rsm::join: succeeded " << log);
+ cfg->restore(log);
return true;
}
// a node that wants to join an RSM as a server sends a
// joinreq to the RSM's current primary; this is the
// handler for that RPC.
-rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) {
+rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
last_myvs.vid << "," << last_myvs.seqno << ")");
if (cfg->ismember(m, vid_commit)) {
LOG("joinreq: is still a member");
- r.log = cfg->dump();
+ log = cfg->dump();
} else if (cfg->myaddr() != primary) {
LOG("joinreq: busy");
ret = rsm_protocol::BUSY;
ml.lock();
}
if (cfg->ismember(m, cfg->view_id())) {
- r.log = cfg->dump();
- LOG("joinreq: ret " << ret << " log " << r.log);
+ log = cfg->dump();
+ LOG("joinreq: ret " << ret << " log " << log);
} else {
LOG("joinreq: failed; proposer couldn't add " << succ);
ret = rsm_protocol::BUSY;
#ifndef rsm_h
#define rsm_h
-#include <string>
-#include <vector>
+#include "types.h"
#include "rsm_protocol.h"
-#include "rsm_state_transfer.h"
#include "rpc/rpc.h"
#include <arpa/inet.h>
#include "config.h"
+class rsm_state_transfer {
+ public:
+ virtual string marshal_state() = 0;
+ virtual void unmarshal_state(string) = 0;
+ virtual ~rsm_state_transfer() {}
+};
+
class rsm : public config_view_change {
private:
void reg1(int proc, handler *);
protected:
- std::map<int, handler *> procs;
+ map<int, handler *> procs;
config *cfg;
class rsm_state_transfer *stf;
rpcs *rsmrpc;
// On primary: viewstamp for the next request from rsm_client
viewstamp myvs;
viewstamp last_myvs; // Viewstamp of the last executed request
- std::string primary;
+ string primary;
bool insync;
bool inviewchange;
unsigned vid_commit; // Latest view id that is known to rsm layer
unsigned vid_insync; // The view id that this node is synchronizing for
- std::vector<std::string> backups; // A list of unsynchronized backups
+ vector<string> backups; // A list of unsynchronized backups
// For testing purposes
rpcs *testsvr;
bool break1;
bool break2;
-
- rsm_client_protocol::status client_members(std::vector<std::string> &r, int i);
- rsm_protocol::status invoke(int &, int proc, viewstamp vs, std::string mreq);
- rsm_protocol::status transferreq(rsm_protocol::transferres &r, std::string src,
+ rsm_client_protocol::status client_members(vector<string> &r, int i);
+ rsm_protocol::status invoke(int &, int proc, viewstamp vs, string mreq);
+ rsm_protocol::status transferreq(rsm_protocol::transferres &r, string src,
viewstamp last, unsigned vid);
- rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid);
- rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src,
+ rsm_protocol::status transferdonereq(int &, string m, unsigned vid);
+ rsm_protocol::status joinreq(string & log, string src,
viewstamp last);
rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
- std::mutex rsm_mutex;
- std::mutex invoke_mutex;
- std::condition_variable recovery_cond;
- std::condition_variable sync_cond;
+ mutex rsm_mutex, invoke_mutex;
+ cond recovery_cond, sync_cond;
- void execute(int procno, std::string req, std::string &r);
- rsm_client_protocol::status client_invoke(std::string &r, int procno,
- std::string req);
- bool statetransfer(std::string m, lock & rsm_mutex_lock);
- bool statetransferdone(std::string m, lock & rsm_mutex_lock);
- bool join(std::string m, lock & rsm_mutex_lock);
+ void execute(int procno, string req, string &r);
+ rsm_client_protocol::status client_invoke(string &r, int procno, string req);
+ bool statetransfer(string m, lock & rsm_mutex_lock);
+ bool statetransferdone(string m, lock & rsm_mutex_lock);
+ bool join(string m, lock & rsm_mutex_lock);
void set_primary(unsigned vid);
- std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid);
+ string find_highest(viewstamp &vs, string &m, unsigned &vid);
bool sync_with_backups(lock & rsm_mutex_lock);
bool sync_with_primary(lock & rsm_mutex_lock);
void net_repair(bool heal, lock & rsm_mutex_lock);
void partition1(lock & rsm_mutex_lock);
void commit_change(unsigned vid, lock & rsm_mutex_lock);
public:
- rsm (std::string _first, std::string _me);
+ rsm (string _first, string _me);
~rsm() {}
bool amiprimary();
+#include "types.h"
#include "rsm_client.h"
-#include <vector>
#include <arpa/inet.h>
-#include <stdio.h>
#include <handle.h>
#include <unistd.h>
-#include "lang/verify.h"
-#include "lock.h"
-#include "threaded_log.h"
-rsm_client::rsm_client(std::string dst) : primary(dst) {
+rsm_client::rsm_client(string dst) : primary(dst) {
LOG("create rsm_client");
lock ml(rsm_client_mutex);
VERIFY (init_members(ml));
known_mems.pop_back();
}
-rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
+rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) {
lock ml(rsm_client_mutex);
while (1) {
- LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
+ LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary);
handle h(primary);
ml.unlock();
if (!cl)
goto prim_fail;
- LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
+ LOG("rsm_client::invoke proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
if (ret == rsm_client_protocol::OK)
return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
continue;
}
prim_fail:
- LOG("primary " << primary << " failed ret " << std::dec << ret);
+ LOG("primary " << primary << " failed ret " << dec << ret);
primary_failure(ml);
LOG("rsm_client::invoke: retry new primary " << primary);
}
#ifndef rsm_client_h
#define rsm_client_h
-#include "rpc/rpc.h"
+#include "types.h"
#include "rsm_protocol.h"
-#include <string>
-#include <vector>
-
//
// rsm client interface.
#ifndef rsm_protocol_h
#define rsm_protocol_h
+#include "types.h"
#include "rpc/rpc.h"
class rsm_client_protocol {
public:
enum status : status_t { OK, ERR, BUSY};
enum rpc_numbers : proc_t {
- invoke = 0x10001,
+ invoke = 0xa001,
transferreq,
transferdonereq,
joinreq,
};
struct transferres {
- std::string state;
+ string state;
viewstamp last;
};
-
- struct joinres {
- std::string log;
- };
};
-inline bool operator==(viewstamp a, viewstamp b) {
- return a.vid == b.vid && a.seqno == b.seqno;
-}
-
-inline bool operator>(viewstamp a, viewstamp b) {
- return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
-}
-
-inline bool operator!=(viewstamp a, viewstamp b) {
- return a.vid != b.vid || a.seqno != b.seqno;
-}
+inline bool operator==(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) == tie(b.vid, b.seqno); }
+inline bool operator>(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) > tie(b.vid, b.seqno); }
+inline bool operator!=(viewstamp a, viewstamp b) { return tie(a.vid, a.seqno) != tie(b.vid, b.seqno); }
inline marshall& operator<<(marshall &m, viewstamp v) {
return m << v.vid << v.seqno;
return u >> r.state >> r.last;
}
-inline marshall & operator<<(marshall &m, rsm_protocol::joinres r) {
- return m << r.log;
-}
-
-inline unmarshall & operator>>(unmarshall &u, rsm_protocol::joinres &r) {
- return u >> r.log;
-}
-
class rsm_test_protocol {
public:
enum status : status_t {OK, ERR};
+++ /dev/null
-#ifndef rsm_state_transfer_h
-#define rsm_state_transfer_h
-
-class rsm_state_transfer {
- public:
- virtual std::string marshal_state() = 0;
- virtual void unmarshal_state(std::string) = 0;
- virtual ~rsm_state_transfer() {}
-};
-
-#endif
// RSM test client
//
+#include "types.h"
#include "rsm_protocol.h"
#include "rsmtest_client.h"
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <vector>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string>
char log_thread_prefix = 't';
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
if(argc != 4){
- fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
- exit(1);
+ cerr << "Usage: " << argv[0] << " [host:]port [partition] arg" << endl;
+ return 1;
}
rsmtest_client *lc = new rsmtest_client(argv[1]);
- std::string command(argv[2]);
+ string command(argv[2]);
if (command == "partition") {
- printf("net_repair returned %d\n", lc->net_repair(atoi(argv[3])));
+ cout << "net_repair returned " << lc->net_repair(stoi(argv[3]));
} else if (command == "breakpoint") {
- int b = atoi(argv[3]);
- printf("breakpoint %d returned %d\n", b, lc->breakpoint(b));
+ int b = stoi(argv[3]);
+ cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
} else {
- fprintf(stderr, "Unknown command %s\n", argv[2]);
+ cerr << "Unknown command " << argv[2] << endl;
}
return 0;
}
// RPC stubs for clients to talk to rsmtest_server
#include "rsmtest_client.h"
-#include "rpc/rpc.h"
#include <arpa/inet.h>
-#include <sstream>
-#include <iostream>
-#include <stdio.h>
-
rsmtest_client::rsmtest_client(std::string dst) : cl(dst) {
if (cl.bind() < 0)
- printf("rsmtest_client: call bind\n");
+ cout << "rsmtest_client: call bind" << endl;
}
rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
#ifndef rsmtest_client_h
#define rsmtest_client_h
-#include <string>
+#include "types.h"
#include "rsm_protocol.h"
-#include "rpc/rpc.h"
// Client interface to the rsmtest server
class rsmtest_client {
-#include <sys/time.h>
-#include <stdint.h>
#include "threaded_log.h"
-std::mutex cerr_mutex;
-std::map<std::thread::id, int> thread_name_map;
+mutex cerr_mutex;
+map<thread::id, int> thread_name_map;
int next_thread_num = 0;
-std::map<void *, int> instance_name_map;
+map<void *, int> instance_name_map;
int next_instance_num = 0;
#ifndef threaded_log_h
#define threaded_log_h
-#include <iomanip>
-#include <iostream>
-#include <stdio.h>
-#include <map>
-#include "lock.h"
+#include "types.h"
extern mutex cerr_mutex;
-extern std::map<std::thread::id, int> thread_name_map;
+extern map<thread::id, int> thread_name_map;
extern int next_thread_num;
-extern std::map<void *, int> instance_name_map;
+extern map<void *, int> instance_name_map;
extern int next_instance_num;
extern char log_thread_prefix;
-template <class A>
-struct iterator_pair : public std::pair<A, A> {
- explicit iterator_pair(const A & first, const A & second) : std::pair<A, A>(first, second) {}
-};
-
-template <class A>
-const struct iterator_pair<A> make_iterator_pair(const A & first, const A & second) {
- return iterator_pair<A>(first, second);
-}
-
-template <class A, class B>
-std::ostream & operator<<(std::ostream &o, const std::pair<A,B> &d) {
- o << "<" << d.first << "," << d.second << ">";
- return o;
+namespace std {
+ // This... is an awful hack. But sticking this in std:: makes it possible for
+ // ostream_iterator to use it.
+ template <class A, class B>
+ ostream & operator<<(ostream &o, const pair<A,B> &d) {
+ return o << "<" << d.first << "," << d.second << ">";
+ }
}
template <class A>
-std::ostream & operator<<(std::ostream &o, const iterator_pair<A> &d) {
+typename enable_if<is_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
+operator<<(ostream &o, const A &a) {
o << "[";
- for (auto i=d.first; i!=d.second; i++) {
- o << *i;
- auto j(i);
- if (++j != d.second)
- o << ", ";
- }
+ auto oit = ostream_iterator<typename A::value_type>(o, ", ");
+ copy(a.begin(), a.end(), oit);
o << "]";
return o;
}
#define LOG_PREFIX { \
- cerr_mutex.lock(); \
auto _thread_ = std::this_thread::get_id(); \
int _tid_ = thread_name_map[_thread_]; \
if (_tid_==0) \
_tid_ = thread_name_map[_thread_] = ++next_thread_num; \
- auto _utime_ = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
- std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \
- std::cerr << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << _tid_; \
- std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \
+ auto _utime_ = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000; \
+ cerr << setfill('0') << dec << left << setw(9) << _utime_ << " "; \
+ cerr << setfill(' ') << log_thread_prefix << left << setw(2) << _tid_; \
+ cerr << " " << setw(20) << __FILE__ << " " << setw(18) << __func__; \
}
#define LOG_THIS_POINTER { \
int _self_ = instance_name_map[this]; \
if (_self_==0) \
_self_ = instance_name_map[this] = ++next_instance_num; \
- std::cerr << "#" << std::setw(2) << _self_; \
-}
-#define LOG_SUFFIX { \
- cerr_mutex.unlock(); \
+ cerr << "#" << setw(2) << _self_; \
}
#define LOG_NONMEMBER(_x_) { \
+ lock _cel_(cerr_mutex); \
LOG_PREFIX; \
- std::cerr << _x_ << std::endl; \
- LOG_SUFFIX; \
+ cerr << _x_ << endl; \
}
#define LOG(_x_) { \
+ lock _cel_(cerr_mutex); \
LOG_PREFIX; \
LOG_THIS_POINTER; \
- std::cerr << _x_ << std::endl; \
- LOG_SUFFIX; \
-}
-#define LOG_FUNC_ENTER { \
- LOG_PREFIX; \
- LOG_THIS_POINTER; \
- std::cerr << "lid=" << lid; \
- std::cerr << std::endl; \
- LOG_SUFFIX; \
-}
-#define LOG_FUNC_ENTER_SERVER { \
- LOG_PREFIX; \
- LOG_THIS_POINTER; \
- std::cerr << "lid=" << lid; \
- std::cerr << " client=" << id << "," << xid; \
- std::cerr << std::endl; \
- LOG_SUFFIX; \
-}
-#define LOG_FUNC_EXIT { \
- LOG_PREFIX; \
- LOG_THIS_POINTER; \
- std::cerr << "return" << lid; \
- std::cerr << std::endl; \
- LOG_SUFFIX; \
+ cerr << _x_ << endl; \
}
#endif
--- /dev/null
+#ifndef types_h
+#define types_h
+
+#include <algorithm>
+using std::copy;
+using std::move;
+using std::max;
+using std::min;
+using std::min_element;
+using std::find;
+using std::count_if;
+
+#include <chrono>
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+using std::chrono::microseconds;
+using std::chrono::nanoseconds;
+using std::chrono::steady_clock;
+using std::chrono::system_clock;
+using std::chrono::duration_cast;
+using std::chrono::time_point_cast;
+using std::chrono::time_point;
+
+#include <exception>
+using std::exception;
+
+#include <fstream>
+using std::ofstream;
+using std::ifstream;
+
+#ifndef LIBT4_NO_FUNCTIONAL
+#include <functional>
+using std::function;
+using std::bind;
+using std::placeholders::_1;
+#endif
+
+#include <iomanip>
+#include <iostream>
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::dec;
+using std::hex;
+using std::left;
+using std::setw;
+using std::setfill;
+using std::setprecision;
+using std::ostream;
+using std::istream;
+using std::ostream_iterator;
+using std::istream_iterator;
+
+#include <limits>
+using std::numeric_limits;
+
+#include <list>
+using std::list;
+
+#include <map>
+using std::map;
+
+#include <mutex>
+using std::mutex;
+using lock = std::unique_lock<std::mutex>;
+using cond = std::condition_variable;
+using std::cv_status;
+
+#include <sstream>
+using std::ostringstream;
+using std::istringstream;
+
+#include <string>
+using std::string;
+using std::to_string;
+using std::stoi;
+
+#include <thread>
+using std::thread;
+
+#include <tuple>
+using std::tuple;
+using std::get;
+using std::tie;
+
+#include <type_traits>
+using std::decay;
+using std::true_type;
+using std::false_type;
+using std::is_enum;
+using std::is_member_function_pointer;
+using std::is_same;
+using std::underlying_type;
+using std::enable_if;
+
+#include <utility>
+using std::pair;
+using std::declval;
+
+#include <vector>
+using std::vector;
+
+
+template <class A, typename I=void> struct is_iterable : false_type {};
+
+template<class A> struct is_iterable<A,
+ decltype(declval<A&>().cbegin(), declval<A&>().cend(), void())
+> : true_type {};
+
+#include "lang/verify.h"
+#include "threaded_log.h"
+
+#endif