rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
-lock_demo=lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o
-lock_demo : $(lock_demo) rpc/librpc.a
+lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
-lock_tester=lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o
-lock_tester : $(lock_tester) rpc/librpc.a
+lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
-lock_server=lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o
-lock_server : $(lock_server) rpc/librpc.a
+lock_server : lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a
-rsm_tester=rsm_tester.o rsmtest_client.o threaded_log.o
-rsm_tester: $(rsm_tester) rpc/librpc.a
+rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a
%.o: %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@
--- /dev/null
+#ifndef endian_h
+#define endian_h
+
+#include <cinttypes>
+
+constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
+
+inline uint8_t hton(uint8_t t) { return t; }
+inline int8_t hton(int8_t t) { return t; }
+inline uint16_t hton(uint16_t t) { return htons(t); }
+inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); }
+inline uint32_t hton(uint32_t t) { return htonl(t); }
+inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); }
+inline uint64_t hton(uint64_t t) {
+ if (!endianness.is_little_endian)
+ return t;
+ return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32);
+}
+inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
+
+template <class T> inline T ntoh(T t) { return hton(t); }
+
+template <class... Args, size_t... Indices> inline tuple<typename remove_reference<Args>::type...>
+tuple_hton_imp(tuple<Args...> && t, tuple_indices<Indices...>) {
+ return tuple<typename remove_reference<Args>::type...>(hton(get<Indices>(t))...);
+}
+
+template <class... Args> inline tuple<typename remove_reference<Args>::type...>
+hton(tuple<Args...> && t) {
+ using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+ return tuple_hton_imp(forward<tuple<Args...>>(t), Indices());
+}
+
+#define ENDIAN_SWAPPABLE(_c_) \
+inline _c_ hton(_c_ && t) { \
+ _c_ result; \
+ result._tuple_() = hton(t._tuple_()); \
+ return result; \
+}
+
+#endif
// manage a cache of RPC connections.
-// assuming cid is a std::string holding the
+// assuming cid is a string holding the
// host:port of the RPC server you want
// to talk to:
//
-// safe assertions.
-
#ifndef verify_client_h
#define verify_client_h
#include <cassert>
#ifdef NDEBUG
-#define VERIFY(expr) do { if (!(expr)) abort(); } while (0)
+#define VERIFY(expr) { if (!(expr)) abort(); }
#else
#define VERIFY(expr) assert(expr)
#endif
#include <arpa/inet.h>
void lock_state::wait(lock & mutex_lock) {
- auto self = std::this_thread::get_id();
+ auto self = this_thread::get_id();
c[self].wait(mutex_lock);
c.erase(self);
}
lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
lock_state &st = get_lock_state(lid);
lock sl(st.m);
- auto self = std::this_thread::get_id();
+ auto self = this_thread::get_id();
// check for reentrancy
VERIFY(st.state != lock_state::locked || st.held_by != self);
lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
lock_state &st = get_lock_state(lid);
lock sl(st.m);
- auto self = std::this_thread::get_id();
+ auto self = this_thread::get_id();
VERIFY(st.state == lock_state::locked && st.held_by == self);
st.state = lock_state::free;
LOG("Lock " << lid << ": free");
}
lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
- std::thread(&lock_server::revoker, this).detach();
- std::thread(&lock_server::retryer, this).detach();
+ thread(&lock_server::revoker, this).detach();
+ thread(&lock_server::retryer, this).detach();
rsm->set_state_transfer(this);
}
string lock_server::marshal_state() {
lock sl(lock_table_lock);
marshall rep;
- rep << nacquire;
- rep << lock_table;
- return rep.str();
+ rep << nacquire << lock_table;
+ return rep.content();
}
void lock_server::unmarshal_state(string state) {
lock sl(lock_table_lock);
- unmarshall rep(state);
- rep >> nacquire;
- rep >> lock_table;
+ unmarshall rep(state, false);
+ rep >> nacquire >> lock_table;
}
lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
// must be >= 2
const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
-std::string dst;
+string dst;
lock_client **lc = new lock_client * [nt];
lock_protocol::lockid_t a = "1";
lock_protocol::lockid_t b = "2";
// doesn't grant the same lock to both clients.
// it assumes that lock names are distinct in the first byte.
int ct[256];
-std::mutex count_mutex;
+mutex count_mutex;
void check_grant(lock_protocol::lockid_t lid) {
lock ml(count_mutex);
int
main(int argc, char *argv[])
{
- std::thread th[nt];
+ thread th[nt];
int test = 0;
setvbuf(stdout, NULL, _IONBF, 0);
if (!test || test == 2) {
// test2
for (int i = 0; i < nt; i++)
- th[i] = std::thread(test2, i);
+ th[i] = thread(test2, i);
for (int i = 0; i < nt; i++)
th[i].join();
}
LOG_NONMEMBER("test 3");
for (int i = 0; i < nt; i++)
- th[i] = std::thread(test3, i);
+ th[i] = thread(test3, i);
for (int i = 0; i < nt; i++)
th[i].join();
}
LOG_NONMEMBER("test 4");
for (int i = 0; i < 2; i++)
- th[i] = std::thread(test4, i);
+ th[i] = thread(test4, i);
for (int i = 0; i < 2; i++)
th[i].join();
}
LOG_NONMEMBER("test 5");
for (int i = 0; i < nt; i++)
- th[i] = std::thread(test5, i);
+ th[i] = thread(test5, i);
for (int i = 0; i < nt; i++)
th[i].join();
}
void log::restore(string s) {
LOG("restore: " << s);
- ofstream f(name, std::ios::trunc);
+ ofstream f(name, ios::trunc);
f << s;
f.close();
}
// XXX should be an atomic operation
void log::loginstance(unsigned instance, string v) {
- ofstream f(name, std::ios::app);
+ ofstream f(name, ios::app);
f << "done " << instance << " " << v << "\n";
f.close();
}
// an acceptor should call logprop(promise) when it
// receives a prepare to which it responds prepare_ok().
void log::logprop(prop_t promise) {
- ofstream f(name, std::ios::app);
+ ofstream f(name, ios::app);
f << "propseen " << promise.n << " " << promise.m << "\n";
f.close();
}
// an acceptor should call logaccept(accepted, accepted_value) when it
// receives an accept RPC to which it replies accept_ok().
void log::logaccept(prop_t n, string v) {
- ofstream f(name, std::ios::app);
+ ofstream f(name, ios::app);
f << "accepted " << n.n << " " << n.m << " " << v << "\n";
f.close();
}
}
stable = false;
bool r = false;
- proposal.n = std::max(promise.n, proposal.n) + 1;
+ proposal.n = max(promise.n, proposal.n) + 1;
nodes_t accepts;
value_t v = newv;
if (prepare(instance, accepts, cur_nodes, v)) {
paxos_change *delegate;
node_t me;
- rpcs pxs = {(uint32_t)std::stoi(me)};
+ rpcs pxs = {(uint32_t)stoi(me)};
bool break1 = false;
bool break2 = false;
// std::bind and syscall bind have the same name, so don't use std::bind in this file
#define LIBT4_NO_FUNCTIONAL
#include "connection.h"
+#include <cerrno>
+#include <csignal>
#include <fcntl.h>
#include <sys/types.h>
#include <netinet/tcp.h>
-#include <errno.h>
-#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
: mgr_(m1), fd_(f1), lossy_(l1)
{
int flags = fcntl(fd_, F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(fd_, F_SETFL, flags);
+ fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
signal(SIGPIPE, SIG_IGN);
connection::~connection() {
VERIFY(dead_);
- if (rpdu_.buf)
- free(rpdu_.buf);
- VERIFY(!wpdu_.buf);
+ VERIFY(!wpdu_.buf.size());
close(fd_);
}
void connection::closeconn() {
{
lock ml(m_);
- if (!dead_) {
- dead_ = true;
- shutdown(fd_,SHUT_RDWR);
- } else {
+ if (dead_)
return;
- }
+ dead_ = true;
+ shutdown(fd_,SHUT_RDWR);
}
//after block_remove_fd, select will never wait on fd_
//and no callbacks will be active
return 0;
}
-bool connection::send(char *b, size_t sz) {
+bool connection::send(const string & b) {
lock ml(m_);
+
waiters_++;
- while (!dead_ && wpdu_.buf) {
+ while (!dead_ && wpdu_.buf.size())
send_wait_.wait(ml);
- }
waiters_--;
- if (dead_) {
+
+ if (dead_)
return false;
- }
+
wpdu_.buf = b;
- wpdu_.sz = sz;
wpdu_.solong = 0;
if (lossy_) {
ml.unlock();
PollMgr::Instance()->block_remove_fd(fd_);
ml.lock();
- } else {
- if (wpdu_.solong == wpdu_.sz) {
- } else {
- //should be rare to need to explicitly add write callback
- PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
- while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
- send_complete_.wait(ml);
- }
- }
+ } else if (wpdu_.solong != wpdu_.buf.size()) {
+ // should be rare to need to explicitly add write callback
+ PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+ while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
+ send_complete_.wait(ml);
}
- bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
- wpdu_.solong = wpdu_.sz = 0;
- wpdu_.buf = NULL;
+ bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
+ wpdu_.solong = 0;
+ wpdu_.buf.clear();
if (waiters_ > 0)
send_wait_.notify_all();
return ret;
lock ml(m_);
VERIFY(!dead_);
VERIFY(fd_ == s);
- if (wpdu_.sz == 0) {
+ if (wpdu_.buf.size() == 0) {
PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
return;
}
dead_ = true;
} else {
VERIFY(wpdu_.solong != size_t_max);
- if (wpdu_.solong < wpdu_.sz) {
+ if (wpdu_.solong < wpdu_.buf.size()) {
return;
}
}
return;
}
+ IF_LEVEL(5) LOG("got data on fd " << s);
+
bool succ = true;
- if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
+ if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
succ = readpdu();
}
if (!succ) {
+ IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
PollMgr::Instance()->del_callback(fd_,CB_RDWR);
dead_ = true;
send_complete_.notify_one();
}
- if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
- if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
+ if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
+ if (mgr_->got_pdu(this, rpdu_.buf)) {
//chanmgr has successfully consumed the pdu
- rpdu_.buf = NULL;
- rpdu_.sz = rpdu_.solong = 0;
+ rpdu_.buf.clear();
+ rpdu_.solong = 0;
}
}
}
bool connection::writepdu() {
VERIFY(wpdu_.solong != size_t_max);
- if (wpdu_.solong == wpdu_.sz)
+ if (wpdu_.solong == wpdu_.buf.size())
return true;
if (wpdu_.solong == 0) {
- uint32_t sz = htonl((uint32_t)wpdu_.sz);
- bcopy(&sz,wpdu_.buf,sizeof(sz));
+ uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
+ copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
}
- ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+ ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
if (n < 0) {
if (errno != EAGAIN) {
IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
wpdu_.solong = size_t_max;
- wpdu_.sz = 0;
+ wpdu_.buf.clear();
}
return (errno == EAGAIN);
}
}
bool connection::readpdu() {
- if (!rpdu_.sz) {
+ IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
+ if (!rpdu_.buf.size()) {
uint32_t sz1;
ssize_t n = read(fd_, &sz1, sizeof(sz1));
}
if (n > 0 && n != sizeof(sz1)) {
- IF_LEVEL(0) LOG("connection::readpdu short read of sz");
+ IF_LEVEL(0) LOG("short read of sz");
return false;
}
size_t sz = ntohl(sz1);
if (sz > MAX_PDU) {
- IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+ IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
return false;
}
- rpdu_.sz = sz;
- VERIFY(rpdu_.buf == NULL);
- rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
- VERIFY(rpdu_.buf);
- bcopy(&sz1,rpdu_.buf,sizeof(sz1));
+ IF_LEVEL(5) LOG("read size of datagram = " << sz);
+
+ VERIFY(rpdu_.buf.size() == 0);
+ rpdu_.buf = string(sz+sizeof(sz1), 0);
+ copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]);
rpdu_.solong = sizeof(sz1);
}
- ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+ ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+
+ IF_LEVEL(5) LOG("read " << n << " bytes");
+
if (n <= 0) {
if (errno == EAGAIN)
return true;
- if (rpdu_.buf)
- free(rpdu_.buf);
- rpdu_.buf = NULL;
- rpdu_.sz = rpdu_.solong = 0;
+ rpdu_.buf.clear();
+ rpdu_.solong = 0;
return (errno == EAGAIN);
}
rpdu_.solong += (size_t)n;
tcp_ = socket(AF_INET, SOCK_STREAM, 0);
if (tcp_ < 0) {
- perror("tcpsconn::tcpsconn accept_loop socket:");
+ perror("accept_loop socket:");
VERIFY(0);
}
}
if (listen(tcp_, 1000) < 0) {
- perror("tcpsconn::tcpsconn listen:");
+ perror("listen:");
VERIFY(0);
}
VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
port_ = ntohs(sin.sin_port);
- IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
+ IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
if (pipe(pipe_) < 0) {
perror("accept_loop pipe:");
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
-#include <cstddef>
#include "pollmgr.h"
constexpr size_t size_t_max = numeric_limits<size_t>::max();
class chanmgr {
public:
- virtual bool got_pdu(connection *c, char *b, size_t sz) = 0;
+ virtual bool got_pdu(connection *c, const string & b) = 0;
virtual ~chanmgr() {}
};
class connection : public aio_callback {
public:
struct charbuf {
- charbuf(): buf(NULL), sz(0), solong(0) {}
- charbuf (char *b, size_t s) : buf(b), sz(s), solong(0){}
- char *buf;
- size_t sz;
- size_t solong; // number of bytes written or read so far
+ string buf;
+ size_t solong = 0; // number of bytes written or read so far
};
connection(chanmgr *m1, int f1, int lossytest=0);
bool isdead();
void closeconn();
- bool send(char *b, size_t sz);
+ bool send(const string & b);
void write_cb(int s);
void read_cb(int s);
int ref() { lock rl(ref_m_); return refno_; }
int compare(connection *another);
+
private:
bool readpdu();
int waiters_ = 0;
int refno_ = 1;
- const int lossy_;
+ int lossy_ = 0;
mutex m_;
mutex ref_m_;
#include "types.h"
#include "marshall.h"
-marshall &
-operator<<(marshall &m, uint8_t x) {
- m.rawbyte(x);
- return m;
-}
-
-marshall &
-operator<<(marshall &m, uint16_t x) {
- x = hton(x);
- m.rawbytes((char *)&x, 2);
- return m;
-}
-
-marshall &
-operator<<(marshall &m, uint32_t x) {
- x = hton(x);
- m.rawbytes((char *)&x, 4);
- return m;
-}
-
-marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
-marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
-marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
-marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
-marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
-
-marshall &
-operator<<(marshall &m, const string &s) {
- m << (unsigned int) s.size();
+MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(int8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint16_t)
+MARSHALL_RAW_NETWORK_ORDER(int16_t)
+MARSHALL_RAW_NETWORK_ORDER(uint32_t)
+MARSHALL_RAW_NETWORK_ORDER(int32_t)
+MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint32_t)
+MARSHALL_RAW_NETWORK_ORDER(uint64_t)
+MARSHALL_RAW_NETWORK_ORDER(int64_t)
+
+marshall & operator<<(marshall &m, const string &s) {
+ m << (uint32_t)s.size();
m.rawbytes(s.data(), s.size());
return m;
}
-void marshall::pack_req_header(const request_header &h) {
- size_t saved_sz = index_;
- //leave the first 4-byte empty for channel to fill size of pdu
- index_ = sizeof(rpc_sz_t);
- *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
- index_ = saved_sz;
-}
-
-void marshall::pack_reply_header(const reply_header &h) {
- size_t saved_sz = index_;
- //leave the first 4-byte empty for channel to fill size of pdu
- index_ = sizeof(rpc_sz_t);
- *this << h.xid << h.ret;
- index_ = saved_sz;
-}
-
-// take the contents from another unmarshall object
-void
-unmarshall::take_in(unmarshall &another)
-{
- if(buf_)
- free(buf_);
- another.take_buf(&buf_, &sz_);
- index_ = RPC_HEADER_SZ;
- ok_ = sz_ >= RPC_HEADER_SZ?true:false;
-}
-
-inline bool
-unmarshall::ensure(size_t n) {
- if (index_+n > sz_)
- ok_ = false;
- return ok_;
-}
-
-inline uint8_t
-unmarshall::rawbyte()
-{
- if (!ensure(1))
- return 0;
- return (uint8_t)buf_[index_++];
-}
-
-void
-unmarshall::rawbytes(string &ss, size_t n)
-{
- VERIFY(ensure(n));
- ss.assign(buf_+index_, n);
- index_ += n;
-}
-
-template <class T>
-void
-unmarshall::rawbytes(T &t)
-{
- const size_t n = sizeof(T);
- VERIFY(ensure(n));
- memcpy(&t, buf_+index_, n);
- t = ntoh(t);
- index_ += n;
-}
-
-unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
-unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
-unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
-unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
-unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
-
unmarshall & operator>>(unmarshall &u, string &s) {
- unsigned sz = u.grab<unsigned>();
- if(u.ok())
- u.rawbytes(s, sz);
+ uint32_t sz = u.grab<uint32_t>();
+ if (u.ok()) {
+ s.resize(sz);
+ u.rawbytes(&s[0], sz);
+ }
return u;
}
#define marshall_h
#include "types.h"
-#include <cstring>
-#include <cstddef>
-#include <cinttypes>
+
+// for structs or classes containing a MEMBERS declaration
+class marshall;
+class unmarshall;
+#define FORWARD_MARSHALLABLE(_c_) \
+extern unmarshall & operator>>(unmarshall &u, typename remove_reference<_c_>::type &a); \
+extern marshall & operator<<(marshall &m, const _c_ a);
+#define MARSHALLABLE(_c_) \
+inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
+inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
+
+// for plain old data
+#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
+marshall & operator<<(marshall &m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
+unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+
+#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
using proc_t = uint32_t;
using status_t = int32_t;
struct request_header {
- request_header(int x=0, proc_t p=0, unsigned c=0, unsigned s=0, int xi=0) :
- xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
int xid;
proc_t proc;
unsigned int clt_nonce;
unsigned int srv_nonce;
int xid_rep;
+
+ MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
};
+FORWARD_MARSHALLABLE(request_header)
+ENDIAN_SWAPPABLE(request_header)
+
struct reply_header {
- reply_header(int x=0, int r=0): xid(x), ret(r) {}
int xid;
int ret;
-};
-
-template<class T> inline T hton(T t);
-
-constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
-template<> inline uint8_t hton(uint8_t t) { return t; }
-template<> inline int8_t hton(int8_t t) { return t; }
-template<> inline uint16_t hton(uint16_t t) { return htons(t); }
-template<> inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); }
-template<> inline uint32_t hton(uint32_t t) { return htonl(t); }
-template<> inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); }
-template<> inline uint64_t hton(uint64_t t) {
- if (!endianness.is_little_endian)
- return t;
- return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32);
-}
-template<> inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
-template<> inline request_header hton(request_header h) { return {hton(h.xid), hton(h.proc), hton(h.clt_nonce), hton(h.srv_nonce), hton(h.xid_rep)}; }
-template<> inline reply_header hton(reply_header h) { return {hton(h.xid), hton(h.ret)}; }
+ MEMBERS(xid, ret)
+};
-template <class T> inline T ntoh(T t) { return hton(t); }
+FORWARD_MARSHALLABLE(reply_header)
+ENDIAN_SWAPPABLE(reply_header)
typedef int rpc_sz_t;
-//size of initial buffer allocation
-#define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
+const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
+// Template parameter pack expansion is not allowed in certain contexts, but
+// brace initializers (for instance, calls to constructors of empty structs)
+// are fair game.
struct pass { template <typename... Args> inline pass(Args&&...) {} };
class marshall {
private:
- char *buf_; // Base of the raw bytes buffer (dynamically readjusted)
- size_t capacity_; // Capacity of the buffer
- size_t index_; // Read/write head position
+ string buf_ = string(DEFAULT_RPC_SZ, 0); // Raw bytes buffer
+ size_t index_ = RPC_HEADER_SZ; // Read/write head position
inline void reserve(size_t n) {
- if((index_+n) > capacity_){
- capacity_ += max(capacity_, n);
- VERIFY (buf_ != NULL);
- buf_ = (char *)realloc(buf_, capacity_);
- VERIFY(buf_);
- }
+ if (index_+n > buf_.size())
+ buf_.resize(index_+n);
}
public:
template <typename... Args>
marshall(const Args&... args) {
- buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
- VERIFY(buf_);
- capacity_ = DEFAULT_RPC_SZ;
- index_ = RPC_HEADER_SZ;
(void)pass{(*this << args)...};
}
- ~marshall() {
- if (buf_)
- free(buf_);
- }
-
- size_t size() { return index_;}
- char *cstr() { return buf_;}
- const char *cstr() const { return buf_;}
-
- void rawbyte(uint8_t x) {
- reserve(1);
- buf_[index_++] = (int8_t)x;
- }
-
- void rawbytes(const char *p, size_t n) {
+ void rawbytes(const void *p, size_t n) {
reserve(n);
- memcpy(buf_+index_, p, n);
+ copy((char *)p, (char *)p+n, &buf_[index_]);
index_ += n;
}
- // Return the current content (excluding header) as a string
- string get_content() {
- return string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
- }
-
- // Return the current content (excluding header) as a string
- string str() {
- return get_content();
- }
-
- void pack_req_header(const request_header &h);
- void pack_reply_header(const reply_header &h);
-
- void take_buf(char **b, size_t *s) {
- *b = buf_;
- *s = index_;
- buf_ = NULL;
- index_ = 0;
- return;
+ // with header
+ operator string () const { return buf_.substr(0,index_); }
+ // without header
+ string content() { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); }
+
+ template <class T>
+ void pack_header(const T &h) {
+ VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ);
+ size_t saved_sz = index_;
+ index_ = sizeof(rpc_sz_t); // first 4 bytes hold length field
+ *this << h;
+ index_ = saved_sz;
}
};
-marshall& operator<<(marshall &, bool);
-marshall& operator<<(marshall &, uint32_t);
-marshall& operator<<(marshall &, int32_t);
-marshall& operator<<(marshall &, uint8_t);
-marshall& operator<<(marshall &, int8_t);
-marshall& operator<<(marshall &, uint16_t);
-marshall& operator<<(marshall &, int16_t);
-marshall& operator<<(marshall &, uint64_t);
-marshall& operator<<(marshall &, const string &);
+FORWARD_MARSHALLABLE(bool);
+FORWARD_MARSHALLABLE(uint8_t);
+FORWARD_MARSHALLABLE(int8_t);
+FORWARD_MARSHALLABLE(uint16_t);
+FORWARD_MARSHALLABLE(int16_t);
+FORWARD_MARSHALLABLE(uint32_t);
+FORWARD_MARSHALLABLE(int32_t);
+FORWARD_MARSHALLABLE(size_t);
+FORWARD_MARSHALLABLE(uint64_t);
+FORWARD_MARSHALLABLE(int64_t);
+FORWARD_MARSHALLABLE(string &);
template <class A> typename enable_if<is_iterable<A>::value, marshall>::type &
operator<<(marshall &m, const A &x) {
- m << (unsigned int) x.size();
+ m << (unsigned int)x.size();
for (const auto &a : x)
m << a;
return m;
return m << from_enum(e);
}
-class unmarshall;
-
-unmarshall& operator>>(unmarshall &, bool &);
-unmarshall& operator>>(unmarshall &, uint8_t &);
-unmarshall& operator>>(unmarshall &, int8_t &);
-unmarshall& operator>>(unmarshall &, uint16_t &);
-unmarshall& operator>>(unmarshall &, int16_t &);
-unmarshall& operator>>(unmarshall &, uint32_t &);
-unmarshall& operator>>(unmarshall &, int32_t &);
-unmarshall& operator>>(unmarshall &, size_t &);
-unmarshall& operator>>(unmarshall &, uint64_t &);
-unmarshall& operator>>(unmarshall &, int64_t &);
-unmarshall& operator>>(unmarshall &, string &);
template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
operator>>(unmarshall &u, E &e);
class unmarshall {
private:
- char *buf_;
- size_t sz_;
- size_t index_;
- bool ok_;
-
- inline bool ensure(size_t n);
- public:
- unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
- unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {}
- unmarshall(const string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
- {
- //take the content which does not exclude a RPC header from a string
- take_content(s);
- }
- ~unmarshall() {
- if (buf_) free(buf_);
+ string buf_;
+ size_t index_ = 0;
+ bool ok_ = false;
+
+ inline bool ensure(size_t n) {
+ if (index_+n > buf_.size())
+ ok_ = false;
+ return ok_;
}
-
- //take contents from another unmarshall object
- void take_in(unmarshall &another);
-
- //take the content which does not exclude a RPC header from a string
- void take_content(const string &s) {
- sz_ = s.size()+RPC_HEADER_SZ;
- buf_ = (char *)realloc(buf_,sz_);
- VERIFY(buf_);
- index_ = RPC_HEADER_SZ;
- memcpy(buf_+index_, s.data(), s.size());
- ok_ = true;
+ public:
+ unmarshall() {}
+ unmarshall(const string &s, bool has_header)
+ : buf_(s),index_(RPC_HEADER_SZ) {
+ if (!has_header)
+ buf_.insert(0, RPC_HEADER_SZ, 0);
+ ok_ = (buf_.size() >= RPC_HEADER_SZ);
}
bool ok() const { return ok_; }
- char *cstr() { return buf_;}
- bool okdone() const { return ok_ && index_ == sz_; }
-
- uint8_t rawbyte();
- void rawbytes(string &s, size_t n);
- template <class T> void rawbytes(T &t);
-
- size_t ind() { return index_;}
- size_t size() { return sz_;}
- void take_buf(char **b, size_t *sz) {
- *b = buf_;
- *sz = sz_;
- sz_ = index_ = 0;
- buf_ = NULL;
- }
+ bool okdone() const { return ok_ && index_ == buf_.size(); }
- void unpack_req_header(request_header *h) {
- //the first 4-byte is for channel to fill size of pdu
- index_ = sizeof(rpc_sz_t);
- *this >> h->xid >> h->proc >> h->clt_nonce >> h->srv_nonce >> h->xid_rep;
- index_ = RPC_HEADER_SZ;
+ void rawbytes(void * t, size_t n) {
+ VERIFY(ensure(n));
+ copy(&buf_[index_], &buf_[index_+n], (char *)t);
+ index_ += n;
}
- void unpack_reply_header(reply_header *h) {
- //the first 4-byte is for channel to fill size of pdu
+ template <class T>
+ void unpack_header(T & h) {
+ // first 4 bytes hold length field
+ VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ);
index_ = sizeof(rpc_sz_t);
- *this >> h->xid >> h->ret;
+ *this >> h;
index_ = RPC_HEADER_SZ;
}
- template <class A>
- inline A grab() {
- A a;
- *this >> a;
- return a;
- }
+ template <class T> inline T grab() { T t; *this >> t; return t; }
};
template <class A> typename enable_if<is_iterable<A>::value, unmarshall>::type &
// PAI 2013/09/19
// C++11 does neither of these two things for us:
// 1) Declare variables using a parameter pack expansion, like so
-// Args ...args;
+// Args... args;
// 2) Call a function with a tuple of the arguments it expects
//
// We implement an 'invoke' function for functions of the RPC handler
// 'invoke' as a parameter which will be ignored, but its type will force the
// compiler to specialize 'invoke' appropriately.
-// The following implementation of tuple_indices is redistributed under the MIT
-// License as an insubstantial portion of the LLVM compiler infrastructure.
-
-template <size_t...> struct tuple_indices {};
-template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
-template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
- typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
-};
-template <size_t E, size_t ...Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
- typedef tuple_indices<Indices...> type;
-};
-template <size_t E, size_t S=0> struct make_tuple_indices {
- typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
-};
-
// This class encapsulates the default response to runtime unmarshalling
// failures. The templated wrappers below may optionally use a different
// class.
// One for function pointers...
-template <class F, class R, class RV, class args_type, size_t ...Indices>
+template <class F, class R, class RV, class args_type, size_t... Indices>
typename enable_if<!is_member_function_pointer<F>::value, RV>::type
invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
return f(r, move(get<Indices>(t))...);
// And one for pointers to member functions...
-template <class F, class C, class RV, class R, class args_type, size_t ...Indices>
+template <class F, class C, class RV, class R, class args_type, size_t... Indices>
typename enable_if<is_member_function_pointer<F>::value, RV>::type
invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
return (c->*f)(r, move(get<Indices>(t))...);
struct marshalled_func<F, ErrorHandler, function<Signature>> :
public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
-template <class ...Args, size_t ...Indices> unmarshall &
+template <class... Args, size_t... Indices> unmarshall &
tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
(void)pass{(u >> get<Indices>(t))...};
return u;
return tuple_unmarshall_imp(u, t, Indices());
}
-template <class ...Args, size_t ...Indices> marshall &
+template <class... Args, size_t... Indices> marshall &
tuple_marshall_imp(marshall & m, tuple<Args...> & t, tuple_indices<Indices...>) {
(void)pass{(m << get<Indices>(t))...};
return m;
return tuple_marshall_imp(m, t, Indices());
}
-// for structs or classes containing a MEMBERS declaration
-#define MARSHALLABLE(_c_) \
-inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
-inline marshall & operator<<(marshall &m, _c_ a) { return m << a._tuple_(); }
+MARSHALLABLE(request_header)
+MARSHALLABLE(reply_header)
#endif
#include "pollmgr.h"
PollMgr *PollMgr::instance = NULL;
-static std::once_flag pollmgr_is_initialized;
+static once_flag pollmgr_is_initialized;
static void
PollMgrInit()
PollMgr *
PollMgr::Instance()
{
- std::call_once(pollmgr_is_initialized, PollMgrInit);
+ call_once(pollmgr_is_initialized, PollMgrInit);
return instance;
}
aio_ = new SelectAIO();
//aio_ = new EPollAIO();
- th_ = std::thread(&PollMgr::wait_loop, this);
+ th_ = thread(&PollMgr::wait_loop, this);
}
PollMgr::~PollMgr() [[noreturn]]
PollMgr::wait_loop() [[noreturn]]
{
- std::vector<int> readable;
- std::vector<int> writable;
+ vector<int> readable;
+ vector<int> writable;
while (1) {
{
}
void
-SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
+SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
{
fd_set trfds, twfds;
int high;
}
void
-EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
+EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
{
int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
for (int i = 0; i < nfds; i++) {
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
+static sockaddr_in make_sockaddr(const string &hostandport);
+
rpcc::rpcc(const string & d, bool retrans) :
dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
}
char *loss_env = getenv("RPC_LOSSY");
- if(loss_env != NULL){
+ if(loss_env)
lossytest_ = atoi(loss_env);
- }
// xid starts with 1 and latest received reply starts with 0
xid_rep_window_.push_back(0);
- IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
+ IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
}
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
- IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
+ IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
if(chan_){
chan_->closeconn();
chan_->decref();
bind_done_ = true;
srv_nonce_ = r;
} else {
- IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
+ IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
}
return ret;
};
// Cancel all outstanding calls
void rpcc::cancel(void) {
lock ml(m_);
- LOG("rpcc::cancel: force callers to fail");
+ LOG("force callers to fail");
for(auto &p : calls_){
caller *ca = p.second;
- IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail");
+ IF_LEVEL(2) LOG("force caller to fail");
{
lock cl(ca->m);
ca->done = true;
destroy_wait_ = true;
destroy_wait_c_.wait(ml);
}
- LOG("rpcc::cancel: done");
+ LOG("done");
}
-int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
caller ca(0, &rep);
int xid_rep;
if((proc != rpc_const::bind && !bind_done_) ||
(proc == rpc_const::bind && bind_done_)){
- IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice");
+ IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
return rpc_const::bind_failure;
}
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
xid_rep = xid_rep_window_.front();
}
}
}
if (forgot.isvalid())
- ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
- ch->send(req.cstr(), req.size());
+ ch->send(forgot.buf);
+ ch->send(req);
}
else IF_LEVEL(1) LOG("not reachable");
- IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc <<
+ IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
" xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
}
transmit = false; // only send once on a given channel
{
lock cal(ca.m);
while (!ca.done){
- IF_LEVEL(2) LOG("rpcc:call1: wait");
+ IF_LEVEL(2) LOG("wait");
if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
- IF_LEVEL(2) LOG("rpcc::call1: timeout");
+ IF_LEVEL(2) LOG("timeout");
break;
}
}
if(ca.done){
- IF_LEVEL(2) LOG("rpcc::call1: reply received");
+ IF_LEVEL(2) LOG("reply received");
break;
}
}
{
lock ml(m_);
if (!dup_req_.isvalid()) {
- dup_req_.buf.assign(req.cstr(), req.size());
+ dup_req_.buf = req;
dup_req_.xid = ca.xid;
}
if (xid_rep > xid_rep_done_)
lock cal(ca.m);
- IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc <<
+ IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
" xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
//
// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *, char *b, size_t sz)
+rpcc::got_pdu(connection *, const string & b)
{
- unmarshall rep(b, sz);
+ unmarshall rep(b, true);
reply_header h;
- rep.unpack_reply_header(&h);
+ rep.unpack_header(h);
if(!rep.ok()){
- IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!");
+ IF_LEVEL(1) LOG("unmarshall header failed!!!");
return true;
}
update_xid_rep(h.xid);
if(calls_.find(h.xid) == calls_.end()){
- IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request");
+ IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
return true;
}
caller *ca = calls_[h.xid];
lock cl(ca->m);
if(!ca->done){
- ca->un->take_in(rep);
+ *ca->rep = b;
ca->intret = h.ret;
if(ca->intret < 0){
- IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret);
+ IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
}
ca->done = 1;
}
}
rpcs::rpcs(unsigned int p1, size_t count)
- : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
+ : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
{
set_rand_seed();
nonce_ = (unsigned int)random();
- IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_);
-
- char *loss_env = getenv("RPC_LOSSY");
- if(loss_env != NULL){
- lossytest_ = atoi(loss_env);
- }
+ IF_LEVEL(2) LOG("created with nonce " << nonce_);
reg(rpc_const::bind, &rpcs::rpcbind, this);
- dispatchpool_ = new ThrPool(6,false);
+ dispatchpool_ = new ThrPool(6, false);
- listener_ = new tcpsconn(this, port_, lossytest_);
+ char *loss_env = getenv("RPC_LOSSY");
+ listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
}
rpcs::~rpcs()
}
bool
-rpcs::got_pdu(connection *c, char *b, size_t sz)
+rpcs::got_pdu(connection *c, const string & b)
{
- if(!reachable_){
- IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable");
- return true;
- }
+ if(!reachable_){
+ IF_LEVEL(1) LOG("not reachable");
+ return true;
+ }
- djob_t *j = new djob_t(c, b, sz);
+ djob_t *j = new djob_t{c, b};
c->incref();
bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
if(!succ || !reachable_){
rpcs::dispatch(djob_t *j)
{
connection *c = j->conn;
- unmarshall req(j->buf, j->sz);
+ unmarshall req(j->buf, true);
delete j;
request_header h;
- req.unpack_req_header(&h);
+ req.unpack_header(h);
proc_t proc = h.proc;
if(!req.ok()){
- IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!");
+ IF_LEVEL(1) LOG("unmarshall header failed!!!");
c->decref();
return;
}
- IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+ IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
marshall rep;
- reply_header rh(h.xid,0);
+ reply_header rh{h.xid,0};
// is client sending to an old instance of server?
if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
- IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce <<
+ IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
" (current " << nonce_ << ") proc " << hex << h.proc);
rh.ret = rpc_const::oldsrv_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
+ rep.pack_header(rh);
+ c->send(rep);
return;
}
{
lock pl(procs_m_);
if(procs_.count(proc) < 1){
- cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl;
+ cerr << "unknown proc " << hex << proc << "." << endl;
c->decref();
VERIFY(0);
return;
}
rpcs::rpcstate_t stat;
- char *b1 = nullptr;
- size_t sz1 = 0;
+ string b1;
if(h.clt_nonce){
// have i seen this client before?
if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid <<
+ IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
" chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
}
}
}
}
- stat = checkduplicate_and_update(h.clt_nonce, h.xid,
- h.xid_rep, &b1, &sz1);
+ stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
} else {
// this client does not require at most once logic
stat = NEW;
}
- switch (stat){
+ switch (stat) {
case NEW: // new request
- if(counting_){
+ if (counting_){
updatestat(proc);
}
rh.ret = (*f)(req, rep);
if (rh.ret == rpc_const::unmarshal_args_failure) {
- cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " <<
+ cerr << "failed to unmarshall the arguments. You are " <<
"probably calling RPC 0x" << hex << proc << " with the wrong " <<
"types of arguments." << endl;
VERIFY(0);
}
VERIFY(rh.ret >= 0);
- rep.pack_reply_header(rh);
- rep.take_buf(&b1,&sz1);
+ rep.pack_header(rh);
+ b1 = rep;
- IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " <<
+ IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
- if(h.clt_nonce > 0){
+ if (h.clt_nonce > 0) {
// only record replies for clients that require at-most-once logic
- add_reply(h.clt_nonce, h.xid, b1, sz1);
+ add_reply(h.clt_nonce, h.xid, b1);
}
// get the latest connection to the client
}
}
- c->send(b1, sz1);
- if(h.clt_nonce == 0){
- // reply is not added to at-most-once window, free it
- free(b1);
- }
+ c->send(rep);
break;
case INPROGRESS: // server is working on this request
break;
case DONE: // duplicate and we still have the response
- c->send(b1, sz1);
+ c->send(b1);
break;
case FORGOTTEN: // very old request and we don't have the response anymore
- IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce);
+ IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
rh.ret = rpc_const::atmostonce_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
+ rep.pack_header(rh);
+ c->send(rep);
break;
}
c->decref();
// returns one of:
// NEW: never seen this xid before.
// INPROGRESS: seen this xid, and still processing it.
-// DONE: seen this xid, previous reply returned in *b and *sz.
+// DONE: seen this xid, previous reply returned in b.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
rpcs::rpcstate_t
rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
- int xid_rep, char **b, size_t *sz)
+ int xid_rep, string & b)
{
lock rwl(reply_window_m_);
if (past_xid_rep < xid_rep || past_xid_rep == -1) {
// scan for deletion candidates
- for (; it != l.end() && it->xid < xid_rep; it++) {
- if (it->cb_present)
- free(it->buf);
- }
+ while (it != l.end() && it->xid < xid_rep)
+ it++;
l.erase(start, it);
l.begin()->xid = xid_rep;
}
if (it != l.end() && it->xid == xid) {
if (it->cb_present) {
// return information about the remembered reply
- *b = it->buf;
- *sz = it->sz;
+ b = it->buf;
return DONE;
- } else {
- return INPROGRESS;
}
+ return INPROGRESS;
} else {
// remember that a new request has arrived
l.insert(it, reply_t(xid));
}
// rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
-// and passes the return value in b and sz.
-// add_reply() should remember b and sz.
-// free_reply_window() and checkduplicate_and_update is responsible for
-// calling free(b).
-void
-rpcs::add_reply(unsigned int clt_nonce, int xid,
- char *b, size_t sz)
-{
+// and passes the return value in b.
+// add_reply() should remember b.
+// free_reply_window() and checkduplicate_and_update are responsible for
+// cleaning up the remembered values.
+void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
lock rwl(reply_window_m_);
// remember the RPC reply value
list<reply_t> &l = reply_window_[clt_nonce];
// there should already be an entry, so whine if there isn't
if (it == l.end() || it->xid != xid) {
cerr << "Could not find reply struct in add_reply" << endl;
- l.insert(it, reply_t(xid, b, sz));
+ l.insert(it, reply_t(xid, b));
} else {
- *it = reply_t(xid, b, sz);
+ *it = reply_t(xid, b);
}
}
void rpcs::free_reply_window(void) {
lock rwl(reply_window_m_);
- for (auto clt : reply_window_) {
- for (auto it : clt.second){
- if (it.cb_present)
- free(it.buf);
- }
- clt.second.clear();
- }
reply_window_.clear();
}
int rpcs::rpcbind(unsigned int &r, int) {
- IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_);
+ IF_LEVEL(2) LOG("called return nonce " << nonce_);
r = nonce_;
return 0;
}
-bool operator<(const sockaddr_in &a, const sockaddr_in &b){
- return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
- ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
- ((a.sin_port < b.sin_port))));
-}
+static sockaddr_in make_sockaddr(const string &host, const string &port);
-/*---------------auxilary function--------------*/
-sockaddr_in make_sockaddr(const string &hostandport) {
+static sockaddr_in make_sockaddr(const string &hostandport) {
auto colon = hostandport.find(':');
if (colon == string::npos)
return make_sockaddr("127.0.0.1", hostandport);
return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
}
-sockaddr_in make_sockaddr(const string &host, const string &port) {
+static sockaddr_in make_sockaddr(const string &host, const string &port) {
sockaddr_in dst;
bzero(&dst, sizeof(dst));
dst.sin_family = AF_INET;
//manages per rpc info
struct caller {
- caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {}
+ caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
int xid;
- unmarshall *un;
+ string *rep;
int intret;
bool done = false;
mutex m;
int islossy() { return lossytest_ > 0; }
- int call1(proc_t proc,
- marshall &req, unmarshall &rep, TO to);
+ int call1(proc_t proc, marshall &req, string &rep, TO to);
- bool got_pdu(connection *c, char *b, size_t sz);
+ bool got_pdu(connection *c, const string & b);
template<class R>
int call_m(proc_t proc, marshall &req, R & r, TO to);
template<class R> int
rpcc::call_m(proc_t proc, marshall &req, R & r, TO to)
{
- unmarshall u;
- int intret = call1(proc, req, u, to);
+ string rep;
+ int intret = call1(proc, req, rep, to);
+ unmarshall u(rep, true);
if (intret < 0) return intret;
u >> r;
if (u.okdone() != true) {
return call_m(proc, m, r, to);
}
-bool operator<(const sockaddr_in &a, const sockaddr_in &b);
-
// rpc server endpoint.
class rpcs : public chanmgr {
// has been sent; in that case buf points to a copy of the reply,
// and sz holds the size of the reply.
struct reply_t {
- reply_t (int _xid) {
- xid = _xid;
- cb_present = false;
- buf = NULL;
- sz = 0;
- }
- reply_t (int _xid, char *_buf, size_t _sz) {
- xid = _xid;
- cb_present = true;
- buf = _buf;
- sz = _sz;
- }
+ reply_t (int _xid) : xid(_xid), cb_present(false) {}
+ reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
int xid;
bool cb_present; // whether the reply buffer is valid
- char *buf; // the reply buffer
- size_t sz; // the size of reply buffer
+ string buf; // the reply buffer
};
unsigned int port_;
map<unsigned int, list<reply_t> > reply_window_;
void free_reply_window(void);
- void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
+ void add_reply(unsigned int clt_nonce, int xid, const string & b);
rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
- int xid, int rep_xid,
- char **b, size_t *sz);
+ int xid, int rep_xid, string & b);
void updatestat(proc_t proc);
size_t curr_counts_;
map<proc_t, size_t> counts_;
- int lossytest_;
bool reachable_;
// map proc # to function
protected:
struct djob_t {
- djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {}
- char *buf;
- size_t sz;
connection *conn;
+ string buf;
};
void dispatch(djob_t *);
void reg1(proc_t proc, handler *);
ThrPool* dispatchpool_;
- tcpsconn* listener_;
+ tcpsconn *listener_;
public:
rpcs(unsigned int port, size_t counts=0);
void set_reachable(bool r) { reachable_ = r; }
- bool got_pdu(connection *c, char *b, size_t sz);
+ bool got_pdu(connection *c, const string & b);
template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
};
reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
}
-sockaddr_in make_sockaddr(const string &hostandport);
-sockaddr_in make_sockaddr(const string &host, const string &port);
-
#endif
{
marshall m;
request_header rh{1,2,3,4,5};
- m.pack_req_header(rh);
- VERIFY(m.size()==RPC_HEADER_SZ);
+ m.pack_header(rh);
+ VERIFY(((string)m).size()==RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
string s = "hallo....";
m << l;
m << s;
- char *b;
- size_t sz;
- m.take_buf(&b,&sz);
- VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+ string b = m;
+ VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
- unmarshall un(b,sz);
+ unmarshall un(b, true);
request_header rh1;
- un.unpack_req_header(&rh1);
+ un.unpack_header(rh1);
VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
int i1;
unsigned long long l1;
int arg = (random() % 1000);
int rep;
- auto start = std::chrono::steady_clock::now();
+ auto start = steady_clock::now();
int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
- auto end = std::chrono::steady_clock::now();
- auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+ auto end = steady_clock::now();
+ auto diff = duration_cast<milliseconds>(end - start).count();
if (ret != 0)
cout << diff << " ms have elapsed!!!" << endl;
VERIFY(ret == 0);
if (isclient) {
// server's address.
- dst = "127.0.0.1:" + std::to_string(port);
+ dst = "127.0.0.1:" + to_string(port);
// start the client. bind it to the server.
// if blocking, then addJob() blocks when queue is full
// otherwise, addJob() simply returns false when queue is full
ThrPool::ThrPool(size_t sz, bool blocking)
-: nthreads_(sz),blockadd_(blocking),jobq_(100*sz)
-{
+: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
for (size_t i=0; i<nthreads_; i++)
th_.emplace_back(&ThrPool::do_worker, this);
}
// IMPORTANT: this function can be called only when no external thread
// will ever use this thread pool again or is currently blocking on it
-ThrPool::~ThrPool()
-{
+ThrPool::~ThrPool() {
for (size_t i=0; i<nthreads_; i++)
jobq_.enq(job_t());
th_[i].join();
}
-bool
-ThrPool::addJob(const job_t &j)
-{
+bool ThrPool::addJob(const job_t &j) {
return jobq_.enq(j,blockadd_);
}
-void
-ThrPool::do_worker()
-{
+void ThrPool::do_worker() {
job_t j;
while (1) {
jobq_.deq(&j);
#include "types.h"
#include "fifo.h"
-typedef std::function<void()> job_t;
+typedef function<void()> job_t;
class ThrPool {
public:
bool blockadd_;
fifo<job_t> jobq_;
- std::vector<std::thread> th_;
+ vector<thread> th_;
void do_worker();
};
#include "rsm.h"
#include "rsm_client.h"
-rsm::rsm(std::string _first, std::string _me) :
+rsm::rsm(string _first, string _me) :
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = new rpcs((uint32_t)std::stoi(_me) + 1);
+ testsvr = new rpcs((uint32_t)stoi(_me) + 1);
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
{
lock ml(rsm_mutex);
- std::thread(&rsm::recovery, this).detach();
+ thread(&rsm::recovery, this).detach();
}
}
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
- std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
+ this_thread::sleep_for(seconds(30)); // XXX make another node in cfg primary?
ml.lock();
}
}
bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
// Remember the primary of vid_insync
- std::string m = primary;
+ string m = primary;
while (vid_insync == vid_commit) {
if (statetransfer(m, rsm_mutex_lock))
break;
* Call to transfer state from m to the local node.
* Assumes that rsm_mutex is already held.
*/
-bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
+bool rsm::statetransfer(string m, lock & rsm_mutex_lock)
{
rsm_protocol::transferres r;
handle h(m);
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
+ LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
return false;
}
if (stf && last_myvs != r.last) {
return true;
}
-bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
+bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) {
rsm_mutex_lock.unlock();
handle h(m);
rpcc *cl = h.safebind();
}
-bool rsm::join(std::string m, lock & rsm_mutex_lock) {
+bool rsm::join(string m, lock & rsm_mutex_lock) {
handle h(m);
int ret = 0;
string log;
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
+ LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
return false;
}
LOG("rsm::join: succeeded " << log);
}
-void rsm::execute(int procno, std::string req, std::string &r) {
+void rsm::execute(int procno, string req, string &r) {
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
- unmarshall args(req);
+ unmarshall args(req, false);
marshall rep;
- std::string reps;
+ string reps;
auto ret = (rsm_protocol::status)(*h)(args, rep);
marshall rep1;
rep1 << ret;
- rep1 << rep.str();
- r = rep1.str();
+ rep1 << rep.content();
+ r = rep1.content();
}
//
// number, and invokes it on all members of the replicated state
// machine.
//
-rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) {
- LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
+rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) {
+ LOG("rsm::client_invoke: procno 0x" << hex << procno);
lock ml(invoke_mutex);
- std::vector<std::string> m;
- std::string myaddr;
+ vector<string> m;
+ string myaddr;
viewstamp vs;
{
lock ml2(rsm_mutex);
// the replica must execute requests in order (with no gaps)
// according to requests' seqno
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) {
- LOG("rsm::invoke: procno 0x" << std::hex << proc);
+rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) {
+ LOG("rsm::invoke: procno 0x" << hex << proc);
lock ml(invoke_mutex);
- std::vector<std::string> m;
- std::string myaddr;
+ vector<string> m;
+ string myaddr;
{
lock ml2(rsm_mutex);
// check if !inviewchange
return rsm_protocol::ERR;
myvs++;
}
- std::string r;
+ string r;
execute(proc, req, r);
last_myvs = vs;
breakpoint1();
/**
* RPC handler: Send back the local node's state to the caller
*/
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
* RPC handler: Inform the local node (the primary) that node m has synchronized
* for view vid
*/
-rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
+rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) {
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
// a node that wants to join an RSM as a server sends a
// joinreq to the RSM's current primary; this is the
// handler for that RPC.
-rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
+rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) {
auto ret = rsm_protocol::OK;
lock ml(rsm_mutex);
* so the client can switch to a different primary
* when it existing primary fails
*/
-rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int) {
- std::vector<std::string> m;
+rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+ vector<string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
m.push_back(primary);
// otherwise, the lowest number node of the previous view.
// caller should hold rsm_mutex
void rsm::set_primary(unsigned vid) {
- std::vector<std::string> c, p;
+ vector<string> c, p;
cfg->get_view(vid, c);
cfg->get_view(vid - 1, p);
VERIFY (c.size() > 0);
// assumes caller holds rsm_mutex
void rsm::net_repair(bool heal, lock &) {
- std::vector<std::string> m;
+ vector<string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
class rsm_client {
protected:
- std::string primary;
- std::vector<std::string> known_mems;
- std::mutex rsm_client_mutex;
+ string primary;
+ vector<string> known_mems;
+ mutex rsm_client_mutex;
void primary_failure(lock & rsm_client_mutex_lock);
bool init_members(lock & rsm_client_mutex_lock);
public:
- rsm_client(std::string dst);
- rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req);
+ rsm_client(string dst);
+ rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
template<class R, class ...Args>
int call(unsigned int proc, R & r, const Args & ...a1);
template<class R>
int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
- std::string rep;
- std::string res;
- int intret = invoke(proc, rep, req.cstr());
+ string rep;
+ string res;
+ int intret = invoke(proc, rep, req);
VERIFY( intret == rsm_client_protocol::OK );
- unmarshall u(rep);
+ unmarshall u(rep, false);
u >> intret;
if (intret < 0) return intret;
u >> res;
VERIFY(0);
return rpc_const::unmarshal_reply_failure;
}
- unmarshall u1(res);
+ unmarshall u1(res, false);
u1 >> r;
if(!u1.okdone()) {
cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
#include "rsmtest_client.h"
#include <arpa/inet.h>
-rsmtest_client::rsmtest_client(std::string dst) : cl(dst) {
+rsmtest_client::rsmtest_client(string dst) : cl(dst) {
if (cl.bind() < 0)
cout << "rsmtest_client: call bind" << endl;
}
protected:
rpcc cl;
public:
- rsmtest_client(std::string d);
+ rsmtest_client(string d);
virtual ~rsmtest_client() {}
virtual rsm_test_protocol::status net_repair(int heal);
virtual rsm_test_protocol::status breakpoint(int b);
extern char log_thread_prefix;
namespace std {
- // This is an awful hack. But sticking this in std:: makes it possible for
- // ostream_iterator to use it.
+ // Sticking this in std:: makes it possible for ostream_iterator to use it.
template <class A, class B>
ostream & operator<<(ostream &o, const pair<A,B> &d) {
return o << "<" << d.first << "," << d.second << ">";
}
#define LOG_PREFIX { \
- auto _thread_ = std::this_thread::get_id(); \
+ auto _thread_ = this_thread::get_id(); \
int _tid_ = thread_name_map[_thread_]; \
if (_tid_==0) \
_tid_ = thread_name_map[_thread_] = ++next_thread_num; \
int _self_ = instance_name_map[this]; \
if (_self_==0) \
_self_ = instance_name_map[this] = ++next_instance_num; \
- cerr << "#" << setw(2) << _self_; \
+ cerr << "#" << setw(2) << " " << _self_; \
}
#define LOG_NONMEMBER(_x_) { \
using std::istream;
using std::ostream_iterator;
using std::istream_iterator;
+using std::ios;
#include <limits>
using std::numeric_limits;
#include <thread>
using std::thread;
+using std::call_once;
+using std::once_flag;
+namespace this_thread {
+ using namespace std::this_thread;
+}
#include <tuple>
using std::tuple;
using std::is_same;
using std::underlying_type;
using std::enable_if;
+using std::remove_reference;
#include <utility>
using std::pair;
using std::declval;
+using std::forward;
#include <vector>
using std::vector;
LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \
LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
+// The following implementation of tuple_indices is redistributed under the MIT
+// License as an insubstantial portion of the LLVM compiler infrastructure.
+
+template <size_t...> struct tuple_indices {};
+template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
+template <size_t S, size_t... Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
+ typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
+};
+template <size_t E, size_t... Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
+ typedef tuple_indices<Indices...> type;
+};
+template <size_t E, size_t S=0> struct make_tuple_indices {
+ typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
+};
+
+#include "endian.h"
+
#endif