--- /dev/null
+*.dSYM
+*.o
+*.d
+rpc/rpctest
+lock_tester
+lock_demo
+lock_server
+*.swp
+*.swo
+*.a
+*.log
+rsm_tester
--- /dev/null
+CXXFLAGS = -g -MMD -Wall -I. -I./rpc
+LDFLAGS = -L. -L/usr/local/lib
+LDLIBS = -lpthread
+LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt)
+LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl)
+CXX = g++
+CC = g++
+
+all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest
+
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o gettime.o
+ rm -f $@
+ ar cq $@ $^
+ ranlib rpc/librpc.a
+
+rpc/rpctest: rpc/rpctest.o rpc/librpc.a
+
+lock_demo=lock_demo.o lock_client.o
+lock_demo : $(lock_demo) rpc/librpc.a
+
+lock_tester=lock_tester.o lock_client.o mutex.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
+lock_tester : $(lock_tester) rpc/librpc.a
+
+lock_server=lock_server.o lock_smain.o mutex.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server : $(lock_server) rpc/librpc.a
+
+rsm_tester=rsm_tester.o rsmtest_client.o
+rsm_tester: $(rsm_tester) rpc/librpc.a
+
+%.o: %.cc
+ $(CXX) $(CXXFLAGS) -c $< -o $@
+
+-include *.d
+-include rpc/*.d
+
+clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester
+.PHONY: clean
+clean:
+ rm -rf $(clean_files)
--- /dev/null
+#include <sstream>
+#include <iostream>
+#include <stdio.h>
+#include "config.h"
+#include "paxos.h"
+#include "handle.h"
+#include "tprintf.h"
+#include "lang/verify.h"
+
+// The config module maintains views. As a node joins or leaves a
+// view, the next view will be the same as previous view, except with
+// the new node added or removed. The first view contains only node
+// 1. If node 2 joins after the first node (it will download the views
+// from node 1), it will learn about view 1 with the first node as the
+// only member. It will then invoke Paxos to create the next view.
+// It will tell Paxos to ask the nodes in view 1 to agree on the value
+// {1, 2}. If Paxos returns success, then it moves to view 2 with
+// {1,2} as the members. When node 3 joins, the config module runs
+// Paxos with the nodes in view 2 and the proposed value to be
+// {1,2,3}. And so on. When a node discovers that some node of the
+// current view is not responding, it kicks off Paxos to propose a new
+// value (the current view minus the node that isn't responding). The
+// config module uses Paxos to create a total order of views, and it
+// is ensured that the majority of the previous view agrees to the
+// next view. The Paxos log contains all the values (i.e., views)
+// agreed on.
+//
+// The RSM module informs config to add nodes. The config module
+// runs a heartbeater thread that checks in with nodes. If a node
+// doesn't respond, the config module will invoke Paxos's proposer to
+// remove the node. Higher layers will learn about this change when a
+// Paxos acceptor accepts the new proposed value through
+// paxos_commit().
+//
+// To be able to bring other nodes up to date to the latest formed
+// view, each node will have a complete history of all view numbers
+// and their values that it knows about. At any time a node can reboot
+// and when it re-joins, it may be many views behind; by remembering
+// all views, the other nodes can bring this re-joined node up to
+// date.
+
+static void *
+heartbeatthread(void *x)
+{
+ config *r = (config *) x;
+ r->heartbeater();
+ return 0;
+}
+
+config::config(std::string _first, std::string _me, config_view_change *_vc)
+ : myvid (0), first (_first), me (_me), vc (_vc)
+{
+ VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
+ VERIFY(pthread_cond_init(&config_cond, NULL) == 0);
+
+ std::ostringstream ost;
+ ost << me;
+
+ acc = new acceptor(this, me == _first, me, ost.str());
+ pro = new proposer(this, acc, me);
+
+ // XXX hack; maybe should have its own port number
+ pxsrpc = acc->get_rpcs();
+ pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+
+ {
+ ScopedLock ml(&cfg_mutex);
+
+ reconstruct();
+
+ pthread_t th;
+ VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
+ }
+}
+
+void
+config::restore(std::string s)
+{
+ ScopedLock ml(&cfg_mutex);
+ acc->restore(s);
+ reconstruct();
+}
+
+std::vector<std::string>
+config::get_view(unsigned instance)
+{
+ ScopedLock ml(&cfg_mutex);
+ return get_view_wo(instance);
+}
+
+// caller should hold cfg_mutex
+std::vector<std::string>
+config::get_view_wo(unsigned instance)
+{
+ std::string value = acc->value(instance);
+ tprintf("get_view(%d): returns %s\n", instance, value.c_str());
+ return members(value);
+}
+
+std::vector<std::string>
+config::members(std::string value)
+{
+ std::istringstream ist(value);
+ std::string m;
+ std::vector<std::string> view;
+ while (ist >> m) {
+ view.push_back(m);
+ }
+ return view;
+}
+
+std::string
+config::value(std::vector<std::string> m)
+{
+ std::ostringstream ost;
+ for (unsigned i = 0; i < m.size(); i++) {
+ ost << m[i];
+ ost << " ";
+ }
+ return ost.str();
+}
+
+// caller should hold cfg_mutex
+void
+config::reconstruct()
+{
+ if (acc->instance() > 0) {
+ std::string m;
+ myvid = acc->instance();
+ mems = get_view_wo(myvid);
+ tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
+ }
+}
+
+// Called by Paxos's acceptor.
+void
+config::paxos_commit(unsigned instance, std::string value)
+{
+ std::string m;
+ std::vector<std::string> newmem;
+ ScopedLock ml(&cfg_mutex);
+
+ newmem = members(value);
+ tprintf("config::paxos_commit: %d: %s\n", instance,
+ print_members(newmem).c_str());
+
+ for (unsigned i = 0; i < mems.size(); i++) {
+ tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
+ if (!isamember(mems[i], newmem) && me != mems[i]) {
+ tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
+ mgr.delete_handle(mems[i]);
+ }
+ }
+
+ mems = newmem;
+ myvid = instance;
+ if (vc) {
+ unsigned vid = myvid;
+ VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
+ vc->commit_change(vid);
+ VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
+ }
+}
+
+bool
+config::ismember(std::string m, unsigned vid)
+{
+ bool r;
+ ScopedLock ml(&cfg_mutex);
+ std::vector<std::string> v = get_view_wo(vid);
+ r = isamember(m, v);
+ return r;
+}
+
+bool
+config::add(std::string new_m, unsigned vid)
+{
+ std::vector<std::string> m;
+ std::vector<std::string> curm;
+ ScopedLock ml(&cfg_mutex);
+ if (vid != myvid)
+ return false;
+ tprintf("config::add %s\n", new_m.c_str());
+ m = mems;
+ m.push_back(new_m);
+ curm = mems;
+ std::string v = value(m);
+ int nextvid = myvid + 1;
+ VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
+ bool r = pro->run(nextvid, curm, v);
+ VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
+ if (r) {
+ tprintf("config::add: proposer returned success\n");
+ } else {
+ tprintf("config::add: proposer returned failure\n");
+ }
+ return r;
+}
+
+// caller should hold cfg_mutex
+bool
+config::remove_wo(std::string m)
+{
+ tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
+ std::vector<std::string> n;
+ for (unsigned i = 0; i < mems.size(); i++) {
+ if (mems[i] != m) n.push_back(mems[i]);
+ }
+ std::string v = value(n);
+ std::vector<std::string> cmems = mems;
+ int nextvid = myvid + 1;
+ VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
+ bool r = pro->run(nextvid, cmems, v);
+ VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
+ if (r) {
+ tprintf("config::remove: proposer returned success\n");
+ } else {
+ tprintf("config::remove: proposer returned failure\n");
+ }
+ return r;
+}
+
+void
+config::heartbeater()
+{
+ struct timeval now;
+ struct timespec next_timeout;
+ std::string m;
+ heartbeat_t h;
+ bool stable;
+ unsigned vid;
+ std::vector<std::string> cmems;
+ ScopedLock ml(&cfg_mutex);
+
+ while (1) {
+
+ gettimeofday(&now, NULL);
+ next_timeout.tv_sec = now.tv_sec + 3;
+ next_timeout.tv_nsec = 0;
+ tprintf("heartbeater: go to sleep\n");
+ pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
+
+ stable = true;
+ vid = myvid;
+ cmems = get_view_wo(vid);
+ tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
+
+ if (!isamember(me, cmems)) {
+ tprintf("heartbeater: not member yet; skip hearbeat\n");
+ continue;
+ }
+
+ //find the node with the smallest id
+ m = me;
+ for (unsigned i = 0; i < cmems.size(); i++) {
+ if (m > cmems[i])
+ m = cmems[i];
+ }
+
+ if (m == me) {
+ //if i am the one with smallest id, ping the rest of the nodes
+ for (unsigned i = 0; i < cmems.size(); i++) {
+ if (cmems[i] != me) {
+ if ((h = doheartbeat(cmems[i])) != OK) {
+ stable = false;
+ m = cmems[i];
+ break;
+ }
+ }
+ }
+ } else {
+ //the rest of the nodes ping the one with smallest id
+ if ((h = doheartbeat(m)) != OK)
+ stable = false;
+ }
+
+ if (!stable && vid == myvid) {
+ remove_wo(m);
+ }
+ }
+}
+
+paxos_protocol::status
+config::heartbeat(std::string m, unsigned vid, int &r)
+{
+ ScopedLock ml(&cfg_mutex);
+ int ret = paxos_protocol::ERR;
+ r = (int) myvid;
+ tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
+ if (vid == myvid) {
+ ret = paxos_protocol::OK;
+ } else if (pro->isrunning()) {
+ VERIFY (vid == myvid + 1 || vid + 1 == myvid);
+ ret = paxos_protocol::OK;
+ } else {
+ ret = paxos_protocol::ERR;
+ }
+ return ret;
+}
+
+config::heartbeat_t
+config::doheartbeat(std::string m)
+{
+ int ret = rpc_const::timeout_failure;
+ int r;
+ unsigned vid = myvid;
+ heartbeat_t res = OK;
+
+ tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+ handle h(m);
+ VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
+ rpcc *cl = h.safebind();
+ if (cl) {
+ ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
+ rpcc::to(1000));
+ }
+ VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
+ if (ret != paxos_protocol::OK) {
+ if (ret == rpc_const::atmostonce_failure ||
+ ret == rpc_const::oldsrv_failure) {
+ mgr.delete_handle(m);
+ } else {
+ tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
+ m.c_str(), ret, vid, r);
+ if (ret < 0) res = FAILURE;
+ else res = VIEWERR;
+ }
+ }
+ tprintf("doheartbeat done %d\n", res);
+ return res;
+}
+
--- /dev/null
+#ifndef config_h
+#define config_h
+
+#include <string>
+#include <vector>
+#include "paxos.h"
+
+class config_view_change {
+ public:
+ virtual void commit_change(unsigned vid) = 0;
+ virtual ~config_view_change() {};
+};
+
+class config : public paxos_change {
+ private:
+ acceptor *acc;
+ proposer *pro;
+ rpcs *pxsrpc;
+ unsigned myvid;
+ std::string first;
+ std::string me;
+ config_view_change *vc;
+ std::vector<std::string> mems;
+ pthread_mutex_t cfg_mutex;
+ pthread_cond_t heartbeat_cond;
+ pthread_cond_t config_cond;
+ paxos_protocol::status heartbeat(std::string m, unsigned instance, int &r);
+ std::string value(std::vector<std::string> mems);
+ std::vector<std::string> members(std::string v);
+ std::vector<std::string> get_view_wo(unsigned instance);
+ bool remove_wo(std::string);
+ void reconstruct();
+ typedef enum {
+ OK, // response and same view #
+ VIEWERR, // response but different view #
+ FAILURE, // no response
+ } heartbeat_t;
+ heartbeat_t doheartbeat(std::string m);
+ public:
+ config(std::string _first, std::string _me, config_view_change *_vc);
+ unsigned vid() { return myvid; }
+ std::string myaddr() { return me; };
+ std::string dump() { return acc->dump(); };
+ std::vector<std::string> get_view(unsigned instance);
+ void restore(std::string s);
+ bool add(std::string, unsigned vid);
+ bool ismember(std::string m, unsigned vid);
+ void heartbeater(void);
+ void paxos_commit(unsigned instance, std::string v);
+ rpcs *get_rpcs() { return acc->get_rpcs(); }
+ void breakpoint(int b) { pro->breakpoint(b); }
+};
+
+#endif
--- /dev/null
+/*
+ * Copyright (c), MM Weiss
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification,
+ * are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. Neither the name of the MM Weiss nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
+ * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * clock_gettime_stub.c
+ * gcc -Wall -c clock_gettime_stub.c
+ * posix realtime functions; MacOS user space glue
+ */
+
+/* @comment
+ * other possible implementation using intel builtin rdtsc
+ * rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html
+ *
+ * we could get the ticks by doing this
+ *
+ * __asm __volatile("mov %%ebx, %%esi\n\t"
+ * "cpuid\n\t"
+ * "xchg %%esi, %%ebx\n\t"
+ * "rdtsc"
+ * : "=a" (a),
+ * "=d" (d)
+ * );
+
+ * we could even replace our tricky sched_yield call by assembly code to get a better accurency,
+ * anyway the following C stub will satisfy 99% of apps using posix clock_gettime call,
+ * moreover, the setter version (clock_settime) could be easly written using mach primitives:
+ * http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time)
+ *
+ * hackers don't be crackers, don't you use a flush toilet?
+ *
+ *
+ * @see draft: ./posix-realtime-stub/posix-realtime-stub.c
+ *
+ */
+
+
+#ifdef __APPLE__
+
+#pragma weak clock_gettime
+
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <mach/mach.h>
+#include <mach/clock.h>
+#include <mach/mach_time.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sched.h>
+
+typedef enum {
+ CLOCK_REALTIME,
+ CLOCK_MONOTONIC,
+ CLOCK_PROCESS_CPUTIME_ID,
+ CLOCK_THREAD_CPUTIME_ID
+} clockid_t;
+
+static mach_timebase_info_data_t __clock_gettime_inf;
+
+int clock_gettime(clockid_t clk_id, struct timespec *tp) {
+ kern_return_t ret;
+ clock_serv_t clk;
+ clock_id_t clk_serv_id;
+ mach_timespec_t tm;
+
+ uint64_t start, end, delta, nano;
+
+ int retval = -1;
+ switch (clk_id) {
+ case CLOCK_REALTIME:
+ case CLOCK_MONOTONIC:
+ clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK;
+ if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) {
+ if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) {
+ tp->tv_sec = tm.tv_sec;
+ tp->tv_nsec = tm.tv_nsec;
+ retval = 0;
+ }
+ }
+ if (KERN_SUCCESS != ret) {
+ errno = EINVAL;
+ retval = -1;
+ }
+ break;
+ case CLOCK_PROCESS_CPUTIME_ID:
+ case CLOCK_THREAD_CPUTIME_ID:
+ start = mach_absolute_time();
+ if (clk_id == CLOCK_PROCESS_CPUTIME_ID) {
+ getpid();
+ } else {
+ sched_yield();
+ }
+ end = mach_absolute_time();
+ delta = end - start;
+ if (0 == __clock_gettime_inf.denom) {
+ mach_timebase_info(&__clock_gettime_inf);
+ }
+ nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom;
+ tp->tv_sec = nano * 1e-9;
+ tp->tv_nsec = nano - (tp->tv_sec * 1e9);
+ retval = 0;
+ break;
+ default:
+ errno = EINVAL;
+ retval = -1;
+ }
+ return retval;
+}
+
+#endif // __APPLE__
--- /dev/null
+#ifndef gettime_h
+#define gettime_h
+
+#ifdef __APPLE__
+typedef enum {
+ CLOCK_REALTIME,
+ CLOCK_MONOTONIC,
+ CLOCK_PROCESS_CPUTIME_ID,
+ CLOCK_THREAD_CPUTIME_ID
+} clockid_t;
+
+int clock_gettime(clockid_t clk_id, struct timespec *tp);
+#endif
+
+#endif
--- /dev/null
+#include "handle.h"
+#include <stdio.h>
+#include "tprintf.h"
+
+handle_mgr mgr;
+
+handle::handle(std::string m)
+{
+ h = mgr.get_handle(m);
+}
+
+rpcc *
+handle::safebind()
+{
+ if (!h)
+ return NULL;
+ ScopedLock ml(&h->cl_mutex);
+ if (h->del)
+ return NULL;
+ if (h->cl)
+ return h->cl;
+ sockaddr_in dstsock;
+ make_sockaddr(h->m.c_str(), &dstsock);
+ rpcc *cl = new rpcc(dstsock);
+ tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
+ int ret;
+ // Starting with lab 6, our test script assumes that the failure
+ // can be detected by paxos and rsm layer within few seconds. We have
+ // to set the timeout with a small value to support the assumption.
+ //
+ // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
+ // lab 6 and lab 7 because the rpc layer may delay your RPC request,
+ // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
+ ret = cl->bind(rpcc::to(1000));
+ if (ret < 0) {
+ tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
+ delete cl;
+ h->del = true;
+ } else {
+ tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
+ h->cl = cl;
+ }
+ return h->cl;
+}
+
+handle::~handle()
+{
+ if (h) mgr.done_handle(h);
+}
+
+handle_mgr::handle_mgr()
+{
+ VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0);
+}
+
+struct hinfo *
+handle_mgr::get_handle(std::string m)
+{
+ ScopedLock ml(&handle_mutex);
+ struct hinfo *h = 0;
+ if (hmap.find(m) == hmap.end()) {
+ h = new hinfo;
+ h->cl = NULL;
+ h->del = false;
+ h->refcnt = 1;
+ h->m = m;
+ pthread_mutex_init(&h->cl_mutex, NULL);
+ hmap[m] = h;
+ } else if (!hmap[m]->del) {
+ h = hmap[m];
+ h->refcnt ++;
+ }
+ return h;
+}
+
+void
+handle_mgr::done_handle(struct hinfo *h)
+{
+ ScopedLock ml(&handle_mutex);
+ h->refcnt--;
+ if (h->refcnt == 0 && h->del)
+ delete_handle_wo(h->m);
+}
+
+void
+handle_mgr::delete_handle(std::string m)
+{
+ ScopedLock ml(&handle_mutex);
+ delete_handle_wo(m);
+}
+
+// Must be called with handle_mutex locked.
+void
+handle_mgr::delete_handle_wo(std::string m)
+{
+ if (hmap.find(m) == hmap.end()) {
+ tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
+ } else {
+ tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
+ hmap[m]->refcnt);
+ struct hinfo *h = hmap[m];
+ if (h->refcnt == 0) {
+ if (h->cl) {
+ h->cl->cancel();
+ delete h->cl;
+ }
+ pthread_mutex_destroy(&h->cl_mutex);
+ hmap.erase(m);
+ delete h;
+ } else {
+ h->del = true;
+ }
+ }
+}
--- /dev/null
+// manage a cache of RPC connections.
+// assuming cid is a std::string holding the
+// host:port of the RPC server you want
+// to talk to:
+//
+// handle h(cid);
+// rpcc *cl = h.safebind();
+// if(cl){
+// ret = cl->call(...);
+// } else {
+// bind() failed
+// }
+//
+// if the calling program has not contacted
+// cid before, safebind() will create a new
+// connection, call bind(), and return
+// an rpcc*, or 0 if bind() failed. if the
+// program has previously contacted cid,
+// safebind() just returns the previously
+// created rpcc*. best not to hold any
+// mutexes while calling safebind().
+
+#ifndef handle_h
+#define handle_h
+
+#include <string>
+#include <vector>
+#include "rpc.h"
+
+struct hinfo {
+ rpcc *cl;
+ int refcnt;
+ bool del;
+ std::string m;
+ pthread_mutex_t cl_mutex;
+};
+
+class handle {
+ private:
+ struct hinfo *h;
+ public:
+ handle(std::string m);
+ ~handle();
+ /* safebind will try to bind with the rpc server on the first call.
+ * Since bind may block, the caller probably should not hold a mutex
+ * when calling safebind.
+ *
+ * return:
+ * if the first safebind succeeded, all later calls would return
+ * a rpcc object; otherwise, all later calls would return NULL.
+ *
+ * Example:
+ * handle h(dst);
+ * XXX_protocol::status ret;
+ * if (h.safebind()) {
+ * ret = h.safebind()->call(...);
+ * }
+ * if (!h.safebind() || ret != XXX_protocol::OK) {
+ * // handle failure
+ * }
+ */
+ rpcc *safebind();
+};
+
+class handle_mgr {
+ private:
+ pthread_mutex_t handle_mutex;
+ std::map<std::string, struct hinfo *> hmap;
+ public:
+ handle_mgr();
+ struct hinfo *get_handle(std::string m);
+ void done_handle(struct hinfo *h);
+ void delete_handle(std::string m);
+ void delete_handle_wo(std::string m);
+};
+
+extern class handle_mgr mgr;
+
+#endif
--- /dev/null
+// compile time version of min and max
+
+#ifndef algorithm_h
+#define algorithm_h
+
+template <int A, int B>
+struct static_max
+{
+ static const int value = A > B ? A : B;
+};
+
+template <int A, int B>
+struct static_min
+{
+ static const int value = A < B ? A : B;
+};
+
+#endif
--- /dev/null
+// safe assertions.
+
+#ifndef verify_client_h
+#define verify_client_h
+
+#include <stdlib.h>
+#include <assert.h>
+
+#ifdef NDEBUG
+#define VERIFY(expr) do { if (!(expr)) abort(); } while (0)
+#else
+#define VERIFY(expr) assert(expr)
+#endif
+
+#endif
--- /dev/null
+// RPC stubs for clients to talk to lock_server
+
+#include "lock_client.h"
+#include "rpc.h"
+#include <arpa/inet.h>
+
+#include <sstream>
+#include <iostream>
+#include <stdio.h>
+
+lock_client::lock_client(std::string dst)
+{
+ sockaddr_in dstsock;
+ make_sockaddr(dst.c_str(), &dstsock);
+ cl = new rpcc(dstsock);
+ if (cl->bind() < 0) {
+ printf("lock_client: call bind\n");
+ }
+}
+
+int
+lock_client::stat(lock_protocol::lockid_t lid)
+{
+ int r;
+ lock_protocol::status ret = cl->call(lock_protocol::stat, cl->id(), lid, r);
+ VERIFY (ret == lock_protocol::OK);
+ return r;
+}
+
+lock_protocol::status
+lock_client::acquire(lock_protocol::lockid_t lid)
+{
+ int r;
+ return cl->call(lock_protocol::acquire, cl->id(), lid, r);
+}
+
+lock_protocol::status
+lock_client::release(lock_protocol::lockid_t lid)
+{
+ int r;
+ return cl->call(lock_protocol::release, cl->id(), lid, r);
+}
+
+t4_lock_client *t4_lock_client_new(const char *dst) {
+ return (t4_lock_client *)new lock_client(dst);
+}
+
+void t4_lock_client_delete(t4_lock_client *client) {
+ delete (lock_client *)client;
+}
+
+t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) {
+ return ((lock_client *)client)->acquire(lid);
+}
+
+t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) {
+ return ((lock_client *)client)->acquire(lid);
+}
+
+t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) {
+ return ((lock_client *)client)->stat(lid);
+}
+
--- /dev/null
+// lock client interface.
+
+#ifndef lock_client_h
+#define lock_client_h
+
+#ifdef __cplusplus
+
+#include <string>
+#include "lock_protocol.h"
+#include "rpc.h"
+#include <vector>
+
+// Client interface to the lock server
+class lock_client {
+ protected:
+ rpcc *cl;
+ public:
+ lock_client(std::string d);
+ virtual ~lock_client() {};
+ virtual lock_protocol::status acquire(lock_protocol::lockid_t);
+ virtual lock_protocol::status release(lock_protocol::lockid_t);
+ virtual lock_protocol::status stat(lock_protocol::lockid_t);
+};
+
+#endif
+
+extern "C" {
+
+struct _t4_lock_client;
+typedef struct _t4_lock_client t4_lock_client;
+
+typedef enum {
+ T4_OK,
+ T4_RETRY,
+ T4_RPCERR,
+ T4_NOENT,
+ T4_IOERR
+} t4_xxstatus;
+
+typedef int t4_status;
+
+typedef unsigned long long t4_lockid_t;
+
+t4_lock_client *t4_lock_client_new(const char *dst);
+void t4_lock_client_delete(t4_lock_client *);
+t4_status t4_lock_client_acquire(t4_lock_client *, t4_lockid_t);
+t4_status t4_lock_client_release(t4_lock_client *, t4_lockid_t);
+t4_status t4_lock_client_stat(t4_lock_client *, t4_lockid_t);
+
+}
+
+#endif
--- /dev/null
+// RPC stubs for clients to talk to lock_server, and cache the locks
+// see lock_client.cache.h for protocol details.
+
+#include "lock_client_cache_rsm.h"
+#include "rpc.h"
+#include <sstream>
+#include <iostream>
+#include <stdio.h>
+#include "tprintf.h"
+
+#include "rsm_client.h"
+
+lock_state::lock_state():
+ state(none)
+{
+}
+
+void lock_state::wait() {
+ pthread_t self = pthread_self();
+ c[self].wait(m);
+ c.erase(self);
+}
+
+void lock_state::signal() {
+ // signal anyone
+ if (c.begin() != c.end())
+ c.begin()->second.signal();
+}
+
+void lock_state::signal(pthread_t who) {
+ if (c.count(who))
+ c[who].signal();
+}
+
+static void * releasethread(void *x) {
+ lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
+ cc->releaser();
+ return 0;
+}
+
+int lock_client_cache_rsm::last_port = 0;
+
+lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
+ ScopedLock sl(lock_table_lock);
+ // by the semantics of std::map, this will create
+ // the lock if it doesn't already exist
+ return lock_table[lid];
+}
+
+lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) {
+ srand(time(NULL)^last_port);
+ rlock_port = ((rand()%32000) | (0x1 << 10));
+ const char *hname;
+ // VERIFY(gethostname(hname, 100) == 0);
+ hname = "127.0.0.1";
+ ostringstream host;
+ host << hname << ":" << rlock_port;
+ id = host.str();
+ last_port = rlock_port;
+ rpcs *rlsrpc = new rpcs(rlock_port);
+ rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
+ rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
+ {
+ ScopedLock sl(xid_mutex);
+ xid = 0;
+ }
+ rsmc = new rsm_client(xdst);
+ int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
+ VERIFY (r == 0);
+}
+
+void lock_client_cache_rsm::releaser() {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ release_fifo.deq(&lid);
+ LOG("Releaser: " << lid);
+
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+ VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
+ st.state = lock_state::releasing;
+ {
+ ScopedUnlock su(st.m);
+ int r;
+ rsmc->call(lock_protocol::release, lid, id, st.xid, r);
+ }
+ st.state = lock_state::none;
+ LOG("Lock " << lid << ": none");
+ st.signal();
+ }
+}
+
+lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+ pthread_t self = pthread_self();
+
+ // check for reentrancy
+ VERIFY(st.state != lock_state::locked || st.held_by != self);
+ VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
+
+ st.wanted_by.push_back(self);
+
+ while (1) {
+ if (st.state != lock_state::free)
+ LOG("Lock " << lid << ": not free");
+
+ if (st.state == lock_state::none || st.state == lock_state::retrying) {
+ if (st.state == lock_state::none) {
+ ScopedLock sl(xid_mutex);
+ st.xid = xid++;
+ }
+ st.state = lock_state::acquiring;
+ LOG("Lock " << lid << ": acquiring");
+ lock_protocol::status result;
+ {
+ ScopedUnlock su(st.m);
+ int r;
+ result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
+ }
+ LOG("acquire returned " << result);
+ if (result == lock_protocol::OK) {
+ st.state = lock_state::free;
+ LOG("Lock " << lid << ": free");
+ }
+ }
+
+ VERIFY(st.wanted_by.size() != 0);
+ if (st.state == lock_state::free) {
+ // is it for me?
+ pthread_t front = st.wanted_by.front();
+ if (front == releaser_thread) {
+ st.wanted_by.pop_front();
+ st.state = lock_state::locked;
+ st.held_by = releaser_thread;
+ LOG("Queuing " << lid << " for release");
+ release_fifo.enq(lid);
+ } else if (front == self) {
+ st.wanted_by.pop_front();
+ st.state = lock_state::locked;
+ st.held_by = self;
+ break;
+ } else {
+ st.signal(front);
+ }
+ }
+
+ LOG("waiting...");
+ st.wait();
+ LOG("wait ended");
+ }
+
+ LOG("Lock " << lid << ": locked");
+ return lock_protocol::OK;
+}
+
+lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+ pthread_t self = pthread_self();
+ VERIFY(st.state == lock_state::locked && st.held_by == self);
+ st.state = lock_state::free;
+ LOG("Lock " << lid << ": free");
+ if (st.wanted_by.size()) {
+ pthread_t front = st.wanted_by.front();
+ if (front == releaser_thread) {
+ st.state = lock_state::locked;
+ st.held_by = releaser_thread;
+ st.wanted_by.pop_front();
+ LOG("Queuing " << lid << " for release");
+ release_fifo.enq(lid);
+ } else
+ st.signal(front);
+ }
+ LOG("Finished signaling.");
+ return lock_protocol::OK;
+}
+
+rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
+ LOG("Revoke handler " << lid << " " << xid);
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+
+ if (st.state == lock_state::releasing || st.state == lock_state::none)
+ return rlock_protocol::OK;
+
+ if (st.state == lock_state::free &&
+ (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
+ // gimme
+ st.state = lock_state::locked;
+ st.held_by = releaser_thread;
+ if (st.wanted_by.size())
+ st.wanted_by.pop_front();
+ release_fifo.enq(lid);
+ } else {
+ // get in line
+ st.wanted_by.push_back(releaser_thread);
+ }
+ return rlock_protocol::OK;
+}
+
+rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+ VERIFY(st.state == lock_state::acquiring);
+ st.state = lock_state::retrying;
+ LOG("Lock " << lid << ": none");
+ st.signal(); // only one thread needs to wake up
+ return rlock_protocol::OK;
+}
--- /dev/null
+// lock client interface.
+
+#ifndef lock_client_cache_rsm_h
+
+#define lock_client_cache_rsm_h
+
+#include <string>
+#include "lock_protocol.h"
+#include "rpc.h"
+#include "lock_client.h"
+#include "lang/verify.h"
+#include "mutex.h"
+#include "rpc/fifo.h"
+#include "rsm_client.h"
+
+// Classes that inherit lock_release_user can override dorelease so that
+// that they will be called when lock_client releases a lock.
+// You will not need to do anything with this class until Lab 5.
+class lock_release_user {
+ public:
+ virtual void dorelease(lock_protocol::lockid_t) = 0;
+ virtual ~lock_release_user() {};
+};
+
+using namespace std;
+
+typedef string callback;
+
+class lock_state {
+public:
+ lock_state();
+ enum {
+ none = 0,
+ retrying,
+ free,
+ locked,
+ acquiring,
+ releasing
+ } state;
+ pthread_t held_by;
+ list<pthread_t> wanted_by;
+ mutex m;
+ map<pthread_t, cond> c;
+ lock_protocol::xid_t xid;
+ void wait();
+ void signal();
+ void signal(pthread_t who);
+};
+
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
+class lock_client_cache_rsm;
+
+// Clients that caches locks. The server can revoke locks using
+// lock_revoke_server.
+class lock_client_cache_rsm : public lock_client {
+ private:
+ pthread_t releaser_thread;
+ rsm_client *rsmc;
+ class lock_release_user *lu;
+ int rlock_port;
+ string hostname;
+ string id;
+ mutex xid_mutex;
+ lock_protocol::xid_t xid;
+ fifo<lock_protocol::lockid_t> release_fifo;
+ mutex lock_table_lock;
+ lock_map lock_table;
+ lock_state &get_lock_state(lock_protocol::lockid_t lid);
+ public:
+ static int last_port;
+ lock_client_cache_rsm(string xdst, class lock_release_user *l = 0);
+ virtual ~lock_client_cache_rsm() {};
+ lock_protocol::status acquire(lock_protocol::lockid_t);
+ virtual lock_protocol::status release(lock_protocol::lockid_t);
+ void releaser();
+ rlock_protocol::status revoke_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &);
+ rlock_protocol::status retry_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &);
+};
+
+
+#endif
--- /dev/null
+//
+// Lock demo
+//
+
+#include "lock_protocol.h"
+#include "lock_client.h"
+#include "rpc.h"
+#include <arpa/inet.h>
+#include <vector>
+#include <stdlib.h>
+#include <stdio.h>
+
+std::string dst;
+lock_client *lc;
+
+int
+main(int argc, char *argv[])
+{
+ int r;
+
+ if(argc != 2){
+ fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
+ exit(1);
+ }
+
+ dst = argv[1];
+ lc = new lock_client(dst);
+ r = lc->stat(1);
+ printf ("stat returned %d\n", r);
+}
--- /dev/null
+// lock protocol
+
+#ifndef lock_protocol_h
+#define lock_protocol_h
+
+#include "rpc.h"
+
+class lock_protocol {
+ public:
+ enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR };
+ typedef int status;
+ typedef unsigned long long lockid_t;
+ typedef unsigned long long xid_t;
+ enum rpc_numbers {
+ acquire = 0x7001,
+ release,
+ stat
+ };
+};
+
+class rlock_protocol {
+ public:
+ enum xxstatus { OK, RPCERR };
+ typedef int status;
+ enum rpc_numbers {
+ revoke = 0x8001,
+ retry = 0x8002
+ };
+};
+#endif
--- /dev/null
+// the lock server implementation
+
+#include "lock_server.h"
+#include <sstream>
+#include <stdio.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+
+lock_server::lock_server():
+ nacquire (0)
+{
+}
+
+// caller must hold lock_lock
+mutex &
+lock_server::get_lock(lock_protocol::lockid_t lid) {
+ lock_lock.acquire();
+ // by the semantics of std::map, this will create
+ // the mutex if it doesn't already exist
+ mutex &l = locks[lid];
+ lock_lock.release();
+ return l;
+}
+
+lock_protocol::status
+lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r)
+{
+ lock_protocol::status ret = lock_protocol::OK;
+ printf("stat request from clt %d\n", clt);
+ r = nacquire;
+ return ret;
+}
+
+lock_protocol::status
+lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r)
+{
+ get_lock(lid).acquire();
+ return lock_protocol::OK;
+}
+
+lock_protocol::status
+lock_server::release(int clt, lock_protocol::lockid_t lid, int &r)
+{
+ get_lock(lid).release();
+ return lock_protocol::OK;
+}
--- /dev/null
+// this is the lock server
+// the lock client has a similar interface
+
+#ifndef lock_server_h
+#define lock_server_h
+
+#include <string>
+#include "lock_protocol.h"
+#include "lock_client.h"
+#include "rpc.h"
+#include <pthread.h>
+#include <list>
+#include <map>
+#include "mutex.h"
+
+using namespace std;
+
+typedef map<lock_protocol::lockid_t, mutex> lock_map;
+
+class lock_server {
+
+ protected:
+ int nacquire;
+ mutex lock_lock;
+ lock_map locks;
+ mutex &get_lock(lock_protocol::lockid_t lid);
+
+ public:
+ lock_server();
+ ~lock_server() {};
+ lock_protocol::status stat(int clt, lock_protocol::lockid_t lid, int &);
+ lock_protocol::status acquire(int clt, lock_protocol::lockid_t lid, int &);
+ lock_protocol::status release(int clt, lock_protocol::lockid_t lid, int &);
+};
+
+#endif
--- /dev/null
+// the caching lock server implementation
+
+#include "lock_server_cache_rsm.h"
+#include <sstream>
+#include <stdio.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include "lang/verify.h"
+#include "handle.h"
+#include "tprintf.h"
+#include "rpc/marshall.h"
+
+lock_state::lock_state():
+ held(false)
+{
+}
+
+template <class A, class B>
+ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
+ o << "<" << d.first << "," << d.second << ">";
+ return o;
+}
+
+template <class A>
+marshall & operator<<(marshall &m, const list<A> &d) {
+ m << vector<A>(d.begin(), d.end());
+ return m;
+}
+
+template <class A>
+unmarshall & operator>>(unmarshall &u, list<A> &d) {
+ vector<A> v;
+ u >> v;
+ d.assign(v.begin(), v.end());
+ return u;
+}
+
+
+template <class A, class B>
+marshall & operator<<(marshall &m, const pair<A,B> &d) {
+ m << d.first;
+ m << d.second;
+ return m;
+}
+
+template <class A, class B>
+unmarshall & operator>>(unmarshall &u, pair<A,B> &d) {
+ u >> d.first;
+ u >> d.second;
+ return u;
+}
+
+marshall & operator<<(marshall &m, const lock_state &d) {
+ m << d.held;
+ m << d.held_by;
+ m << d.wanted_by;
+ return m;
+}
+
+unmarshall & operator>>(unmarshall &u, lock_state &d) {
+ u >> d.held;
+ u >> d.held_by;
+ u >> d.wanted_by;
+ return u;
+}
+
+
+lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
+ ScopedLock sl(lock_table_lock);
+ // by the semantics of map, this will create
+ // the lock if it doesn't already exist
+ return lock_table[lid];
+}
+
+static void *revokethread(void *x) {
+ lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
+ sc->revoker();
+ return 0;
+}
+
+static void *retrythread(void *x) {
+ lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
+ sc->retryer();
+ return 0;
+}
+
+lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
+ pthread_t th;
+ VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0);
+ VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0);
+ rsm->set_state_transfer(this);
+}
+
+void lock_server_cache_rsm::revoker() {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ revoke_fifo.deq(&lid);
+ LOG("Revoking " << lid);
+ if (rsm && !rsm->amiprimary())
+ continue;
+
+ lock_state &st = get_lock_state(lid);
+ holder held_by;
+ {
+ ScopedLock sl(st.m);
+ held_by = st.held_by;
+ }
+
+ rpcc *proxy = NULL;
+ // try a few times?
+ //int t=5;
+ //while (t-- && !proxy)
+ proxy = handle(held_by.first).safebind();
+ if (proxy) {
+ int r;
+ rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r);
+ LOG("Revoke returned " << ret);
+ }
+ }
+}
+
+void lock_server_cache_rsm::retryer() {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ retry_fifo.deq(&lid);
+ if (rsm && !rsm->amiprimary())
+ continue;
+
+ LOG("Sending retry for " << lid);
+ lock_state &st = get_lock_state(lid);
+ holder front;
+ {
+ ScopedLock sl(st.m);
+ if (st.wanted_by.empty())
+ continue;
+ front = st.wanted_by.front();
+ }
+
+ rlock_protocol::status ret = -1;
+
+ rpcc *proxy = NULL;
+ // try a few times?
+ //int t=5;
+ //while (t-- && !proxy)
+ proxy = handle(front.first).safebind();
+ if (proxy) {
+ int r;
+ ret = proxy->call(rlock_protocol::retry, lid, front.second, r);
+ LOG("Retry returned " << ret);
+ }
+ }
+}
+
+int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) {
+ LOG_FUNC_ENTER_SERVER;
+ holder h = holder(id, xid);
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+
+ // deal with duplicated requests
+ if (st.old_requests.count(id)) {
+ lock_protocol::xid_t old_xid = st.old_requests[id];
+ if (old_xid > xid)
+ return lock_protocol::RPCERR;
+ else if (old_xid == xid) {
+ if (st.held && st.held_by == h) {
+ LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+ return lock_protocol::OK;
+ }
+ }
+ }
+
+ // grant the lock if it's available and I'm next in line
+ if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
+ if (!st.wanted_by.empty())
+ st.wanted_by.pop_front();
+ st.old_requests[id] = xid;
+
+ st.held = true;
+ st.held_by = h;
+ LOG("Lock " << lid << " held by " << h.first);
+ if (st.wanted_by.size())
+ revoke_fifo.enq(lid);
+ return lock_protocol::OK;
+ }
+
+ // get in line
+ bool found = false;
+ for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
+ if (i->first == id) {
+ // make sure client is obeying serialization
+ if (i->second != xid) {
+ LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+ return lock_protocol::RPCERR;
+ }
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ st.wanted_by.push_back(h);
+
+ LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+
+ // send revoke if we're first in line
+ if (st.wanted_by.front() == h)
+ revoke_fifo.enq(lid);
+
+ return lock_protocol::RETRY;
+}
+
+int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
+ LOG_FUNC_ENTER_SERVER;
+ lock_state &st = get_lock_state(lid);
+ ScopedLock sl(st.m);
+ if (st.held && st.held_by == holder(id, xid)) {
+ st.held = false;
+ LOG("Lock " << lid << " not held");
+ }
+ if (st.wanted_by.size())
+ retry_fifo.enq(lid);
+ return lock_protocol::OK;
+}
+
+string lock_server_cache_rsm::marshal_state() {
+ ScopedLock sl(lock_table_lock);
+ marshall rep;
+ rep << nacquire;
+ rep << lock_table;
+ return rep.str();
+}
+
+void lock_server_cache_rsm::unmarshal_state(string state) {
+ ScopedLock sl(lock_table_lock);
+ unmarshall rep(state);
+ rep >> nacquire;
+ rep >> lock_table;
+}
+
+lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) {
+ printf("stat request\n");
+ r = nacquire;
+ return lock_protocol::OK;
+}
+
--- /dev/null
+#ifndef lock_server_cache_rsm_h
+#define lock_server_cache_rsm_h
+
+#include <string>
+
+#include <map>
+#include <vector>
+#include "lock_protocol.h"
+#include "rpc.h"
+#include "mutex.h"
+#include "rsm_state_transfer.h"
+#include "rsm.h"
+#include "rpc/fifo.h"
+
+using namespace std;
+
+typedef string callback;
+typedef pair<callback, lock_protocol::xid_t> holder;
+
+class lock_state {
+public:
+ lock_state();
+ bool held;
+ holder held_by;
+ list<holder> wanted_by;
+ map<callback, lock_protocol::xid_t> old_requests;
+ mutex m;
+};
+
+typedef map<lock_protocol::lockid_t, lock_state> lock_map;
+
+class lock_server_cache_rsm : public rsm_state_transfer {
+ private:
+ int nacquire;
+ mutex lock_table_lock;
+ lock_map lock_table;
+ lock_state &get_lock_state(lock_protocol::lockid_t lid);
+ fifo<lock_protocol::lockid_t> retry_fifo;
+ fifo<lock_protocol::lockid_t> revoke_fifo;
+ class rsm *rsm;
+ public:
+ lock_server_cache_rsm(class rsm *rsm = 0);
+ lock_protocol::status stat(lock_protocol::lockid_t, int &);
+ void revoker();
+ void retryer();
+ string marshal_state();
+ void unmarshal_state(string state);
+ int acquire(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &);
+ int release(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &);
+};
+
+#endif
--- /dev/null
+#include "rpc.h"
+#include <arpa/inet.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include "lock_server_cache_rsm.h"
+#include "paxos.h"
+#include "rsm.h"
+
+#include "jsl_log.h"
+
+// Main loop of lock_server
+
+char tprintf_thread_prefix = 's';
+
+int
+main(int argc, char *argv[])
+{
+ int count = 0;
+
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+
+ srandom(getpid());
+
+ if(argc != 3){
+ fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+ exit(1);
+ }
+
+ char *count_env = getenv("RPC_COUNT");
+ if(count_env != NULL){
+ count = atoi(count_env);
+ }
+
+ //jsl_set_debug(2);
+ // Comment out the next line to switch between the ordinary lock
+ // server and the RSM. In Lab 6, we disable the lock server and
+ // implement Paxos. In Lab 7, we will make the lock server use your
+ // RSM layer.
+#define RSM
+#ifdef RSM
+// You must comment out the next line once you are done with Step One.
+//#define STEP_ONE
+#ifdef STEP_ONE
+ rpcs server(atoi(argv[1]));
+ lock_server_cache_rsm ls;
+ server.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
+ server.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
+ server.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
+#else
+ rsm rsm(argv[1], argv[2]);
+ lock_server_cache_rsm ls(&rsm);
+ rsm.set_state_transfer((rsm_state_transfer *)&ls);
+ rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
+ rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
+ rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
+#endif // STEP_ONE
+#endif // RSM
+
+#ifndef RSM
+ lock_server_cache ls;
+ rpcs server(atoi(argv[1]), count);
+ server.reg(lock_protocol::stat, &ls, &lock_server_cache::stat);
+ server.reg(lock_protocol::acquire, &ls, &lock_server_cache::acquire);
+ server.reg(lock_protocol::release, &ls, &lock_server_cache::release);
+#endif
+
+
+ while(1)
+ sleep(1000);
+}
--- /dev/null
+//
+// Lock server tester
+//
+
+#include "lock_protocol.h"
+#include "lock_client.h"
+#include "rpc.h"
+#include "jsl_log.h"
+#include <arpa/inet.h>
+#include <vector>
+#include <stdlib.h>
+#include <stdio.h>
+#include "lang/verify.h"
+#include "lock_client_cache_rsm.h"
+#include "tprintf.h"
+
+char tprintf_thread_prefix = 'c';
+
+// must be >= 2
+int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
+std::string dst;
+lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt];
+lock_protocol::lockid_t a = 1;
+lock_protocol::lockid_t b = 2;
+lock_protocol::lockid_t c = 3;
+
+// check_grant() and check_release() check that the lock server
+// doesn't grant the same lock to both clients.
+// it assumes that lock names are distinct in the first byte.
+int ct[256];
+pthread_mutex_t count_mutex;
+
+void
+check_grant(lock_protocol::lockid_t lid)
+{
+ ScopedLock ml(&count_mutex);
+ int x = lid & 0xff;
+ if(ct[x] != 0){
+ fprintf(stderr, "error: server granted %016llx twice\n", lid);
+ fprintf(stdout, "error: server granted %016llx twice\n", lid);
+ exit(1);
+ }
+ ct[x] += 1;
+}
+
+void
+check_release(lock_protocol::lockid_t lid)
+{
+ ScopedLock ml(&count_mutex);
+ int x = lid & 0xff;
+ if(ct[x] != 1){
+ fprintf(stderr, "error: client released un-held lock %016llx\n", lid);
+ exit(1);
+ }
+ ct[x] -= 1;
+}
+
+void
+test1(void)
+{
+ tprintf ("acquire a release a acquire a release a\n");
+ lc[0]->acquire(a);
+ check_grant(a);
+ lc[0]->release(a);
+ check_release(a);
+ lc[0]->acquire(a);
+ check_grant(a);
+ lc[0]->release(a);
+ check_release(a);
+
+ tprintf ("acquire a acquire b release b release a\n");
+ lc[0]->acquire(a);
+ check_grant(a);
+ lc[0]->acquire(b);
+ check_grant(b);
+ lc[0]->release(b);
+ check_release(b);
+ lc[0]->release(a);
+ check_release(a);
+}
+
+void *
+test2(void *x)
+{
+ int i = * (int *) x;
+
+ tprintf ("test2: client %d acquire a release a\n", i);
+ lc[i]->acquire(a);
+ tprintf ("test2: client %d acquire done\n", i);
+ check_grant(a);
+ sleep(1);
+ tprintf ("test2: client %d release\n", i);
+ check_release(a);
+ lc[i]->release(a);
+ tprintf ("test2: client %d release done\n", i);
+ return 0;
+}
+
+void *
+test3(void *x)
+{
+ int i = * (int *) x;
+
+ tprintf ("test3: client %d acquire a release a concurrent\n", i);
+ for (int j = 0; j < 10; j++) {
+ lc[i]->acquire(a);
+ check_grant(a);
+ tprintf ("test3: client %d got lock\n", i);
+ check_release(a);
+ lc[i]->release(a);
+ }
+ return 0;
+}
+
+void *
+test4(void *x)
+{
+ int i = * (int *) x;
+
+ tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
+ for (int j = 0; j < 10; j++) {
+ lc[0]->acquire(a);
+ check_grant(a);
+ tprintf ("test4: thread %d on client 0 got lock\n", i);
+ check_release(a);
+ lc[0]->release(a);
+ }
+ return 0;
+}
+
+void *
+test5(void *x)
+{
+ int i = * (int *) x;
+
+ tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
+ for (int j = 0; j < 10; j++) {
+ if (i < 5) lc[0]->acquire(a);
+ else lc[1]->acquire(a);
+ check_grant(a);
+ tprintf ("test5: client %d got lock\n", i);
+ check_release(a);
+ if (i < 5) lc[0]->release(a);
+ else lc[1]->release(a);
+ }
+ return 0;
+}
+
+int
+main(int argc, char *argv[])
+{
+ int r;
+ pthread_t th[nt];
+ int test = 0;
+
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+ srandom(getpid());
+
+ //jsl_set_debug(2);
+
+ if(argc < 2) {
+ fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
+ exit(1);
+ }
+
+ dst = argv[1];
+
+ if (argc > 2) {
+ test = atoi(argv[2]);
+ if(test < 1 || test > 5){
+ tprintf("Test number must be between 1 and 5\n");
+ exit(1);
+ }
+ }
+
+ VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0);
+ tprintf("cache lock client\n");
+ for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst);
+
+ if(!test || test == 1){
+ test1();
+ }
+
+ if(!test || test == 2){
+ // test2
+ for (int i = 0; i < nt; i++) {
+ int *a = new int (i);
+ r = pthread_create(&th[i], NULL, test2, (void *) a);
+ VERIFY (r == 0);
+ }
+ for (int i = 0; i < nt; i++) {
+ pthread_join(th[i], NULL);
+ }
+ }
+
+ if(!test || test == 3){
+ tprintf("test 3\n");
+
+ // test3
+ for (int i = 0; i < nt; i++) {
+ int *a = new int (i);
+ r = pthread_create(&th[i], NULL, test3, (void *) a);
+ VERIFY (r == 0);
+ }
+ for (int i = 0; i < nt; i++) {
+ pthread_join(th[i], NULL);
+ }
+ }
+
+ if(!test || test == 4){
+ tprintf("test 4\n");
+
+ // test 4
+ for (int i = 0; i < 2; i++) {
+ int *a = new int (i);
+ r = pthread_create(&th[i], NULL, test4, (void *) a);
+ VERIFY (r == 0);
+ }
+ for (int i = 0; i < 2; i++) {
+ pthread_join(th[i], NULL);
+ }
+ }
+
+ if(!test || test == 5){
+ tprintf("test 5\n");
+
+ // test 5
+
+ for (int i = 0; i < nt; i++) {
+ int *a = new int (i);
+ r = pthread_create(&th[i], NULL, test5, (void *) a);
+ VERIFY (r == 0);
+ }
+ for (int i = 0; i < nt; i++) {
+ pthread_join(th[i], NULL);
+ }
+ }
+
+ tprintf ("%s: passed all tests successfully\n", argv[0]);
+
+}
--- /dev/null
+#include "paxos.h"
+#include <fstream>
+#include <iostream>
+
+// Paxos must maintain some durable state (i.e., that survives power
+// failures) to run Paxos correct. This module implements a log with
+// all durable state to run Paxos. Since the values chosen correspond
+// to views, the log contains all views since the beginning of time.
+
+log::log(acceptor *_acc, std::string _me)
+ : pxs (_acc)
+{
+ name = "paxos-" + _me + ".log";
+ logread();
+}
+
+void
+log::logread(void)
+{
+ std::ifstream from;
+ std::string type;
+ unsigned instance;
+
+ from.open(name.c_str());
+ printf ("logread\n");
+ while (from >> type) {
+ if (type == "done") {
+ std::string v;
+ from >> instance;
+ from.get();
+ getline(from, v);
+ pxs->values[instance] = v;
+ pxs->instance_h = instance;
+ printf ("logread: instance: %d w. v = %s\n", instance,
+ pxs->values[instance].c_str());
+ pxs->v_a.clear();
+ pxs->n_h.n = 0;
+ pxs->n_a.n = 0;
+ } else if (type == "propseen") {
+ from >> pxs->n_h.n;
+ from >> pxs->n_h.m;
+ printf("logread: high update: %d(%s)\n", pxs->n_h.n, pxs->n_h.m.c_str());
+ } else if (type == "accepted") {
+ std::string v;
+ from >> pxs->n_a.n;
+ from >> pxs->n_a.m;
+ from.get();
+ getline(from, v);
+ pxs->v_a = v;
+ printf("logread: prop update %d(%s) with v = %s\n", pxs->n_a.n,
+ pxs->n_a.m.c_str(), pxs->v_a.c_str());
+ } else {
+ printf("logread: unknown log record\n");
+ VERIFY(0);
+ }
+ }
+ from.close();
+}
+
+std::string
+log::dump()
+{
+ std::ifstream from;
+ std::string res;
+ std::string v;
+ from.open(name.c_str());
+ while (getline(from, v)) {
+ res = res + v + "\n";
+ }
+ from.close();
+ return res;
+}
+
+void
+log::restore(std::string s)
+{
+ std::ofstream f;
+ printf("restore: %s\n", s.c_str());
+ f.open(name.c_str(), std::ios::trunc);
+ f << s;
+ f.close();
+}
+
+// XXX should be an atomic operation
+void
+log::loginstance(unsigned instance, std::string v)
+{
+ std::ofstream f;
+ f.open(name.c_str(), std::ios::app);
+ f << "done";
+ f << " ";
+ f << instance;
+ f << " ";
+ f << v;
+ f << "\n";
+ f.close();
+}
+
+// an acceptor should call logprop(n_h) when it
+// receives a prepare to which it responds prepare_ok().
+void
+log::logprop(prop_t n_h)
+{
+ std::ofstream f;
+ f.open(name.c_str(), std::ios::app);
+ f << "propseen";
+ f << " ";
+ f << n_h.n;
+ f << " ";
+ f << n_h.m;
+ f << "\n";
+ f.close();
+}
+
+// an acceptor should call logaccept(n_a, v_a) when it
+// receives an accept RPC to which it replies accept_ok().
+void
+log::logaccept(prop_t n, std::string v)
+{
+ std::ofstream f;
+ f.open(name.c_str(), std::ios::app);
+ f << "accepted";
+ f << " ";
+ f << n.n;
+ f << " ";
+ f << n.m;
+ f << " ";
+ f << v;
+ f << "\n";
+ f.close();
+}
+
--- /dev/null
+#ifndef log_h
+#define log_h
+
+#include <string>
+#include <vector>
+
+
+class acceptor;
+
+class log {
+ private:
+ std::string name;
+ acceptor *pxs;
+ public:
+ log (acceptor*, std::string _me);
+ std::string dump();
+ void restore(std::string s);
+ void logread(void);
+ /* Log a committed paxos instance*/
+ void loginstance(unsigned instance, std::string v);
+ /* Log the highest proposal number that the local paxos acceptor has ever seen */
+ void logprop(prop_t n_h);
+ /* Log the proposal (proposal number and value) that the local paxos acceptor
+ accept has ever accepted */
+ void logaccept(prop_t n_a, std::string v);
+};
+
+#endif /* log_h */
--- /dev/null
+#include "mutex.h"
+#include "lang/verify.h"
+
+mutex::mutex() {
+ VERIFY(pthread_mutex_init(&m, NULL) == 0);
+}
+
+mutex::~mutex() {
+ VERIFY(pthread_mutex_destroy(&m) == 0);
+}
+
+void mutex::acquire() {
+ VERIFY(pthread_mutex_lock(&m) == 0);
+}
+
+void mutex::release() {
+ VERIFY(pthread_mutex_unlock(&m) == 0);
+}
+
+mutex::operator pthread_mutex_t *() {
+ return &m;
+}
+
+cond::cond() {
+ VERIFY(pthread_cond_init(&c, NULL) == 0);
+}
+
+cond::~cond() {
+ VERIFY(pthread_cond_destroy(&c) == 0);
+}
+
+void cond::wait(mutex &m) {
+ VERIFY(pthread_cond_wait(&c, m) == 0);
+}
+
+void cond::signal() {
+ VERIFY(pthread_cond_signal(&c) == 0);
+}
+
+void cond::broadcast() {
+ VERIFY(pthread_cond_broadcast(&c) == 0);
+}
--- /dev/null
+#ifndef mutex_h
+#define mutex_h
+
+#include <pthread.h>
+
+class mutex {
+ protected:
+ pthread_mutex_t m;
+ public:
+ mutex();
+ ~mutex();
+ void acquire();
+ void release();
+ operator pthread_mutex_t *();
+};
+
+class cond {
+ protected:
+ pthread_cond_t c;
+ public:
+ cond();
+ ~cond();
+ void wait(mutex &m);
+ void signal();
+ void broadcast();
+};
+
+#endif
--- /dev/null
+#include "paxos.h"
+#include "handle.h"
+// #include <signal.h>
+#include <stdio.h>
+#include "tprintf.h"
+#include "lang/verify.h"
+
+// This module implements the proposer and acceptor of the Paxos
+// distributed algorithm as described by Lamport's "Paxos Made
+// Simple". To kick off an instance of Paxos, the caller supplies a
+// list of nodes, a proposed value, and invokes the proposer. If the
+// majority of the nodes agree on the proposed value after running
+// this instance of Paxos, the acceptor invokes the upcall
+// paxos_commit to inform higher layers of the agreed value for this
+// instance.
+
+
+bool
+operator> (const prop_t &a, const prop_t &b)
+{
+ return (a.n > b.n || (a.n == b.n && a.m > b.m));
+}
+
+bool
+operator>= (const prop_t &a, const prop_t &b)
+{
+ return (a.n > b.n || (a.n == b.n && a.m >= b.m));
+}
+
+std::string
+print_members(const std::vector<std::string> &nodes)
+{
+ std::string s;
+ s.clear();
+ for (unsigned i = 0; i < nodes.size(); i++) {
+ s += nodes[i];
+ if (i < (nodes.size()-1))
+ s += ",";
+ }
+ return s;
+}
+
+bool isamember(std::string m, const std::vector<std::string> &nodes)
+{
+ for (unsigned i = 0; i < nodes.size(); i++) {
+ if (nodes[i] == m) return 1;
+ }
+ return 0;
+}
+
+bool
+proposer::isrunning()
+{
+ bool r;
+ ScopedLock ml(pxs_mutex);
+ r = !stable;
+ return r;
+}
+
+// check if the servers in l2 contains a majority of servers in l1
+bool
+proposer::majority(const std::vector<std::string> &l1,
+ const std::vector<std::string> &l2)
+{
+ unsigned n = 0;
+
+ for (unsigned i = 0; i < l1.size(); i++) {
+ if (isamember(l1[i], l2))
+ n++;
+ }
+ return n >= (l1.size() >> 1) + 1;
+}
+
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
+ std::string _me)
+ : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
+ stable (true)
+{
+ my_n.n = 0;
+ my_n.m = me;
+}
+
+void
+proposer::setn()
+{
+ my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+}
+
+bool
+proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
+{
+ std::vector<std::string> accepts;
+ std::vector<std::string> nodes;
+ std::string v;
+ bool r = false;
+
+ ScopedLock ml(pxs_mutex);
+ tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
+ print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+ if (!stable) { // already running proposer?
+ tprintf("proposer::run: already running\n");
+ return false;
+ }
+ stable = false;
+ setn();
+ accepts.clear();
+ v.clear();
+ if (prepare(instance, accepts, cur_nodes, v)) {
+
+ if (majority(cur_nodes, accepts)) {
+ tprintf("paxos::manager: received a majority of prepare responses\n");
+
+ if (v.size() == 0)
+ v = newv;
+
+ breakpoint1();
+
+ nodes = accepts;
+ accepts.clear();
+ accept(instance, accepts, nodes, v);
+
+ if (majority(cur_nodes, accepts)) {
+ tprintf("paxos::manager: received a majority of accept responses\n");
+
+ breakpoint2();
+
+ decide(instance, accepts, v);
+ r = true;
+ } else {
+ tprintf("paxos::manager: no majority of accept responses\n");
+ }
+ } else {
+ tprintf("paxos::manager: no majority of prepare responses\n");
+ }
+ } else {
+ tprintf("paxos::manager: prepare is rejected %d\n", stable);
+ }
+ stable = true;
+ return r;
+}
+
+// proposer::run() calls prepare to send prepare RPCs to nodes
+// and collect responses. if one of those nodes
+// replies with an oldinstance, return false.
+// otherwise fill in accepts with set of nodes that accepted,
+// set v to the v_a with the highest n_a, and return true.
+bool
+proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
+ std::vector<std::string> nodes,
+ std::string &v)
+{
+ struct paxos_protocol::preparearg arg = { instance, my_n };
+ struct paxos_protocol::prepareres res;
+ prop_t n_a = { 0, "" };
+ rpcc *r;
+ for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
+ handle h(*i);
+ if (!(r = h.safebind()))
+ continue;
+ int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
+ if (status == paxos_protocol::OK) {
+ if (res.oldinstance) {
+ tprintf("commiting old instance!\n");
+ acc->commit(instance, res.v_a);
+ return false;
+ }
+ if (res.accept) {
+ accepts.push_back(*i);
+ if (res.n_a >= n_a) {
+ tprintf("found a newer accepted proposal\n");
+ v = res.v_a;
+ n_a = res.n_a;
+ }
+ }
+ }
+ }
+ return true;
+}
+
+// run() calls this to send out accept RPCs to accepts.
+// fill in accepts with list of nodes that accepted.
+void
+proposer::accept(unsigned instance, std::vector<std::string> &accepts,
+ std::vector<std::string> nodes, std::string v)
+{
+ struct paxos_protocol::acceptarg arg = { instance, my_n, v };
+ rpcc *r;
+ for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
+ handle h(*i);
+ if (!(r = h.safebind()))
+ continue;
+ bool accept = false;
+ int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
+ if (status == paxos_protocol::OK) {
+ if (accept)
+ accepts.push_back(*i);
+ }
+ }
+}
+
+void
+proposer::decide(unsigned instance, std::vector<std::string> accepts,
+ std::string v)
+{
+ struct paxos_protocol::decidearg arg = { instance, v };
+ rpcc *r;
+ for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
+ handle h(*i);
+ if (!(r = h.safebind()))
+ continue;
+ int res = 0;
+ r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
+ }
+}
+
+acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
+ std::string _value)
+ : cfg(_cfg), me (_me), instance_h(0)
+{
+ n_h.n = 0;
+ n_h.m = me;
+ n_a.n = 0;
+ n_a.m = me;
+ v_a.clear();
+
+ l = new log (this, me);
+
+ if (instance_h == 0 && _first) {
+ values[1] = _value;
+ l->loginstance(1, _value);
+ instance_h = 1;
+ }
+
+ pxs = new rpcs(atoi(_me.c_str()));
+ pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
+ pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
+ pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
+}
+
+paxos_protocol::status
+acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
+ paxos_protocol::prepareres &r)
+{
+ ScopedLock ml(pxs_mutex);
+ r.oldinstance = false;
+ r.accept = false;
+ r.n_a = n_a;
+ r.v_a = v_a;
+ if (a.instance <= instance_h) {
+ r.oldinstance = true;
+ r.v_a = values[a.instance];
+ } else if (a.n > n_h) {
+ n_h = a.n;
+ l->logprop(n_h);
+ r.accept = true;
+ } else {
+ tprintf("I totally rejected this request. Ha.\n");
+ }
+ return paxos_protocol::OK;
+}
+
+paxos_protocol::status
+acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
+{
+ ScopedLock ml(pxs_mutex);
+ r = false;
+ if (a.n >= n_h) {
+ n_a = a.n;
+ v_a = a.v;
+ l->logaccept(n_a, v_a);
+ r = true;
+ }
+ return paxos_protocol::OK;
+}
+
+// the src argument is only for debug purpose
+paxos_protocol::status
+acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
+{
+ ScopedLock ml(pxs_mutex);
+ tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
+ a.instance, instance_h, v_a.c_str());
+ if (a.instance == instance_h + 1) {
+ VERIFY(v_a == a.v);
+ commit_wo(a.instance, v_a);
+ } else if (a.instance <= instance_h) {
+ // we are ahead ignore.
+ } else {
+ // we are behind
+ VERIFY(0);
+ }
+ return paxos_protocol::OK;
+}
+
+void
+acceptor::commit_wo(unsigned instance, std::string value)
+{
+ //assume pxs_mutex is held
+ tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+ if (instance > instance_h) {
+ tprintf("commit: highestaccepteinstance = %d\n", instance);
+ values[instance] = value;
+ l->loginstance(instance, value);
+ instance_h = instance;
+ n_h.n = 0;
+ n_h.m = me;
+ n_a.n = 0;
+ n_a.m = me;
+ v_a.clear();
+ if (cfg) {
+ pxs_mutex.release();
+ cfg->paxos_commit(instance, value);
+ pxs_mutex.acquire();
+ }
+ }
+}
+
+void
+acceptor::commit(unsigned instance, std::string value)
+{
+ ScopedLock ml(pxs_mutex);
+ commit_wo(instance, value);
+}
+
+std::string
+acceptor::dump()
+{
+ return l->dump();
+}
+
+void
+acceptor::restore(std::string s)
+{
+ l->restore(s);
+ l->logread();
+}
+
+
+
+// For testing purposes
+
+// Call this from your code between phases prepare and accept of proposer
+void
+proposer::breakpoint1()
+{
+ if (break1) {
+ tprintf("Dying at breakpoint 1!\n");
+ exit(1);
+ }
+}
+
+// Call this from your code between phases accept and decide of proposer
+void
+proposer::breakpoint2()
+{
+ if (break2) {
+ tprintf("Dying at breakpoint 2!\n");
+ exit(1);
+ }
+}
+
+void
+proposer::breakpoint(int b)
+{
+ if (b == 3) {
+ tprintf("Proposer: breakpoint 1\n");
+ break1 = true;
+ } else if (b == 4) {
+ tprintf("Proposer: breakpoint 2\n");
+ break2 = true;
+ }
+}
--- /dev/null
+#ifndef paxos_h
+#define paxos_h
+
+#include <string>
+#include <vector>
+#include "rpc.h"
+#include "paxos_protocol.h"
+#include "log.h"
+#include "mutex.h"
+
+
+class paxos_change {
+ public:
+ virtual void paxos_commit(unsigned instance, std::string v) = 0;
+ virtual ~paxos_change() {};
+};
+
+class acceptor {
+ private:
+ log *l;
+ rpcs *pxs;
+ paxos_change *cfg;
+ std::string me;
+ mutex pxs_mutex;
+
+ // Acceptor state
+ prop_t n_h; // number of the highest proposal seen in a prepare
+ prop_t n_a; // number of highest proposal accepted
+ std::string v_a; // value of highest proposal accepted
+ unsigned instance_h; // number of the highest instance we have decided
+ std::map<unsigned,std::string> values; // vals of each instance
+
+ void commit_wo(unsigned instance, std::string v);
+ paxos_protocol::status preparereq(std::string src,
+ paxos_protocol::preparearg a,
+ paxos_protocol::prepareres &r);
+ paxos_protocol::status acceptreq(std::string src,
+ paxos_protocol::acceptarg a, bool &r);
+ paxos_protocol::status decidereq(std::string src,
+ paxos_protocol::decidearg a, int &r);
+
+ friend class log;
+
+ public:
+ acceptor(class paxos_change *cfg, bool _first, std::string _me,
+ std::string _value);
+ ~acceptor() {};
+ void commit(unsigned instance, std::string v);
+ unsigned instance() { return instance_h; }
+ std::string value(unsigned instance) { return values[instance]; }
+ std::string dump();
+ void restore(std::string);
+ rpcs *get_rpcs() { return pxs; };
+ prop_t get_n_h() { return n_h; };
+ unsigned get_instance_h() { return instance_h; };
+};
+
+extern bool isamember(std::string m, const std::vector<std::string> &nodes);
+extern std::string print_members(const std::vector<std::string> &nodes);
+
+class proposer {
+ private:
+ log *l;
+ paxos_change *cfg;
+ acceptor *acc;
+ std::string me;
+ bool break1;
+ bool break2;
+
+ mutex pxs_mutex;
+
+ // Proposer state
+ bool stable;
+ prop_t my_n; // number of the last proposal used in this instance
+
+ void setn();
+ bool prepare(unsigned instance, std::vector<std::string> &accepts,
+ std::vector<std::string> nodes,
+ std::string &v);
+ void accept(unsigned instance, std::vector<std::string> &accepts,
+ std::vector<std::string> nodes, std::string v);
+ void decide(unsigned instance, std::vector<std::string> accepts,
+ std::string v);
+
+ void breakpoint1();
+ void breakpoint2();
+ bool majority(const std::vector<std::string> &l1, const std::vector<std::string> &l2);
+
+ friend class log;
+ public:
+ proposer(class paxos_change *cfg, class acceptor *_acceptor, std::string _me);
+ ~proposer() {};
+ bool run(int instance, std::vector<std::string> cnodes, std::string v);
+ bool isrunning();
+ void breakpoint(int b);
+};
+
+
+
+#endif /* paxos_h */
--- /dev/null
+#ifndef paxos_protocol_h
+#define paxos_protocol_h
+
+#include "rpc.h"
+
+struct prop_t {
+ unsigned n;
+ std::string m;
+};
+
+class paxos_protocol {
+ public:
+ enum xxstatus { OK, ERR };
+ typedef int status;
+ enum rpc_numbers {
+ preparereq = 0x11001,
+ acceptreq,
+ decidereq,
+ heartbeat,
+ };
+
+ struct preparearg {
+ unsigned instance;
+ prop_t n;
+ };
+
+ struct prepareres {
+ bool oldinstance;
+ bool accept;
+ prop_t n_a;
+ std::string v_a;
+ };
+
+ struct acceptarg {
+ unsigned instance;
+ prop_t n;
+ std::string v;
+ };
+
+ struct decidearg {
+ unsigned instance;
+ std::string v;
+ };
+
+};
+
+inline unmarshall &
+operator>>(unmarshall &u, prop_t &a)
+{
+ u >> a.n;
+ u >> a.m;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, prop_t a)
+{
+ m << a.n;
+ m << a.m;
+ return m;
+}
+
+inline unmarshall &
+operator>>(unmarshall &u, paxos_protocol::preparearg &a)
+{
+ u >> a.instance;
+ u >> a.n;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, paxos_protocol::preparearg a)
+{
+ m << a.instance;
+ m << a.n;
+ return m;
+}
+
+inline unmarshall &
+operator>>(unmarshall &u, paxos_protocol::prepareres &r)
+{
+ u >> r.oldinstance;
+ u >> r.accept;
+ u >> r.n_a;
+ u >> r.v_a;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, paxos_protocol::prepareres r)
+{
+ m << r.oldinstance;
+ m << r.accept;
+ m << r.n_a;
+ m << r.v_a;
+ return m;
+}
+
+inline unmarshall &
+operator>>(unmarshall &u, paxos_protocol::acceptarg &a)
+{
+ u >> a.instance;
+ u >> a.n;
+ u >> a.v;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, paxos_protocol::acceptarg a)
+{
+ m << a.instance;
+ m << a.n;
+ m << a.v;
+ return m;
+}
+
+inline unmarshall &
+operator>>(unmarshall &u, paxos_protocol::decidearg &a)
+{
+ u >> a.instance;
+ u >> a.v;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, paxos_protocol::decidearg a)
+{
+ m << a.instance;
+ m << a.v;
+ return m;
+}
+
+#endif
--- /dev/null
+#include "mutex.h"
+#include <stdlib.h>
+#include "rpc/slock.h"
+#include <stdint.h>
+
+static mutex rand_m;
+
+void srand_safe(unsigned int seed) {
+ ScopedLock s(rand_m);
+ srandom(seed);
+}
+
+// RAND_MAX is guaranteed to be at least 32767
+// but math with rand() is annoying.
+// Here's a canned solution from
+// http://www.azillionmonkeys.com/qed/random.html
+// which gives uniform numbers in [0, r)
+
+#define RS_SCALE (1.0 / (1.0 + RAND_MAX))
+
+double drand (void) {
+ double d;
+ do {
+ d = (((random() * RS_SCALE) + random()) * RS_SCALE + random()) * RS_SCALE;
+ } while (d >= 1); /* Round off */
+ return d;
+}
+
+#define irand(x) ((unsigned int) ((x) * drand()))
+
+// use this to construct a 32-bit thread-safe RNG
+#define RAND32 ((uint32_t)irand(1ul<<32))
+uint32_t rand32_safe() {
+ ScopedLock s(rand_m);
+ return RAND32;
+}
--- /dev/null
+#ifndef random_h
+#define random_h
+
+void srand_safe(unsigned int seed);
+uint32_t rand32_safe();
+
+#endif
--- /dev/null
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <netinet/tcp.h>
+#include <errno.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include "method_thread.h"
+#include "connection.h"
+#include "slock.h"
+#include "pollmgr.h"
+#include "jsl_log.h"
+#include "gettime.h"
+#include "lang/verify.h"
+
+#define MAX_PDU (10<<20) //maximum PDF is 10M
+
+
+connection::connection(chanmgr *m1, int f1, int l1)
+: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
+{
+
+ int flags = fcntl(fd_, F_GETFL, NULL);
+ flags |= O_NONBLOCK;
+ fcntl(fd_, F_SETFL, flags);
+
+ signal(SIGPIPE, SIG_IGN);
+ VERIFY(pthread_mutex_init(&m_,0)==0);
+ VERIFY(pthread_mutex_init(&ref_m_,0)==0);
+ VERIFY(pthread_cond_init(&send_wait_,0)==0);
+ VERIFY(pthread_cond_init(&send_complete_,0)==0);
+
+ VERIFY(gettimeofday(&create_time_, NULL) == 0);
+
+ PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+}
+
+connection::~connection()
+{
+ VERIFY(dead_);
+ VERIFY(pthread_mutex_destroy(&m_)== 0);
+ VERIFY(pthread_mutex_destroy(&ref_m_)== 0);
+ VERIFY(pthread_cond_destroy(&send_wait_) == 0);
+ VERIFY(pthread_cond_destroy(&send_complete_) == 0);
+ if (rpdu_.buf)
+ free(rpdu_.buf);
+ VERIFY(!wpdu_.buf);
+ close(fd_);
+}
+
+void
+connection::incref()
+{
+ ScopedLock ml(&ref_m_);
+ refno_++;
+}
+
+bool
+connection::isdead()
+{
+ ScopedLock ml(&m_);
+ return dead_;
+}
+
+void
+connection::closeconn()
+{
+ {
+ ScopedLock ml(&m_);
+ if (!dead_) {
+ dead_ = true;
+ shutdown(fd_,SHUT_RDWR);
+ }else{
+ return;
+ }
+ }
+ //after block_remove_fd, select will never wait on fd_
+ //and no callbacks will be active
+ PollMgr::Instance()->block_remove_fd(fd_);
+}
+
+void
+connection::decref()
+{
+ VERIFY(pthread_mutex_lock(&ref_m_)==0);
+ refno_ --;
+ VERIFY(refno_>=0);
+ if (refno_==0) {
+ VERIFY(pthread_mutex_lock(&m_)==0);
+ if (dead_) {
+ VERIFY(pthread_mutex_unlock(&ref_m_)==0);
+ VERIFY(pthread_mutex_unlock(&m_)==0);
+ delete this;
+ return;
+ }
+ VERIFY(pthread_mutex_unlock(&m_)==0);
+ }
+ pthread_mutex_unlock(&ref_m_);
+}
+
+int
+connection::ref()
+{
+ ScopedLock rl(&ref_m_);
+ return refno_;
+}
+
+int
+connection::compare(connection *another)
+{
+ if (create_time_.tv_sec > another->create_time_.tv_sec)
+ return 1;
+ if (create_time_.tv_sec < another->create_time_.tv_sec)
+ return -1;
+ if (create_time_.tv_usec > another->create_time_.tv_usec)
+ return 1;
+ if (create_time_.tv_usec < another->create_time_.tv_usec)
+ return -1;
+ return 0;
+}
+
+bool
+connection::send(char *b, int sz)
+{
+ ScopedLock ml(&m_);
+ waiters_++;
+ while (!dead_ && wpdu_.buf) {
+ VERIFY(pthread_cond_wait(&send_wait_, &m_)==0);
+ }
+ waiters_--;
+ if (dead_) {
+ return false;
+ }
+ wpdu_.buf = b;
+ wpdu_.sz = sz;
+ wpdu_.solong = 0;
+
+ if (lossy_) {
+ if ((random()%100) < lossy_) {
+ jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
+ shutdown(fd_,SHUT_RDWR);
+ }
+ }
+
+ if (!writepdu()) {
+ dead_ = true;
+ VERIFY(pthread_mutex_unlock(&m_) == 0);
+ PollMgr::Instance()->block_remove_fd(fd_);
+ VERIFY(pthread_mutex_lock(&m_) == 0);
+ }else{
+ if (wpdu_.solong == wpdu_.sz) {
+ }else{
+ //should be rare to need to explicitly add write callback
+ PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+ while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
+ VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0);
+ }
+ }
+ }
+ bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
+ wpdu_.solong = wpdu_.sz = 0;
+ wpdu_.buf = NULL;
+ if (waiters_ > 0)
+ pthread_cond_broadcast(&send_wait_);
+ return ret;
+}
+
+//fd_ is ready to be written
+void
+connection::write_cb(int s)
+{
+ ScopedLock ml(&m_);
+ VERIFY(!dead_);
+ VERIFY(fd_ == s);
+ if (wpdu_.sz == 0) {
+ PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+ return;
+ }
+ if (!writepdu()) {
+ PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+ dead_ = true;
+ }else{
+ VERIFY(wpdu_.solong >= 0);
+ if (wpdu_.solong < wpdu_.sz) {
+ return;
+ }
+ }
+ pthread_cond_signal(&send_complete_);
+}
+
+//fd_ is ready to be read
+void
+connection::read_cb(int s)
+{
+ ScopedLock ml(&m_);
+ VERIFY(fd_ == s);
+ if (dead_) {
+ return;
+ }
+
+ bool succ = true;
+ if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
+ succ = readpdu();
+ }
+
+ if (!succ) {
+ PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+ dead_ = true;
+ pthread_cond_signal(&send_complete_);
+ }
+
+ if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
+ if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
+ //chanmgr has successfully consumed the pdu
+ rpdu_.buf = NULL;
+ rpdu_.sz = rpdu_.solong = 0;
+ }
+ }
+}
+
+bool
+connection::writepdu()
+{
+ VERIFY(wpdu_.solong >= 0);
+ if (wpdu_.solong == wpdu_.sz)
+ return true;
+
+ if (wpdu_.solong == 0) {
+ int sz = htonl(wpdu_.sz);
+ bcopy(&sz,wpdu_.buf,sizeof(sz));
+ }
+ int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
+ wpdu_.solong = -1;
+ wpdu_.sz = 0;
+ }
+ return (errno == EAGAIN);
+ }
+ wpdu_.solong += n;
+ return true;
+}
+
+bool
+connection::readpdu()
+{
+ if (!rpdu_.sz) {
+ int sz, sz1;
+ int n = read(fd_, &sz1, sizeof(sz1));
+
+ if (n == 0) {
+ return false;
+ }
+
+ if (n < 0) {
+ VERIFY(errno!=EAGAIN);
+ return false;
+ }
+
+ if (n >0 && n!= sizeof(sz)) {
+ jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
+ return false;
+ }
+
+ sz = ntohl(sz1);
+
+ if (sz > MAX_PDU) {
+ char *tmpb = (char *)&sz1;
+ jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
+ sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
+ return false;
+ }
+
+ rpdu_.sz = sz;
+ VERIFY(rpdu_.buf == NULL);
+ rpdu_.buf = (char *)malloc(sz+sizeof(sz));
+ VERIFY(rpdu_.buf);
+ bcopy(&sz1,rpdu_.buf,sizeof(sz));
+ rpdu_.solong = sizeof(sz);
+ }
+
+ int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+ if (n <= 0) {
+ if (errno == EAGAIN)
+ return true;
+ if (rpdu_.buf)
+ free(rpdu_.buf);
+ rpdu_.buf = NULL;
+ rpdu_.sz = rpdu_.solong = 0;
+ return (errno == EAGAIN);
+ }
+ rpdu_.solong += n;
+ return true;
+}
+
+tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
+: mgr_(m1), lossy_(lossytest)
+{
+
+ VERIFY(pthread_mutex_init(&m_,NULL) == 0);
+
+ struct sockaddr_in sin;
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(port);
+
+ tcp_ = socket(AF_INET, SOCK_STREAM, 0);
+ if(tcp_ < 0){
+ perror("tcpsconn::tcpsconn accept_loop socket:");
+ VERIFY(0);
+ }
+
+ int yes = 1;
+ setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
+ setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+
+ if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){
+ perror("accept_loop tcp bind:");
+ VERIFY(0);
+ }
+
+ if(listen(tcp_, 1000) < 0) {
+ perror("tcpsconn::tcpsconn listen:");
+ VERIFY(0);
+ }
+
+ socklen_t addrlen = sizeof(sin);
+ VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
+ port_ = ntohs(sin.sin_port);
+
+ jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
+ sin.sin_port);
+
+ if (pipe(pipe_) < 0) {
+ perror("accept_loop pipe:");
+ VERIFY(0);
+ }
+
+ int flags = fcntl(pipe_[0], F_GETFL, NULL);
+ flags |= O_NONBLOCK;
+ fcntl(pipe_[0], F_SETFL, flags);
+
+ VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0);
+}
+
+tcpsconn::~tcpsconn()
+{
+ VERIFY(close(pipe_[1]) == 0);
+ VERIFY(pthread_join(th_, NULL) == 0);
+
+ //close all the active connections
+ std::map<int, connection *>::iterator i;
+ for (i = conns_.begin(); i != conns_.end(); i++) {
+ i->second->closeconn();
+ i->second->decref();
+ }
+}
+
+void
+tcpsconn::process_accept()
+{
+ sockaddr_in sin;
+ socklen_t slen = sizeof(sin);
+ int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
+ if (s1 < 0) {
+ perror("tcpsconn::accept_conn error");
+ pthread_exit(NULL);
+ }
+
+ jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
+ s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
+ connection *ch = new connection(mgr_, s1, lossy_);
+
+ // garbage collect all dead connections with refcount of 1
+ std::map<int, connection *>::iterator i;
+ for (i = conns_.begin(); i != conns_.end();) {
+ if (i->second->isdead() && i->second->ref() == 1) {
+ jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
+ i->second->channo());
+ i->second->decref();
+ // Careful not to reuse i right after erase. (i++) will
+ // be evaluated before the erase call because in C++,
+ // there is a sequence point before a function call.
+ // See http://en.wikipedia.org/wiki/Sequence_point.
+ conns_.erase(i++);
+ } else
+ ++i;
+ }
+
+ conns_[ch->channo()] = ch;
+}
+
+void
+tcpsconn::accept_conn()
+{
+ fd_set rfds;
+ int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
+
+ while (1) {
+ FD_ZERO(&rfds);
+ FD_SET(pipe_[0], &rfds);
+ FD_SET(tcp_, &rfds);
+
+ int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+ if (ret < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ perror("accept_conn select:");
+ jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
+ VERIFY(0);
+ }
+ }
+
+ if (FD_ISSET(pipe_[0], &rfds)) {
+ close(pipe_[0]);
+ close(tcp_);
+ return;
+ }
+ else if (FD_ISSET(tcp_, &rfds)) {
+ process_accept();
+ } else {
+ VERIFY(0);
+ }
+ }
+}
+
+connection *
+connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
+{
+ int s= socket(AF_INET, SOCK_STREAM, 0);
+ int yes = 1;
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+ if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+ jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
+ inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
+ close(s);
+ return NULL;
+ }
+ jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n",
+ s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
+ return new connection(mgr, s, lossy);
+}
+
+
--- /dev/null
+#ifndef connection_h
+#define connection_h 1
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <cstddef>
+
+#include <map>
+
+#include "pollmgr.h"
+
+class connection;
+
+class chanmgr {
+ public:
+ virtual bool got_pdu(connection *c, char *b, int sz) = 0;
+ virtual ~chanmgr() {}
+};
+
+class connection : public aio_callback {
+ public:
+ struct charbuf {
+ charbuf(): buf(NULL), sz(0), solong(0) {}
+ charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
+ char *buf;
+ int sz;
+ int solong; //amount of bytes written or read so far
+ };
+
+ connection(chanmgr *m1, int f1, int lossytest=0);
+ ~connection();
+
+ int channo() { return fd_; }
+ bool isdead();
+ void closeconn();
+
+ bool send(char *b, int sz);
+ void write_cb(int s);
+ void read_cb(int s);
+
+ void incref();
+ void decref();
+ int ref();
+
+ int compare(connection *another);
+ private:
+
+ bool readpdu();
+ bool writepdu();
+
+ chanmgr *mgr_;
+ const int fd_;
+ bool dead_;
+
+ charbuf wpdu_;
+ charbuf rpdu_;
+
+ struct timeval create_time_;
+
+ int waiters_;
+ int refno_;
+ const int lossy_;
+
+ pthread_mutex_t m_;
+ pthread_mutex_t ref_m_;
+ pthread_cond_t send_complete_;
+ pthread_cond_t send_wait_;
+};
+
+class tcpsconn {
+ public:
+ tcpsconn(chanmgr *m1, int port, int lossytest=0);
+ ~tcpsconn();
+ inline int port() { return port_; }
+ void accept_conn();
+ private:
+ int port_;
+ pthread_mutex_t m_;
+ pthread_t th_;
+ int pipe_[2];
+
+ int tcp_; //file desciptor for accepting connection
+ chanmgr *mgr_;
+ int lossy_;
+ std::map<int, connection *> conns_;
+
+ void process_accept();
+};
+
+struct bundle {
+ bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
+ chanmgr *mgr;
+ int tcp;
+ int lossy;
+};
+
+void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0);
+connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
+#endif
--- /dev/null
+#ifndef fifo_h
+#define fifo_h
+
+// fifo template
+// blocks enq() and deq() when queue is FULL or EMPTY
+
+#include <errno.h>
+#include <list>
+#include <sys/time.h>
+#include <time.h>
+#include <errno.h>
+#include "slock.h"
+#include "lang/verify.h"
+
+template<class T>
+class fifo {
+ public:
+ fifo(int m=0);
+ ~fifo();
+ bool enq(T, bool blocking=true);
+ void deq(T *);
+ bool size();
+
+ private:
+ std::list<T> q_;
+ pthread_mutex_t m_;
+ pthread_cond_t non_empty_c_; // q went non-empty
+ pthread_cond_t has_space_c_; // q is not longer overfull
+ unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit
+};
+
+template<class T>
+fifo<T>::fifo(int limit) : max_(limit)
+{
+ VERIFY(pthread_mutex_init(&m_, 0) == 0);
+ VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0);
+ VERIFY(pthread_cond_init(&has_space_c_, 0) == 0);
+}
+
+template<class T>
+fifo<T>::~fifo()
+{
+ //fifo is to be deleted only when no threads are using it!
+ VERIFY(pthread_mutex_destroy(&m_)==0);
+ VERIFY(pthread_cond_destroy(&non_empty_c_) == 0);
+ VERIFY(pthread_cond_destroy(&has_space_c_) == 0);
+}
+
+template<class T> bool
+fifo<T>::size()
+{
+ ScopedLock ml(&m_);
+ return q_.size();
+}
+
+template<class T> bool
+fifo<T>::enq(T e, bool blocking)
+{
+ ScopedLock ml(&m_);
+ while (1) {
+ if (!max_ || q_.size() < max_) {
+ q_.push_back(e);
+ break;
+ }
+ if (blocking)
+ VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0);
+ else
+ return false;
+ }
+ VERIFY(pthread_cond_signal(&non_empty_c_) == 0);
+ return true;
+}
+
+template<class T> void
+fifo<T>::deq(T *e)
+{
+ ScopedLock ml(&m_);
+
+ while(1) {
+ if(q_.empty()){
+ VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0);
+ } else {
+ *e = q_.front();
+ q_.pop_front();
+ if (max_ && q_.size() < max_) {
+ VERIFY(pthread_cond_signal(&has_space_c_)==0);
+ }
+ break;
+ }
+ }
+ return;
+}
+
+#endif
--- /dev/null
+#include "jsl_log.h"
+
+int JSL_DEBUG_LEVEL = 0;
+void
+jsl_set_debug(int level) {
+ JSL_DEBUG_LEVEL = level;
+}
+
+
--- /dev/null
+#ifndef __JSL_LOG_H__
+#define __JSL_LOG_H__ 1
+
+enum dbcode {
+ JSL_DBG_OFF = 0,
+ JSL_DBG_1 = 1, // Critical
+ JSL_DBG_2 = 2, // Error
+ JSL_DBG_3 = 3, // Info
+ JSL_DBG_4 = 4, // Debugging
+};
+
+extern int JSL_DEBUG_LEVEL;
+
+#define jsl_log(level,...) \
+ do { \
+ if(JSL_DEBUG_LEVEL < abs(level)) \
+ {;} \
+ else { \
+ { printf(__VA_ARGS__);} \
+ } \
+ } while(0)
+
+void jsl_set_debug(int level);
+
+#endif // __JSL_LOG_H__
--- /dev/null
+#ifndef marshall_h
+#define marshall_h
+
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <map>
+#include <stdlib.h>
+#include <string.h>
+#include <cstddef>
+#include <inttypes.h>
+#include "lang/verify.h"
+#include "lang/algorithm.h"
+
+struct req_header {
+ req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0):
+ xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
+ int xid;
+ int proc;
+ unsigned int clt_nonce;
+ unsigned int srv_nonce;
+ int xid_rep;
+};
+
+struct reply_header {
+ reply_header(int x=0, int r=0): xid(x), ret(r) {}
+ int xid;
+ int ret;
+};
+
+typedef uint64_t rpc_checksum_t;
+typedef int rpc_sz_t;
+
+enum {
+ //size of initial buffer allocation
+ DEFAULT_RPC_SZ = 1024,
+#if RPC_CHECKSUMMING
+ //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
+ RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t)
+#else
+ RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t)
+#endif
+};
+
+class marshall {
+ private:
+ char *_buf; // Base of the raw bytes buffer (dynamically readjusted)
+ int _capa; // Capacity of the buffer
+ int _ind; // Read/write head position
+
+ public:
+ marshall() {
+ _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
+ VERIFY(_buf);
+ _capa = DEFAULT_RPC_SZ;
+ _ind = RPC_HEADER_SZ;
+ }
+
+ ~marshall() {
+ if (_buf)
+ free(_buf);
+ }
+
+ int size() { return _ind;}
+ char *cstr() { return _buf;}
+
+ void rawbyte(unsigned char);
+ void rawbytes(const char *, int);
+
+ // Return the current content (excluding header) as a string
+ std::string get_content() {
+ return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ);
+ }
+
+ // Return the current content (excluding header) as a string
+ std::string str() {
+ return get_content();
+ }
+
+ void pack(int i);
+
+ void pack_req_header(const req_header &h) {
+ int saved_sz = _ind;
+ //leave the first 4-byte empty for channel to fill size of pdu
+ _ind = sizeof(rpc_sz_t);
+#if RPC_CHECKSUMMING
+ _ind += sizeof(rpc_checksum_t);
+#endif
+ pack(h.xid);
+ pack(h.proc);
+ pack((int)h.clt_nonce);
+ pack((int)h.srv_nonce);
+ pack(h.xid_rep);
+ _ind = saved_sz;
+ }
+
+ void pack_reply_header(const reply_header &h) {
+ int saved_sz = _ind;
+ //leave the first 4-byte empty for channel to fill size of pdu
+ _ind = sizeof(rpc_sz_t);
+#if RPC_CHECKSUMMING
+ _ind += sizeof(rpc_checksum_t);
+#endif
+ pack(h.xid);
+ pack(h.ret);
+ _ind = saved_sz;
+ }
+
+ void take_buf(char **b, int *s) {
+ *b = _buf;
+ *s = _ind;
+ _buf = NULL;
+ _ind = 0;
+ return;
+ }
+};
+marshall& operator<<(marshall &, bool);
+marshall& operator<<(marshall &, unsigned int);
+marshall& operator<<(marshall &, int);
+marshall& operator<<(marshall &, unsigned char);
+marshall& operator<<(marshall &, char);
+marshall& operator<<(marshall &, unsigned short);
+marshall& operator<<(marshall &, short);
+marshall& operator<<(marshall &, unsigned long long);
+marshall& operator<<(marshall &, const std::string &);
+
+class unmarshall {
+ private:
+ char *_buf;
+ int _sz;
+ int _ind;
+ bool _ok;
+ public:
+ unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {}
+ unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {}
+ unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false)
+ {
+ //take the content which does not exclude a RPC header from a string
+ take_content(s);
+ }
+ ~unmarshall() {
+ if (_buf) free(_buf);
+ }
+
+ //take contents from another unmarshall object
+ void take_in(unmarshall &another);
+
+ //take the content which does not exclude a RPC header from a string
+ void take_content(const std::string &s) {
+ _sz = s.size()+RPC_HEADER_SZ;
+ _buf = (char *)realloc(_buf,_sz);
+ VERIFY(_buf);
+ _ind = RPC_HEADER_SZ;
+ memcpy(_buf+_ind, s.data(), s.size());
+ _ok = true;
+ }
+
+ bool ok() { return _ok; }
+ char *cstr() { return _buf;}
+ bool okdone();
+ unsigned int rawbyte();
+ void rawbytes(std::string &s, unsigned int n);
+
+ int ind() { return _ind;}
+ int size() { return _sz;}
+ void unpack(int *); //non-const ref
+ void take_buf(char **b, int *sz) {
+ *b = _buf;
+ *sz = _sz;
+ _sz = _ind = 0;
+ _buf = NULL;
+ }
+
+ void unpack_req_header(req_header *h) {
+ //the first 4-byte is for channel to fill size of pdu
+ _ind = sizeof(rpc_sz_t);
+#if RPC_CHECKSUMMING
+ _ind += sizeof(rpc_checksum_t);
+#endif
+ unpack(&h->xid);
+ unpack(&h->proc);
+ unpack((int *)&h->clt_nonce);
+ unpack((int *)&h->srv_nonce);
+ unpack(&h->xid_rep);
+ _ind = RPC_HEADER_SZ;
+ }
+
+ void unpack_reply_header(reply_header *h) {
+ //the first 4-byte is for channel to fill size of pdu
+ _ind = sizeof(rpc_sz_t);
+#if RPC_CHECKSUMMING
+ _ind += sizeof(rpc_checksum_t);
+#endif
+ unpack(&h->xid);
+ unpack(&h->ret);
+ _ind = RPC_HEADER_SZ;
+ }
+};
+
+unmarshall& operator>>(unmarshall &, bool &);
+unmarshall& operator>>(unmarshall &, unsigned char &);
+unmarshall& operator>>(unmarshall &, char &);
+unmarshall& operator>>(unmarshall &, unsigned short &);
+unmarshall& operator>>(unmarshall &, short &);
+unmarshall& operator>>(unmarshall &, unsigned int &);
+unmarshall& operator>>(unmarshall &, int &);
+unmarshall& operator>>(unmarshall &, unsigned long long &);
+unmarshall& operator>>(unmarshall &, std::string &);
+
+template <class C> marshall &
+operator<<(marshall &m, std::vector<C> v)
+{
+ m << (unsigned int) v.size();
+ for(unsigned i = 0; i < v.size(); i++)
+ m << v[i];
+ return m;
+}
+
+template <class C> unmarshall &
+operator>>(unmarshall &u, std::vector<C> &v)
+{
+ unsigned n;
+ u >> n;
+ for(unsigned i = 0; i < n; i++){
+ C z;
+ u >> z;
+ v.push_back(z);
+ }
+ return u;
+}
+
+template <class A, class B> marshall &
+operator<<(marshall &m, const std::map<A,B> &d) {
+ typename std::map<A,B>::const_iterator i;
+
+ m << (unsigned int) d.size();
+
+ for (i = d.begin(); i != d.end(); i++) {
+ m << i->first << i->second;
+ }
+ return m;
+}
+
+template <class A, class B> unmarshall &
+operator>>(unmarshall &u, std::map<A,B> &d) {
+ unsigned int n;
+ u >> n;
+
+ d.clear();
+
+ for (unsigned int lcv = 0; lcv < n; lcv++) {
+ A a;
+ B b;
+ u >> a >> b;
+ d[a] = b;
+ }
+ return u;
+}
+
+#endif
--- /dev/null
+#ifndef method_thread_h
+#define method_thread_h
+
+// method_thread(): start a thread that runs an object method.
+// returns a pthread_t on success, and zero on error.
+
+#include <pthread.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include "lang/verify.h"
+
+static pthread_t
+method_thread_parent(void *(*fn)(void *), void *arg, bool detach)
+{
+ pthread_t th;
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ // set stack size to 100K, so we don't run out of memory
+ pthread_attr_setstacksize(&attr, 100*1024);
+ int err = pthread_create(&th, &attr, fn, arg);
+ pthread_attr_destroy(&attr);
+ if (err != 0) {
+ fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err));
+ exit(1);
+ }
+
+ if (detach) {
+ // don't keep thread state around after exit, to avoid
+ // running out of threads. set detach==false if you plan
+ // to pthread_join.
+ VERIFY(pthread_detach(th) == 0);
+ }
+
+ return th;
+}
+
+static void
+method_thread_child()
+{
+ // defer pthread_cancel() by default. check explicitly by
+ // enabling then pthread_testcancel().
+ int oldstate, oldtype;
+ VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0);
+ VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0);
+}
+
+template <class C> pthread_t
+method_thread(C *o, bool detach, void (C::*m)())
+{
+ class XXX {
+ public:
+ C *o;
+ void (C::*m)();
+ static void *yyy(void *vvv) {
+ XXX *x = (XXX*)vvv;
+ C *o = x->o;
+ void (C::*m)() = x->m;
+ delete x;
+ method_thread_child();
+ (o->*m)();
+ return 0;
+ }
+ };
+ XXX *x = new XXX;
+ x->o = o;
+ x->m = m;
+ return method_thread_parent(&XXX::yyy, (void *) x, detach);
+}
+
+template <class C, class A> pthread_t
+method_thread(C *o, bool detach, void (C::*m)(A), A a)
+{
+ class XXX {
+ public:
+ C *o;
+ void (C::*m)(A a);
+ A a;
+ static void *yyy(void *vvv) {
+ XXX *x = (XXX*)vvv;
+ C *o = x->o;
+ void (C::*m)(A ) = x->m;
+ A a = x->a;
+ delete x;
+ method_thread_child();
+ (o->*m)(a);
+ return 0;
+ }
+ };
+ XXX *x = new XXX;
+ x->o = o;
+ x->m = m;
+ x->a = a;
+ return method_thread_parent(&XXX::yyy, (void *) x, detach);
+}
+
+namespace {
+ // ~xavid: this causes a bizzare compile error on OS X.5 when
+ // it's declared in the function, so I moved it out here.
+ template <class C, class A1, class A2>
+ class XXX {
+ public:
+ C *o;
+ void (C::*m)(A1 a1, A2 a2);
+ A1 a1;
+ A2 a2;
+ static void *yyy(void *vvv) {
+ XXX *x = (XXX*)vvv;
+ C *o = x->o;
+ void (C::*m)(A1 , A2 ) = x->m;
+ A1 a1 = x->a1;
+ A2 a2 = x->a2;
+ delete x;
+ method_thread_child();
+ (o->*m)(a1, a2);
+ return 0;
+ }
+ };
+}
+
+template <class C, class A1, class A2> pthread_t
+method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2)
+{
+ XXX<C,A1,A2> *x = new XXX<C,A1,A2>;
+ x->o = o;
+ x->m = m;
+ x->a1 = a1;
+ x->a2 = a2;
+ return method_thread_parent(&XXX<C,A1,A2>::yyy, (void *) x, detach);
+}
+
+template <class C, class A1, class A2, class A3> pthread_t
+method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3)
+{
+ class XXX {
+ public:
+ C *o;
+ void (C::*m)(A1 a1, A2 a2, A3 a3);
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ static void *yyy(void *vvv) {
+ XXX *x = (XXX*)vvv;
+ C *o = x->o;
+ void (C::*m)(A1 , A2 , A3 ) = x->m;
+ A1 a1 = x->a1;
+ A2 a2 = x->a2;
+ A3 a3 = x->a3;
+ delete x;
+ method_thread_child();
+ (o->*m)(a1, a2, a3);
+ return 0;
+ }
+ };
+ XXX *x = new XXX;
+ x->o = o;
+ x->m = m;
+ x->a1 = a1;
+ x->a2 = a2;
+ x->a3 = a3;
+ return method_thread_parent(&XXX::yyy, (void *) x, detach);
+}
+
+#endif
--- /dev/null
+#include <sys/time.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "slock.h"
+#include "jsl_log.h"
+#include "method_thread.h"
+#include "lang/verify.h"
+#include "pollmgr.h"
+
+PollMgr *PollMgr::instance = NULL;
+static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
+
+void
+PollMgrInit()
+{
+ PollMgr::instance = new PollMgr();
+}
+
+PollMgr *
+PollMgr::Instance()
+{
+ pthread_once(&pollmgr_is_initialized, PollMgrInit);
+ return instance;
+}
+
+PollMgr::PollMgr() : pending_change_(false)
+{
+ bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
+ aio_ = new SelectAIO();
+ //aio_ = new EPollAIO();
+
+ VERIFY(pthread_mutex_init(&m_, NULL) == 0);
+ VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
+ VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
+}
+
+PollMgr::~PollMgr()
+{
+ //never kill me!!!
+ VERIFY(0);
+}
+
+void
+PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
+{
+ VERIFY(fd < MAX_POLL_FDS);
+
+ ScopedLock ml(&m_);
+ aio_->watch_fd(fd, flag);
+
+ VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
+ callbacks_[fd] = ch;
+}
+
+//remove all callbacks related to fd
+//the return guarantees that callbacks related to fd
+//will never be called again
+void
+PollMgr::block_remove_fd(int fd)
+{
+ ScopedLock ml(&m_);
+ aio_->unwatch_fd(fd, CB_RDWR);
+ pending_change_ = true;
+ VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
+ callbacks_[fd] = NULL;
+}
+
+void
+PollMgr::del_callback(int fd, poll_flag flag)
+{
+ ScopedLock ml(&m_);
+ if (aio_->unwatch_fd(fd, flag)) {
+ callbacks_[fd] = NULL;
+ }
+}
+
+bool
+PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
+{
+ ScopedLock ml(&m_);
+ if (!callbacks_[fd] || callbacks_[fd]!=c)
+ return false;
+
+ return aio_->is_watched(fd, flag);
+}
+
+void
+PollMgr::wait_loop()
+{
+
+ std::vector<int> readable;
+ std::vector<int> writable;
+
+ while (1) {
+ {
+ ScopedLock ml(&m_);
+ if (pending_change_) {
+ pending_change_ = false;
+ VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
+ }
+ }
+ readable.clear();
+ writable.clear();
+ aio_->wait_ready(&readable,&writable);
+
+ if (!readable.size() && !writable.size()) {
+ continue;
+ }
+ //no locking of m_
+ //because no add_callback() and del_callback should
+ //modify callbacks_[fd] while the fd is not dead
+ for (unsigned int i = 0; i < readable.size(); i++) {
+ int fd = readable[i];
+ if (callbacks_[fd])
+ callbacks_[fd]->read_cb(fd);
+ }
+
+ for (unsigned int i = 0; i < writable.size(); i++) {
+ int fd = writable[i];
+ if (callbacks_[fd])
+ callbacks_[fd]->write_cb(fd);
+ }
+ }
+}
+
+SelectAIO::SelectAIO() : highfds_(0)
+{
+ FD_ZERO(&rfds_);
+ FD_ZERO(&wfds_);
+
+ VERIFY(pipe(pipefd_) == 0);
+ FD_SET(pipefd_[0], &rfds_);
+ highfds_ = pipefd_[0];
+
+ int flags = fcntl(pipefd_[0], F_GETFL, NULL);
+ flags |= O_NONBLOCK;
+ fcntl(pipefd_[0], F_SETFL, flags);
+
+ VERIFY(pthread_mutex_init(&m_, NULL) == 0);
+}
+
+SelectAIO::~SelectAIO()
+{
+ VERIFY(pthread_mutex_destroy(&m_) == 0);
+}
+
+void
+SelectAIO::watch_fd(int fd, poll_flag flag)
+{
+ ScopedLock ml(&m_);
+ if (highfds_ <= fd)
+ highfds_ = fd;
+
+ if (flag == CB_RDONLY) {
+ FD_SET(fd,&rfds_);
+ }else if (flag == CB_WRONLY) {
+ FD_SET(fd,&wfds_);
+ }else {
+ FD_SET(fd,&rfds_);
+ FD_SET(fd,&wfds_);
+ }
+
+ char tmp = 1;
+ VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+}
+
+bool
+SelectAIO::is_watched(int fd, poll_flag flag)
+{
+ ScopedLock ml(&m_);
+ if (flag == CB_RDONLY) {
+ return FD_ISSET(fd,&rfds_);
+ }else if (flag == CB_WRONLY) {
+ return FD_ISSET(fd,&wfds_);
+ }else{
+ return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
+ }
+}
+
+bool
+SelectAIO::unwatch_fd(int fd, poll_flag flag)
+{
+ ScopedLock ml(&m_);
+ if (flag == CB_RDONLY) {
+ FD_CLR(fd, &rfds_);
+ }else if (flag == CB_WRONLY) {
+ FD_CLR(fd, &wfds_);
+ }else if (flag == CB_RDWR) {
+ FD_CLR(fd, &wfds_);
+ FD_CLR(fd, &rfds_);
+ }else{
+ VERIFY(0);
+ }
+
+ if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
+ if (fd == highfds_) {
+ int newh = pipefd_[0];
+ for (int i = 0; i <= highfds_; i++) {
+ if (FD_ISSET(i, &rfds_)) {
+ newh = i;
+ }else if (FD_ISSET(i, &wfds_)) {
+ newh = i;
+ }
+ }
+ highfds_ = newh;
+ }
+ }
+ if (flag == CB_RDWR) {
+ char tmp = 1;
+ VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ }
+ return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
+}
+
+void
+SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
+{
+ fd_set trfds, twfds;
+ int high;
+
+ {
+ ScopedLock ml(&m_);
+ trfds = rfds_;
+ twfds = wfds_;
+ high = highfds_;
+
+ }
+
+ int ret = select(high+1, &trfds, &twfds, NULL, NULL);
+
+ if (ret < 0) {
+ if (errno == EINTR) {
+ return;
+ } else {
+ perror("select:");
+ jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
+ VERIFY(0);
+ }
+ }
+
+ for (int fd = 0; fd <= high; fd++) {
+ if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+ char tmp;
+ VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+ VERIFY(tmp==1);
+ }else {
+ if (FD_ISSET(fd, &twfds)) {
+ writable->push_back(fd);
+ }
+ if (FD_ISSET(fd, &trfds)) {
+ readable->push_back(fd);
+ }
+ }
+ }
+}
+
+#ifdef __linux__
+
+EPollAIO::EPollAIO()
+{
+ pollfd_ = epoll_create(MAX_POLL_FDS);
+ VERIFY(pollfd_ >= 0);
+ bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
+}
+
+EPollAIO::~EPollAIO()
+{
+ close(pollfd_);
+}
+
+static inline
+int poll_flag_to_event(poll_flag flag)
+{
+ int f;
+ if (flag == CB_RDONLY) {
+ f = EPOLLIN;
+ }else if (flag == CB_WRONLY) {
+ f = EPOLLOUT;
+ }else { //flag == CB_RDWR
+ f = EPOLLIN | EPOLLOUT;
+ }
+ return f;
+}
+
+void
+EPollAIO::watch_fd(int fd, poll_flag flag)
+{
+ VERIFY(fd < MAX_POLL_FDS);
+
+ struct epoll_event ev;
+ int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+ fdstatus_[fd] |= (int)flag;
+
+ ev.events = EPOLLET;
+ ev.data.fd = fd;
+
+ if (fdstatus_[fd] & CB_RDONLY) {
+ ev.events |= EPOLLIN;
+ }
+ if (fdstatus_[fd] & CB_WRONLY) {
+ ev.events |= EPOLLOUT;
+ }
+
+ if (flag == CB_RDWR) {
+ VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
+ }
+
+ VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+}
+
+bool
+EPollAIO::unwatch_fd(int fd, poll_flag flag)
+{
+ VERIFY(fd < MAX_POLL_FDS);
+ fdstatus_[fd] &= ~(int)flag;
+
+ struct epoll_event ev;
+ int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+
+ ev.events = EPOLLET;
+ ev.data.fd = fd;
+
+ if (fdstatus_[fd] & CB_RDONLY) {
+ ev.events |= EPOLLIN;
+ }
+ if (fdstatus_[fd] & CB_WRONLY) {
+ ev.events |= EPOLLOUT;
+ }
+
+ if (flag == CB_RDWR) {
+ VERIFY(op == EPOLL_CTL_DEL);
+ }
+ VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+ return (op == EPOLL_CTL_DEL);
+}
+
+bool
+EPollAIO::is_watched(int fd, poll_flag flag)
+{
+ VERIFY(fd < MAX_POLL_FDS);
+ return ((fdstatus_[fd] & CB_MASK) == flag);
+}
+
+void
+EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
+{
+ int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+ for (int i = 0; i < nfds; i++) {
+ if (ready_[i].events & EPOLLIN) {
+ readable->push_back(ready_[i].data.fd);
+ }
+ if (ready_[i].events & EPOLLOUT) {
+ writable->push_back(ready_[i].data.fd);
+ }
+ }
+}
+
+#endif
--- /dev/null
+#ifndef pollmgr_h
+#define pollmgr_h
+
+#include <sys/select.h>
+#include <vector>
+
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
+
+#define MAX_POLL_FDS 128
+
+typedef enum {
+ CB_NONE = 0x0,
+ CB_RDONLY = 0x1,
+ CB_WRONLY = 0x10,
+ CB_RDWR = 0x11,
+ CB_MASK = ~0x11,
+} poll_flag;
+
+class aio_mgr {
+ public:
+ virtual void watch_fd(int fd, poll_flag flag) = 0;
+ virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+ virtual bool is_watched(int fd, poll_flag flag) = 0;
+ virtual void wait_ready(std::vector<int> *readable, std::vector<int> *writable) = 0;
+ virtual ~aio_mgr() {}
+};
+
+class aio_callback {
+ public:
+ virtual void read_cb(int fd) = 0;
+ virtual void write_cb(int fd) = 0;
+ virtual ~aio_callback() {}
+};
+
+class PollMgr {
+ public:
+ PollMgr();
+ ~PollMgr();
+
+ static PollMgr *Instance();
+ static PollMgr *CreateInst();
+
+ void add_callback(int fd, poll_flag flag, aio_callback *ch);
+ void del_callback(int fd, poll_flag flag);
+ bool has_callback(int fd, poll_flag flag, aio_callback *ch);
+ void block_remove_fd(int fd);
+ void wait_loop();
+
+
+ static PollMgr *instance;
+ static int useful;
+ static int useless;
+
+ private:
+ pthread_mutex_t m_;
+ pthread_cond_t changedone_c_;
+ pthread_t th_;
+
+ aio_callback *callbacks_[MAX_POLL_FDS];
+ aio_mgr *aio_;
+ bool pending_change_;
+
+};
+
+class SelectAIO : public aio_mgr {
+ public :
+
+ SelectAIO();
+ ~SelectAIO();
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ bool is_watched(int fd, poll_flag flag);
+ void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
+
+ private:
+
+ fd_set rfds_;
+ fd_set wfds_;
+ int highfds_;
+ int pipefd_[2];
+
+ pthread_mutex_t m_;
+
+};
+
+#ifdef __linux__
+class EPollAIO : public aio_mgr {
+ public:
+ EPollAIO();
+ ~EPollAIO();
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ bool is_watched(int fd, poll_flag flag);
+ void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
+
+ private:
+ int pollfd_;
+ struct epoll_event ready_[MAX_POLL_FDS];
+ int fdstatus_[MAX_POLL_FDS];
+
+};
+#endif /* __linux */
+
+#endif /* pollmgr_h */
+
--- /dev/null
+/*
+ The rpcc class handles client-side RPC. Each rpcc is bound to a
+ single RPC server. The jobs of rpcc include maintaining a connection to
+ server, sending RPC requests and waiting for responses, retransmissions,
+ at-most-once delivery etc.
+
+ The rpcs class handles the server side of RPC. Each rpcs handles multiple
+ connections from different rpcc objects. The jobs of rpcs include accepting
+ connections, dispatching requests to registered RPC handlers, at-most-once
+ delivery etc.
+
+ Both rpcc and rpcs use the connection class as an abstraction for the
+ underlying communication channel. To send an RPC request/reply, one calls
+ connection::send() which blocks until data is sent or the connection has failed
+ (thus the caller can free the buffer when send() returns). When a
+ request/reply is received, connection makes a callback into the corresponding
+ rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
+
+ Thread organization:
+ rpcc uses application threads to send RPC requests and blocks to receive the
+ reply or error. All connections use a single PollMgr object to perform async
+ socket IO. PollMgr creates a single thread to examine the readiness of socket
+ file descriptors and informs the corresponding connection whenever a socket is
+ ready to be read or written. (We use asynchronous socket IO to reduce the
+ number of threads needed to manage these connections; without async IO, at
+ least one thread is needed per connection to read data without blocking other
+ activities.) Each rpcs object creates one thread for listening on the server
+ port and a pool of threads for executing RPC requests. The
+ thread pool allows us to control the number of threads spawned at the server
+ (spawning one thread per request will hurt when the server faces thousands of
+ requests).
+
+ In order to delete a connection object, we must maintain a reference count.
+ For rpcc,
+ multiple client threads might be invoking the rpcc::call() functions and thus
+ holding multiple references to the underlying connection object. For rpcs,
+ multiple dispatch threads might be holding references to the same connection
+ object. A connection object is deleted only when the underlying connection is
+ dead and the reference count reaches zero.
+
+ The previous version of the RPC library uses pthread_cancel* routines
+ to implement the deletion of rpcc and rpcs objects. The idea is to cancel
+ all active threads that might be holding a reference to an object before
+ deleting that object. However, pthread_cancel is not robust and there are
+ always bugs where outstanding references to deleted objects persist.
+ This version of the RPC library does not do pthread_cancel, but explicitly
+ joins exited threads to make sure no outstanding references exist before
+ deleting objects.
+
+ To delete a rpcc object safely, the users of the library must ensure that
+ there are no outstanding calls on the rpcc object.
+
+ To delete a rpcs object safely, we do the following in sequence: 1. stop
+ accepting new incoming connections. 2. close existing active connections.
+ 3. delete the dispatch thread pool which involves waiting for current active
+ RPC handlers to finish. It is interesting how a thread pool can be deleted
+ without using thread cancellation. The trick is to inject x "poison pills" for
+ a thread pool of x threads. Upon getting a poison pill instead of a normal
+ task, a worker thread will exit (and thread pool destructor waits to join all
+ x exited worker threads).
+ */
+
+#include "rpc.h"
+#include "method_thread.h"
+#include "slock.h"
+
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <time.h>
+#include <netdb.h>
+
+#include "jsl_log.h"
+#include "gettime.h"
+#include "lang/verify.h"
+
+const rpcc::TO rpcc::to_max = { 120000 };
+const rpcc::TO rpcc::to_min = { 1000 };
+
+rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
+: xid(xxid), un(xun), done(false)
+{
+ VERIFY(pthread_mutex_init(&m,0) == 0);
+ VERIFY(pthread_cond_init(&c, 0) == 0);
+}
+
+rpcc::caller::~caller()
+{
+ VERIFY(pthread_mutex_destroy(&m) == 0);
+ VERIFY(pthread_cond_destroy(&c) == 0);
+}
+
+inline
+void set_rand_seed()
+{
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ srandom((int)ts.tv_nsec^((int)getpid()));
+}
+
+rpcc::rpcc(sockaddr_in d, bool retrans) :
+ dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+ retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+{
+ VERIFY(pthread_mutex_init(&m_, 0) == 0);
+ VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
+ VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
+
+ if(retrans){
+ set_rand_seed();
+ clt_nonce_ = random();
+ } else {
+ // special client nonce 0 means this client does not
+ // require at-most-once logic from the server
+ // because it uses tcp and never retries a failed connection
+ clt_nonce_ = 0;
+ }
+
+ char *loss_env = getenv("RPC_LOSSY");
+ if(loss_env != NULL){
+ lossytest_ = atoi(loss_env);
+ }
+
+ // xid starts with 1 and latest received reply starts with 0
+ xid_rep_window_.push_back(0);
+
+ jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
+ clt_nonce_, lossytest_);
+}
+
+// IMPORTANT: destruction should happen only when no external threads
+// are blocked inside rpcc or will use rpcc in the future
+rpcc::~rpcc()
+{
+ jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
+ clt_nonce_, chan_?chan_->channo():-1);
+ if(chan_){
+ chan_->closeconn();
+ chan_->decref();
+ }
+ VERIFY(calls_.size() == 0);
+ VERIFY(pthread_mutex_destroy(&m_) == 0);
+ VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
+}
+
+int
+rpcc::bind(TO to)
+{
+ int r;
+ int ret = call(rpc_const::bind, 0, r, to);
+ if(ret == 0){
+ ScopedLock ml(&m_);
+ bind_done_ = true;
+ srv_nonce_ = r;
+ } else {
+ jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
+ inet_ntoa(dst_.sin_addr), ret);
+ }
+ return ret;
+};
+
+// Cancel all outstanding calls
+void
+rpcc::cancel(void)
+{
+ ScopedLock ml(&m_);
+ printf("rpcc::cancel: force callers to fail\n");
+ std::map<int,caller*>::iterator iter;
+ for(iter = calls_.begin(); iter != calls_.end(); iter++){
+ caller *ca = iter->second;
+
+ jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
+ {
+ ScopedLock cl(&ca->m);
+ ca->done = true;
+ ca->intret = rpc_const::cancel_failure;
+ VERIFY(pthread_cond_signal(&ca->c) == 0);
+ }
+ }
+
+ while (calls_.size () > 0){
+ destroy_wait_ = true;
+ VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
+ }
+ printf("rpcc::cancel: done\n");
+}
+
+int
+rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
+ TO to)
+{
+
+ caller ca(0, &rep);
+ int xid_rep;
+ {
+ ScopedLock ml(&m_);
+
+ if((proc != rpc_const::bind && !bind_done_) ||
+ (proc == rpc_const::bind && bind_done_)){
+ jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
+ return rpc_const::bind_failure;
+ }
+
+ if(destroy_wait_){
+ return rpc_const::cancel_failure;
+ }
+
+ ca.xid = xid_++;
+ calls_[ca.xid] = &ca;
+
+ req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
+ xid_rep_window_.front());
+ req.pack_req_header(h);
+ xid_rep = xid_rep_window_.front();
+ }
+
+ TO curr_to;
+ struct timespec now, nextdeadline, finaldeadline;
+
+ clock_gettime(CLOCK_REALTIME, &now);
+ add_timespec(now, to.to, &finaldeadline);
+ curr_to.to = to_min.to;
+
+ bool transmit = true;
+ connection *ch = NULL;
+
+ while (1){
+ if(transmit){
+ get_refconn(&ch);
+ if(ch){
+ if(reachable_) {
+ request forgot;
+ {
+ ScopedLock ml(&m_);
+ if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
+ forgot = dup_req_;
+ dup_req_.clear();
+ }
+ }
+ if (forgot.isvalid())
+ ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
+ ch->send(req.cstr(), req.size());
+ }
+ else jsl_log(JSL_DBG_1, "not reachable\n");
+ jsl_log(JSL_DBG_2,
+ "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
+ clt_nonce_, proc, ca.xid, clt_nonce_);
+ }
+ transmit = false; // only send once on a given channel
+ }
+
+ if(!finaldeadline.tv_sec)
+ break;
+
+ clock_gettime(CLOCK_REALTIME, &now);
+ add_timespec(now, curr_to.to, &nextdeadline);
+ if(cmp_timespec(nextdeadline,finaldeadline) > 0){
+ nextdeadline = finaldeadline;
+ finaldeadline.tv_sec = 0;
+ }
+
+ {
+ ScopedLock cal(&ca.m);
+ while (!ca.done){
+ jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
+ if(pthread_cond_timedwait(&ca.c, &ca.m,
+ &nextdeadline) == ETIMEDOUT){
+ jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
+ break;
+ }
+ }
+ if(ca.done){
+ jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
+ break;
+ }
+ }
+
+ if(retrans_ && (!ch || ch->isdead())){
+ // since connection is dead, retransmit
+ // on the new connection
+ transmit = true;
+ }
+ curr_to.to <<= 1;
+ }
+
+ {
+ // no locking of ca.m since only this thread changes ca.xid
+ ScopedLock ml(&m_);
+ calls_.erase(ca.xid);
+ // may need to update the xid again here, in case the
+ // packet times out before it's even sent by the channel.
+ // I don't think there's any harm in maybe doing it twice
+ update_xid_rep(ca.xid);
+
+ if(destroy_wait_){
+ VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
+ }
+ }
+
+ if (ca.done && lossytest_)
+ {
+ ScopedLock ml(&m_);
+ if (!dup_req_.isvalid()) {
+ dup_req_.buf.assign(req.cstr(), req.size());
+ dup_req_.xid = ca.xid;
+ }
+ if (xid_rep > xid_rep_done_)
+ xid_rep_done_ = xid_rep;
+ }
+
+ ScopedLock cal(&ca.m);
+
+ jsl_log(JSL_DBG_2,
+ "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
+ clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
+ ntohs(dst_.sin_port), ca.done, ca.intret);
+
+ if(ch)
+ ch->decref();
+
+ // destruction of req automatically frees its buffer
+ return (ca.done? ca.intret : rpc_const::timeout_failure);
+}
+
+void
+rpcc::get_refconn(connection **ch)
+{
+ ScopedLock ml(&chan_m_);
+ if(!chan_ || chan_->isdead()){
+ if(chan_)
+ chan_->decref();
+ chan_ = connect_to_dst(dst_, this, lossytest_);
+ }
+ if(ch && chan_){
+ if(*ch){
+ (*ch)->decref();
+ }
+ *ch = chan_;
+ (*ch)->incref();
+ }
+}
+
+// PollMgr's thread is being used to
+// make this upcall from connection object to rpcc.
+// this funtion must not block.
+//
+// this function keeps no reference for connection *c
+bool
+rpcc::got_pdu(connection *c, char *b, int sz)
+{
+ unmarshall rep(b, sz);
+ reply_header h;
+ rep.unpack_reply_header(&h);
+
+ if(!rep.ok()){
+ jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
+ return true;
+ }
+
+ ScopedLock ml(&m_);
+
+ update_xid_rep(h.xid);
+
+ if(calls_.find(h.xid) == calls_.end()){
+ jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
+ return true;
+ }
+ caller *ca = calls_[h.xid];
+
+ ScopedLock cl(&ca->m);
+ if(!ca->done){
+ ca->un->take_in(rep);
+ ca->intret = h.ret;
+ if(ca->intret < 0){
+ jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
+ h.xid, ca->intret);
+ }
+ ca->done = 1;
+ }
+ VERIFY(pthread_cond_broadcast(&ca->c) == 0);
+ return true;
+}
+
+// assumes thread holds mutex m
+void
+rpcc::update_xid_rep(unsigned int xid)
+{
+ std::list<unsigned int>::iterator it;
+
+ if(xid <= xid_rep_window_.front()){
+ return;
+ }
+
+ for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+ if(*it > xid){
+ xid_rep_window_.insert(it, xid);
+ goto compress;
+ }
+ }
+ xid_rep_window_.push_back(xid);
+
+compress:
+ it = xid_rep_window_.begin();
+ for (it++; it != xid_rep_window_.end(); it++){
+ while (xid_rep_window_.front() + 1 == *it)
+ xid_rep_window_.pop_front();
+ }
+}
+
+
+rpcs::rpcs(unsigned int p1, int count)
+ : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
+{
+ VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
+ VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
+ VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
+ VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
+
+ set_rand_seed();
+ nonce_ = random();
+ jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
+
+ char *loss_env = getenv("RPC_LOSSY");
+ if(loss_env != NULL){
+ lossytest_ = atoi(loss_env);
+ }
+
+ reg(rpc_const::bind, this, &rpcs::rpcbind);
+ dispatchpool_ = new ThrPool(6,false);
+
+ listener_ = new tcpsconn(this, port_, lossytest_);
+}
+
+rpcs::~rpcs()
+{
+ // must delete listener before dispatchpool
+ delete listener_;
+ delete dispatchpool_;
+ free_reply_window();
+}
+
+bool
+rpcs::got_pdu(connection *c, char *b, int sz)
+{
+ if(!reachable_){
+ jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
+ return true;
+ }
+
+ djob_t *j = new djob_t(c, b, sz);
+ c->incref();
+ bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
+ if(!succ || !reachable_){
+ c->decref();
+ delete j;
+ }
+ return succ;
+}
+
+void
+rpcs::reg1(unsigned int proc, handler *h)
+{
+ ScopedLock pl(&procs_m_);
+ VERIFY(procs_.count(proc) == 0);
+ procs_[proc] = h;
+ VERIFY(procs_.count(proc) >= 1);
+}
+
+void
+rpcs::updatestat(unsigned int proc)
+{
+ ScopedLock cl(&count_m_);
+ counts_[proc]++;
+ curr_counts_--;
+ if(curr_counts_ == 0){
+ std::map<int, int>::iterator i;
+ printf("RPC STATS: ");
+ for (i = counts_.begin(); i != counts_.end(); i++){
+ printf("%x:%d ", i->first, i->second);
+ }
+ printf("\n");
+
+ ScopedLock rwl(&reply_window_m_);
+ std::map<unsigned int,std::list<reply_t> >::iterator clt;
+
+ unsigned int totalrep = 0, maxrep = 0;
+ for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+ totalrep += clt->second.size();
+ if(clt->second.size() > maxrep)
+ maxrep = clt->second.size();
+ }
+ jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
+ (int) reply_window_.size()-1, totalrep, maxrep);
+ curr_counts_ = counting_;
+ }
+}
+
+void
+rpcs::dispatch(djob_t *j)
+{
+ connection *c = j->conn;
+ unmarshall req(j->buf, j->sz);
+ delete j;
+
+ req_header h;
+ req.unpack_req_header(&h);
+ int proc = h.proc;
+
+ if(!req.ok()){
+ jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
+ c->decref();
+ return;
+ }
+
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
+ h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
+
+ marshall rep;
+ reply_header rh(h.xid,0);
+
+ // is client sending to an old instance of server?
+ if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
+ h.srv_nonce, nonce_, h.proc);
+ rh.ret = rpc_const::oldsrv_failure;
+ rep.pack_reply_header(rh);
+ c->send(rep.cstr(),rep.size());
+ return;
+ }
+
+ handler *f;
+ // is RPC proc a registered procedure?
+ {
+ ScopedLock pl(&procs_m_);
+ if(procs_.count(proc) < 1){
+ fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
+ proc);
+ c->decref();
+ VERIFY(0);
+ return;
+ }
+
+ f = procs_[proc];
+ }
+
+ rpcs::rpcstate_t stat;
+ char *b1;
+ int sz1;
+
+ if(h.clt_nonce){
+ // have i seen this client before?
+ {
+ ScopedLock rwl(&reply_window_m_);
+ // if we don't know about this clt_nonce, create a cleanup object
+ if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
+ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+ reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
+ h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
+ }
+ }
+
+ // save the latest good connection to the client
+ {
+ ScopedLock rwl(&conss_m_);
+ if(conns_.find(h.clt_nonce) == conns_.end()){
+ c->incref();
+ conns_[h.clt_nonce] = c;
+ } else if(conns_[h.clt_nonce]->compare(c) < 0){
+ conns_[h.clt_nonce]->decref();
+ c->incref();
+ conns_[h.clt_nonce] = c;
+ }
+ }
+
+ stat = checkduplicate_and_update(h.clt_nonce, h.xid,
+ h.xid_rep, &b1, &sz1);
+ } else {
+ // this client does not require at most once logic
+ stat = NEW;
+ }
+
+ switch (stat){
+ case NEW: // new request
+ if(counting_){
+ updatestat(proc);
+ }
+
+ rh.ret = f->fn(req, rep);
+ if (rh.ret == rpc_const::unmarshal_args_failure) {
+ fprintf(stderr, "rpcs::dispatch: failed to"
+ " unmarshall the arguments. You are"
+ " probably calling RPC 0x%x with wrong"
+ " types of arguments.\n", proc);
+ VERIFY(0);
+ }
+ VERIFY(rh.ret >= 0);
+
+ rep.pack_reply_header(rh);
+ rep.take_buf(&b1,&sz1);
+
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
+ sz1, h.xid, proc, rh.ret, h.clt_nonce);
+
+ if(h.clt_nonce > 0){
+ // only record replies for clients that require at-most-once logic
+ add_reply(h.clt_nonce, h.xid, b1, sz1);
+ }
+
+ // get the latest connection to the client
+ {
+ ScopedLock rwl(&conss_m_);
+ if(c->isdead() && c != conns_[h.clt_nonce]){
+ c->decref();
+ c = conns_[h.clt_nonce];
+ c->incref();
+ }
+ }
+
+ c->send(b1, sz1);
+ if(h.clt_nonce == 0){
+ // reply is not added to at-most-once window, free it
+ free(b1);
+ }
+ break;
+ case INPROGRESS: // server is working on this request
+ break;
+ case DONE: // duplicate and we still have the response
+ c->send(b1, sz1);
+ break;
+ case FORGOTTEN: // very old request and we don't have the response anymore
+ jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
+ h.xid, h.clt_nonce);
+ rh.ret = rpc_const::atmostonce_failure;
+ rep.pack_reply_header(rh);
+ c->send(rep.cstr(),rep.size());
+ break;
+ }
+ c->decref();
+}
+
+// rpcs::dispatch calls this when an RPC request arrives.
+//
+// checks to see if an RPC with xid from clt_nonce has already been received.
+// if not, remembers the request in reply_window_.
+//
+// deletes remembered requests with XIDs <= xid_rep; the client
+// says it has received a reply for every RPC up through xid_rep.
+// frees the reply_t::buf of each such request.
+//
+// returns one of:
+// NEW: never seen this xid before.
+// INPROGRESS: seen this xid, and still processing it.
+// DONE: seen this xid, previous reply returned in *b and *sz.
+// FORGOTTEN: might have seen this xid, but deleted previous reply.
+rpcs::rpcstate_t
+rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
+ unsigned int xid_rep, char **b, int *sz)
+{
+ ScopedLock rwl(&reply_window_m_);
+
+ std::list<reply_t> &l = reply_window_[clt_nonce];
+
+ VERIFY(l.size() > 0);
+ VERIFY(xid >= xid_rep);
+
+ unsigned int past_xid_rep = l.begin()->xid;
+
+ std::list<reply_t>::iterator start = l.begin(), it;
+ it = ++start;
+
+ if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
+ // scan for deletion candidates
+ for (; it != l.end() && it->xid < xid_rep; it++) {
+ if (it->cb_present)
+ free(it->buf);
+ }
+ l.erase(start, it);
+ l.begin()->xid = xid_rep;
+ }
+
+ if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
+ return FORGOTTEN;
+
+ // skip non-deletion candidates
+ while (it != l.end() && it->xid < xid)
+ it++;
+
+ // if it's in the list it must be right here
+ if (it != l.end() && it->xid == xid) {
+ if (it->cb_present) {
+ // return information about the remembered reply
+ *b = it->buf;
+ *sz = it->sz;
+ return DONE;
+ } else {
+ return INPROGRESS;
+ }
+ } else {
+ // remember that a new request has arrived
+ l.insert(it, reply_t(xid));
+ return NEW;
+ }
+}
+
+// rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
+// and passes the return value in b and sz.
+// add_reply() should remember b and sz.
+// free_reply_window() and checkduplicate_and_update is responsible for
+// calling free(b).
+void
+rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
+ char *b, int sz)
+{
+ ScopedLock rwl(&reply_window_m_);
+ // remember the RPC reply value
+ std::list<reply_t> &l = reply_window_[clt_nonce];
+ std::list<reply_t>::iterator it = l.begin();
+ // skip to our place in the list
+ for (it++; it != l.end() && it->xid < xid; it++);
+ // there should already be an entry, so whine if there isn't
+ if (it == l.end() || it->xid != xid) {
+ fprintf(stderr, "Could not find reply struct in add_reply");
+ l.insert(it, reply_t(xid, b, sz));
+ } else {
+ *it = reply_t(xid, b, sz);
+ }
+}
+
+void
+rpcs::free_reply_window(void)
+{
+ std::map<unsigned int,std::list<reply_t> >::iterator clt;
+ std::list<reply_t>::iterator it;
+
+ ScopedLock rwl(&reply_window_m_);
+ for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+ for (it = clt->second.begin(); it != clt->second.end(); it++){
+ if (it->cb_present)
+ free(it->buf);
+ }
+ clt->second.clear();
+ }
+ reply_window_.clear();
+}
+
+// rpc handler
+int
+rpcs::rpcbind(int a, int &r)
+{
+ jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
+ r = nonce_;
+ return 0;
+}
+
+void
+marshall::rawbyte(unsigned char x)
+{
+ if(_ind >= _capa){
+ _capa *= 2;
+ VERIFY (_buf != NULL);
+ _buf = (char *)realloc(_buf, _capa);
+ VERIFY(_buf);
+ }
+ _buf[_ind++] = x;
+}
+
+void
+marshall::rawbytes(const char *p, int n)
+{
+ if((_ind+n) > _capa){
+ _capa = _capa > n? 2*_capa:(_capa+n);
+ VERIFY (_buf != NULL);
+ _buf = (char *)realloc(_buf, _capa);
+ VERIFY(_buf);
+ }
+ memcpy(_buf+_ind, p, n);
+ _ind += n;
+}
+
+marshall &
+operator<<(marshall &m, bool x)
+{
+ m.rawbyte(x);
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, unsigned char x)
+{
+ m.rawbyte(x);
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, char x)
+{
+ m << (unsigned char) x;
+ return m;
+}
+
+
+marshall &
+operator<<(marshall &m, unsigned short x)
+{
+ m.rawbyte((x >> 8) & 0xff);
+ m.rawbyte(x & 0xff);
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, short x)
+{
+ m << (unsigned short) x;
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, unsigned int x)
+{
+ // network order is big-endian
+ m.rawbyte((x >> 24) & 0xff);
+ m.rawbyte((x >> 16) & 0xff);
+ m.rawbyte((x >> 8) & 0xff);
+ m.rawbyte(x & 0xff);
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, int x)
+{
+ m << (unsigned int) x;
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, const std::string &s)
+{
+ m << (unsigned int) s.size();
+ m.rawbytes(s.data(), s.size());
+ return m;
+}
+
+marshall &
+operator<<(marshall &m, unsigned long long x)
+{
+ m << (unsigned int) (x >> 32);
+ m << (unsigned int) x;
+ return m;
+}
+
+void
+marshall::pack(int x)
+{
+ rawbyte((x >> 24) & 0xff);
+ rawbyte((x >> 16) & 0xff);
+ rawbyte((x >> 8) & 0xff);
+ rawbyte(x & 0xff);
+}
+
+void
+unmarshall::unpack(int *x)
+{
+ (*x) = (rawbyte() & 0xff) << 24;
+ (*x) |= (rawbyte() & 0xff) << 16;
+ (*x) |= (rawbyte() & 0xff) << 8;
+ (*x) |= rawbyte() & 0xff;
+}
+
+// take the contents from another unmarshall object
+void
+unmarshall::take_in(unmarshall &another)
+{
+ if(_buf)
+ free(_buf);
+ another.take_buf(&_buf, &_sz);
+ _ind = RPC_HEADER_SZ;
+ _ok = _sz >= RPC_HEADER_SZ?true:false;
+}
+
+bool
+unmarshall::okdone()
+{
+ if(ok() && _ind == _sz){
+ return true;
+ } else {
+ return false;
+ }
+}
+
+unsigned int
+unmarshall::rawbyte()
+{
+ char c = 0;
+ if(_ind >= _sz)
+ _ok = false;
+ else
+ c = _buf[_ind++];
+ return c;
+}
+
+unmarshall &
+operator>>(unmarshall &u, bool &x)
+{
+ x = (bool) u.rawbyte() ;
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, unsigned char &x)
+{
+ x = (unsigned char) u.rawbyte() ;
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, char &x)
+{
+ x = (char) u.rawbyte();
+ return u;
+}
+
+
+unmarshall &
+operator>>(unmarshall &u, unsigned short &x)
+{
+ x = (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, short &x)
+{
+ x = (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, unsigned int &x)
+{
+ x = (u.rawbyte() & 0xff) << 24;
+ x |= (u.rawbyte() & 0xff) << 16;
+ x |= (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, int &x)
+{
+ x = (u.rawbyte() & 0xff) << 24;
+ x |= (u.rawbyte() & 0xff) << 16;
+ x |= (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, unsigned long long &x)
+{
+ unsigned int h, l;
+ u >> h;
+ u >> l;
+ x = l | ((unsigned long long) h << 32);
+ return u;
+}
+
+unmarshall &
+operator>>(unmarshall &u, std::string &s)
+{
+ unsigned sz;
+ u >> sz;
+ if(u.ok())
+ u.rawbytes(s, sz);
+ return u;
+}
+
+void
+unmarshall::rawbytes(std::string &ss, unsigned int n)
+{
+ if((_ind+n) > (unsigned)_sz){
+ _ok = false;
+ } else {
+ std::string tmps = std::string(_buf+_ind, n);
+ swap(ss, tmps);
+ VERIFY(ss.size() == n);
+ _ind += n;
+ }
+}
+
+bool operator<(const sockaddr_in &a, const sockaddr_in &b){
+ return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
+ ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
+ ((a.sin_port < b.sin_port))));
+}
+
+/*---------------auxilary function--------------*/
+void
+make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
+
+ char host[200];
+ const char *localhost = "127.0.0.1";
+ const char *port = index(hostandport, ':');
+ if(port == NULL){
+ memcpy(host, localhost, strlen(localhost)+1);
+ port = hostandport;
+ } else {
+ memcpy(host, hostandport, port-hostandport);
+ host[port-hostandport] = '\0';
+ port++;
+ }
+
+ make_sockaddr(host, port, dst);
+
+}
+
+void
+make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
+
+ in_addr_t a;
+
+ bzero(dst, sizeof(*dst));
+ dst->sin_family = AF_INET;
+
+ a = inet_addr(host);
+ if(a != INADDR_NONE){
+ dst->sin_addr.s_addr = a;
+ } else {
+ struct hostent *hp = gethostbyname(host);
+ if(hp == 0 || hp->h_length != 4){
+ fprintf(stderr, "cannot find host name %s\n", host);
+ exit(1);
+ }
+ dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
+ }
+ dst->sin_port = htons(atoi(port));
+}
+
+int
+cmp_timespec(const struct timespec &a, const struct timespec &b)
+{
+ if(a.tv_sec > b.tv_sec)
+ return 1;
+ else if(a.tv_sec < b.tv_sec)
+ return -1;
+ else {
+ if(a.tv_nsec > b.tv_nsec)
+ return 1;
+ else if(a.tv_nsec < b.tv_nsec)
+ return -1;
+ else
+ return 0;
+ }
+}
+
+void
+add_timespec(const struct timespec &a, int b, struct timespec *result)
+{
+ // convert to millisec, add timeout, convert back
+ result->tv_sec = a.tv_sec + b/1000;
+ result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
+ VERIFY(result->tv_nsec >= 0);
+ while (result->tv_nsec > 1000000000){
+ result->tv_sec++;
+ result->tv_nsec-=1000000000;
+ }
+}
+
+int
+diff_timespec(const struct timespec &end, const struct timespec &start)
+{
+ int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
+ VERIFY(diff || end.tv_sec == start.tv_sec);
+ if(end.tv_nsec > start.tv_nsec){
+ diff += (end.tv_nsec-start.tv_nsec)/1000000;
+ } else {
+ diff -= (start.tv_nsec-end.tv_nsec)/1000000;
+ }
+ return diff;
+}
--- /dev/null
+#ifndef rpc_h
+#define rpc_h
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <list>
+#include <map>
+#include <stdio.h>
+
+#include "thr_pool.h"
+#include "marshall.h"
+#include "connection.h"
+
+#ifdef DMALLOC
+#include "dmalloc.h"
+#endif
+
+class rpc_const {
+ public:
+ static const unsigned int bind = 1; // handler number reserved for bind
+ static const int timeout_failure = -1;
+ static const int unmarshal_args_failure = -2;
+ static const int unmarshal_reply_failure = -3;
+ static const int atmostonce_failure = -4;
+ static const int oldsrv_failure = -5;
+ static const int bind_failure = -6;
+ static const int cancel_failure = -7;
+};
+
+// rpc client endpoint.
+// manages a xid space per destination socket
+// threaded: multiple threads can be sending RPCs,
+class rpcc : public chanmgr {
+
+ private:
+
+ //manages per rpc info
+ struct caller {
+ caller(unsigned int xxid, unmarshall *un);
+ ~caller();
+
+ unsigned int xid;
+ unmarshall *un;
+ int intret;
+ bool done;
+ pthread_mutex_t m;
+ pthread_cond_t c;
+ };
+
+ void get_refconn(connection **ch);
+ void update_xid_rep(unsigned int xid);
+
+
+ sockaddr_in dst_;
+ unsigned int clt_nonce_;
+ unsigned int srv_nonce_;
+ bool bind_done_;
+ unsigned int xid_;
+ int lossytest_;
+ bool retrans_;
+ bool reachable_;
+
+ connection *chan_;
+
+ pthread_mutex_t m_; // protect insert/delete to calls[]
+ pthread_mutex_t chan_m_;
+
+ bool destroy_wait_;
+ pthread_cond_t destroy_wait_c_;
+
+ std::map<int, caller *> calls_;
+ std::list<unsigned int> xid_rep_window_;
+
+ struct request {
+ request() { clear(); }
+ void clear() { buf.clear(); xid = -1; }
+ bool isvalid() { return xid != -1; }
+ std::string buf;
+ int xid;
+ };
+ struct request dup_req_;
+ int xid_rep_done_;
+ public:
+
+ rpcc(sockaddr_in d, bool retrans=true);
+ ~rpcc();
+
+ struct TO {
+ int to;
+ };
+ static const TO to_max;
+ static const TO to_min;
+ static TO to(int x) { TO t; t.to = x; return t;}
+
+ unsigned int id() { return clt_nonce_; }
+
+ int bind(TO to = to_max);
+
+ void set_reachable(bool r) { reachable_ = r; }
+
+ void cancel();
+
+ int islossy() { return lossytest_ > 0; }
+
+ int call1(unsigned int proc,
+ marshall &req, unmarshall &rep, TO to);
+
+ bool got_pdu(connection *c, char *b, int sz);
+
+
+ template<class R>
+ int call_m(unsigned int proc, marshall &req, R & r, TO to);
+
+ template<class R>
+ int call(unsigned int proc, R & r, TO to = to_max);
+ template<class R, class A1>
+ int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max);
+ template<class R, class A1, class A2>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r,
+ TO to = to_max);
+ template<class R, class A1, class A2, class A3>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ R & r, TO to = to_max);
+ template<class R, class A1, class A2, class A3, class A4>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ const A4 & a4, R & r, TO to = to_max);
+ template<class R, class A1, class A2, class A3, class A4, class A5>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ const A4 & a4, const A5 & a5, R & r, TO to = to_max);
+ template<class R, class A1, class A2, class A3, class A4, class A5,
+ class A6>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ const A4 & a4, const A5 & a5, const A6 & a6,
+ R & r, TO to = to_max);
+ template<class R, class A1, class A2, class A3, class A4, class A5,
+ class A6, class A7>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7,
+ R & r, TO to = to_max);
+
+};
+
+template<class R> int
+rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to)
+{
+ unmarshall u;
+ int intret = call1(proc, req, u, to);
+ if (intret < 0) return intret;
+ u >> r;
+ if(u.okdone() != true) {
+ fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
+ "You are probably calling RPC 0x%x with wrong return "
+ "type.\n", proc);
+ VERIFY(0);
+ return rpc_const::unmarshal_reply_failure;
+ }
+ return intret;
+}
+
+template<class R> int
+rpcc::call(unsigned int proc, R & r, TO to)
+{
+ marshall m;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1> int
+rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1, class A2> int
+rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
+ R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ m << a2;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1, class A2, class A3> int
+rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
+ const A3 & a3, R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ m << a2;
+ m << a3;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1, class A2, class A3, class A4> int
+rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
+ const A3 & a3, const A4 & a4, R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ m << a2;
+ m << a3;
+ m << a4;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1, class A2, class A3, class A4, class A5> int
+rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
+ const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ m << a2;
+ m << a3;
+ m << a4;
+ m << a5;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1, class A2, class A3, class A4, class A5,
+ class A6> int
+rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
+ const A3 & a3, const A4 & a4, const A5 & a5,
+ const A6 & a6, R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ m << a2;
+ m << a3;
+ m << a4;
+ m << a5;
+ m << a6;
+ return call_m(proc, m, r, to);
+}
+
+template<class R, class A1, class A2, class A3, class A4, class A5,
+ class A6, class A7> int
+rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
+ const A3 & a3, const A4 & a4, const A5 & a5,
+ const A6 & a6, const A7 & a7,
+ R & r, TO to)
+{
+ marshall m;
+ m << a1;
+ m << a2;
+ m << a3;
+ m << a4;
+ m << a5;
+ m << a6;
+ m << a7;
+ return call_m(proc, m, r, to);
+}
+
+bool operator<(const sockaddr_in &a, const sockaddr_in &b);
+
+class handler {
+ public:
+ handler() { }
+ virtual ~handler() { }
+ virtual int fn(unmarshall &, marshall &) = 0;
+};
+
+
+// rpc server endpoint.
+class rpcs : public chanmgr {
+
+ typedef enum {
+ NEW, // new RPC, not a duplicate
+ INPROGRESS, // duplicate of an RPC we're still processing
+ DONE, // duplicate of an RPC we already replied to (have reply)
+ FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten
+ } rpcstate_t;
+
+ private:
+
+ // state about an in-progress or completed RPC, for at-most-once.
+ // if cb_present is true, then the RPC is complete and a reply
+ // has been sent; in that case buf points to a copy of the reply,
+ // and sz holds the size of the reply.
+ struct reply_t {
+ reply_t (unsigned int _xid) {
+ xid = _xid;
+ cb_present = false;
+ buf = NULL;
+ sz = 0;
+ }
+ reply_t (unsigned int _xid, char *_buf, int _sz) {
+ xid = _xid;
+ cb_present = true;
+ buf = _buf;
+ sz = _sz;
+ }
+ unsigned int xid;
+ bool cb_present; // whether the reply buffer is valid
+ char *buf; // the reply buffer
+ int sz; // the size of reply buffer
+ };
+
+ int port_;
+ unsigned int nonce_;
+
+ // provide at most once semantics by maintaining a window of replies
+ // per client that that client hasn't acknowledged receiving yet.
+ // indexed by client nonce.
+ std::map<unsigned int, std::list<reply_t> > reply_window_;
+
+ void free_reply_window(void);
+ void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
+
+ rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
+ unsigned int xid, unsigned int rep_xid,
+ char **b, int *sz);
+
+ void updatestat(unsigned int proc);
+
+ // latest connection to the client
+ std::map<unsigned int, connection *> conns_;
+
+ // counting
+ const int counting_;
+ int curr_counts_;
+ std::map<int, int> counts_;
+
+ int lossytest_;
+ bool reachable_;
+
+ // map proc # to function
+ std::map<int, handler *> procs_;
+
+ pthread_mutex_t procs_m_; // protect insert/delete to procs[]
+ pthread_mutex_t count_m_; //protect modification of counts
+ pthread_mutex_t reply_window_m_; // protect reply window et al
+ pthread_mutex_t conss_m_; // protect conns_
+
+
+ protected:
+
+ struct djob_t {
+ djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
+ char *buf;
+ int sz;
+ connection *conn;
+ };
+ void dispatch(djob_t *);
+
+ // internal handler registration
+ void reg1(unsigned int proc, handler *);
+
+ ThrPool* dispatchpool_;
+ tcpsconn* listener_;
+
+ public:
+ rpcs(unsigned int port, int counts=0);
+ ~rpcs();
+ inline int port() { return listener_->port(); }
+ //RPC handler for clients binding
+ int rpcbind(int a, int &r);
+
+ void set_reachable(bool r) { reachable_ = r; }
+
+ bool got_pdu(connection *c, char *b, int sz);
+
+ // register a handler
+ template<class S, class A1, class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
+ template<class S, class A1, class A2, class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2,
+ R & r));
+ template<class S, class A1, class A2, class A3, class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
+ const A3, R & r));
+ template<class S, class A1, class A2, class A3, class A4, class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
+ const A3, const A4, R & r));
+ template<class S, class A1, class A2, class A3, class A4, class A5, class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
+ const A3, const A4, const A5,
+ R & r));
+ template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
+ class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
+ const A3, const A4, const A5,
+ const A6, R & r));
+ template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
+ class A7, class R>
+ void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
+ const A3, const A4, const A5,
+ const A6, const A7,
+ R & r));
+};
+
+template<class S, class A1, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ R r;
+ args >> a1;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ R r;
+ args >> a1;
+ args >> a2;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, a2, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, a2, a3, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class A4, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4,
+ R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
+ const A4 a4, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ A4 a4;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ args >> a4;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, a2, a3, a4, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4,
+ const A5 a5, R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
+ const A5 a5, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
+ const A4 a4, const A5 a5, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ A4 a4;
+ A5 a5;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ args >> a4;
+ args >> a5;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4,
+ const A5 a5, const A6 a6,
+ R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
+ const A5 a5, const A6 a6, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
+ const A4 a4, const A5 a5, const A6 a6, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ A4 a4;
+ A5 a5;
+ A6 a6;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ args >> a4;
+ args >> a5;
+ args >> a6;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class A4, class A5,
+ class A6, class A7, class R> void
+rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4,
+ const A5 a5, const A6 a6,
+ const A7 a7, R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
+ const A5 a5, const A6 a6, const A7 a7, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
+ const A4 a4, const A5 a5, const A6 a6,
+ const A7 a7, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ A4 a4;
+ A5 a5;
+ A6 a6;
+ A7 a7;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ args >> a4;
+ args >> a5;
+ args >> a6;
+ args >> a7;
+ if(!args.okdone())
+ return rpc_const::unmarshal_args_failure;
+ int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+
+void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
+void make_sockaddr(const char *host, const char *port,
+ struct sockaddr_in *dst);
+
+int cmp_timespec(const struct timespec &a, const struct timespec &b);
+void add_timespec(const struct timespec &a, int b, struct timespec *result);
+int diff_timespec(const struct timespec &a, const struct timespec &b);
+
+#endif
--- /dev/null
+// RPC test and pseudo-documentation.
+// generates print statements on failures, but eventually says "rpctest OK"
+
+#include "rpc.h"
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <getopt.h>
+#include "jsl_log.h"
+#include "gettime.h"
+#include "lang/verify.h"
+
+#define NUM_CL 2
+
+rpcs *server; // server rpc object
+rpcc *clients[NUM_CL]; // client rpc object
+struct sockaddr_in dst; //server's ip address
+int port;
+pthread_attr_t attr;
+
+// server-side handlers. they must be methods of some class
+// to simplify rpcs::reg(). a server process can have handlers
+// from multiple classes.
+class srv {
+ public:
+ int handle_22(const std::string a, const std::string b, std::string & r);
+ int handle_fast(const int a, int &r);
+ int handle_slow(const int a, int &r);
+ int handle_bigrep(const int a, std::string &r);
+};
+
+// a handler. a and b are arguments, r is the result.
+// there can be multiple arguments but only one result.
+// the caller also gets to see the int return value
+// as the return value from rpcc::call().
+// rpcs::reg() decides how to unmarshall by looking
+// at these argument types, so this function definition
+// does what a .x file does in SunRPC.
+int
+srv::handle_22(const std::string a, std::string b, std::string &r)
+{
+ r = a + b;
+ return 0;
+}
+
+int
+srv::handle_fast(const int a, int &r)
+{
+ r = a + 1;
+ return 0;
+}
+
+int
+srv::handle_slow(const int a, int &r)
+{
+ usleep(random() % 5000);
+ r = a + 2;
+ return 0;
+}
+
+int
+srv::handle_bigrep(const int len, std::string &r)
+{
+ r = std::string(len, 'x');
+ return 0;
+}
+
+srv service;
+
+void startserver()
+{
+ server = new rpcs(port);
+ server->reg(22, &service, &srv::handle_22);
+ server->reg(23, &service, &srv::handle_fast);
+ server->reg(24, &service, &srv::handle_slow);
+ server->reg(25, &service, &srv::handle_bigrep);
+}
+
+void
+testmarshall()
+{
+ marshall m;
+ req_header rh(1,2,3,4,5);
+ m.pack_req_header(rh);
+ VERIFY(m.size()==RPC_HEADER_SZ);
+ int i = 12345;
+ unsigned long long l = 1223344455L;
+ std::string s = std::string("hallo....");
+ m << i;
+ m << l;
+ m << s;
+
+ char *b;
+ int sz;
+ m.take_buf(&b,&sz);
+ VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
+
+ unmarshall un(b,sz);
+ req_header rh1;
+ un.unpack_req_header(&rh1);
+ VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
+ int i1;
+ unsigned long long l1;
+ std::string s1;
+ un >> i1;
+ un >> l1;
+ un >> s1;
+ VERIFY(un.okdone());
+ VERIFY(i1==i && l1==l && s1==s);
+}
+
+void *
+client1(void *xx)
+{
+
+ // test concurrency.
+ int which_cl = ((unsigned long) xx ) % NUM_CL;
+
+ for(int i = 0; i < 100; i++){
+ int arg = (random() % 2000);
+ std::string rep;
+ int ret = clients[which_cl]->call(25, arg, rep);
+ VERIFY(ret == 0);
+ if ((int)rep.size()!=arg) {
+ printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
+ }
+ VERIFY((int)rep.size() == arg);
+ }
+
+ // test rpc replies coming back not in the order of
+ // the original calls -- i.e. does xid reply dispatch work.
+ for(int i = 0; i < 100; i++){
+ int which = (random() % 2);
+ int arg = (random() % 1000);
+ int rep;
+
+ struct timespec start,end;
+ clock_gettime(CLOCK_REALTIME, &start);
+
+ int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
+ clock_gettime(CLOCK_REALTIME, &end);
+ int diff = diff_timespec(end, start);
+ if (ret != 0)
+ printf("%d ms have elapsed!!!\n", diff);
+ VERIFY(ret == 0);
+ VERIFY(rep == (which ? arg+1 : arg+2));
+ }
+
+ return 0;
+}
+
+void *
+client2(void *xx)
+{
+ int which_cl = ((unsigned long) xx ) % NUM_CL;
+
+ time_t t1;
+ time(&t1);
+
+ while(time(0) - t1 < 10){
+ int arg = (random() % 2000);
+ std::string rep;
+ int ret = clients[which_cl]->call(25, arg, rep);
+ if ((int)rep.size()!=arg) {
+ printf("ask for %d reply got %d ret %d\n",
+ arg, (int)rep.size(), ret);
+ }
+ VERIFY((int)rep.size() == arg);
+ }
+ return 0;
+}
+
+void *
+client3(void *xx)
+{
+ rpcc *c = (rpcc *) xx;
+
+ for(int i = 0; i < 4; i++){
+ int rep;
+ int ret = c->call(24, i, rep, rpcc::to(3000));
+ VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+ }
+ return 0;
+}
+
+
+void
+simple_tests(rpcc *c)
+{
+ printf("simple_tests\n");
+ // an RPC call to procedure #22.
+ // rpcc::call() looks at the argument types to decide how
+ // to marshall the RPC call packet, and how to unmarshall
+ // the reply packet.
+ std::string rep;
+ int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
+ VERIFY(intret == 0); // this is what handle_22 returns
+ VERIFY(rep == "hello goodbye");
+ printf(" -- string concat RPC .. ok\n");
+
+ // small request, big reply (perhaps req via UDP, reply via TCP)
+ intret = c->call(25, 70000, rep, rpcc::to(200000));
+ VERIFY(intret == 0);
+ VERIFY(rep.size() == 70000);
+ printf(" -- small request, big reply .. ok\n");
+
+#if 0
+ // too few arguments
+ intret = c->call(22, (std::string)"just one", rep);
+ VERIFY(intret < 0);
+ printf(" -- too few arguments .. failed ok\n");
+
+ // too many arguments; proc #23 expects just one.
+ intret = c->call(23, 1001, 1002, rep);
+ VERIFY(intret < 0);
+ printf(" -- too many arguments .. failed ok\n");
+
+ // wrong return value size
+ int wrongrep;
+ intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
+ VERIFY(intret < 0);
+ printf(" -- wrong ret value size .. failed ok\n");
+#endif
+
+ // specify a timeout value to an RPC that should succeed (udp)
+ int xx = 0;
+ intret = c->call(23, 77, xx, rpcc::to(3000));
+ VERIFY(intret == 0 && xx == 78);
+ printf(" -- no suprious timeout .. ok\n");
+
+ // specify a timeout value to an RPC that should succeed (tcp)
+ {
+ std::string arg(1000, 'x');
+ std::string rep;
+ c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
+ VERIFY(rep.size() == 1001);
+ printf(" -- no suprious timeout .. ok\n");
+ }
+
+ // huge RPC
+ std::string big(1000000, 'x');
+ intret = c->call(22, big, (std::string)"z", rep);
+ VERIFY(rep.size() == 1000001);
+ printf(" -- huge 1M rpc request .. ok\n");
+
+ // specify a timeout value to an RPC that should timeout (udp)
+ struct sockaddr_in non_existent;
+ memset(&non_existent, 0, sizeof(non_existent));
+ non_existent.sin_family = AF_INET;
+ non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
+ non_existent.sin_port = htons(7661);
+ rpcc *c1 = new rpcc(non_existent);
+ time_t t0 = time(0);
+ intret = c1->bind(rpcc::to(3000));
+ time_t t1 = time(0);
+ VERIFY(intret < 0 && (t1 - t0) <= 4);
+ printf(" -- rpc timeout .. ok\n");
+ printf("simple_tests OK\n");
+}
+
+void
+concurrent_test(int nt)
+{
+ // create threads that make lots of calls in parallel,
+ // to test thread synchronization for concurrent calls
+ // and dispatches.
+ int ret;
+
+ printf("start concurrent_test (%d threads) ...", nt);
+
+ pthread_t th[nt];
+ for(int i = 0; i < nt; i++){
+ ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
+ VERIFY(ret == 0);
+ }
+
+ for(int i = 0; i < nt; i++){
+ VERIFY(pthread_join(th[i], NULL) == 0);
+ }
+ printf(" OK\n");
+}
+
+void
+lossy_test()
+{
+ int ret;
+
+ printf("start lossy_test ...");
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+
+ if (server) {
+ delete server;
+ startserver();
+ }
+
+ for (int i = 0; i < NUM_CL; i++) {
+ delete clients[i];
+ clients[i] = new rpcc(dst);
+ VERIFY(clients[i]->bind()==0);
+ }
+
+ int nt = 1;
+ pthread_t th[nt];
+ for(int i = 0; i < nt; i++){
+ ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
+ VERIFY(ret == 0);
+ }
+ for(int i = 0; i < nt; i++){
+ VERIFY(pthread_join(th[i], NULL) == 0);
+ }
+ printf(".. OK\n");
+ VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
+}
+
+void
+failure_test()
+{
+ rpcc *client1;
+ rpcc *client = clients[0];
+
+ printf("failure_test\n");
+
+ delete server;
+
+ client1 = new rpcc(dst);
+ VERIFY (client1->bind(rpcc::to(3000)) < 0);
+ printf(" -- create new client and try to bind to failed server .. failed ok\n");
+
+ delete client1;
+
+ startserver();
+
+ std::string rep;
+ int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
+ VERIFY(intret == rpc_const::oldsrv_failure);
+ printf(" -- call recovered server with old client .. failed ok\n");
+
+ delete client;
+
+ clients[0] = client = new rpcc(dst);
+ VERIFY (client->bind() >= 0);
+ VERIFY (client->bind() < 0);
+
+ intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
+ VERIFY(intret == 0);
+ VERIFY(rep == "hello goodbye");
+
+ printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
+
+
+ int nt = 10;
+ int ret;
+ printf(" -- concurrent test on new rpc client w/ %d threads ..", nt);
+
+ pthread_t th[nt];
+ for(int i = 0; i < nt; i++){
+ ret = pthread_create(&th[i], &attr, client3, (void *) client);
+ VERIFY(ret == 0);
+ }
+
+ for(int i = 0; i < nt; i++){
+ VERIFY(pthread_join(th[i], NULL) == 0);
+ }
+ printf("ok\n");
+
+ delete server;
+ delete client;
+
+ startserver();
+ clients[0] = client = new rpcc(dst);
+ VERIFY (client->bind() >= 0);
+ printf(" -- delete existing rpc client and server, create replacements.. ok\n");
+
+ printf(" -- concurrent test on new client and server w/ %d threads ..", nt);
+ for(int i = 0; i < nt; i++){
+ ret = pthread_create(&th[i], &attr, client3, (void *)client);
+ VERIFY(ret == 0);
+ }
+
+ for(int i = 0; i < nt; i++){
+ VERIFY(pthread_join(th[i], NULL) == 0);
+ }
+ printf("ok\n");
+
+ printf("failure_test OK\n");
+}
+
+int
+main(int argc, char *argv[])
+{
+
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+ int debug_level = 0;
+
+ bool isclient = false;
+ bool isserver = false;
+
+ srandom(getpid());
+ port = 20000 + (getpid() % 10000);
+
+ char ch = 0;
+ while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
+ switch (ch) {
+ case 'c':
+ isclient = true;
+ break;
+ case 's':
+ isserver = true;
+ break;
+ case 'd':
+ debug_level = atoi(optarg);
+ break;
+ case 'p':
+ port = atoi(optarg);
+ break;
+ case 'l':
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ default:
+ break;
+ }
+ }
+
+ if (!isserver && !isclient) {
+ isserver = isclient = true;
+ }
+
+ if (debug_level > 0) {
+ //__loginit.initNow();
+ jsl_set_debug(debug_level);
+ jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
+ }
+
+ testmarshall();
+
+ pthread_attr_init(&attr);
+ // set stack size to 32K, so we don't run out of memory
+ pthread_attr_setstacksize(&attr, 32*1024);
+
+ if (isserver) {
+ printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
+ startserver();
+ }
+
+ if (isclient) {
+ // server's address.
+ memset(&dst, 0, sizeof(dst));
+ dst.sin_family = AF_INET;
+ dst.sin_addr.s_addr = inet_addr("127.0.0.1");
+ dst.sin_port = htons(port);
+
+
+ // start the client. bind it to the server.
+ // starts a thread to listen for replies and hand them to
+ // the correct waiting caller thread. there should probably
+ // be only one rpcc per process. you probably need one
+ // rpcc per server.
+ for (int i = 0; i < NUM_CL; i++) {
+ clients[i] = new rpcc(dst);
+ VERIFY (clients[i]->bind() == 0);
+ }
+
+ simple_tests(clients[0]);
+ concurrent_test(10);
+ lossy_test();
+ if (isserver) {
+ failure_test();
+ }
+
+ printf("rpctest OK\n");
+
+ exit(0);
+ }
+
+ while (1) {
+ sleep(1);
+ }
+}
--- /dev/null
+#ifndef __SCOPED_LOCK__
+#define __SCOPED_LOCK__
+
+#include <pthread.h>
+#include "lang/verify.h"
+struct ScopedLock {
+ private:
+ pthread_mutex_t *m_;
+ public:
+ ScopedLock(pthread_mutex_t *m): m_(m) {
+ VERIFY(pthread_mutex_lock(m_)==0);
+ }
+ ~ScopedLock() {
+ VERIFY(pthread_mutex_unlock(m_)==0);
+ }
+};
+struct ScopedUnlock {
+ private:
+ pthread_mutex_t *m_;
+ public:
+ ScopedUnlock(pthread_mutex_t *m): m_(m) {
+ VERIFY(pthread_mutex_unlock(m_)==0);
+ }
+ ~ScopedUnlock() {
+ VERIFY(pthread_mutex_lock(m_)==0);
+ }
+};
+#endif /*__SCOPED_LOCK__*/
--- /dev/null
+#include "slock.h"
+#include "thr_pool.h"
+#include <stdlib.h>
+#include <errno.h>
+#include "lang/verify.h"
+
+static void *
+do_worker(void *arg)
+{
+ ThrPool *tp = (ThrPool *)arg;
+ while (1) {
+ ThrPool::job_t j;
+ if (!tp->takeJob(&j))
+ break; //die
+
+ (void)(j.f)(j.a);
+ }
+ pthread_exit(NULL);
+}
+
+//if blocking, then addJob() blocks when queue is full
+//otherwise, addJob() simply returns false when queue is full
+ThrPool::ThrPool(int sz, bool blocking)
+: nthreads_(sz),blockadd_(blocking),jobq_(100*sz)
+{
+ pthread_attr_init(&attr_);
+ pthread_attr_setstacksize(&attr_, 128<<10);
+
+ for (int i = 0; i < sz; i++) {
+ pthread_t t;
+ VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0);
+ th_.push_back(t);
+ }
+}
+
+//IMPORTANT: this function can be called only when no external thread
+//will ever use this thread pool again or is currently blocking on it
+ThrPool::~ThrPool()
+{
+ for (int i = 0; i < nthreads_; i++) {
+ job_t j;
+ j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit
+ jobq_.enq(j);
+ }
+
+ for (int i = 0; i < nthreads_; i++) {
+ VERIFY(pthread_join(th_[i], NULL)==0);
+ }
+
+ VERIFY(pthread_attr_destroy(&attr_)==0);
+}
+
+bool
+ThrPool::addJob(void *(*f)(void *), void *a)
+{
+ job_t j;
+ j.f = f;
+ j.a = a;
+
+ return jobq_.enq(j,blockadd_);
+}
+
+bool
+ThrPool::takeJob(job_t *j)
+{
+ jobq_.deq(j);
+ return (j->f!=NULL);
+}
+
--- /dev/null
+#ifndef __THR_POOL__
+#define __THR_POOL__
+
+#include <pthread.h>
+#include <vector>
+
+#include "fifo.h"
+
+class ThrPool {
+
+
+ public:
+ struct job_t {
+ void *(*f)(void *); //function point
+ void *a; //function arguments
+ };
+
+ ThrPool(int sz, bool blocking=true);
+ ~ThrPool();
+ template<class C, class A> bool addObjJob(C *o, void (C::*m)(A), A a);
+ void waitDone();
+
+ bool takeJob(job_t *j);
+
+ private:
+ pthread_attr_t attr_;
+ int nthreads_;
+ bool blockadd_;
+
+
+ fifo<job_t> jobq_;
+ std::vector<pthread_t> th_;
+
+ bool addJob(void *(*f)(void *), void *a);
+};
+
+ template <class C, class A> bool
+ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
+{
+
+ class objfunc_wrapper {
+ public:
+ C *o;
+ void (C::*m)(A a);
+ A a;
+ static void *func(void *vvv) {
+ objfunc_wrapper *x = (objfunc_wrapper*)vvv;
+ C *o = x->o;
+ void (C::*m)(A ) = x->m;
+ A a = x->a;
+ (o->*m)(a);
+ delete x;
+ return 0;
+ }
+ };
+
+ objfunc_wrapper *x = new objfunc_wrapper;
+ x->o = o;
+ x->m = m;
+ x->a = a;
+ return addJob(&objfunc_wrapper::func, (void *)x);
+}
+
+
+#endif
+
--- /dev/null
+//
+// Replicated state machine implementation with a primary and several
+// backups. The primary receives requests, assigns each a view stamp (a
+// vid, and a sequence number) in the order of reception, and forwards
+// them to all backups. A backup executes requests in the order that
+// the primary stamps them and replies with an OK to the primary. The
+// primary executes the request after it receives OKs from all backups,
+// and sends the reply back to the client.
+//
+// The config module will tell the RSM about a new view. If the
+// primary in the previous view is a member of the new view, then it
+// will stay the primary. Otherwise, the smallest numbered node of
+// the previous view will be the new primary. In either case, the new
+// primary will be a node from the previous view. The configuration
+// module constructs the sequence of views for the RSM and the RSM
+// ensures there will be always one primary, who was a member of the
+// last view.
+//
+// When a new node starts, the recovery thread is in charge of joining
+// the RSM. It will collect the internal RSM state from the primary;
+// the primary asks the config module to add the new node and returns
+// to the joining the internal RSM state (e.g., paxos log). Since
+// there is only one primary, all joins happen in well-defined total
+// order.
+//
+// The recovery thread also runs during a view change (e.g, when a node
+// has failed). After a failure some of the backups could have
+// processed a request that the primary has not, but those results are
+// not visible to clients (since the primary responds). If the
+// primary of the previous view is in the current view, then it will
+// be the primary and its state is authoritive: the backups download
+// from the primary the current state. A primary waits until all
+// backups have downloaded the state. Once the RSM is in sync, the
+// primary accepts requests again from clients. If one of the backups
+// is the new primary, then its state is authoritative. In either
+// scenario, the next view uses a node as primary that has the state
+// resulting from processing all acknowledged client requests.
+// Therefore, if the nodes sync up before processing the next request,
+// the next view will have the correct state.
+//
+// While the RSM in a view change (i.e., a node has failed, a new view
+// has been formed, but the sync hasn't completed), another failure
+// could happen, which complicates a view change. During syncing the
+// primary or backups can timeout, and initiate another Paxos round.
+// There are 2 variables that RSM uses to keep track in what state it
+// is:
+// - inviewchange: a node has failed and the RSM is performing a view change
+// - insync: this node is syncing its state
+//
+// If inviewchange is false and a node is the primary, then it can
+// process client requests. If it is true, clients are told to retry
+// later again. While inviewchange is true, the RSM may go through several
+// member list changes, one by one. After a member list
+// change completes, the nodes tries to sync. If the sync complets,
+// the view change completes (and inviewchange is set to false). If
+// the sync fails, the node may start another member list change
+// (inviewchange = true and insync = false).
+//
+// The implementation should be used only with servers that run all
+// requests run to completion; in particular, a request shouldn't
+// block. If a request blocks, the backup won't respond to the
+// primary, and the primary won't execute the request. A request may
+// send an RPC to another host, but the RPC should be a one-way
+// message to that host; the backup shouldn't do anything based on the
+// response or execute after the response, because it is not
+// guaranteed that all backup will receive the same response and
+// execute in the same order.
+//
+// The implementation can be viewed as a layered system:
+// RSM module ---- in charge of replication
+// config module ---- in charge of view management
+// Paxos module ---- in charge of running Paxos to agree on a value
+//
+// Each module has threads and internal locks. Furthermore, a thread
+// may call down through the layers (e.g., to run Paxos's proposer).
+// When Paxos's acceptor accepts a new value for an instance, a thread
+// will invoke an upcall to inform higher layers of the new value.
+// The rule is that a module releases its internal locks before it
+// upcalls, but can keep its locks when calling down.
+
+#include <fstream>
+#include <iostream>
+
+#include "handle.h"
+#include "rsm.h"
+#include "tprintf.h"
+#include "lang/verify.h"
+#include "rsm_client.h"
+
+static void *recoverythread(void *x) {
+ rsm *r = (rsm *) x;
+ r->recovery();
+ return 0;
+}
+
+rsm::rsm(std::string _first, std::string _me) :
+ stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
+ partitioned (false), dopartition(false), break1(false), break2(false)
+{
+ pthread_t th;
+
+ last_myvs.vid = 0;
+ last_myvs.seqno = 0;
+ myvs = last_myvs;
+ myvs.seqno = 1;
+
+ cfg = new config(_first, _me, this);
+
+ if (_first == _me) {
+ // Commit the first view here. We can not have acceptor::acceptor
+ // do the commit, since at that time this->cfg is not initialized
+ commit_change(1);
+ }
+ rsmrpc = cfg->get_rpcs();
+ rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke);
+ rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members);
+ rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke);
+ rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq);
+ rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq);
+ rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq);
+
+ // tester must be on different port, otherwise it may partition itself
+ testsvr = new rpcs(atoi(_me.c_str()) + 1);
+ testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq);
+ testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq);
+
+ {
+ ScopedLock ml(rsm_mutex);
+ VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0);
+ }
+}
+
+void rsm::reg1(int proc, handler *h) {
+ ScopedLock ml(rsm_mutex);
+ procs[proc] = h;
+}
+
+// The recovery thread runs this function
+void rsm::recovery() {
+ bool r = true;
+ ScopedLock ml(rsm_mutex);
+
+ while (1) {
+ while (!cfg->ismember(cfg->myaddr(), vid_commit)) {
+ if (join(primary)) {
+ tprintf("recovery: joined\n");
+ commit_change_wo(cfg->vid());
+ } else {
+ ScopedUnlock su(rsm_mutex);
+ sleep (30); // XXX make another node in cfg primary?
+ }
+ }
+ vid_insync = vid_commit;
+ tprintf("recovery: sync vid_insync %d\n", vid_insync);
+ if (primary == cfg->myaddr()) {
+ r = sync_with_backups();
+ } else {
+ r = sync_with_primary();
+ }
+ tprintf("recovery: sync done\n");
+
+ // If there was a commited viewchange during the synchronization, restart
+ // the recovery
+ if (vid_insync != vid_commit)
+ continue;
+
+ if (r) {
+ myvs.vid = vid_commit;
+ myvs.seqno = 1;
+ inviewchange = false;
+ }
+ tprintf("recovery: go to sleep %d %d\n", insync, inviewchange);
+ recovery_cond.wait(rsm_mutex);
+ }
+}
+
+template <class A>
+std::ostream & operator<<(std::ostream &o, const std::vector<A> &d) {
+ o << "[";
+ for (typename std::vector<A>::const_iterator i=d.begin(); i!=d.end(); i++) {
+ o << *i;
+ if (i+1 != d.end())
+ o << ", ";
+ }
+ o << "]";
+ return o;
+}
+
+bool rsm::sync_with_backups() {
+ {
+ ScopedUnlock su(rsm_mutex);
+ // Make sure that the state of lock_server_cache_rsm is stable during
+ // synchronization; otherwise, the primary's state may be more recent
+ // than replicas after the synchronization.
+ ScopedLock ml(invoke_mutex);
+ // By acquiring and releasing the invoke_mutex once, we make sure that
+ // the state of lock_server_cache_rsm will not be changed until all
+ // replicas are synchronized. The reason is that client_invoke arrives
+ // after this point of time will see inviewchange == true, and returns
+ // BUSY.
+ }
+ // Start accepting synchronization request (statetransferreq) now!
+ insync = true;
+ backups = std::vector<std::string>(cfg->get_view(vid_insync));
+ backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
+ LOG("rsm::sync_with_backups " << backups);
+ sync_cond.wait(rsm_mutex);
+ insync = false;
+ return true;
+}
+
+
+bool rsm::sync_with_primary() {
+ // Remember the primary of vid_insync
+ std::string m = primary;
+ while (vid_insync == vid_commit) {
+ if (statetransfer(m))
+ break;
+ }
+ return statetransferdone(m);
+}
+
+
+/**
+ * Call to transfer state from m to the local node.
+ * Assumes that rsm_mutex is already held.
+ */
+bool rsm::statetransfer(std::string m)
+{
+ rsm_protocol::transferres r;
+ handle h(m);
+ int ret;
+ tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n",
+ m.c_str(), last_myvs.vid, last_myvs.seqno);
+ rpcc *cl;
+ {
+ ScopedUnlock su(rsm_mutex);
+ cl = h.safebind();
+ if (cl) {
+ ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(),
+ last_myvs, vid_insync, r, rpcc::to(1000));
+ }
+ }
+ if (cl == 0 || ret != rsm_protocol::OK) {
+ tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(),
+ (long unsigned) cl, ret);
+ return false;
+ }
+ if (stf && last_myvs != r.last) {
+ stf->unmarshal_state(r.state);
+ }
+ last_myvs = r.last;
+ tprintf("rsm::statetransfer transfer from %s success, vs(%d,%d)\n",
+ m.c_str(), last_myvs.vid, last_myvs.seqno);
+ return true;
+}
+
+bool rsm::statetransferdone(std::string m) {
+ ScopedUnlock su(rsm_mutex);
+ handle h(m);
+ rpcc *cl = h.safebind();
+ if (!cl)
+ return false;
+ int r;
+ rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r);
+ if (ret != rsm_protocol::OK)
+ return false;
+ return true;
+}
+
+
+bool rsm::join(std::string m) {
+ handle h(m);
+ int ret;
+ rsm_protocol::joinres r;
+
+ tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid,
+ last_myvs.seqno);
+ rpcc *cl;
+ {
+ ScopedUnlock su(rsm_mutex);
+ cl = h.safebind();
+ if (cl != 0) {
+ ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs,
+ r, rpcc::to(120000));
+ }
+ }
+
+ if (cl == 0 || ret != rsm_protocol::OK) {
+ tprintf("rsm::join: couldn't reach %s %p %d\n", m.c_str(),
+ cl, ret);
+ return false;
+ }
+ tprintf("rsm::join: succeeded %s\n", r.log.c_str());
+ cfg->restore(r.log);
+ return true;
+}
+
+/*
+ * Config informs rsm whenever it has successfully
+ * completed a view change
+ */
+void rsm::commit_change(unsigned vid) {
+ ScopedLock ml(rsm_mutex);
+ commit_change_wo(vid);
+ if (cfg->ismember(cfg->myaddr(), vid_commit))
+ breakpoint2();
+}
+
+void rsm::commit_change_wo(unsigned vid) {
+ if (vid <= vid_commit)
+ return;
+ tprintf("commit_change: new view (%d) last vs (%d,%d) %s insync %d\n",
+ vid, last_myvs.vid, last_myvs.seqno, primary.c_str(), insync);
+ vid_commit = vid;
+ inviewchange = true;
+ set_primary(vid);
+ recovery_cond.signal();
+ sync_cond.signal();
+ if (cfg->ismember(cfg->myaddr(), vid_commit))
+ breakpoint2();
+}
+
+
+void rsm::execute(int procno, std::string req, std::string &r) {
+ tprintf("execute\n");
+ handler *h = procs[procno];
+ VERIFY(h);
+ unmarshall args(req);
+ marshall rep;
+ std::string reps;
+ rsm_protocol::status ret = h->fn(args, rep);
+ marshall rep1;
+ rep1 << ret;
+ rep1 << rep.str();
+ r = rep1.str();
+}
+
+//
+// Clients call client_invoke to invoke a procedure on the replicated state
+// machine: the primary receives the request, assigns it a sequence
+// number, and invokes it on all members of the replicated state
+// machine.
+//
+rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) {
+ LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
+ ScopedLock ml(invoke_mutex);
+ std::vector<std::string> m;
+ std::string myaddr;
+ viewstamp vs;
+ {
+ ScopedLock ml(rsm_mutex);
+ LOG("Checking for inviewchange");
+ if (inviewchange)
+ return rsm_client_protocol::BUSY;
+ LOG("Checking for primacy");
+ myaddr = cfg->myaddr();
+ if (primary != myaddr)
+ return rsm_client_protocol::NOTPRIMARY;
+ LOG("Assigning a viewstamp");
+ m = cfg->get_view(vid_commit);
+ // assign the RPC the next viewstamp number
+ vs = myvs;
+ myvs++;
+ }
+
+ // send an invoke RPC to all slaves in the current view with a timeout of 1 second
+ LOG("Invoking slaves");
+ for (unsigned i = 0; i < m.size(); i++) {
+ if (m[i] != myaddr) {
+ // if invoke on slave fails, return rsm_client_protocol::BUSY
+ handle h(m[i]);
+ LOG("Sending invoke to " << m[i]);
+ rpcc *cl = h.safebind();
+ if (!cl)
+ return rsm_client_protocol::BUSY;
+ rsm_protocol::status ret;
+ int r;
+ ret = cl->call(rsm_protocol::invoke, procno, vs, req, r, rpcc::to(1000));
+ LOG("Invoke returned " << ret);
+ if (ret != rsm_protocol::OK)
+ return rsm_client_protocol::BUSY;
+ breakpoint1();
+ partition1();
+ }
+ }
+ execute(procno, req, r);
+ last_myvs = vs;
+ return rsm_client_protocol::OK;
+}
+
+//
+// The primary calls the internal invoke at each member of the
+// replicated state machine
+//
+// the replica must execute requests in order (with no gaps)
+// according to requests' seqno
+
+rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) {
+ LOG("rsm::invoke: procno 0x" << std::hex << proc);
+ ScopedLock ml(invoke_mutex);
+ std::vector<std::string> m;
+ std::string myaddr;
+ {
+ ScopedLock ml(rsm_mutex);
+ // check if !inviewchange
+ LOG("Checking for view change");
+ if (inviewchange)
+ return rsm_protocol::ERR;
+ // check if slave
+ LOG("Checking for slave status");
+ myaddr = cfg->myaddr();
+ if (primary == myaddr)
+ return rsm_protocol::ERR;
+ m = cfg->get_view(vid_commit);
+ if (find(m.begin(), m.end(), myaddr) == m.end())
+ return rsm_protocol::ERR;
+ // check sequence number
+ LOG("Checking sequence number");
+ if (vs != myvs)
+ return rsm_protocol::ERR;
+ myvs++;
+ }
+ std::string r;
+ execute(proc, req, r);
+ last_myvs = vs;
+ breakpoint1();
+ return rsm_protocol::OK;
+}
+
+/**
+ * RPC handler: Send back the local node's state to the caller
+ */
+rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid,
+ rsm_protocol::transferres &r) {
+ ScopedLock ml(rsm_mutex);
+ int ret = rsm_protocol::OK;
+ tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
+ last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
+ if (!insync || vid != vid_insync) {
+ return rsm_protocol::BUSY;
+ }
+ if (stf && last != last_myvs)
+ r.state = stf->marshal_state();
+ r.last = last_myvs;
+ return ret;
+}
+
+/**
+ * RPC handler: Inform the local node (the primary) that node m has synchronized
+ * for view vid
+ */
+rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
+ ScopedLock ml(rsm_mutex);
+ if (!insync || vid != vid_insync)
+ return rsm_protocol::BUSY;
+ backups.erase(find(backups.begin(), backups.end(), m));
+ if (backups.empty())
+ sync_cond.signal();
+ return rsm_protocol::OK;
+}
+
+// a node that wants to join an RSM as a server sends a
+// joinreq to the RSM's current primary; this is the
+// handler for that RPC.
+rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) {
+ int ret = rsm_protocol::OK;
+
+ ScopedLock ml(rsm_mutex);
+ tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(),
+ last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
+ if (cfg->ismember(m, vid_commit)) {
+ tprintf("joinreq: is still a member\n");
+ r.log = cfg->dump();
+ } else if (cfg->myaddr() != primary) {
+ tprintf("joinreq: busy\n");
+ ret = rsm_protocol::BUSY;
+ } else {
+ // We cache vid_commit to avoid adding m to a view which already contains
+ // m due to race condition
+ unsigned vid_cache = vid_commit;
+ bool succ;
+ {
+ ScopedUnlock su(rsm_mutex);
+ succ = cfg->add(m, vid_cache);
+ }
+ if (cfg->ismember(m, cfg->vid())) {
+ r.log = cfg->dump();
+ tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str());
+ } else {
+ tprintf("joinreq: failed; proposer couldn't add %d\n", succ);
+ ret = rsm_protocol::BUSY;
+ }
+ }
+ return ret;
+}
+
+/*
+ * RPC handler: Send back all the nodes this local knows about to client
+ * so the client can switch to a different primary
+ * when it existing primary fails
+ */
+rsm_client_protocol::status rsm::client_members(int i, std::vector<std::string> &r) {
+ std::vector<std::string> m;
+ ScopedLock ml(rsm_mutex);
+ m = cfg->get_view(vid_commit);
+ m.push_back(primary);
+ r = m;
+ tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(),
+ primary.c_str());
+ return rsm_client_protocol::OK;
+}
+
+// if primary is member of new view, that node is primary
+// otherwise, the lowest number node of the previous view.
+// caller should hold rsm_mutex
+void rsm::set_primary(unsigned vid) {
+ std::vector<std::string> c = cfg->get_view(vid);
+ std::vector<std::string> p = cfg->get_view(vid - 1);
+ VERIFY (c.size() > 0);
+
+ if (isamember(primary,c)) {
+ tprintf("set_primary: primary stays %s\n", primary.c_str());
+ return;
+ }
+
+ VERIFY(p.size() > 0);
+ for (unsigned i = 0; i < p.size(); i++) {
+ if (isamember(p[i], c)) {
+ primary = p[i];
+ tprintf("set_primary: primary is %s\n", primary.c_str());
+ return;
+ }
+ }
+ VERIFY(0);
+}
+
+bool rsm::amiprimary() {
+ ScopedLock ml(rsm_mutex);
+ return primary == cfg->myaddr() && !inviewchange;
+}
+
+
+// Testing server
+
+// Simulate partitions
+
+// assumes caller holds rsm_mutex
+void rsm::net_repair_wo(bool heal) {
+ std::vector<std::string> m;
+ m = cfg->get_view(vid_commit);
+ for (unsigned i = 0; i < m.size(); i++) {
+ if (m[i] != cfg->myaddr()) {
+ handle h(m[i]);
+ tprintf("rsm::net_repair_wo: %s %d\n", m[i].c_str(), heal);
+ if (h.safebind()) h.safebind()->set_reachable(heal);
+ }
+ }
+ rsmrpc->set_reachable(heal);
+}
+
+rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) {
+ ScopedLock ml(rsm_mutex);
+ tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
+ heal, dopartition, partitioned);
+ if (heal) {
+ net_repair_wo(heal);
+ partitioned = false;
+ } else {
+ dopartition = true;
+ partitioned = false;
+ }
+ r = rsm_test_protocol::OK;
+ return r;
+}
+
+// simulate failure at breakpoint 1 and 2
+
+void rsm::breakpoint1() {
+ if (break1) {
+ tprintf("Dying at breakpoint 1 in rsm!\n");
+ exit(1);
+ }
+}
+
+void rsm::breakpoint2() {
+ if (break2) {
+ tprintf("Dying at breakpoint 2 in rsm!\n");
+ exit(1);
+ }
+}
+
+void rsm::partition1() {
+ if (dopartition) {
+ net_repair_wo(false);
+ dopartition = false;
+ partitioned = true;
+ }
+}
+
+rsm_test_protocol::status rsm::breakpointreq(int b, int &r) {
+ r = rsm_test_protocol::OK;
+ ScopedLock ml(rsm_mutex);
+ tprintf("rsm::breakpointreq: %d\n", b);
+ if (b == 1) break1 = true;
+ else if (b == 2) break2 = true;
+ else if (b == 3 || b == 4) cfg->breakpoint(b);
+ else r = rsm_test_protocol::ERR;
+ return r;
+}
--- /dev/null
+// replicated state machine interface.
+
+#ifndef rsm_h
+#define rsm_h
+
+#include <string>
+#include <vector>
+#include "rsm_protocol.h"
+#include "rsm_state_transfer.h"
+#include "rpc.h"
+#include <arpa/inet.h>
+#include "config.h"
+
+
+class rsm : public config_view_change {
+ private:
+ void reg1(int proc, handler *);
+ protected:
+ std::map<int, handler *> procs;
+ config *cfg;
+ class rsm_state_transfer *stf;
+ rpcs *rsmrpc;
+ // On slave: expected viewstamp of next invoke request
+ // On primary: viewstamp for the next request from rsm_client
+ viewstamp myvs;
+ viewstamp last_myvs; // Viewstamp of the last executed request
+ std::string primary;
+ bool insync;
+ bool inviewchange;
+ unsigned vid_commit; // Latest view id that is known to rsm layer
+ unsigned vid_insync; // The view id that this node is synchronizing for
+ std::vector<std::string> backups; // A list of unsynchronized backups
+
+ // For testing purposes
+ rpcs *testsvr;
+ bool partitioned;
+ bool dopartition;
+ bool break1;
+ bool break2;
+
+
+ rsm_client_protocol::status client_members(int i,
+ std::vector<std::string> &r);
+ rsm_protocol::status invoke(int proc, viewstamp vs, std::string mreq,
+ int &dummy);
+ rsm_protocol::status transferreq(std::string src, viewstamp last, unsigned vid,
+ rsm_protocol::transferres &r);
+ rsm_protocol::status transferdonereq(std::string m, unsigned vid, int &);
+ rsm_protocol::status joinreq(std::string src, viewstamp last,
+ rsm_protocol::joinres &r);
+ rsm_test_protocol::status test_net_repairreq(int heal, int &r);
+ rsm_test_protocol::status breakpointreq(int b, int &r);
+
+ mutex rsm_mutex;
+ mutex invoke_mutex;
+ cond recovery_cond;
+ cond sync_cond;
+
+ void execute(int procno, std::string req, std::string &r);
+ rsm_client_protocol::status client_invoke(int procno, std::string req,
+ std::string &r);
+ bool statetransfer(std::string m);
+ bool statetransferdone(std::string m);
+ bool join(std::string m);
+ void set_primary(unsigned vid);
+ std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid);
+ bool sync_with_backups();
+ bool sync_with_primary();
+ void net_repair_wo(bool heal);
+ void breakpoint1();
+ void breakpoint2();
+ void partition1();
+ void commit_change_wo(unsigned vid);
+ public:
+ rsm (std::string _first, std::string _me);
+ ~rsm() {};
+
+ bool amiprimary();
+ void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; };
+ void recovery();
+ void commit_change(unsigned vid);
+
+ template<class S, class A1, class R>
+ void reg(int proc, S*, int (S::*meth)(const A1 a1, R &));
+ template<class S, class A1, class A2, class R>
+ void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, R &));
+ template<class S, class A1, class A2, class A3, class R>
+ void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, R &));
+ template<class S, class A1, class A2, class A3, class A4, class R>
+ void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4, R &));
+ template<class S, class A1, class A2, class A3, class A4, class A5, class R>
+ void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4, const A5 a5, R &));
+};
+
+template<class S, class A1, class R>
+void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) {
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ R r;
+ args >> a1;
+ VERIFY(args.okdone());
+ int b = (sob->*meth)(a1,r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class R>
+void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, R & r)) {
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ R r;
+ args >> a1;
+ args >> a2;
+ VERIFY(args.okdone());
+ int b = (sob->*meth)(a1,a2,r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class R>
+void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, R & r)) {
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ VERIFY(args.okdone());
+ int b = (sob->*meth)(a1,a2,a3,r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+template<class S, class A1, class A2, class A3, class A4, class R>
+void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4, R & r)) {
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
+ const A4 a4, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ A4 a4;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ args >> a4;
+ VERIFY(args.okdone());
+ int b = (sob->*meth)(a1,a2,a3,a4,r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+
+template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
+ rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
+ const A3 a3, const A4 a4,
+ const A5 a5, R & r))
+{
+ class h1 : public handler {
+ private:
+ S * sob;
+ int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
+ const A5 a5, R & r);
+ public:
+ h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
+ const A4 a4, const A5 a5, R & r))
+ : sob(xsob), meth(xmeth) { }
+ int fn(unmarshall &args, marshall &ret) {
+ A1 a1;
+ A2 a2;
+ A3 a3;
+ A4 a4;
+ A5 a5;
+ R r;
+ args >> a1;
+ args >> a2;
+ args >> a3;
+ args >> a4;
+ VERIFY(args.okdone());
+ int b = (sob->*meth)(a1,a2,a3,a4,a5,r);
+ ret << r;
+ return b;
+ }
+ };
+ reg1(proc, new h1(sob, meth));
+}
+
+#endif /* rsm_h */
--- /dev/null
+#include "rsm_client.h"
+#include <vector>
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <handle.h>
+#include "lang/verify.h"
+
+
+rsm_client::rsm_client(std::string dst) {
+ printf("create rsm_client\n");
+ std::vector<std::string> mems;
+
+ pthread_mutex_init(&rsm_client_mutex, NULL);
+ sockaddr_in dstsock;
+ make_sockaddr(dst.c_str(), &dstsock);
+ primary = dst;
+
+ {
+ ScopedLock ml(&rsm_client_mutex);
+ VERIFY (init_members());
+ }
+ printf("rsm_client: done\n");
+}
+
+// Assumes caller holds rsm_client_mutex
+void rsm_client::primary_failure() {
+ primary = known_mems.back();
+ known_mems.pop_back();
+}
+
+rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
+ int ret;
+ ScopedLock ml(&rsm_client_mutex);
+ while (1) {
+ printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
+ handle h(primary);
+
+ VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
+ rpcc *cl = h.safebind();
+ if (cl)
+ ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
+ VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
+
+ if (!cl)
+ goto prim_fail;
+
+ printf("rsm_client::invoke proc %x primary %s ret %d\n", proc,
+ primary.c_str(), ret);
+ if (ret == rsm_client_protocol::OK)
+ break;
+ if (ret == rsm_client_protocol::BUSY) {
+ printf("rsm is busy %s\n", primary.c_str());
+ sleep(3);
+ continue;
+ }
+ if (ret == rsm_client_protocol::NOTPRIMARY) {
+ printf("primary %s isn't the primary--let's get a complete list of mems\n",
+ primary.c_str());
+ if (init_members())
+ continue;
+ }
+prim_fail:
+ printf("primary %s failed ret %d\n", primary.c_str(), ret);
+ primary_failure();
+ printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str());
+ }
+ return ret;
+}
+
+bool rsm_client::init_members() {
+ printf("rsm_client::init_members get members!\n");
+ handle h(primary);
+ VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
+ int ret;
+ rpcc *cl = h.safebind();
+ if (cl) {
+ ret = cl->call(rsm_client_protocol::members, 0, known_mems,
+ rpcc::to(1000));
+ }
+ VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
+ if (cl == 0 || ret != rsm_protocol::OK)
+ return false;
+ if (known_mems.size() < 1) {
+ printf("rsm_client::init_members do not know any members!\n");
+ VERIFY(0);
+ }
+
+ primary = known_mems.back();
+ known_mems.pop_back();
+
+ printf("rsm_client::init_members: primary %s\n", primary.c_str());
+
+ return true;
+}
--- /dev/null
+#ifndef rsm_client_h
+#define rsm_client_h
+
+#include "rpc.h"
+#include "rsm_protocol.h"
+#include <string>
+#include <vector>
+
+
+//
+// rsm client interface.
+//
+// The client stubs package up an rpc, and then call the invoke procedure
+// on the replicated state machine passing the RPC as an argument. This way
+// the replicated state machine isn't service specific; any server can use it.
+//
+
+class rsm_client {
+ protected:
+ std::string primary;
+ std::vector<std::string> known_mems;
+ pthread_mutex_t rsm_client_mutex;
+ void primary_failure();
+ bool init_members();
+ public:
+ rsm_client(std::string dst);
+ rsm_protocol::status invoke(int proc, std::string req, std::string &rep);
+
+ template<class R, class A1>
+ int call(unsigned int proc, const A1 & a1, R &r);
+
+ template<class R, class A1, class A2>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, R &r);
+
+ template<class R, class A1, class A2, class A3>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ R &r);
+
+ template<class R, class A1, class A2, class A3, class A4>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ const A4 & a4, R &r);
+
+ template<class R, class A1, class A2, class A3, class A4, class A5>
+ int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
+ const A4 & a4, const A5 & a5, R &r);
+ private:
+ template<class R> int call_m(unsigned int proc, marshall &req, R &r);
+};
+
+template<class R>
+int rsm_client::call_m(unsigned int proc, marshall &req, R &r) {
+ std::string rep;
+ std::string res;
+ int intret = invoke(proc, req.str(), rep);
+ VERIFY( intret == rsm_client_protocol::OK );
+ unmarshall u(rep);
+ u >> intret;
+ if (intret < 0) return intret;
+ u >> res;
+ if (!u.okdone()) {
+ fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
+ "You probably forgot to set the reply string in "
+ "rsm::client_invoke, or you may call RPC 0x%x with wrong return "
+ "type\n", proc);
+ VERIFY(0);
+ return rpc_const::unmarshal_reply_failure;
+ }
+ unmarshall u1(res);
+ u1 >> r;
+ if(!u1.okdone()) {
+ fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n"
+ "You are probably calling RPC 0x%x with wrong return "
+ "type.\n", proc);
+ VERIFY(0);
+ return rpc_const::unmarshal_reply_failure;
+ }
+ return intret;
+}
+
+template<class R, class A1>
+int rsm_client::call(unsigned int proc, const A1 & a1, R & r) {
+ marshall m;
+ m << a1;
+ return call_m(proc, m, r);
+}
+
+template<class R, class A1, class A2>
+int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, R & r) {
+ marshall m;
+ m << a1;
+ m << a2;
+ return call_m(proc, m, r);
+}
+
+template<class R, class A1, class A2, class A3>
+int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, R & r) {
+ marshall m;
+ std::string rep;
+ std::string res;
+ m << a1;
+ m << a2;
+ m << a3;
+ return call_m(proc, m, r);
+}
+
+template<class R, class A1, class A2, class A3, class A4>
+int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, R & r) {
+ marshall m;
+ std::string rep;
+ std::string res;
+ m << a1;
+ m << a2;
+ m << a3;
+ m << a4;
+ return call_m(proc, m, r);
+}
+
+template<class R, class A1, class A2, class A3, class A4, class A5>
+int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, const A5 & a5, R & r) {
+ marshall m;
+ std::string rep;
+ std::string res;
+ m << a1;
+ m << a2;
+ m << a3;
+ m << a4;
+ m << a5;
+ return call_m(proc, m, r);
+}
+
+#endif
--- /dev/null
+#ifndef rsm_protocol_h
+#define rsm_protocol_h
+
+#include "rpc.h"
+
+
+class rsm_client_protocol {
+ public:
+ enum xxstatus { OK, ERR, NOTPRIMARY, BUSY};
+ typedef int status;
+ enum rpc_numbers {
+ invoke = 0x9001,
+ members,
+ };
+};
+
+
+struct viewstamp {
+ viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) {
+ vid = _vid;
+ seqno = _seqno;
+ };
+ unsigned int vid;
+ unsigned int seqno;
+ inline void operator++(int) {
+ seqno++;
+ };
+};
+
+class rsm_protocol {
+ public:
+ enum xxstatus { OK, ERR, BUSY};
+ typedef int status;
+ enum rpc_numbers {
+ invoke = 0x10001,
+ transferreq,
+ transferdonereq,
+ joinreq,
+ };
+
+ struct transferres {
+ std::string state;
+ viewstamp last;
+ };
+
+ struct joinres {
+ std::string log;
+ };
+};
+
+inline bool operator==(viewstamp a, viewstamp b) {
+ return a.vid == b.vid && a.seqno == b.seqno;
+}
+
+inline bool operator>(viewstamp a, viewstamp b) {
+ return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno);
+}
+
+inline bool operator!=(viewstamp a, viewstamp b) {
+ return a.vid != b.vid || a.seqno != b.seqno;
+}
+
+inline marshall& operator<<(marshall &m, viewstamp v)
+{
+ m << v.vid;
+ m << v.seqno;
+ return m;
+}
+
+inline unmarshall& operator>>(unmarshall &u, viewstamp &v) {
+ u >> v.vid;
+ u >> v.seqno;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, rsm_protocol::transferres r)
+{
+ m << r.state;
+ m << r.last;
+ return m;
+}
+
+inline unmarshall &
+operator>>(unmarshall &u, rsm_protocol::transferres &r)
+{
+ u >> r.state;
+ u >> r.last;
+ return u;
+}
+
+inline marshall &
+operator<<(marshall &m, rsm_protocol::joinres r)
+{
+ m << r.log;
+ return m;
+}
+
+inline unmarshall &
+operator>>(unmarshall &u, rsm_protocol::joinres &r)
+{
+ u >> r.log;
+ return u;
+}
+
+class rsm_test_protocol {
+ public:
+ enum xxstatus { OK, ERR};
+ typedef int status;
+ enum rpc_numbers {
+ net_repair = 0x12001,
+ breakpoint = 0x12002,
+ };
+};
+
+#endif
--- /dev/null
+#ifndef rsm_state_transfer_h
+#define rsm_state_transfer_h
+
+class rsm_state_transfer {
+ public:
+ virtual std::string marshal_state() = 0;
+ virtual void unmarshal_state(std::string) = 0;
+ virtual ~rsm_state_transfer() {};
+};
+
+#endif
--- /dev/null
+//
+// RSM test client
+//
+
+#include "rsm_protocol.h"
+#include "rsmtest_client.h"
+#include "rpc.h"
+#include <arpa/inet.h>
+#include <vector>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string>
+using namespace std;
+
+rsmtest_client *lc;
+
+int
+main(int argc, char *argv[])
+{
+ int r;
+
+ if(argc != 4){
+ fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
+ exit(1);
+ }
+
+ lc = new rsmtest_client(argv[1]);
+ string command(argv[2]);
+ if (command == "partition") {
+ r = lc->net_repair(atoi(argv[3]));
+ printf ("net_repair returned %d\n", r);
+ } else if (command == "breakpoint") {
+ int b = atoi(argv[3]);
+ r = lc->breakpoint(b);
+ printf ("breakpoint %d returned %d\n", b, r);
+ } else {
+ fprintf(stderr, "Unknown command %s\n", argv[2]);
+ }
+ exit(0);
+}
--- /dev/null
+#!/usr/bin/perl -w
+
+use POSIX ":sys_wait_h";
+use Getopt::Std;
+use strict;
+
+
+my @pid;
+my @logs = ();
+my @views = (); #expected views
+my %in_views; #the number of views a node is expected to be present
+my @p;
+my $t;
+my $always_kill = 0;
+
+use sigtrap 'handler' => \&killprocess, 'HUP', 'INT', 'ABRT', 'QUIT', 'TERM';
+
+sub paxos_log {
+ my $port = shift;
+ return "paxos-$port.log";
+}
+
+sub mydie {
+ my ($s) = @_;
+ killprocess() if ($always_kill);
+ die $s;
+}
+
+sub killprocess {
+ print "killprocess: forcestop all spawned processes...@pid \n";
+ kill 9, @pid;
+}
+
+sub cleanup {
+ kill 9, @pid;
+ unlink(@logs);
+ sleep 2;
+}
+
+sub spawn {
+ my ($p, @a) = @_;
+ my $aa = join("-", @a);
+ if (my $pid = fork) {
+# parent
+ push( @logs, "$p-$aa.log" );
+ if( $p =~ /config_server/ ) {
+ push( @logs, paxos_log($a[1]) );
+ }
+ if( $p =~ /lock_server/ ) {
+ push( @logs, paxos_log($a[1]) );
+ }
+ return $pid;
+ } elsif (defined $pid) {
+# child
+ open(STDOUT, ">>$p-$aa.log")
+ or mydie "Couln't redirect stout\n";
+ open(STDERR, ">&STDOUT")
+ or mydie "Couln't redirect stderr\n";
+ $| = 1;
+ print "$p @a\n";
+ exec "$p @a"
+ or mydie "Cannot start new $p @a $!\n";
+ } else {
+ mydie "Cannot fork: $!\n";
+ }
+}
+
+sub randports {
+
+ my $num = shift;
+ my @p = ();
+ for( my $i = 0; $i < $num; $i++ ) {
+ push( @p, int(rand(54000))+10000 );
+ }
+ my @sp = sort { $a <=> $b } @p;
+ return @sp;
+}
+
+sub print_config {
+ my @ports = @_;
+ open( CONFIG, ">config" ) or mydie( "Couldn't open config for writing" );
+ foreach my $p (@ports) {
+ printf CONFIG "%05d\n", $p;
+ }
+ close( CONFIG );
+}
+
+sub spawn_ls {
+ my $master = shift;
+ my $port = shift;
+ return spawn( "./lock_server", $master, $port );
+}
+
+sub spawn_config {
+ my $master = shift;
+ my $port = shift;
+ return spawn( "./config_server", $master, $port );
+}
+
+sub check_views {
+
+ my $l = shift;
+ my $v = shift;
+ my $last_v = shift;
+
+ open( LOG, "<$l" )
+ or mydie( "Failed: couldn't read $l" );
+ my @log = <LOG>;
+ close(LOG);
+
+ my @vs = @{$v};
+
+ my $i = 0;
+ my @last_view;
+ foreach my $line (@log) {
+ if( $line =~ /^done (\d+) ([\d\s]+)$/ ) {
+
+ my $num = $1;
+ my @view = split( /\s+/, $2 );
+ @last_view = @view;
+
+ if( $i > $#vs ) {
+# let there be extra views
+ next;
+ }
+
+ my $e = $vs[$i];
+ my @expected = @{$e};
+
+ if( @expected != @view ) {
+ mydie( "Failed: In log $l at view $num is (@view), but expected $i (@expected)" );
+ }
+
+ $i++;
+ }
+ }
+
+ if( $i <= $#vs ) {
+ mydie( "Failed: In log $l, not enough views seen!" );
+ }
+
+ if( defined $last_v ) {
+ my @last_exp_v = @{$last_v};
+ if( @last_exp_v != @last_view ) {
+ mydie( "Failed: In log $l last view didn't match, got view @last_view, but expected @last_exp_v" );
+ }
+ }
+
+}
+
+sub get_num_views {
+
+ my $log = shift;
+ my $including = shift;
+ my $nv = `grep "done " $log | grep "$including" | wc -l`;
+ chomp $nv;
+ return $nv;
+
+}
+
+sub wait_for_view_change {
+
+ my $log = shift;
+ my $num_views = shift;
+ my $including = shift;
+ my $timeout = shift;
+
+ my $start = time();
+ while( (get_num_views( $log, $including ) < $num_views) and
+ ($start + $timeout > time()) ) {
+ my $lastv = `grep done $log | tail -n 1`;
+ chomp $lastv;
+ print " Waiting for $including to be present in >=$num_views views in $log (Last view: $lastv)\n";
+ sleep 1;
+ }
+
+ if( get_num_views( $log, $including ) < $num_views) {
+ mydie( "Failed: Timed out waiting for $including to be in >=$num_views in log $log" );
+ }else{
+ print " Done: $including is in >=$num_views views in $log\n";
+ }
+}
+
+sub waitpid_to {
+ my $pid = shift;
+ my $to = shift;
+
+ my $start = time();
+ my $done_pid;
+ do {
+ sleep 1;
+ $done_pid = waitpid($pid, POSIX::WNOHANG);
+ } while( $done_pid <= 0 and (time() - $start) < $to );
+
+ if( $done_pid <= 0 ) {
+ kill 9,$pid;
+ mydie( "Failed: Timed out waiting for process $pid\n" );
+ } else {
+ return 1;
+ }
+
+}
+
+sub wait_and_check_expected_view($) {
+ my $v = shift;
+ push @views, $v;
+ for (my $i = 0; $i <=$#$v; $i++) {
+ $in_views{$v->[$i]}++;
+ }
+ foreach my $port (@$v) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}, $port, 20);
+ }
+ foreach my $port (@$v) {
+ my $log = paxos_log($port);
+ check_views( $log, \@views );
+ }
+}
+
+sub start_nodes ($$){
+
+ @pid = ();
+ @logs = ();
+ @views = ();
+ for (my $i = 0; $i <= $#p; $i++) {
+ $in_views{$p[$i]} = 0;
+ }
+
+ my $n = shift;
+ my $command = shift;
+
+ for (my $i = 0; $i < $n; $i++) {
+ if ($command eq "ls") {
+ @pid = (@pid, spawn_ls($p[0],$p[$i]));
+ print "Start lock_server on $p[$i]\n";
+ }elsif ($command eq "config_server"){
+ @pid = (@pid, spawn_config($p[0],$p[$i]));
+ print "Start config on $p[$i]\n";
+ }
+ sleep 1;
+
+ my @vv = @p[0..$i];
+ wait_and_check_expected_view(\@vv);
+ }
+
+}
+
+my %options;
+getopts("s:k",\%options);
+if (defined($options{s})) {
+ srand($options{s});
+}
+if (defined($options{k})) {
+ $always_kill = 1;
+}
+
+#get a sorted list of random ports
+@p = randports(5);
+print_config( @p[0..4] );
+
+my @do_run = ();
+my $NUM_TESTS = 17;
+
+# see which tests are set
+if( $#ARGV > -1 ) {
+ foreach my $t (@ARGV) {
+ if( $t < $NUM_TESTS && $t >= 0 ) {
+ $do_run[$t] = 1;
+ }
+ }
+} else {
+# turn on all tests
+ for( my $i = 0; $i < $NUM_TESTS; $i++ ) {
+ $do_run[$i] = 1;
+ }
+}
+
+if ($do_run[0]) {
+ print "test0: start 3-process lock server\n";
+ start_nodes(3,"ls");
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[1]) {
+ print "test1: start 3-process lock server, kill third server\n";
+ start_nodes(3,"ls");
+
+ print "Kill third server (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 5;
+
+ # it should go through 4 views
+ my @v4 = ($p[0], $p[1]);
+ wait_and_check_expected_view(\@v4);
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[2]) {
+ print "test2: start 3-process lock server, kill first server\n";
+ start_nodes(3,"ls");
+
+ print "Kill first (PID: $pid[0]) on port $p[0]\n";
+ kill "TERM", $pid[0];
+
+ sleep 5;
+
+ # it should go through 4 views
+ my @v4 = ($p[1], $p[2]);
+ wait_and_check_expected_view(\@v4);
+
+ cleanup();
+ sleep 2;
+}
+
+
+if ($do_run[3]) {
+
+ print "test3: start 3-process lock_server, kill a server, restart a server\n";
+ start_nodes(3,"ls");
+
+ print "Kill server (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 5;
+
+ my @v4 = ($p[0], $p[1]);
+ wait_and_check_expected_view(\@v4);
+
+ print "Restart killed server on port $p[2]\n";
+ $pid[2] = spawn_ls ($p[0], $p[2]);
+
+ sleep 5;
+
+ my @v5 = ($p[0], $p[1], $p[2]);
+ wait_and_check_expected_view(\@v5);
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[4]) {
+ print "test4: 3-process lock_server, kill third server, kill second server, restart third server, kill third server again, restart second server, re-restart third server, check logs\n";
+ start_nodes(3,"ls");
+
+ print "Kill server (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 5;
+ my @v4 = ($p[0], $p[1]);
+ wait_and_check_expected_view(\@v4);
+
+ print "Kill server (PID: $pid[1]) on port $p[1]\n";
+ kill "TERM", $pid[1];
+
+ sleep 5;
+ #no view change can happen because of a lack of majority
+
+ print "Restarting server on port $p[2]\n";
+ $pid[2] = spawn_ls($p[0], $p[2]);
+
+ sleep 5;
+
+ #no view change can happen because of a lack of majority
+ foreach my $port (@p[0..2]) {
+ my $num_v = get_num_views(paxos_log($port), $port);
+ die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port});
+ }
+
+ # kill node 3 again,
+ print "Kill server (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 5;
+
+
+ print "Restarting server on port $p[1]\n";
+ $pid[1] = spawn_ls($p[0], $p[1]);
+
+ sleep 7;
+
+ foreach my $port (@p[0..1]) {
+ $in_views{$port} = get_num_views( paxos_log($port), $port );
+ print " Node $port is present in ", $in_views{$port}, " views in ", paxos_log($port), "\n";
+ }
+
+ print "Restarting server on port $p[2]\n";
+ $pid[2] = spawn_ls($p[0], $p[2]);
+
+ my @lastv = ($p[0],$p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+# now check the paxos logs and make sure the logs go through the right
+# views
+
+ foreach my $port (@lastv) {
+ check_views( paxos_log($port), \@views, \@lastv);
+ }
+
+ cleanup();
+
+}
+
+if ($do_run[5]) {
+ print "test5: 3-process lock_server, send signal 1 to first server, kill third server, restart third server, check logs\n";
+ start_nodes(3,"ls");
+
+ print "Sending paxos breakpoint 1 to first server on port $p[0]\n";
+ spawn("./rsm_tester", $p[0]+1, "breakpoint", 3);
+
+ sleep 1;
+
+ print "Kill third server (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 5;
+ foreach my $port (@p[0..2]) {
+ my $num_v = get_num_views( paxos_log($port), $port );
+ die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port});
+ }
+
+ print "Restarting third server on port $p[2]\n";
+ $pid[2]= spawn_ls($p[0], $p[2]);
+ my @lastv = ($p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+ sleep 10;
+
+# now check the paxos logs and make sure the logs go through the right
+# views
+
+ foreach my $port (@lastv) {
+ check_views( paxos_log($port), \@views, \@lastv);
+ }
+
+ cleanup();
+
+}
+
+if ($do_run[6]) {
+ print "test6: 4-process lock_server, send signal 2 to first server, kill fourth server, restart fourth server, check logs\n";
+ start_nodes(4,"ls");
+ print "Sending paxos breakpoint 2 to first server on port $p[0]\n";
+ spawn("./rsm_tester", $p[0]+1, "breakpoint", 4);
+
+ sleep 1;
+
+ print "Kill fourth server (PID: $pid[3]) on port $p[3]\n";
+ kill "TERM", $pid[3];
+
+ sleep 5;
+
+ foreach my $port ($p[1],$p[2]) {
+ my $num_v = get_num_views( paxos_log($port), $port );
+ die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port});
+ }
+
+ sleep 5;
+
+ print "Restarting fourth server on port $p[3]\n";
+ $pid[3] = spawn_ls($p[1], $p[3]);
+
+ sleep 5;
+
+ my @v5 = ($p[0],$p[1],$p[2]);
+ foreach my $port (@v5) {
+ $in_views{$port}++;
+ }
+ push @views, \@v5;
+
+ sleep 10;
+
+ # the 6th view will be (2,3) or (1,2,3,4)
+ my @v6 = ($p[1],$p[2]);
+ foreach my $port (@v6) {
+ $in_views{$port}++;
+ }
+ foreach my $port (@v6) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 30);
+ }
+
+ # final will be (2,3,4)
+ my @lastv = ($p[1],$p[2],$p[3]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+ foreach my $port (@lastv) {
+ check_views( paxos_log($port), \@views, \@lastv );
+ }
+ cleanup();
+
+}
+
+if ($do_run[7]) {
+ print "test7: 4-process lock_server, send signal 2 to first server, kill fourth server, kill other servers, restart other servers, restart fourth server, check logs\n";
+ start_nodes(4,"ls");
+ print "Sending paxos breakpoint 2 to first server on port $p[0]\n";
+ spawn("./rsm_tester", $p[0]+1, "breakpoint", 4);
+ sleep 3;
+
+ print "Kill fourth server (PID: $pid[3]) on port $p[3]\n";
+ kill "TERM", $pid[3];
+
+ sleep 5;
+
+ print "Kill third server (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ print "Kill second server (PID: $pid[1]) on port $p[1]\n";
+ kill "TERM", $pid[1];
+
+ sleep 5;
+
+ print "Restarting second server on port $p[1]\n";
+ $pid[1] = spawn_ls($p[0], $p[1]);
+
+ sleep 5;
+
+ print "Restarting third server on port $p[2]\n";
+ $pid[2] = spawn_ls($p[0], $p[2]);
+
+ sleep 5;
+
+#no view change is possible by now because there is no majority
+ foreach my $port ($p[1],$p[2]) {
+ my $num_v = get_num_views( paxos_log($port), $port );
+ die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port});
+ }
+
+ print "Restarting fourth server on port $p[3]\n";
+ $pid[3] = spawn_ls($p[1], $p[3]);
+
+ sleep 5;
+
+ my @v5 = ($p[0], $p[1], $p[2]);
+ push @views, \@v5;
+ foreach my $port (@v5) {
+ $in_views{$port}++;
+ }
+
+ sleep 15;
+ my @lastv = ($p[1],$p[2],$p[3]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ foreach my $port (@lastv) {
+ check_views( paxos_log($port), \@views, \@lastv);
+ }
+
+ cleanup();
+
+}
+
+if ($do_run[8]) {
+ print "test8: start 3-process lock service\n";
+ start_nodes(3,"ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 8" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[9]) {
+
+ print "test9: start 3-process rsm, kill second slave while lock_tester is running\n";
+ start_nodes(3,"ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep int(rand(10)+1);
+
+ print "Kill slave (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 3;
+
+ # it should go through 4 views
+ my @v4 = ($p[0], $p[1]);
+ wait_and_check_expected_view(\@v4);
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 9" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[10]) {
+
+ print "test10: start 3-process rsm, kill second slave and restarts it later while lock_tester is running\n";
+ start_nodes(3,"ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep int(rand(10)+1);
+
+ print "Kill slave (PID: $pid[2]) on port $p[2]\n";
+ kill "TERM", $pid[2];
+
+ sleep 3;
+
+ # it should go through 4 views
+ my @v4 = ($p[0], $p[1]);
+ wait_and_check_expected_view(\@v4);
+
+ sleep 3;
+
+ print "Restarting killed lock_server on port $p[2]\n";
+ $pid[2] = spawn_ls($p[0], $p[2]);
+ my @v5 = ($p[0],$p[1],$p[2]);
+ wait_and_check_expected_view(\@v5);
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 10" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+
+if ($do_run[11]) {
+
+ print "test11: start 3-process rsm, kill primary while lock_tester is running\n";
+ start_nodes(3,"ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep int(rand(10)+1);
+
+ print "Kill primary (PID: $pid[0]) on port $p[0]\n";
+ kill "TERM", $pid[0];
+
+ sleep 3;
+
+ # it should go through 4 views
+ my @v4 = ($p[1], $p[2]);
+ wait_and_check_expected_view(\@v4);
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 11" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[12]) {
+
+ print "test12: start 3-process rsm, kill master at break1 and restart it while lock_tester is running\n";
+
+ start_nodes(3, "ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep 1;
+
+ print "Kill master (PID: $pid[0]) on port $p[0] at breakpoint 1\n";
+ spawn("./rsm_tester", $p[0]+1, "breakpoint", 1);
+
+
+ sleep 1;
+
+ # it should go through 5 views
+ my @v4 = ($p[1], $p[2]);
+ wait_and_check_expected_view(\@v4);
+
+ print "Restarting killed lock_server on port $p[0]\n";
+ $pid[0] = spawn_ls($p[1], $p[0]);
+
+ sleep 3;
+
+ # the last view should include all nodes
+ my @lastv = ($p[0],$p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ foreach my $port (@lastv) {
+ check_views( paxos_log($port), \@views, \@lastv);
+ }
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 12" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[13]) {
+
+ print "test13: start 3-process rsm, kill slave at break1 and restart it while lock_tester is running\n";
+
+ start_nodes(3, "ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep 1;
+
+ print "Kill slave (PID: $pid[2]) on port $p[2] at breakpoint 1\n";
+ spawn("./rsm_tester", $p[2]+1, "breakpoint", 1);
+
+ sleep 1;
+
+ # it should go through 4 views
+ my @v4 = ($p[0], $p[1]);
+ wait_and_check_expected_view(\@v4);
+
+ print "Restarting killed lock_server on port $p[2]\n";
+ $pid[2] = spawn_ls($p[0], $p[2]);
+
+ sleep 3;
+
+ # the last view should include all nodes
+ my @lastv = ($p[0],$p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ foreach my $port (@lastv) {
+ check_views( paxos_log($port), \@views, \@lastv);
+ }
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 13" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[14]) {
+
+ print "test14: start 5-process rsm, kill slave break1, kill slave break2\n";
+
+ start_nodes(5, "ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep 1;
+
+ print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1\n";
+ spawn("./rsm_tester", $p[4]+1, "breakpoint", 1);
+
+
+ print "Kill slave (PID: $pid[3]) on port $p[3] at breakpoint 2\n";
+ spawn("./rsm_tester", $p[3]+1, "breakpoint", 2);
+
+
+ sleep 1;
+
+ # two view changes:
+
+ print "first view change wait\n";
+ my @lastv = ($p[0],$p[1],$p[2],$p[3]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ print "second view change wait\n";
+
+ @lastv = ($p[0],$p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 14" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[15]) {
+
+ print "test15: start 5-process rsm, kill slave break1, kill primary break2\n";
+
+ start_nodes(5, "ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep 1;
+
+ print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1\n";
+ spawn("./rsm_tester", $p[4]+1, "breakpoint", 1);
+
+
+ print "Kill primary (PID: $pid[0]) on port $p[0] at breakpoint 2\n";
+ spawn("./rsm_tester", $p[0]+1, "breakpoint", 2);
+
+ sleep 1;
+
+ # two view changes:
+
+ print "first view change wait\n";
+ my @lastv = ($p[0],$p[1],$p[2],$p[3]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ print "second view change wait\n";
+
+ @lastv = ($p[1],$p[2],$p[3]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 15" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+if ($do_run[16]) {
+
+ print "test16: start 3-process rsm, partition primary, heal it\n";
+
+ start_nodes(3, "ls");
+
+ print "Start lock_tester $p[0]\n";
+ $t = spawn("./lock_tester", $p[0]);
+
+ sleep 1;
+
+ print "Partition primary (PID: $pid[0]) on port $p[0] at breakpoint\n";
+
+ spawn("./rsm_tester", $p[0]+1, "partition", 0);
+
+ sleep 3;
+
+ print "first view change wait\n";
+ my @lastv = ($p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ sleep 1;
+
+ print "Heal partition primary (PID: $pid[0]) on port $p[0] at breakpoint\n";
+ spawn("./rsm_tester", $p[0]+1, "partition", 1);
+
+ sleep 1;
+
+ # xxx it should test that this is the 5th view!
+ print "second view change wait\n";
+ @lastv = ($p[0], $p[1],$p[2]);
+ foreach my $port (@lastv) {
+ wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20);
+ }
+
+ print " Wait for lock_tester to finish (waitpid $t)\n";
+ waitpid_to($t, 600);
+
+ if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) {
+ mydie( "Failed lock tester for test 16" );
+ }
+
+ cleanup();
+ sleep 2;
+}
+
+print "tests done OK\n";
+
+unlink("config");
--- /dev/null
+// RPC stubs for clients to talk to rsmtest_server
+
+#include "rsmtest_client.h"
+#include "rpc.h"
+#include <arpa/inet.h>
+
+#include <sstream>
+#include <iostream>
+#include <stdio.h>
+
+rsmtest_client::rsmtest_client(std::string dst)
+{
+ sockaddr_in dstsock;
+ make_sockaddr(dst.c_str(), &dstsock);
+ cl = new rpcc(dstsock);
+ if (cl->bind() < 0) {
+ printf("rsmtest_client: call bind\n");
+ }
+}
+
+int
+rsmtest_client::net_repair(int heal)
+{
+ int r;
+ int ret = cl->call(rsm_test_protocol::net_repair, heal, r);
+ VERIFY (ret == rsm_test_protocol::OK);
+ return r;
+}
+
+int
+rsmtest_client::breakpoint(int b)
+{
+ int r;
+ int ret = cl->call(rsm_test_protocol::breakpoint, b, r);
+ VERIFY (ret == rsm_test_protocol::OK);
+ return r;
+}
+
+
--- /dev/null
+// rsmtest client interface.
+
+#ifndef rsmtest_client_h
+#define rsmtest_client_h
+
+#include <string>
+#include "rsm_protocol.h"
+#include "rpc.h"
+
+// Client interface to the rsmtest server
+class rsmtest_client {
+ protected:
+ rpcc *cl;
+ public:
+ rsmtest_client(std::string d);
+ virtual ~rsmtest_client() {};
+ virtual rsm_test_protocol::status net_repair(int heal);
+ virtual rsm_test_protocol::status breakpoint(int b);
+};
+#endif
--- /dev/null
+#include "srlock.h"
+
+ScopedRemoteLock::ScopedRemoteLock(lock_client *lc, lock_protocol::lockid_t lid) :
+ lc_(lc), lid_(lid) {
+ lc_->acquire(lid_);
+ releaseOnFree = true;
+}
+
+void ScopedRemoteLock::retain() {
+ releaseOnFree = false;
+}
+
+ScopedRemoteLock::~ScopedRemoteLock() {
+ if (releaseOnFree)
+ lc_->release(lid_);
+}
--- /dev/null
+#ifndef srlock_h
+#define srlock_h
+
+#include "lock_protocol.h"
+#include "lock_client.h"
+
+class ScopedRemoteLock {
+ protected:
+ lock_client *lc_;
+ lock_protocol::lockid_t lid_;
+ bool releaseOnFree;
+ public:
+ ScopedRemoteLock(lock_client *, lock_protocol::lockid_t);
+ void retain();
+ ~ScopedRemoteLock();
+};
+
+#endif
--- /dev/null
+#!/usr/bin/env bash
+
+ulimit -c unlimited
+
+NUM_LS=${1:-0}
+
+BASE_PORT=$RANDOM
+BASE_PORT=$[BASE_PORT+2000]
+LOCK_PORT=$[BASE_PORT+6]
+
+if [ $NUM_LS -gt 1 ]; then
+ x=0
+ rm config
+ while [ $x -lt $NUM_LS ]; do
+ port=$[LOCK_PORT+2*x]
+ x=$[x+1]
+ echo $port >> config
+ done
+ x=0
+ while [ $x -lt $NUM_LS ]; do
+ port=$[LOCK_PORT+2*x]
+ x=$[x+1]
+ echo "starting ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &"
+ ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &
+ sleep 1
+ done
+else
+ echo "starting ./lock_server $LOCK_PORT > lock_server.log 2>&1 &"
+ ./lock_server $LOCK_PORT > lock_server.log 2>&1 &
+ sleep 1
+fi
--- /dev/null
+#!/usr/bin/env bash
+
+pkill -u $USER lock_server
--- /dev/null
+#include "mutex.h"
+#include <sys/time.h>
+#include <stdint.h>
+#include "tprintf.h"
+
+uint64_t utime() {
+ struct timeval tp;
+ gettimeofday(&tp, NULL);
+ return (tp.tv_usec + (uint64_t)tp.tv_sec * 1000000) % 1000000000;
+}
+
+mutex cerr_mutex;
+std::map<pthread_t, int> thread_name_map;
+int next_thread_num = 0;
+std::map<void *, int> instance_name_map;
+int next_instance_num = 0;
--- /dev/null
+#ifndef TPRINTF_H
+#define TPRINTF_H
+
+#include <iomanip>
+#include <iostream>
+#include "mutex.h"
+#include <time.h>
+#include <stdio.h>
+#include <map>
+
+extern mutex cerr_mutex;
+extern std::map<pthread_t, int> thread_name_map;
+extern int next_thread_num;
+extern std::map<void *, int> instance_name_map;
+extern int next_instance_num;
+extern char tprintf_thread_prefix;
+
+#define LOG_PREFIX { \
+ cerr_mutex.acquire(); \
+ pthread_t self = pthread_self(); \
+ int tid = thread_name_map[self]; \
+ if (tid==0) \
+ tid = thread_name_map[self] = ++next_thread_num; \
+ std::cerr << std::left << std::setw(9) << utime() << " "; \
+ std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \
+ std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \
+}
+#define LOG_THIS_POINTER { \
+ int self = instance_name_map[this]; \
+ if (self==0) \
+ self = instance_name_map[this] = ++next_instance_num; \
+ std::cerr << "#" << std::setw(2) << self; \
+}
+#define LOG_SUFFIX { \
+ cerr_mutex.release(); \
+}
+
+#define LOG_NONMEMBER(x) { \
+ LOG_PREFIX; \
+ std::cerr << x << std::endl; \
+ LOG_SUFFIX; \
+}
+#define LOG(x) { \
+ LOG_PREFIX; \
+ LOG_THIS_POINTER; \
+ std::cerr << x << std::endl; \
+ LOG_SUFFIX; \
+}
+#define JOIN(from,to,sep) ({ \
+ ostringstream oss; \
+ for(typeof(from) i=from;i!=to;i++) \
+ oss << *i << sep; \
+ oss.str(); \
+})
+#define LOG_FUNC_ENTER { \
+ LOG_PREFIX; \
+ LOG_THIS_POINTER; \
+ std::cerr << "lid=" << lid; \
+ std::cerr << std::endl; \
+ LOG_SUFFIX; \
+}
+#define LOG_FUNC_ENTER_SERVER { \
+ LOG_PREFIX; \
+ LOG_THIS_POINTER; \
+ std::cerr << "lid=" << lid; \
+ std::cerr << " client=" << id << "," << xid; \
+ std::cerr << std::endl; \
+ LOG_SUFFIX; \
+}
+#define LOG_FUNC_EXIT { \
+ LOG_PREFIX; \
+ LOG_THIS_POINTER; \
+ std::cerr << "return" << lid; \
+ std::cerr << std::endl; \
+ LOG_SUFFIX; \
+}
+
+#define tprintf(args...) { \
+ int len = snprintf(NULL, 0, args); \
+ char buf[len+1]; \
+ buf[len] = '\0'; \
+ snprintf(buf, len+1, args); \
+ if (buf[len-1]=='\n') \
+ buf[len-1] = '\0'; \
+ LOG_NONMEMBER(buf); \
+}
+
+uint64_t utime();
+
+#endif