From: Peter Iannucci Date: Sun, 15 Sep 2013 01:48:20 +0000 (-0400) Subject: Imported from 6.824 labs X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d Imported from 6.824 labs --- 5fd8cc8409d0efadc07dfe8d6774ad9ff477663d diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..510791b --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +*.dSYM +*.o +*.d +rpc/rpctest +lock_tester +lock_demo +lock_server +*.swp +*.swo +*.a +*.log +rsm_tester diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..edf9fd0 --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ +CXXFLAGS = -g -MMD -Wall -I. -I./rpc +LDFLAGS = -L. -L/usr/local/lib +LDLIBS = -lpthread +LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt) +LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl) +CXX = g++ +CC = g++ + +all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest + +rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o gettime.o + rm -f $@ + ar cq $@ $^ + ranlib rpc/librpc.a + +rpc/rpctest: rpc/rpctest.o rpc/librpc.a + +lock_demo=lock_demo.o lock_client.o +lock_demo : $(lock_demo) rpc/librpc.a + +lock_tester=lock_tester.o lock_client.o mutex.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o +lock_tester : $(lock_tester) rpc/librpc.a + +lock_server=lock_server.o lock_smain.o mutex.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o +lock_server : $(lock_server) rpc/librpc.a + +rsm_tester=rsm_tester.o rsmtest_client.o +rsm_tester: $(rsm_tester) rpc/librpc.a + +%.o: %.cc + $(CXX) $(CXXFLAGS) -c $< -o $@ + +-include *.d +-include rpc/*.d + +clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester +.PHONY: clean +clean: + rm -rf $(clean_files) diff --git a/config.cc b/config.cc new file mode 100644 index 0000000..0f9ab4c --- /dev/null +++ b/config.cc @@ -0,0 +1,332 @@ +#include +#include +#include +#include "config.h" +#include "paxos.h" +#include "handle.h" +#include "tprintf.h" +#include "lang/verify.h" + +// The config module maintains views. As a node joins or leaves a +// view, the next view will be the same as previous view, except with +// the new node added or removed. The first view contains only node +// 1. If node 2 joins after the first node (it will download the views +// from node 1), it will learn about view 1 with the first node as the +// only member. It will then invoke Paxos to create the next view. +// It will tell Paxos to ask the nodes in view 1 to agree on the value +// {1, 2}. If Paxos returns success, then it moves to view 2 with +// {1,2} as the members. When node 3 joins, the config module runs +// Paxos with the nodes in view 2 and the proposed value to be +// {1,2,3}. And so on. When a node discovers that some node of the +// current view is not responding, it kicks off Paxos to propose a new +// value (the current view minus the node that isn't responding). The +// config module uses Paxos to create a total order of views, and it +// is ensured that the majority of the previous view agrees to the +// next view. The Paxos log contains all the values (i.e., views) +// agreed on. +// +// The RSM module informs config to add nodes. The config module +// runs a heartbeater thread that checks in with nodes. If a node +// doesn't respond, the config module will invoke Paxos's proposer to +// remove the node. Higher layers will learn about this change when a +// Paxos acceptor accepts the new proposed value through +// paxos_commit(). +// +// To be able to bring other nodes up to date to the latest formed +// view, each node will have a complete history of all view numbers +// and their values that it knows about. At any time a node can reboot +// and when it re-joins, it may be many views behind; by remembering +// all views, the other nodes can bring this re-joined node up to +// date. + +static void * +heartbeatthread(void *x) +{ + config *r = (config *) x; + r->heartbeater(); + return 0; +} + +config::config(std::string _first, std::string _me, config_view_change *_vc) + : myvid (0), first (_first), me (_me), vc (_vc) +{ + VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0); + VERIFY(pthread_cond_init(&config_cond, NULL) == 0); + + std::ostringstream ost; + ost << me; + + acc = new acceptor(this, me == _first, me, ost.str()); + pro = new proposer(this, acc, me); + + // XXX hack; maybe should have its own port number + pxsrpc = acc->get_rpcs(); + pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat); + + { + ScopedLock ml(&cfg_mutex); + + reconstruct(); + + pthread_t th; + VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0); + } +} + +void +config::restore(std::string s) +{ + ScopedLock ml(&cfg_mutex); + acc->restore(s); + reconstruct(); +} + +std::vector +config::get_view(unsigned instance) +{ + ScopedLock ml(&cfg_mutex); + return get_view_wo(instance); +} + +// caller should hold cfg_mutex +std::vector +config::get_view_wo(unsigned instance) +{ + std::string value = acc->value(instance); + tprintf("get_view(%d): returns %s\n", instance, value.c_str()); + return members(value); +} + +std::vector +config::members(std::string value) +{ + std::istringstream ist(value); + std::string m; + std::vector view; + while (ist >> m) { + view.push_back(m); + } + return view; +} + +std::string +config::value(std::vector m) +{ + std::ostringstream ost; + for (unsigned i = 0; i < m.size(); i++) { + ost << m[i]; + ost << " "; + } + return ost.str(); +} + +// caller should hold cfg_mutex +void +config::reconstruct() +{ + if (acc->instance() > 0) { + std::string m; + myvid = acc->instance(); + mems = get_view_wo(myvid); + tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str()); + } +} + +// Called by Paxos's acceptor. +void +config::paxos_commit(unsigned instance, std::string value) +{ + std::string m; + std::vector newmem; + ScopedLock ml(&cfg_mutex); + + newmem = members(value); + tprintf("config::paxos_commit: %d: %s\n", instance, + print_members(newmem).c_str()); + + for (unsigned i = 0; i < mems.size(); i++) { + tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str()); + if (!isamember(mems[i], newmem) && me != mems[i]) { + tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); + mgr.delete_handle(mems[i]); + } + } + + mems = newmem; + myvid = instance; + if (vc) { + unsigned vid = myvid; + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + vc->commit_change(vid); + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + } +} + +bool +config::ismember(std::string m, unsigned vid) +{ + bool r; + ScopedLock ml(&cfg_mutex); + std::vector v = get_view_wo(vid); + r = isamember(m, v); + return r; +} + +bool +config::add(std::string new_m, unsigned vid) +{ + std::vector m; + std::vector curm; + ScopedLock ml(&cfg_mutex); + if (vid != myvid) + return false; + tprintf("config::add %s\n", new_m.c_str()); + m = mems; + m.push_back(new_m); + curm = mems; + std::string v = value(m); + int nextvid = myvid + 1; + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + bool r = pro->run(nextvid, curm, v); + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + if (r) { + tprintf("config::add: proposer returned success\n"); + } else { + tprintf("config::add: proposer returned failure\n"); + } + return r; +} + +// caller should hold cfg_mutex +bool +config::remove_wo(std::string m) +{ + tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str()); + std::vector n; + for (unsigned i = 0; i < mems.size(); i++) { + if (mems[i] != m) n.push_back(mems[i]); + } + std::string v = value(n); + std::vector cmems = mems; + int nextvid = myvid + 1; + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + bool r = pro->run(nextvid, cmems, v); + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + if (r) { + tprintf("config::remove: proposer returned success\n"); + } else { + tprintf("config::remove: proposer returned failure\n"); + } + return r; +} + +void +config::heartbeater() +{ + struct timeval now; + struct timespec next_timeout; + std::string m; + heartbeat_t h; + bool stable; + unsigned vid; + std::vector cmems; + ScopedLock ml(&cfg_mutex); + + while (1) { + + gettimeofday(&now, NULL); + next_timeout.tv_sec = now.tv_sec + 3; + next_timeout.tv_nsec = 0; + tprintf("heartbeater: go to sleep\n"); + pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout); + + stable = true; + vid = myvid; + cmems = get_view_wo(vid); + tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str()); + + if (!isamember(me, cmems)) { + tprintf("heartbeater: not member yet; skip hearbeat\n"); + continue; + } + + //find the node with the smallest id + m = me; + for (unsigned i = 0; i < cmems.size(); i++) { + if (m > cmems[i]) + m = cmems[i]; + } + + if (m == me) { + //if i am the one with smallest id, ping the rest of the nodes + for (unsigned i = 0; i < cmems.size(); i++) { + if (cmems[i] != me) { + if ((h = doheartbeat(cmems[i])) != OK) { + stable = false; + m = cmems[i]; + break; + } + } + } + } else { + //the rest of the nodes ping the one with smallest id + if ((h = doheartbeat(m)) != OK) + stable = false; + } + + if (!stable && vid == myvid) { + remove_wo(m); + } + } +} + +paxos_protocol::status +config::heartbeat(std::string m, unsigned vid, int &r) +{ + ScopedLock ml(&cfg_mutex); + int ret = paxos_protocol::ERR; + r = (int) myvid; + tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid); + if (vid == myvid) { + ret = paxos_protocol::OK; + } else if (pro->isrunning()) { + VERIFY (vid == myvid + 1 || vid + 1 == myvid); + ret = paxos_protocol::OK; + } else { + ret = paxos_protocol::ERR; + } + return ret; +} + +config::heartbeat_t +config::doheartbeat(std::string m) +{ + int ret = rpc_const::timeout_failure; + int r; + unsigned vid = myvid; + heartbeat_t res = OK; + + tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); + handle h(m); + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + rpcc *cl = h.safebind(); + if (cl) { + ret = cl->call(paxos_protocol::heartbeat, me, vid, r, + rpcc::to(1000)); + } + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + if (ret != paxos_protocol::OK) { + if (ret == rpc_const::atmostonce_failure || + ret == rpc_const::oldsrv_failure) { + mgr.delete_handle(m); + } else { + tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", + m.c_str(), ret, vid, r); + if (ret < 0) res = FAILURE; + else res = VIEWERR; + } + } + tprintf("doheartbeat done %d\n", res); + return res; +} + diff --git a/config.h b/config.h new file mode 100644 index 0000000..9d7b023 --- /dev/null +++ b/config.h @@ -0,0 +1,54 @@ +#ifndef config_h +#define config_h + +#include +#include +#include "paxos.h" + +class config_view_change { + public: + virtual void commit_change(unsigned vid) = 0; + virtual ~config_view_change() {}; +}; + +class config : public paxos_change { + private: + acceptor *acc; + proposer *pro; + rpcs *pxsrpc; + unsigned myvid; + std::string first; + std::string me; + config_view_change *vc; + std::vector mems; + pthread_mutex_t cfg_mutex; + pthread_cond_t heartbeat_cond; + pthread_cond_t config_cond; + paxos_protocol::status heartbeat(std::string m, unsigned instance, int &r); + std::string value(std::vector mems); + std::vector members(std::string v); + std::vector get_view_wo(unsigned instance); + bool remove_wo(std::string); + void reconstruct(); + typedef enum { + OK, // response and same view # + VIEWERR, // response but different view # + FAILURE, // no response + } heartbeat_t; + heartbeat_t doheartbeat(std::string m); + public: + config(std::string _first, std::string _me, config_view_change *_vc); + unsigned vid() { return myvid; } + std::string myaddr() { return me; }; + std::string dump() { return acc->dump(); }; + std::vector get_view(unsigned instance); + void restore(std::string s); + bool add(std::string, unsigned vid); + bool ismember(std::string m, unsigned vid); + void heartbeater(void); + void paxos_commit(unsigned instance, std::string v); + rpcs *get_rpcs() { return acc->get_rpcs(); } + void breakpoint(int b) { pro->breakpoint(b); } +}; + +#endif diff --git a/gettime.cc b/gettime.cc new file mode 100644 index 0000000..926c510 --- /dev/null +++ b/gettime.cc @@ -0,0 +1,135 @@ +/* + * Copyright (c), MM Weiss + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the MM Weiss nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * clock_gettime_stub.c + * gcc -Wall -c clock_gettime_stub.c + * posix realtime functions; MacOS user space glue + */ + +/* @comment + * other possible implementation using intel builtin rdtsc + * rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html + * + * we could get the ticks by doing this + * + * __asm __volatile("mov %%ebx, %%esi\n\t" + * "cpuid\n\t" + * "xchg %%esi, %%ebx\n\t" + * "rdtsc" + * : "=a" (a), + * "=d" (d) + * ); + + * we could even replace our tricky sched_yield call by assembly code to get a better accurency, + * anyway the following C stub will satisfy 99% of apps using posix clock_gettime call, + * moreover, the setter version (clock_settime) could be easly written using mach primitives: + * http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time) + * + * hackers don't be crackers, don't you use a flush toilet? + * + * + * @see draft: ./posix-realtime-stub/posix-realtime-stub.c + * + */ + + +#ifdef __APPLE__ + +#pragma weak clock_gettime + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef enum { + CLOCK_REALTIME, + CLOCK_MONOTONIC, + CLOCK_PROCESS_CPUTIME_ID, + CLOCK_THREAD_CPUTIME_ID +} clockid_t; + +static mach_timebase_info_data_t __clock_gettime_inf; + +int clock_gettime(clockid_t clk_id, struct timespec *tp) { + kern_return_t ret; + clock_serv_t clk; + clock_id_t clk_serv_id; + mach_timespec_t tm; + + uint64_t start, end, delta, nano; + + int retval = -1; + switch (clk_id) { + case CLOCK_REALTIME: + case CLOCK_MONOTONIC: + clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK; + if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) { + if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) { + tp->tv_sec = tm.tv_sec; + tp->tv_nsec = tm.tv_nsec; + retval = 0; + } + } + if (KERN_SUCCESS != ret) { + errno = EINVAL; + retval = -1; + } + break; + case CLOCK_PROCESS_CPUTIME_ID: + case CLOCK_THREAD_CPUTIME_ID: + start = mach_absolute_time(); + if (clk_id == CLOCK_PROCESS_CPUTIME_ID) { + getpid(); + } else { + sched_yield(); + } + end = mach_absolute_time(); + delta = end - start; + if (0 == __clock_gettime_inf.denom) { + mach_timebase_info(&__clock_gettime_inf); + } + nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom; + tp->tv_sec = nano * 1e-9; + tp->tv_nsec = nano - (tp->tv_sec * 1e9); + retval = 0; + break; + default: + errno = EINVAL; + retval = -1; + } + return retval; +} + +#endif // __APPLE__ diff --git a/gettime.h b/gettime.h new file mode 100644 index 0000000..f10def4 --- /dev/null +++ b/gettime.h @@ -0,0 +1,15 @@ +#ifndef gettime_h +#define gettime_h + +#ifdef __APPLE__ +typedef enum { + CLOCK_REALTIME, + CLOCK_MONOTONIC, + CLOCK_PROCESS_CPUTIME_ID, + CLOCK_THREAD_CPUTIME_ID +} clockid_t; + +int clock_gettime(clockid_t clk_id, struct timespec *tp); +#endif + +#endif diff --git a/handle.cc b/handle.cc new file mode 100644 index 0000000..a233d5b --- /dev/null +++ b/handle.cc @@ -0,0 +1,114 @@ +#include "handle.h" +#include +#include "tprintf.h" + +handle_mgr mgr; + +handle::handle(std::string m) +{ + h = mgr.get_handle(m); +} + +rpcc * +handle::safebind() +{ + if (!h) + return NULL; + ScopedLock ml(&h->cl_mutex); + if (h->del) + return NULL; + if (h->cl) + return h->cl; + sockaddr_in dstsock; + make_sockaddr(h->m.c_str(), &dstsock); + rpcc *cl = new rpcc(dstsock); + tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); + int ret; + // Starting with lab 6, our test script assumes that the failure + // can be detected by paxos and rsm layer within few seconds. We have + // to set the timeout with a small value to support the assumption. + // + // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of + // lab 6 and lab 7 because the rpc layer may delay your RPC request, + // and cause a time out failure. Please make sure RPC_LOSSY is set to 0. + ret = cl->bind(rpcc::to(1000)); + if (ret < 0) { + tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); + delete cl; + h->del = true; + } else { + tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str()); + h->cl = cl; + } + return h->cl; +} + +handle::~handle() +{ + if (h) mgr.done_handle(h); +} + +handle_mgr::handle_mgr() +{ + VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0); +} + +struct hinfo * +handle_mgr::get_handle(std::string m) +{ + ScopedLock ml(&handle_mutex); + struct hinfo *h = 0; + if (hmap.find(m) == hmap.end()) { + h = new hinfo; + h->cl = NULL; + h->del = false; + h->refcnt = 1; + h->m = m; + pthread_mutex_init(&h->cl_mutex, NULL); + hmap[m] = h; + } else if (!hmap[m]->del) { + h = hmap[m]; + h->refcnt ++; + } + return h; +} + +void +handle_mgr::done_handle(struct hinfo *h) +{ + ScopedLock ml(&handle_mutex); + h->refcnt--; + if (h->refcnt == 0 && h->del) + delete_handle_wo(h->m); +} + +void +handle_mgr::delete_handle(std::string m) +{ + ScopedLock ml(&handle_mutex); + delete_handle_wo(m); +} + +// Must be called with handle_mutex locked. +void +handle_mgr::delete_handle_wo(std::string m) +{ + if (hmap.find(m) == hmap.end()) { + tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str()); + } else { + tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(), + hmap[m]->refcnt); + struct hinfo *h = hmap[m]; + if (h->refcnt == 0) { + if (h->cl) { + h->cl->cancel(); + delete h->cl; + } + pthread_mutex_destroy(&h->cl_mutex); + hmap.erase(m); + delete h; + } else { + h->del = true; + } + } +} diff --git a/handle.h b/handle.h new file mode 100644 index 0000000..ecd8884 --- /dev/null +++ b/handle.h @@ -0,0 +1,79 @@ +// manage a cache of RPC connections. +// assuming cid is a std::string holding the +// host:port of the RPC server you want +// to talk to: +// +// handle h(cid); +// rpcc *cl = h.safebind(); +// if(cl){ +// ret = cl->call(...); +// } else { +// bind() failed +// } +// +// if the calling program has not contacted +// cid before, safebind() will create a new +// connection, call bind(), and return +// an rpcc*, or 0 if bind() failed. if the +// program has previously contacted cid, +// safebind() just returns the previously +// created rpcc*. best not to hold any +// mutexes while calling safebind(). + +#ifndef handle_h +#define handle_h + +#include +#include +#include "rpc.h" + +struct hinfo { + rpcc *cl; + int refcnt; + bool del; + std::string m; + pthread_mutex_t cl_mutex; +}; + +class handle { + private: + struct hinfo *h; + public: + handle(std::string m); + ~handle(); + /* safebind will try to bind with the rpc server on the first call. + * Since bind may block, the caller probably should not hold a mutex + * when calling safebind. + * + * return: + * if the first safebind succeeded, all later calls would return + * a rpcc object; otherwise, all later calls would return NULL. + * + * Example: + * handle h(dst); + * XXX_protocol::status ret; + * if (h.safebind()) { + * ret = h.safebind()->call(...); + * } + * if (!h.safebind() || ret != XXX_protocol::OK) { + * // handle failure + * } + */ + rpcc *safebind(); +}; + +class handle_mgr { + private: + pthread_mutex_t handle_mutex; + std::map hmap; + public: + handle_mgr(); + struct hinfo *get_handle(std::string m); + void done_handle(struct hinfo *h); + void delete_handle(std::string m); + void delete_handle_wo(std::string m); +}; + +extern class handle_mgr mgr; + +#endif diff --git a/lang/algorithm.h b/lang/algorithm.h new file mode 100644 index 0000000..a487094 --- /dev/null +++ b/lang/algorithm.h @@ -0,0 +1,18 @@ +// compile time version of min and max + +#ifndef algorithm_h +#define algorithm_h + +template +struct static_max +{ + static const int value = A > B ? A : B; +}; + +template +struct static_min +{ + static const int value = A < B ? A : B; +}; + +#endif diff --git a/lang/verify.h b/lang/verify.h new file mode 100644 index 0000000..2b092d2 --- /dev/null +++ b/lang/verify.h @@ -0,0 +1,15 @@ +// safe assertions. + +#ifndef verify_client_h +#define verify_client_h + +#include +#include + +#ifdef NDEBUG +#define VERIFY(expr) do { if (!(expr)) abort(); } while (0) +#else +#define VERIFY(expr) assert(expr) +#endif + +#endif diff --git a/lock_client.cc b/lock_client.cc new file mode 100644 index 0000000..6e9fab1 --- /dev/null +++ b/lock_client.cc @@ -0,0 +1,63 @@ +// RPC stubs for clients to talk to lock_server + +#include "lock_client.h" +#include "rpc.h" +#include + +#include +#include +#include + +lock_client::lock_client(std::string dst) +{ + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + cl = new rpcc(dstsock); + if (cl->bind() < 0) { + printf("lock_client: call bind\n"); + } +} + +int +lock_client::stat(lock_protocol::lockid_t lid) +{ + int r; + lock_protocol::status ret = cl->call(lock_protocol::stat, cl->id(), lid, r); + VERIFY (ret == lock_protocol::OK); + return r; +} + +lock_protocol::status +lock_client::acquire(lock_protocol::lockid_t lid) +{ + int r; + return cl->call(lock_protocol::acquire, cl->id(), lid, r); +} + +lock_protocol::status +lock_client::release(lock_protocol::lockid_t lid) +{ + int r; + return cl->call(lock_protocol::release, cl->id(), lid, r); +} + +t4_lock_client *t4_lock_client_new(const char *dst) { + return (t4_lock_client *)new lock_client(dst); +} + +void t4_lock_client_delete(t4_lock_client *client) { + delete (lock_client *)client; +} + +t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) { + return ((lock_client *)client)->acquire(lid); +} + +t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) { + return ((lock_client *)client)->acquire(lid); +} + +t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) { + return ((lock_client *)client)->stat(lid); +} + diff --git a/lock_client.h b/lock_client.h new file mode 100644 index 0000000..df22711 --- /dev/null +++ b/lock_client.h @@ -0,0 +1,52 @@ +// lock client interface. + +#ifndef lock_client_h +#define lock_client_h + +#ifdef __cplusplus + +#include +#include "lock_protocol.h" +#include "rpc.h" +#include + +// Client interface to the lock server +class lock_client { + protected: + rpcc *cl; + public: + lock_client(std::string d); + virtual ~lock_client() {}; + virtual lock_protocol::status acquire(lock_protocol::lockid_t); + virtual lock_protocol::status release(lock_protocol::lockid_t); + virtual lock_protocol::status stat(lock_protocol::lockid_t); +}; + +#endif + +extern "C" { + +struct _t4_lock_client; +typedef struct _t4_lock_client t4_lock_client; + +typedef enum { + T4_OK, + T4_RETRY, + T4_RPCERR, + T4_NOENT, + T4_IOERR +} t4_xxstatus; + +typedef int t4_status; + +typedef unsigned long long t4_lockid_t; + +t4_lock_client *t4_lock_client_new(const char *dst); +void t4_lock_client_delete(t4_lock_client *); +t4_status t4_lock_client_acquire(t4_lock_client *, t4_lockid_t); +t4_status t4_lock_client_release(t4_lock_client *, t4_lockid_t); +t4_status t4_lock_client_stat(t4_lock_client *, t4_lockid_t); + +} + +#endif diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc new file mode 100644 index 0000000..9ec802c --- /dev/null +++ b/lock_client_cache_rsm.cc @@ -0,0 +1,210 @@ +// RPC stubs for clients to talk to lock_server, and cache the locks +// see lock_client.cache.h for protocol details. + +#include "lock_client_cache_rsm.h" +#include "rpc.h" +#include +#include +#include +#include "tprintf.h" + +#include "rsm_client.h" + +lock_state::lock_state(): + state(none) +{ +} + +void lock_state::wait() { + pthread_t self = pthread_self(); + c[self].wait(m); + c.erase(self); +} + +void lock_state::signal() { + // signal anyone + if (c.begin() != c.end()) + c.begin()->second.signal(); +} + +void lock_state::signal(pthread_t who) { + if (c.count(who)) + c[who].signal(); +} + +static void * releasethread(void *x) { + lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x; + cc->releaser(); + return 0; +} + +int lock_client_cache_rsm::last_port = 0; + +lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { + ScopedLock sl(lock_table_lock); + // by the semantics of std::map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) { + srand(time(NULL)^last_port); + rlock_port = ((rand()%32000) | (0x1 << 10)); + const char *hname; + // VERIFY(gethostname(hname, 100) == 0); + hname = "127.0.0.1"; + ostringstream host; + host << hname << ":" << rlock_port; + id = host.str(); + last_port = rlock_port; + rpcs *rlsrpc = new rpcs(rlock_port); + rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler); + rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler); + { + ScopedLock sl(xid_mutex); + xid = 0; + } + rsmc = new rsm_client(xdst); + int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this); + VERIFY (r == 0); +} + +void lock_client_cache_rsm::releaser() { + while (1) { + lock_protocol::lockid_t lid; + release_fifo.deq(&lid); + LOG("Releaser: " << lid); + + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread); + st.state = lock_state::releasing; + { + ScopedUnlock su(st.m); + int r; + rsmc->call(lock_protocol::release, lid, id, st.xid, r); + } + st.state = lock_state::none; + LOG("Lock " << lid << ": none"); + st.signal(); + } +} + +lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) { + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + pthread_t self = pthread_self(); + + // check for reentrancy + VERIFY(st.state != lock_state::locked || st.held_by != self); + VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end()); + + st.wanted_by.push_back(self); + + while (1) { + if (st.state != lock_state::free) + LOG("Lock " << lid << ": not free"); + + if (st.state == lock_state::none || st.state == lock_state::retrying) { + if (st.state == lock_state::none) { + ScopedLock sl(xid_mutex); + st.xid = xid++; + } + st.state = lock_state::acquiring; + LOG("Lock " << lid << ": acquiring"); + lock_protocol::status result; + { + ScopedUnlock su(st.m); + int r; + result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r); + } + LOG("acquire returned " << result); + if (result == lock_protocol::OK) { + st.state = lock_state::free; + LOG("Lock " << lid << ": free"); + } + } + + VERIFY(st.wanted_by.size() != 0); + if (st.state == lock_state::free) { + // is it for me? + pthread_t front = st.wanted_by.front(); + if (front == releaser_thread) { + st.wanted_by.pop_front(); + st.state = lock_state::locked; + st.held_by = releaser_thread; + LOG("Queuing " << lid << " for release"); + release_fifo.enq(lid); + } else if (front == self) { + st.wanted_by.pop_front(); + st.state = lock_state::locked; + st.held_by = self; + break; + } else { + st.signal(front); + } + } + + LOG("waiting..."); + st.wait(); + LOG("wait ended"); + } + + LOG("Lock " << lid << ": locked"); + return lock_protocol::OK; +} + +lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) { + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + pthread_t self = pthread_self(); + VERIFY(st.state == lock_state::locked && st.held_by == self); + st.state = lock_state::free; + LOG("Lock " << lid << ": free"); + if (st.wanted_by.size()) { + pthread_t front = st.wanted_by.front(); + if (front == releaser_thread) { + st.state = lock_state::locked; + st.held_by = releaser_thread; + st.wanted_by.pop_front(); + LOG("Queuing " << lid << " for release"); + release_fifo.enq(lid); + } else + st.signal(front); + } + LOG("Finished signaling."); + return lock_protocol::OK; +} + +rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { + LOG("Revoke handler " << lid << " " << xid); + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + + if (st.state == lock_state::releasing || st.state == lock_state::none) + return rlock_protocol::OK; + + if (st.state == lock_state::free && + (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) { + // gimme + st.state = lock_state::locked; + st.held_by = releaser_thread; + if (st.wanted_by.size()) + st.wanted_by.pop_front(); + release_fifo.enq(lid); + } else { + // get in line + st.wanted_by.push_back(releaser_thread); + } + return rlock_protocol::OK; +} + +rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + VERIFY(st.state == lock_state::acquiring); + st.state = lock_state::retrying; + LOG("Lock " << lid << ": none"); + st.signal(); // only one thread needs to wake up + return rlock_protocol::OK; +} diff --git a/lock_client_cache_rsm.h b/lock_client_cache_rsm.h new file mode 100644 index 0000000..28b0323 --- /dev/null +++ b/lock_client_cache_rsm.h @@ -0,0 +1,82 @@ +// lock client interface. + +#ifndef lock_client_cache_rsm_h + +#define lock_client_cache_rsm_h + +#include +#include "lock_protocol.h" +#include "rpc.h" +#include "lock_client.h" +#include "lang/verify.h" +#include "mutex.h" +#include "rpc/fifo.h" +#include "rsm_client.h" + +// Classes that inherit lock_release_user can override dorelease so that +// that they will be called when lock_client releases a lock. +// You will not need to do anything with this class until Lab 5. +class lock_release_user { + public: + virtual void dorelease(lock_protocol::lockid_t) = 0; + virtual ~lock_release_user() {}; +}; + +using namespace std; + +typedef string callback; + +class lock_state { +public: + lock_state(); + enum { + none = 0, + retrying, + free, + locked, + acquiring, + releasing + } state; + pthread_t held_by; + list wanted_by; + mutex m; + map c; + lock_protocol::xid_t xid; + void wait(); + void signal(); + void signal(pthread_t who); +}; + +typedef map lock_map; + +class lock_client_cache_rsm; + +// Clients that caches locks. The server can revoke locks using +// lock_revoke_server. +class lock_client_cache_rsm : public lock_client { + private: + pthread_t releaser_thread; + rsm_client *rsmc; + class lock_release_user *lu; + int rlock_port; + string hostname; + string id; + mutex xid_mutex; + lock_protocol::xid_t xid; + fifo release_fifo; + mutex lock_table_lock; + lock_map lock_table; + lock_state &get_lock_state(lock_protocol::lockid_t lid); + public: + static int last_port; + lock_client_cache_rsm(string xdst, class lock_release_user *l = 0); + virtual ~lock_client_cache_rsm() {}; + lock_protocol::status acquire(lock_protocol::lockid_t); + virtual lock_protocol::status release(lock_protocol::lockid_t); + void releaser(); + rlock_protocol::status revoke_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &); + rlock_protocol::status retry_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &); +}; + + +#endif diff --git a/lock_demo.cc b/lock_demo.cc new file mode 100644 index 0000000..4d74acb --- /dev/null +++ b/lock_demo.cc @@ -0,0 +1,30 @@ +// +// Lock demo +// + +#include "lock_protocol.h" +#include "lock_client.h" +#include "rpc.h" +#include +#include +#include +#include + +std::string dst; +lock_client *lc; + +int +main(int argc, char *argv[]) +{ + int r; + + if(argc != 2){ + fprintf(stderr, "Usage: %s [host:]port\n", argv[0]); + exit(1); + } + + dst = argv[1]; + lc = new lock_client(dst); + r = lc->stat(1); + printf ("stat returned %d\n", r); +} diff --git a/lock_protocol.h b/lock_protocol.h new file mode 100644 index 0000000..60df0ef --- /dev/null +++ b/lock_protocol.h @@ -0,0 +1,30 @@ +// lock protocol + +#ifndef lock_protocol_h +#define lock_protocol_h + +#include "rpc.h" + +class lock_protocol { + public: + enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR }; + typedef int status; + typedef unsigned long long lockid_t; + typedef unsigned long long xid_t; + enum rpc_numbers { + acquire = 0x7001, + release, + stat + }; +}; + +class rlock_protocol { + public: + enum xxstatus { OK, RPCERR }; + typedef int status; + enum rpc_numbers { + revoke = 0x8001, + retry = 0x8002 + }; +}; +#endif diff --git a/lock_server.cc b/lock_server.cc new file mode 100644 index 0000000..3801814 --- /dev/null +++ b/lock_server.cc @@ -0,0 +1,46 @@ +// the lock server implementation + +#include "lock_server.h" +#include +#include +#include +#include + +lock_server::lock_server(): + nacquire (0) +{ +} + +// caller must hold lock_lock +mutex & +lock_server::get_lock(lock_protocol::lockid_t lid) { + lock_lock.acquire(); + // by the semantics of std::map, this will create + // the mutex if it doesn't already exist + mutex &l = locks[lid]; + lock_lock.release(); + return l; +} + +lock_protocol::status +lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r) +{ + lock_protocol::status ret = lock_protocol::OK; + printf("stat request from clt %d\n", clt); + r = nacquire; + return ret; +} + +lock_protocol::status +lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r) +{ + get_lock(lid).acquire(); + return lock_protocol::OK; +} + +lock_protocol::status +lock_server::release(int clt, lock_protocol::lockid_t lid, int &r) +{ + get_lock(lid).release(); + return lock_protocol::OK; +} diff --git a/lock_server.h b/lock_server.h new file mode 100644 index 0000000..e24e359 --- /dev/null +++ b/lock_server.h @@ -0,0 +1,36 @@ +// this is the lock server +// the lock client has a similar interface + +#ifndef lock_server_h +#define lock_server_h + +#include +#include "lock_protocol.h" +#include "lock_client.h" +#include "rpc.h" +#include +#include +#include +#include "mutex.h" + +using namespace std; + +typedef map lock_map; + +class lock_server { + + protected: + int nacquire; + mutex lock_lock; + lock_map locks; + mutex &get_lock(lock_protocol::lockid_t lid); + + public: + lock_server(); + ~lock_server() {}; + lock_protocol::status stat(int clt, lock_protocol::lockid_t lid, int &); + lock_protocol::status acquire(int clt, lock_protocol::lockid_t lid, int &); + lock_protocol::status release(int clt, lock_protocol::lockid_t lid, int &); +}; + +#endif diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc new file mode 100644 index 0000000..5149049 --- /dev/null +++ b/lock_server_cache_rsm.cc @@ -0,0 +1,245 @@ +// the caching lock server implementation + +#include "lock_server_cache_rsm.h" +#include +#include +#include +#include +#include "lang/verify.h" +#include "handle.h" +#include "tprintf.h" +#include "rpc/marshall.h" + +lock_state::lock_state(): + held(false) +{ +} + +template +ostringstream & operator<<(ostringstream &o, const pair &d) { + o << "<" << d.first << "," << d.second << ">"; + return o; +} + +template +marshall & operator<<(marshall &m, const list &d) { + m << vector(d.begin(), d.end()); + return m; +} + +template +unmarshall & operator>>(unmarshall &u, list &d) { + vector v; + u >> v; + d.assign(v.begin(), v.end()); + return u; +} + + +template +marshall & operator<<(marshall &m, const pair &d) { + m << d.first; + m << d.second; + return m; +} + +template +unmarshall & operator>>(unmarshall &u, pair &d) { + u >> d.first; + u >> d.second; + return u; +} + +marshall & operator<<(marshall &m, const lock_state &d) { + m << d.held; + m << d.held_by; + m << d.wanted_by; + return m; +} + +unmarshall & operator>>(unmarshall &u, lock_state &d) { + u >> d.held; + u >> d.held_by; + u >> d.wanted_by; + return u; +} + + +lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { + ScopedLock sl(lock_table_lock); + // by the semantics of map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +static void *revokethread(void *x) { + lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x; + sc->revoker(); + return 0; +} + +static void *retrythread(void *x) { + lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x; + sc->retryer(); + return 0; +} + +lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) { + pthread_t th; + VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0); + VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0); + rsm->set_state_transfer(this); +} + +void lock_server_cache_rsm::revoker() { + while (1) { + lock_protocol::lockid_t lid; + revoke_fifo.deq(&lid); + LOG("Revoking " << lid); + if (rsm && !rsm->amiprimary()) + continue; + + lock_state &st = get_lock_state(lid); + holder held_by; + { + ScopedLock sl(st.m); + held_by = st.held_by; + } + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(held_by.first).safebind(); + if (proxy) { + int r; + rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r); + LOG("Revoke returned " << ret); + } + } +} + +void lock_server_cache_rsm::retryer() { + while (1) { + lock_protocol::lockid_t lid; + retry_fifo.deq(&lid); + if (rsm && !rsm->amiprimary()) + continue; + + LOG("Sending retry for " << lid); + lock_state &st = get_lock_state(lid); + holder front; + { + ScopedLock sl(st.m); + if (st.wanted_by.empty()) + continue; + front = st.wanted_by.front(); + } + + rlock_protocol::status ret = -1; + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(front.first).safebind(); + if (proxy) { + int r; + ret = proxy->call(rlock_protocol::retry, lid, front.second, r); + LOG("Retry returned " << ret); + } + } +} + +int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) { + LOG_FUNC_ENTER_SERVER; + holder h = holder(id, xid); + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + + // deal with duplicated requests + if (st.old_requests.count(id)) { + lock_protocol::xid_t old_xid = st.old_requests[id]; + if (old_xid > xid) + return lock_protocol::RPCERR; + else if (old_xid == xid) { + if (st.held && st.held_by == h) { + LOG("Client " << id << " sent duplicate acquire xid=" << xid); + return lock_protocol::OK; + } + } + } + + // grant the lock if it's available and I'm next in line + if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) { + if (!st.wanted_by.empty()) + st.wanted_by.pop_front(); + st.old_requests[id] = xid; + + st.held = true; + st.held_by = h; + LOG("Lock " << lid << " held by " << h.first); + if (st.wanted_by.size()) + revoke_fifo.enq(lid); + return lock_protocol::OK; + } + + // get in line + bool found = false; + for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { + if (i->first == id) { + // make sure client is obeying serialization + if (i->second != xid) { + LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); + return lock_protocol::RPCERR; + } + found = true; + break; + } + } + if (!found) + st.wanted_by.push_back(h); + + LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); + + // send revoke if we're first in line + if (st.wanted_by.front() == h) + revoke_fifo.enq(lid); + + return lock_protocol::RETRY; +} + +int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) { + LOG_FUNC_ENTER_SERVER; + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + if (st.held && st.held_by == holder(id, xid)) { + st.held = false; + LOG("Lock " << lid << " not held"); + } + if (st.wanted_by.size()) + retry_fifo.enq(lid); + return lock_protocol::OK; +} + +string lock_server_cache_rsm::marshal_state() { + ScopedLock sl(lock_table_lock); + marshall rep; + rep << nacquire; + rep << lock_table; + return rep.str(); +} + +void lock_server_cache_rsm::unmarshal_state(string state) { + ScopedLock sl(lock_table_lock); + unmarshall rep(state); + rep >> nacquire; + rep >> lock_table; +} + +lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) { + printf("stat request\n"); + r = nacquire; + return lock_protocol::OK; +} + diff --git a/lock_server_cache_rsm.h b/lock_server_cache_rsm.h new file mode 100644 index 0000000..eb86bd0 --- /dev/null +++ b/lock_server_cache_rsm.h @@ -0,0 +1,52 @@ +#ifndef lock_server_cache_rsm_h +#define lock_server_cache_rsm_h + +#include + +#include +#include +#include "lock_protocol.h" +#include "rpc.h" +#include "mutex.h" +#include "rsm_state_transfer.h" +#include "rsm.h" +#include "rpc/fifo.h" + +using namespace std; + +typedef string callback; +typedef pair holder; + +class lock_state { +public: + lock_state(); + bool held; + holder held_by; + list wanted_by; + map old_requests; + mutex m; +}; + +typedef map lock_map; + +class lock_server_cache_rsm : public rsm_state_transfer { + private: + int nacquire; + mutex lock_table_lock; + lock_map lock_table; + lock_state &get_lock_state(lock_protocol::lockid_t lid); + fifo retry_fifo; + fifo revoke_fifo; + class rsm *rsm; + public: + lock_server_cache_rsm(class rsm *rsm = 0); + lock_protocol::status stat(lock_protocol::lockid_t, int &); + void revoker(); + void retryer(); + string marshal_state(); + void unmarshal_state(string state); + int acquire(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &); + int release(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &); +}; + +#endif diff --git a/lock_smain.cc b/lock_smain.cc new file mode 100644 index 0000000..fba7eae --- /dev/null +++ b/lock_smain.cc @@ -0,0 +1,71 @@ +#include "rpc.h" +#include +#include +#include +#include "lock_server_cache_rsm.h" +#include "paxos.h" +#include "rsm.h" + +#include "jsl_log.h" + +// Main loop of lock_server + +char tprintf_thread_prefix = 's'; + +int +main(int argc, char *argv[]) +{ + int count = 0; + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + + srandom(getpid()); + + if(argc != 3){ + fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); + exit(1); + } + + char *count_env = getenv("RPC_COUNT"); + if(count_env != NULL){ + count = atoi(count_env); + } + + //jsl_set_debug(2); + // Comment out the next line to switch between the ordinary lock + // server and the RSM. In Lab 6, we disable the lock server and + // implement Paxos. In Lab 7, we will make the lock server use your + // RSM layer. +#define RSM +#ifdef RSM +// You must comment out the next line once you are done with Step One. +//#define STEP_ONE +#ifdef STEP_ONE + rpcs server(atoi(argv[1])); + lock_server_cache_rsm ls; + server.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); + server.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); + server.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); +#else + rsm rsm(argv[1], argv[2]); + lock_server_cache_rsm ls(&rsm); + rsm.set_state_transfer((rsm_state_transfer *)&ls); + rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); + rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); + rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); +#endif // STEP_ONE +#endif // RSM + +#ifndef RSM + lock_server_cache ls; + rpcs server(atoi(argv[1]), count); + server.reg(lock_protocol::stat, &ls, &lock_server_cache::stat); + server.reg(lock_protocol::acquire, &ls, &lock_server_cache::acquire); + server.reg(lock_protocol::release, &ls, &lock_server_cache::release); +#endif + + + while(1) + sleep(1000); +} diff --git a/lock_tester.cc b/lock_tester.cc new file mode 100644 index 0000000..dd7c07b --- /dev/null +++ b/lock_tester.cc @@ -0,0 +1,242 @@ +// +// Lock server tester +// + +#include "lock_protocol.h" +#include "lock_client.h" +#include "rpc.h" +#include "jsl_log.h" +#include +#include +#include +#include +#include "lang/verify.h" +#include "lock_client_cache_rsm.h" +#include "tprintf.h" + +char tprintf_thread_prefix = 'c'; + +// must be >= 2 +int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. +std::string dst; +lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt]; +lock_protocol::lockid_t a = 1; +lock_protocol::lockid_t b = 2; +lock_protocol::lockid_t c = 3; + +// check_grant() and check_release() check that the lock server +// doesn't grant the same lock to both clients. +// it assumes that lock names are distinct in the first byte. +int ct[256]; +pthread_mutex_t count_mutex; + +void +check_grant(lock_protocol::lockid_t lid) +{ + ScopedLock ml(&count_mutex); + int x = lid & 0xff; + if(ct[x] != 0){ + fprintf(stderr, "error: server granted %016llx twice\n", lid); + fprintf(stdout, "error: server granted %016llx twice\n", lid); + exit(1); + } + ct[x] += 1; +} + +void +check_release(lock_protocol::lockid_t lid) +{ + ScopedLock ml(&count_mutex); + int x = lid & 0xff; + if(ct[x] != 1){ + fprintf(stderr, "error: client released un-held lock %016llx\n", lid); + exit(1); + } + ct[x] -= 1; +} + +void +test1(void) +{ + tprintf ("acquire a release a acquire a release a\n"); + lc[0]->acquire(a); + check_grant(a); + lc[0]->release(a); + check_release(a); + lc[0]->acquire(a); + check_grant(a); + lc[0]->release(a); + check_release(a); + + tprintf ("acquire a acquire b release b release a\n"); + lc[0]->acquire(a); + check_grant(a); + lc[0]->acquire(b); + check_grant(b); + lc[0]->release(b); + check_release(b); + lc[0]->release(a); + check_release(a); +} + +void * +test2(void *x) +{ + int i = * (int *) x; + + tprintf ("test2: client %d acquire a release a\n", i); + lc[i]->acquire(a); + tprintf ("test2: client %d acquire done\n", i); + check_grant(a); + sleep(1); + tprintf ("test2: client %d release\n", i); + check_release(a); + lc[i]->release(a); + tprintf ("test2: client %d release done\n", i); + return 0; +} + +void * +test3(void *x) +{ + int i = * (int *) x; + + tprintf ("test3: client %d acquire a release a concurrent\n", i); + for (int j = 0; j < 10; j++) { + lc[i]->acquire(a); + check_grant(a); + tprintf ("test3: client %d got lock\n", i); + check_release(a); + lc[i]->release(a); + } + return 0; +} + +void * +test4(void *x) +{ + int i = * (int *) x; + + tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i); + for (int j = 0; j < 10; j++) { + lc[0]->acquire(a); + check_grant(a); + tprintf ("test4: thread %d on client 0 got lock\n", i); + check_release(a); + lc[0]->release(a); + } + return 0; +} + +void * +test5(void *x) +{ + int i = * (int *) x; + + tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i); + for (int j = 0; j < 10; j++) { + if (i < 5) lc[0]->acquire(a); + else lc[1]->acquire(a); + check_grant(a); + tprintf ("test5: client %d got lock\n", i); + check_release(a); + if (i < 5) lc[0]->release(a); + else lc[1]->release(a); + } + return 0; +} + +int +main(int argc, char *argv[]) +{ + int r; + pthread_t th[nt]; + int test = 0; + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + srandom(getpid()); + + //jsl_set_debug(2); + + if(argc < 2) { + fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); + exit(1); + } + + dst = argv[1]; + + if (argc > 2) { + test = atoi(argv[2]); + if(test < 1 || test > 5){ + tprintf("Test number must be between 1 and 5\n"); + exit(1); + } + } + + VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0); + tprintf("cache lock client\n"); + for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst); + + if(!test || test == 1){ + test1(); + } + + if(!test || test == 2){ + // test2 + for (int i = 0; i < nt; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test2, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < nt; i++) { + pthread_join(th[i], NULL); + } + } + + if(!test || test == 3){ + tprintf("test 3\n"); + + // test3 + for (int i = 0; i < nt; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test3, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < nt; i++) { + pthread_join(th[i], NULL); + } + } + + if(!test || test == 4){ + tprintf("test 4\n"); + + // test 4 + for (int i = 0; i < 2; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test4, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < 2; i++) { + pthread_join(th[i], NULL); + } + } + + if(!test || test == 5){ + tprintf("test 5\n"); + + // test 5 + + for (int i = 0; i < nt; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test5, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < nt; i++) { + pthread_join(th[i], NULL); + } + } + + tprintf ("%s: passed all tests successfully\n", argv[0]); + +} diff --git a/log.cc b/log.cc new file mode 100644 index 0000000..a760815 --- /dev/null +++ b/log.cc @@ -0,0 +1,132 @@ +#include "paxos.h" +#include +#include + +// Paxos must maintain some durable state (i.e., that survives power +// failures) to run Paxos correct. This module implements a log with +// all durable state to run Paxos. Since the values chosen correspond +// to views, the log contains all views since the beginning of time. + +log::log(acceptor *_acc, std::string _me) + : pxs (_acc) +{ + name = "paxos-" + _me + ".log"; + logread(); +} + +void +log::logread(void) +{ + std::ifstream from; + std::string type; + unsigned instance; + + from.open(name.c_str()); + printf ("logread\n"); + while (from >> type) { + if (type == "done") { + std::string v; + from >> instance; + from.get(); + getline(from, v); + pxs->values[instance] = v; + pxs->instance_h = instance; + printf ("logread: instance: %d w. v = %s\n", instance, + pxs->values[instance].c_str()); + pxs->v_a.clear(); + pxs->n_h.n = 0; + pxs->n_a.n = 0; + } else if (type == "propseen") { + from >> pxs->n_h.n; + from >> pxs->n_h.m; + printf("logread: high update: %d(%s)\n", pxs->n_h.n, pxs->n_h.m.c_str()); + } else if (type == "accepted") { + std::string v; + from >> pxs->n_a.n; + from >> pxs->n_a.m; + from.get(); + getline(from, v); + pxs->v_a = v; + printf("logread: prop update %d(%s) with v = %s\n", pxs->n_a.n, + pxs->n_a.m.c_str(), pxs->v_a.c_str()); + } else { + printf("logread: unknown log record\n"); + VERIFY(0); + } + } + from.close(); +} + +std::string +log::dump() +{ + std::ifstream from; + std::string res; + std::string v; + from.open(name.c_str()); + while (getline(from, v)) { + res = res + v + "\n"; + } + from.close(); + return res; +} + +void +log::restore(std::string s) +{ + std::ofstream f; + printf("restore: %s\n", s.c_str()); + f.open(name.c_str(), std::ios::trunc); + f << s; + f.close(); +} + +// XXX should be an atomic operation +void +log::loginstance(unsigned instance, std::string v) +{ + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "done"; + f << " "; + f << instance; + f << " "; + f << v; + f << "\n"; + f.close(); +} + +// an acceptor should call logprop(n_h) when it +// receives a prepare to which it responds prepare_ok(). +void +log::logprop(prop_t n_h) +{ + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "propseen"; + f << " "; + f << n_h.n; + f << " "; + f << n_h.m; + f << "\n"; + f.close(); +} + +// an acceptor should call logaccept(n_a, v_a) when it +// receives an accept RPC to which it replies accept_ok(). +void +log::logaccept(prop_t n, std::string v) +{ + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "accepted"; + f << " "; + f << n.n; + f << " "; + f << n.m; + f << " "; + f << v; + f << "\n"; + f.close(); +} + diff --git a/log.h b/log.h new file mode 100644 index 0000000..5bd2779 --- /dev/null +++ b/log.h @@ -0,0 +1,28 @@ +#ifndef log_h +#define log_h + +#include +#include + + +class acceptor; + +class log { + private: + std::string name; + acceptor *pxs; + public: + log (acceptor*, std::string _me); + std::string dump(); + void restore(std::string s); + void logread(void); + /* Log a committed paxos instance*/ + void loginstance(unsigned instance, std::string v); + /* Log the highest proposal number that the local paxos acceptor has ever seen */ + void logprop(prop_t n_h); + /* Log the proposal (proposal number and value) that the local paxos acceptor + accept has ever accepted */ + void logaccept(prop_t n_a, std::string v); +}; + +#endif /* log_h */ diff --git a/mutex.cc b/mutex.cc new file mode 100644 index 0000000..4e54cc0 --- /dev/null +++ b/mutex.cc @@ -0,0 +1,42 @@ +#include "mutex.h" +#include "lang/verify.h" + +mutex::mutex() { + VERIFY(pthread_mutex_init(&m, NULL) == 0); +} + +mutex::~mutex() { + VERIFY(pthread_mutex_destroy(&m) == 0); +} + +void mutex::acquire() { + VERIFY(pthread_mutex_lock(&m) == 0); +} + +void mutex::release() { + VERIFY(pthread_mutex_unlock(&m) == 0); +} + +mutex::operator pthread_mutex_t *() { + return &m; +} + +cond::cond() { + VERIFY(pthread_cond_init(&c, NULL) == 0); +} + +cond::~cond() { + VERIFY(pthread_cond_destroy(&c) == 0); +} + +void cond::wait(mutex &m) { + VERIFY(pthread_cond_wait(&c, m) == 0); +} + +void cond::signal() { + VERIFY(pthread_cond_signal(&c) == 0); +} + +void cond::broadcast() { + VERIFY(pthread_cond_broadcast(&c) == 0); +} diff --git a/mutex.h b/mutex.h new file mode 100644 index 0000000..228a3bb --- /dev/null +++ b/mutex.h @@ -0,0 +1,28 @@ +#ifndef mutex_h +#define mutex_h + +#include + +class mutex { + protected: + pthread_mutex_t m; + public: + mutex(); + ~mutex(); + void acquire(); + void release(); + operator pthread_mutex_t *(); +}; + +class cond { + protected: + pthread_cond_t c; + public: + cond(); + ~cond(); + void wait(mutex &m); + void signal(); + void broadcast(); +}; + +#endif diff --git a/paxos.cc b/paxos.cc new file mode 100644 index 0000000..c3a445f --- /dev/null +++ b/paxos.cc @@ -0,0 +1,372 @@ +#include "paxos.h" +#include "handle.h" +// #include +#include +#include "tprintf.h" +#include "lang/verify.h" + +// This module implements the proposer and acceptor of the Paxos +// distributed algorithm as described by Lamport's "Paxos Made +// Simple". To kick off an instance of Paxos, the caller supplies a +// list of nodes, a proposed value, and invokes the proposer. If the +// majority of the nodes agree on the proposed value after running +// this instance of Paxos, the acceptor invokes the upcall +// paxos_commit to inform higher layers of the agreed value for this +// instance. + + +bool +operator> (const prop_t &a, const prop_t &b) +{ + return (a.n > b.n || (a.n == b.n && a.m > b.m)); +} + +bool +operator>= (const prop_t &a, const prop_t &b) +{ + return (a.n > b.n || (a.n == b.n && a.m >= b.m)); +} + +std::string +print_members(const std::vector &nodes) +{ + std::string s; + s.clear(); + for (unsigned i = 0; i < nodes.size(); i++) { + s += nodes[i]; + if (i < (nodes.size()-1)) + s += ","; + } + return s; +} + +bool isamember(std::string m, const std::vector &nodes) +{ + for (unsigned i = 0; i < nodes.size(); i++) { + if (nodes[i] == m) return 1; + } + return 0; +} + +bool +proposer::isrunning() +{ + bool r; + ScopedLock ml(pxs_mutex); + r = !stable; + return r; +} + +// check if the servers in l2 contains a majority of servers in l1 +bool +proposer::majority(const std::vector &l1, + const std::vector &l2) +{ + unsigned n = 0; + + for (unsigned i = 0; i < l1.size(); i++) { + if (isamember(l1[i], l2)) + n++; + } + return n >= (l1.size() >> 1) + 1; +} + +proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, + std::string _me) + : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), + stable (true) +{ + my_n.n = 0; + my_n.m = me; +} + +void +proposer::setn() +{ + my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; +} + +bool +proposer::run(int instance, std::vector cur_nodes, std::string newv) +{ + std::vector accepts; + std::vector nodes; + std::string v; + bool r = false; + + ScopedLock ml(pxs_mutex); + tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", + print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + if (!stable) { // already running proposer? + tprintf("proposer::run: already running\n"); + return false; + } + stable = false; + setn(); + accepts.clear(); + v.clear(); + if (prepare(instance, accepts, cur_nodes, v)) { + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of prepare responses\n"); + + if (v.size() == 0) + v = newv; + + breakpoint1(); + + nodes = accepts; + accepts.clear(); + accept(instance, accepts, nodes, v); + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of accept responses\n"); + + breakpoint2(); + + decide(instance, accepts, v); + r = true; + } else { + tprintf("paxos::manager: no majority of accept responses\n"); + } + } else { + tprintf("paxos::manager: no majority of prepare responses\n"); + } + } else { + tprintf("paxos::manager: prepare is rejected %d\n", stable); + } + stable = true; + return r; +} + +// proposer::run() calls prepare to send prepare RPCs to nodes +// and collect responses. if one of those nodes +// replies with an oldinstance, return false. +// otherwise fill in accepts with set of nodes that accepted, +// set v to the v_a with the highest n_a, and return true. +bool +proposer::prepare(unsigned instance, std::vector &accepts, + std::vector nodes, + std::string &v) +{ + struct paxos_protocol::preparearg arg = { instance, my_n }; + struct paxos_protocol::prepareres res; + prop_t n_a = { 0, "" }; + rpcc *r; + for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { + handle h(*i); + if (!(r = h.safebind())) + continue; + int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000)); + if (status == paxos_protocol::OK) { + if (res.oldinstance) { + tprintf("commiting old instance!\n"); + acc->commit(instance, res.v_a); + return false; + } + if (res.accept) { + accepts.push_back(*i); + if (res.n_a >= n_a) { + tprintf("found a newer accepted proposal\n"); + v = res.v_a; + n_a = res.n_a; + } + } + } + } + return true; +} + +// run() calls this to send out accept RPCs to accepts. +// fill in accepts with list of nodes that accepted. +void +proposer::accept(unsigned instance, std::vector &accepts, + std::vector nodes, std::string v) +{ + struct paxos_protocol::acceptarg arg = { instance, my_n, v }; + rpcc *r; + for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { + handle h(*i); + if (!(r = h.safebind())) + continue; + bool accept = false; + int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000)); + if (status == paxos_protocol::OK) { + if (accept) + accepts.push_back(*i); + } + } +} + +void +proposer::decide(unsigned instance, std::vector accepts, + std::string v) +{ + struct paxos_protocol::decidearg arg = { instance, v }; + rpcc *r; + for (std::vector::iterator i=accepts.begin(); i!=accepts.end(); i++) { + handle h(*i); + if (!(r = h.safebind())) + continue; + int res = 0; + r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000)); + } +} + +acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, + std::string _value) + : cfg(_cfg), me (_me), instance_h(0) +{ + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + + l = new log (this, me); + + if (instance_h == 0 && _first) { + values[1] = _value; + l->loginstance(1, _value); + instance_h = 1; + } + + pxs = new rpcs(atoi(_me.c_str())); + pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq); + pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq); + pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq); +} + +paxos_protocol::status +acceptor::preparereq(std::string src, paxos_protocol::preparearg a, + paxos_protocol::prepareres &r) +{ + ScopedLock ml(pxs_mutex); + r.oldinstance = false; + r.accept = false; + r.n_a = n_a; + r.v_a = v_a; + if (a.instance <= instance_h) { + r.oldinstance = true; + r.v_a = values[a.instance]; + } else if (a.n > n_h) { + n_h = a.n; + l->logprop(n_h); + r.accept = true; + } else { + tprintf("I totally rejected this request. Ha.\n"); + } + return paxos_protocol::OK; +} + +paxos_protocol::status +acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) +{ + ScopedLock ml(pxs_mutex); + r = false; + if (a.n >= n_h) { + n_a = a.n; + v_a = a.v; + l->logaccept(n_a, v_a); + r = true; + } + return paxos_protocol::OK; +} + +// the src argument is only for debug purpose +paxos_protocol::status +acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) +{ + ScopedLock ml(pxs_mutex); + tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", + a.instance, instance_h, v_a.c_str()); + if (a.instance == instance_h + 1) { + VERIFY(v_a == a.v); + commit_wo(a.instance, v_a); + } else if (a.instance <= instance_h) { + // we are ahead ignore. + } else { + // we are behind + VERIFY(0); + } + return paxos_protocol::OK; +} + +void +acceptor::commit_wo(unsigned instance, std::string value) +{ + //assume pxs_mutex is held + tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + if (instance > instance_h) { + tprintf("commit: highestaccepteinstance = %d\n", instance); + values[instance] = value; + l->loginstance(instance, value); + instance_h = instance; + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + if (cfg) { + pxs_mutex.release(); + cfg->paxos_commit(instance, value); + pxs_mutex.acquire(); + } + } +} + +void +acceptor::commit(unsigned instance, std::string value) +{ + ScopedLock ml(pxs_mutex); + commit_wo(instance, value); +} + +std::string +acceptor::dump() +{ + return l->dump(); +} + +void +acceptor::restore(std::string s) +{ + l->restore(s); + l->logread(); +} + + + +// For testing purposes + +// Call this from your code between phases prepare and accept of proposer +void +proposer::breakpoint1() +{ + if (break1) { + tprintf("Dying at breakpoint 1!\n"); + exit(1); + } +} + +// Call this from your code between phases accept and decide of proposer +void +proposer::breakpoint2() +{ + if (break2) { + tprintf("Dying at breakpoint 2!\n"); + exit(1); + } +} + +void +proposer::breakpoint(int b) +{ + if (b == 3) { + tprintf("Proposer: breakpoint 1\n"); + break1 = true; + } else if (b == 4) { + tprintf("Proposer: breakpoint 2\n"); + break2 = true; + } +} diff --git a/paxos.h b/paxos.h new file mode 100644 index 0000000..7188edb --- /dev/null +++ b/paxos.h @@ -0,0 +1,100 @@ +#ifndef paxos_h +#define paxos_h + +#include +#include +#include "rpc.h" +#include "paxos_protocol.h" +#include "log.h" +#include "mutex.h" + + +class paxos_change { + public: + virtual void paxos_commit(unsigned instance, std::string v) = 0; + virtual ~paxos_change() {}; +}; + +class acceptor { + private: + log *l; + rpcs *pxs; + paxos_change *cfg; + std::string me; + mutex pxs_mutex; + + // Acceptor state + prop_t n_h; // number of the highest proposal seen in a prepare + prop_t n_a; // number of highest proposal accepted + std::string v_a; // value of highest proposal accepted + unsigned instance_h; // number of the highest instance we have decided + std::map values; // vals of each instance + + void commit_wo(unsigned instance, std::string v); + paxos_protocol::status preparereq(std::string src, + paxos_protocol::preparearg a, + paxos_protocol::prepareres &r); + paxos_protocol::status acceptreq(std::string src, + paxos_protocol::acceptarg a, bool &r); + paxos_protocol::status decidereq(std::string src, + paxos_protocol::decidearg a, int &r); + + friend class log; + + public: + acceptor(class paxos_change *cfg, bool _first, std::string _me, + std::string _value); + ~acceptor() {}; + void commit(unsigned instance, std::string v); + unsigned instance() { return instance_h; } + std::string value(unsigned instance) { return values[instance]; } + std::string dump(); + void restore(std::string); + rpcs *get_rpcs() { return pxs; }; + prop_t get_n_h() { return n_h; }; + unsigned get_instance_h() { return instance_h; }; +}; + +extern bool isamember(std::string m, const std::vector &nodes); +extern std::string print_members(const std::vector &nodes); + +class proposer { + private: + log *l; + paxos_change *cfg; + acceptor *acc; + std::string me; + bool break1; + bool break2; + + mutex pxs_mutex; + + // Proposer state + bool stable; + prop_t my_n; // number of the last proposal used in this instance + + void setn(); + bool prepare(unsigned instance, std::vector &accepts, + std::vector nodes, + std::string &v); + void accept(unsigned instance, std::vector &accepts, + std::vector nodes, std::string v); + void decide(unsigned instance, std::vector accepts, + std::string v); + + void breakpoint1(); + void breakpoint2(); + bool majority(const std::vector &l1, const std::vector &l2); + + friend class log; + public: + proposer(class paxos_change *cfg, class acceptor *_acceptor, std::string _me); + ~proposer() {}; + bool run(int instance, std::vector cnodes, std::string v); + bool isrunning(); + void breakpoint(int b); +}; + + + +#endif /* paxos_h */ diff --git a/paxos_protocol.h b/paxos_protocol.h new file mode 100644 index 0000000..9c2703e --- /dev/null +++ b/paxos_protocol.h @@ -0,0 +1,133 @@ +#ifndef paxos_protocol_h +#define paxos_protocol_h + +#include "rpc.h" + +struct prop_t { + unsigned n; + std::string m; +}; + +class paxos_protocol { + public: + enum xxstatus { OK, ERR }; + typedef int status; + enum rpc_numbers { + preparereq = 0x11001, + acceptreq, + decidereq, + heartbeat, + }; + + struct preparearg { + unsigned instance; + prop_t n; + }; + + struct prepareres { + bool oldinstance; + bool accept; + prop_t n_a; + std::string v_a; + }; + + struct acceptarg { + unsigned instance; + prop_t n; + std::string v; + }; + + struct decidearg { + unsigned instance; + std::string v; + }; + +}; + +inline unmarshall & +operator>>(unmarshall &u, prop_t &a) +{ + u >> a.n; + u >> a.m; + return u; +} + +inline marshall & +operator<<(marshall &m, prop_t a) +{ + m << a.n; + m << a.m; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::preparearg &a) +{ + u >> a.instance; + u >> a.n; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::preparearg a) +{ + m << a.instance; + m << a.n; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::prepareres &r) +{ + u >> r.oldinstance; + u >> r.accept; + u >> r.n_a; + u >> r.v_a; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::prepareres r) +{ + m << r.oldinstance; + m << r.accept; + m << r.n_a; + m << r.v_a; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::acceptarg &a) +{ + u >> a.instance; + u >> a.n; + u >> a.v; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::acceptarg a) +{ + m << a.instance; + m << a.n; + m << a.v; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::decidearg &a) +{ + u >> a.instance; + u >> a.v; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::decidearg a) +{ + m << a.instance; + m << a.v; + return m; +} + +#endif diff --git a/random.cc b/random.cc new file mode 100644 index 0000000..d344958 --- /dev/null +++ b/random.cc @@ -0,0 +1,36 @@ +#include "mutex.h" +#include +#include "rpc/slock.h" +#include + +static mutex rand_m; + +void srand_safe(unsigned int seed) { + ScopedLock s(rand_m); + srandom(seed); +} + +// RAND_MAX is guaranteed to be at least 32767 +// but math with rand() is annoying. +// Here's a canned solution from +// http://www.azillionmonkeys.com/qed/random.html +// which gives uniform numbers in [0, r) + +#define RS_SCALE (1.0 / (1.0 + RAND_MAX)) + +double drand (void) { + double d; + do { + d = (((random() * RS_SCALE) + random()) * RS_SCALE + random()) * RS_SCALE; + } while (d >= 1); /* Round off */ + return d; +} + +#define irand(x) ((unsigned int) ((x) * drand())) + +// use this to construct a 32-bit thread-safe RNG +#define RAND32 ((uint32_t)irand(1ul<<32)) +uint32_t rand32_safe() { + ScopedLock s(rand_m); + return RAND32; +} diff --git a/random.h b/random.h new file mode 100644 index 0000000..fce86ef --- /dev/null +++ b/random.h @@ -0,0 +1,7 @@ +#ifndef random_h +#define random_h + +void srand_safe(unsigned int seed); +uint32_t rand32_safe(); + +#endif diff --git a/rpc/connection.cc b/rpc/connection.cc new file mode 100644 index 0000000..c22ad45 --- /dev/null +++ b/rpc/connection.cc @@ -0,0 +1,448 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "method_thread.h" +#include "connection.h" +#include "slock.h" +#include "pollmgr.h" +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +#define MAX_PDU (10<<20) //maximum PDF is 10M + + +connection::connection(chanmgr *m1, int f1, int l1) +: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) +{ + + int flags = fcntl(fd_, F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(fd_, F_SETFL, flags); + + signal(SIGPIPE, SIG_IGN); + VERIFY(pthread_mutex_init(&m_,0)==0); + VERIFY(pthread_mutex_init(&ref_m_,0)==0); + VERIFY(pthread_cond_init(&send_wait_,0)==0); + VERIFY(pthread_cond_init(&send_complete_,0)==0); + + VERIFY(gettimeofday(&create_time_, NULL) == 0); + + PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); +} + +connection::~connection() +{ + VERIFY(dead_); + VERIFY(pthread_mutex_destroy(&m_)== 0); + VERIFY(pthread_mutex_destroy(&ref_m_)== 0); + VERIFY(pthread_cond_destroy(&send_wait_) == 0); + VERIFY(pthread_cond_destroy(&send_complete_) == 0); + if (rpdu_.buf) + free(rpdu_.buf); + VERIFY(!wpdu_.buf); + close(fd_); +} + +void +connection::incref() +{ + ScopedLock ml(&ref_m_); + refno_++; +} + +bool +connection::isdead() +{ + ScopedLock ml(&m_); + return dead_; +} + +void +connection::closeconn() +{ + { + ScopedLock ml(&m_); + if (!dead_) { + dead_ = true; + shutdown(fd_,SHUT_RDWR); + }else{ + return; + } + } + //after block_remove_fd, select will never wait on fd_ + //and no callbacks will be active + PollMgr::Instance()->block_remove_fd(fd_); +} + +void +connection::decref() +{ + VERIFY(pthread_mutex_lock(&ref_m_)==0); + refno_ --; + VERIFY(refno_>=0); + if (refno_==0) { + VERIFY(pthread_mutex_lock(&m_)==0); + if (dead_) { + VERIFY(pthread_mutex_unlock(&ref_m_)==0); + VERIFY(pthread_mutex_unlock(&m_)==0); + delete this; + return; + } + VERIFY(pthread_mutex_unlock(&m_)==0); + } + pthread_mutex_unlock(&ref_m_); +} + +int +connection::ref() +{ + ScopedLock rl(&ref_m_); + return refno_; +} + +int +connection::compare(connection *another) +{ + if (create_time_.tv_sec > another->create_time_.tv_sec) + return 1; + if (create_time_.tv_sec < another->create_time_.tv_sec) + return -1; + if (create_time_.tv_usec > another->create_time_.tv_usec) + return 1; + if (create_time_.tv_usec < another->create_time_.tv_usec) + return -1; + return 0; +} + +bool +connection::send(char *b, int sz) +{ + ScopedLock ml(&m_); + waiters_++; + while (!dead_ && wpdu_.buf) { + VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); + } + waiters_--; + if (dead_) { + return false; + } + wpdu_.buf = b; + wpdu_.sz = sz; + wpdu_.solong = 0; + + if (lossy_) { + if ((random()%100) < lossy_) { + jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_); + shutdown(fd_,SHUT_RDWR); + } + } + + if (!writepdu()) { + dead_ = true; + VERIFY(pthread_mutex_unlock(&m_) == 0); + PollMgr::Instance()->block_remove_fd(fd_); + VERIFY(pthread_mutex_lock(&m_) == 0); + }else{ + if (wpdu_.solong == wpdu_.sz) { + }else{ + //should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { + VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); + } + } + } + bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); + wpdu_.solong = wpdu_.sz = 0; + wpdu_.buf = NULL; + if (waiters_ > 0) + pthread_cond_broadcast(&send_wait_); + return ret; +} + +//fd_ is ready to be written +void +connection::write_cb(int s) +{ + ScopedLock ml(&m_); + VERIFY(!dead_); + VERIFY(fd_ == s); + if (wpdu_.sz == 0) { + PollMgr::Instance()->del_callback(fd_,CB_WRONLY); + return; + } + if (!writepdu()) { + PollMgr::Instance()->del_callback(fd_, CB_RDWR); + dead_ = true; + }else{ + VERIFY(wpdu_.solong >= 0); + if (wpdu_.solong < wpdu_.sz) { + return; + } + } + pthread_cond_signal(&send_complete_); +} + +//fd_ is ready to be read +void +connection::read_cb(int s) +{ + ScopedLock ml(&m_); + VERIFY(fd_ == s); + if (dead_) { + return; + } + + bool succ = true; + if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + succ = readpdu(); + } + + if (!succ) { + PollMgr::Instance()->del_callback(fd_,CB_RDWR); + dead_ = true; + pthread_cond_signal(&send_complete_); + } + + if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + //chanmgr has successfully consumed the pdu + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + } + } +} + +bool +connection::writepdu() +{ + VERIFY(wpdu_.solong >= 0); + if (wpdu_.solong == wpdu_.sz) + return true; + + if (wpdu_.solong == 0) { + int sz = htonl(wpdu_.sz); + bcopy(&sz,wpdu_.buf,sizeof(sz)); + } + int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + if (n < 0) { + if (errno != EAGAIN) { + jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno); + wpdu_.solong = -1; + wpdu_.sz = 0; + } + return (errno == EAGAIN); + } + wpdu_.solong += n; + return true; +} + +bool +connection::readpdu() +{ + if (!rpdu_.sz) { + int sz, sz1; + int n = read(fd_, &sz1, sizeof(sz1)); + + if (n == 0) { + return false; + } + + if (n < 0) { + VERIFY(errno!=EAGAIN); + return false; + } + + if (n >0 && n!= sizeof(sz)) { + jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n"); + return false; + } + + sz = ntohl(sz1); + + if (sz > MAX_PDU) { + char *tmpb = (char *)&sz1; + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); + return false; + } + + rpdu_.sz = sz; + VERIFY(rpdu_.buf == NULL); + rpdu_.buf = (char *)malloc(sz+sizeof(sz)); + VERIFY(rpdu_.buf); + bcopy(&sz1,rpdu_.buf,sizeof(sz)); + rpdu_.solong = sizeof(sz); + } + + int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + if (n <= 0) { + if (errno == EAGAIN) + return true; + if (rpdu_.buf) + free(rpdu_.buf); + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + return (errno == EAGAIN); + } + rpdu_.solong += n; + return true; +} + +tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +: mgr_(m1), lossy_(lossytest) +{ + + VERIFY(pthread_mutex_init(&m_,NULL) == 0); + + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + tcp_ = socket(AF_INET, SOCK_STREAM, 0); + if(tcp_ < 0){ + perror("tcpsconn::tcpsconn accept_loop socket:"); + VERIFY(0); + } + + int yes = 1; + setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + + if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ + perror("accept_loop tcp bind:"); + VERIFY(0); + } + + if(listen(tcp_, 1000) < 0) { + perror("tcpsconn::tcpsconn listen:"); + VERIFY(0); + } + + socklen_t addrlen = sizeof(sin); + VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); + port_ = ntohs(sin.sin_port); + + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, + sin.sin_port); + + if (pipe(pipe_) < 0) { + perror("accept_loop pipe:"); + VERIFY(0); + } + + int flags = fcntl(pipe_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipe_[0], F_SETFL, flags); + + VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); +} + +tcpsconn::~tcpsconn() +{ + VERIFY(close(pipe_[1]) == 0); + VERIFY(pthread_join(th_, NULL) == 0); + + //close all the active connections + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end(); i++) { + i->second->closeconn(); + i->second->decref(); + } +} + +void +tcpsconn::process_accept() +{ + sockaddr_in sin; + socklen_t slen = sizeof(sin); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + if (s1 < 0) { + perror("tcpsconn::accept_conn error"); + pthread_exit(NULL); + } + + jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", + s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); + connection *ch = new connection(mgr_, s1, lossy_); + + // garbage collect all dead connections with refcount of 1 + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end();) { + if (i->second->isdead() && i->second->ref() == 1) { + jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", + i->second->channo()); + i->second->decref(); + // Careful not to reuse i right after erase. (i++) will + // be evaluated before the erase call because in C++, + // there is a sequence point before a function call. + // See http://en.wikipedia.org/wiki/Sequence_point. + conns_.erase(i++); + } else + ++i; + } + + conns_[ch->channo()] = ch; +} + +void +tcpsconn::accept_conn() +{ + fd_set rfds; + int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; + + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + perror("accept_conn select:"); + jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + VERIFY(0); + } + } + + if (FD_ISSET(pipe_[0], &rfds)) { + close(pipe_[0]); + close(tcp_); + return; + } + else if (FD_ISSET(tcp_, &rfds)) { + process_accept(); + } else { + VERIFY(0); + } + } +} + +connection * +connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) +{ + int s= socket(AF_INET, SOCK_STREAM, 0); + int yes = 1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", + inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); + close(s); + return NULL; + } + jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n", + s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); + return new connection(mgr, s, lossy); +} + + diff --git a/rpc/connection.h b/rpc/connection.h new file mode 100644 index 0000000..da48cf4 --- /dev/null +++ b/rpc/connection.h @@ -0,0 +1,101 @@ +#ifndef connection_h +#define connection_h 1 + +#include +#include +#include +#include +#include + +#include + +#include "pollmgr.h" + +class connection; + +class chanmgr { + public: + virtual bool got_pdu(connection *c, char *b, int sz) = 0; + virtual ~chanmgr() {} +}; + +class connection : public aio_callback { + public: + struct charbuf { + charbuf(): buf(NULL), sz(0), solong(0) {} + charbuf (char *b, int s) : buf(b), sz(s), solong(0){} + char *buf; + int sz; + int solong; //amount of bytes written or read so far + }; + + connection(chanmgr *m1, int f1, int lossytest=0); + ~connection(); + + int channo() { return fd_; } + bool isdead(); + void closeconn(); + + bool send(char *b, int sz); + void write_cb(int s); + void read_cb(int s); + + void incref(); + void decref(); + int ref(); + + int compare(connection *another); + private: + + bool readpdu(); + bool writepdu(); + + chanmgr *mgr_; + const int fd_; + bool dead_; + + charbuf wpdu_; + charbuf rpdu_; + + struct timeval create_time_; + + int waiters_; + int refno_; + const int lossy_; + + pthread_mutex_t m_; + pthread_mutex_t ref_m_; + pthread_cond_t send_complete_; + pthread_cond_t send_wait_; +}; + +class tcpsconn { + public: + tcpsconn(chanmgr *m1, int port, int lossytest=0); + ~tcpsconn(); + inline int port() { return port_; } + void accept_conn(); + private: + int port_; + pthread_mutex_t m_; + pthread_t th_; + int pipe_[2]; + + int tcp_; //file desciptor for accepting connection + chanmgr *mgr_; + int lossy_; + std::map conns_; + + void process_accept(); +}; + +struct bundle { + bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} + chanmgr *mgr; + int tcp; + int lossy; +}; + +void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0); +connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); +#endif diff --git a/rpc/fifo.h b/rpc/fifo.h new file mode 100644 index 0000000..979cf62 --- /dev/null +++ b/rpc/fifo.h @@ -0,0 +1,94 @@ +#ifndef fifo_h +#define fifo_h + +// fifo template +// blocks enq() and deq() when queue is FULL or EMPTY + +#include +#include +#include +#include +#include +#include "slock.h" +#include "lang/verify.h" + +template +class fifo { + public: + fifo(int m=0); + ~fifo(); + bool enq(T, bool blocking=true); + void deq(T *); + bool size(); + + private: + std::list q_; + pthread_mutex_t m_; + pthread_cond_t non_empty_c_; // q went non-empty + pthread_cond_t has_space_c_; // q is not longer overfull + unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit +}; + +template +fifo::fifo(int limit) : max_(limit) +{ + VERIFY(pthread_mutex_init(&m_, 0) == 0); + VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0); + VERIFY(pthread_cond_init(&has_space_c_, 0) == 0); +} + +template +fifo::~fifo() +{ + //fifo is to be deleted only when no threads are using it! + VERIFY(pthread_mutex_destroy(&m_)==0); + VERIFY(pthread_cond_destroy(&non_empty_c_) == 0); + VERIFY(pthread_cond_destroy(&has_space_c_) == 0); +} + +template bool +fifo::size() +{ + ScopedLock ml(&m_); + return q_.size(); +} + +template bool +fifo::enq(T e, bool blocking) +{ + ScopedLock ml(&m_); + while (1) { + if (!max_ || q_.size() < max_) { + q_.push_back(e); + break; + } + if (blocking) + VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0); + else + return false; + } + VERIFY(pthread_cond_signal(&non_empty_c_) == 0); + return true; +} + +template void +fifo::deq(T *e) +{ + ScopedLock ml(&m_); + + while(1) { + if(q_.empty()){ + VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0); + } else { + *e = q_.front(); + q_.pop_front(); + if (max_ && q_.size() < max_) { + VERIFY(pthread_cond_signal(&has_space_c_)==0); + } + break; + } + } + return; +} + +#endif diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc new file mode 100644 index 0000000..06e5c2c --- /dev/null +++ b/rpc/jsl_log.cc @@ -0,0 +1,9 @@ +#include "jsl_log.h" + +int JSL_DEBUG_LEVEL = 0; +void +jsl_set_debug(int level) { + JSL_DEBUG_LEVEL = level; +} + + diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h new file mode 100644 index 0000000..7f92998 --- /dev/null +++ b/rpc/jsl_log.h @@ -0,0 +1,25 @@ +#ifndef __JSL_LOG_H__ +#define __JSL_LOG_H__ 1 + +enum dbcode { + JSL_DBG_OFF = 0, + JSL_DBG_1 = 1, // Critical + JSL_DBG_2 = 2, // Error + JSL_DBG_3 = 3, // Info + JSL_DBG_4 = 4, // Debugging +}; + +extern int JSL_DEBUG_LEVEL; + +#define jsl_log(level,...) \ + do { \ + if(JSL_DEBUG_LEVEL < abs(level)) \ + {;} \ + else { \ + { printf(__VA_ARGS__);} \ + } \ + } while(0) + +void jsl_set_debug(int level); + +#endif // __JSL_LOG_H__ diff --git a/rpc/marshall.h b/rpc/marshall.h new file mode 100644 index 0000000..e0370d1 --- /dev/null +++ b/rpc/marshall.h @@ -0,0 +1,261 @@ +#ifndef marshall_h +#define marshall_h + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "lang/verify.h" +#include "lang/algorithm.h" + +struct req_header { + req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0): + xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} + int xid; + int proc; + unsigned int clt_nonce; + unsigned int srv_nonce; + int xid_rep; +}; + +struct reply_header { + reply_header(int x=0, int r=0): xid(x), ret(r) {} + int xid; + int ret; +}; + +typedef uint64_t rpc_checksum_t; +typedef int rpc_sz_t; + +enum { + //size of initial buffer allocation + DEFAULT_RPC_SZ = 1024, +#if RPC_CHECKSUMMING + //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum + RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t) +#else + RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) +#endif +}; + +class marshall { + private: + char *_buf; // Base of the raw bytes buffer (dynamically readjusted) + int _capa; // Capacity of the buffer + int _ind; // Read/write head position + + public: + marshall() { + _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); + VERIFY(_buf); + _capa = DEFAULT_RPC_SZ; + _ind = RPC_HEADER_SZ; + } + + ~marshall() { + if (_buf) + free(_buf); + } + + int size() { return _ind;} + char *cstr() { return _buf;} + + void rawbyte(unsigned char); + void rawbytes(const char *, int); + + // Return the current content (excluding header) as a string + std::string get_content() { + return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ); + } + + // Return the current content (excluding header) as a string + std::string str() { + return get_content(); + } + + void pack(int i); + + void pack_req_header(const req_header &h) { + int saved_sz = _ind; + //leave the first 4-byte empty for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + pack(h.xid); + pack(h.proc); + pack((int)h.clt_nonce); + pack((int)h.srv_nonce); + pack(h.xid_rep); + _ind = saved_sz; + } + + void pack_reply_header(const reply_header &h) { + int saved_sz = _ind; + //leave the first 4-byte empty for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + pack(h.xid); + pack(h.ret); + _ind = saved_sz; + } + + void take_buf(char **b, int *s) { + *b = _buf; + *s = _ind; + _buf = NULL; + _ind = 0; + return; + } +}; +marshall& operator<<(marshall &, bool); +marshall& operator<<(marshall &, unsigned int); +marshall& operator<<(marshall &, int); +marshall& operator<<(marshall &, unsigned char); +marshall& operator<<(marshall &, char); +marshall& operator<<(marshall &, unsigned short); +marshall& operator<<(marshall &, short); +marshall& operator<<(marshall &, unsigned long long); +marshall& operator<<(marshall &, const std::string &); + +class unmarshall { + private: + char *_buf; + int _sz; + int _ind; + bool _ok; + public: + unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {} + unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {} + unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false) + { + //take the content which does not exclude a RPC header from a string + take_content(s); + } + ~unmarshall() { + if (_buf) free(_buf); + } + + //take contents from another unmarshall object + void take_in(unmarshall &another); + + //take the content which does not exclude a RPC header from a string + void take_content(const std::string &s) { + _sz = s.size()+RPC_HEADER_SZ; + _buf = (char *)realloc(_buf,_sz); + VERIFY(_buf); + _ind = RPC_HEADER_SZ; + memcpy(_buf+_ind, s.data(), s.size()); + _ok = true; + } + + bool ok() { return _ok; } + char *cstr() { return _buf;} + bool okdone(); + unsigned int rawbyte(); + void rawbytes(std::string &s, unsigned int n); + + int ind() { return _ind;} + int size() { return _sz;} + void unpack(int *); //non-const ref + void take_buf(char **b, int *sz) { + *b = _buf; + *sz = _sz; + _sz = _ind = 0; + _buf = NULL; + } + + void unpack_req_header(req_header *h) { + //the first 4-byte is for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + unpack(&h->xid); + unpack(&h->proc); + unpack((int *)&h->clt_nonce); + unpack((int *)&h->srv_nonce); + unpack(&h->xid_rep); + _ind = RPC_HEADER_SZ; + } + + void unpack_reply_header(reply_header *h) { + //the first 4-byte is for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + unpack(&h->xid); + unpack(&h->ret); + _ind = RPC_HEADER_SZ; + } +}; + +unmarshall& operator>>(unmarshall &, bool &); +unmarshall& operator>>(unmarshall &, unsigned char &); +unmarshall& operator>>(unmarshall &, char &); +unmarshall& operator>>(unmarshall &, unsigned short &); +unmarshall& operator>>(unmarshall &, short &); +unmarshall& operator>>(unmarshall &, unsigned int &); +unmarshall& operator>>(unmarshall &, int &); +unmarshall& operator>>(unmarshall &, unsigned long long &); +unmarshall& operator>>(unmarshall &, std::string &); + +template marshall & +operator<<(marshall &m, std::vector v) +{ + m << (unsigned int) v.size(); + for(unsigned i = 0; i < v.size(); i++) + m << v[i]; + return m; +} + +template unmarshall & +operator>>(unmarshall &u, std::vector &v) +{ + unsigned n; + u >> n; + for(unsigned i = 0; i < n; i++){ + C z; + u >> z; + v.push_back(z); + } + return u; +} + +template marshall & +operator<<(marshall &m, const std::map &d) { + typename std::map::const_iterator i; + + m << (unsigned int) d.size(); + + for (i = d.begin(); i != d.end(); i++) { + m << i->first << i->second; + } + return m; +} + +template unmarshall & +operator>>(unmarshall &u, std::map &d) { + unsigned int n; + u >> n; + + d.clear(); + + for (unsigned int lcv = 0; lcv < n; lcv++) { + A a; + B b; + u >> a >> b; + d[a] = b; + } + return u; +} + +#endif diff --git a/rpc/method_thread.h b/rpc/method_thread.h new file mode 100644 index 0000000..bcbc08b --- /dev/null +++ b/rpc/method_thread.h @@ -0,0 +1,164 @@ +#ifndef method_thread_h +#define method_thread_h + +// method_thread(): start a thread that runs an object method. +// returns a pthread_t on success, and zero on error. + +#include +#include +#include +#include +#include "lang/verify.h" + +static pthread_t +method_thread_parent(void *(*fn)(void *), void *arg, bool detach) +{ + pthread_t th; + pthread_attr_t attr; + pthread_attr_init(&attr); + // set stack size to 100K, so we don't run out of memory + pthread_attr_setstacksize(&attr, 100*1024); + int err = pthread_create(&th, &attr, fn, arg); + pthread_attr_destroy(&attr); + if (err != 0) { + fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err)); + exit(1); + } + + if (detach) { + // don't keep thread state around after exit, to avoid + // running out of threads. set detach==false if you plan + // to pthread_join. + VERIFY(pthread_detach(th) == 0); + } + + return th; +} + +static void +method_thread_child() +{ + // defer pthread_cancel() by default. check explicitly by + // enabling then pthread_testcancel(). + int oldstate, oldtype; + VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0); + VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)()) +{ + class XXX { + public: + C *o; + void (C::*m)(); + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)() = x->m; + delete x; + method_thread_child(); + (o->*m)(); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A), A a) +{ + class XXX { + public: + C *o; + void (C::*m)(A a); + A a; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + delete x; + method_thread_child(); + (o->*m)(a); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a = a; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +namespace { + // ~xavid: this causes a bizzare compile error on OS X.5 when + // it's declared in the function, so I moved it out here. + template + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2); + A1 a1; + A2 a2; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + delete x; + method_thread_child(); + (o->*m)(a1, a2); + return 0; + } + }; +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2) +{ + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3) +{ + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2, A3 a3); + A1 a1; + A2 a2; + A3 a3; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 , A3 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + A3 a3 = x->a3; + delete x; + method_thread_child(); + (o->*m)(a1, a2, a3); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + x->a3 = a3; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +#endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc new file mode 100644 index 0000000..f73a3a5 --- /dev/null +++ b/rpc/pollmgr.cc @@ -0,0 +1,360 @@ +#include +#include +#include +#include + +#include "slock.h" +#include "jsl_log.h" +#include "method_thread.h" +#include "lang/verify.h" +#include "pollmgr.h" + +PollMgr *PollMgr::instance = NULL; +static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT; + +void +PollMgrInit() +{ + PollMgr::instance = new PollMgr(); +} + +PollMgr * +PollMgr::Instance() +{ + pthread_once(&pollmgr_is_initialized, PollMgrInit); + return instance; +} + +PollMgr::PollMgr() : pending_change_(false) +{ + bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); + aio_ = new SelectAIO(); + //aio_ = new EPollAIO(); + + VERIFY(pthread_mutex_init(&m_, NULL) == 0); + VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0); + VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0); +} + +PollMgr::~PollMgr() +{ + //never kill me!!! + VERIFY(0); +} + +void +PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) +{ + VERIFY(fd < MAX_POLL_FDS); + + ScopedLock ml(&m_); + aio_->watch_fd(fd, flag); + + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; +} + +//remove all callbacks related to fd +//the return guarantees that callbacks related to fd +//will never be called again +void +PollMgr::block_remove_fd(int fd) +{ + ScopedLock ml(&m_); + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; + VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0); + callbacks_[fd] = NULL; +} + +void +PollMgr::del_callback(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (aio_->unwatch_fd(fd, flag)) { + callbacks_[fd] = NULL; + } +} + +bool +PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) +{ + ScopedLock ml(&m_); + if (!callbacks_[fd] || callbacks_[fd]!=c) + return false; + + return aio_->is_watched(fd, flag); +} + +void +PollMgr::wait_loop() +{ + + std::vector readable; + std::vector writable; + + while (1) { + { + ScopedLock ml(&m_); + if (pending_change_) { + pending_change_ = false; + VERIFY(pthread_cond_broadcast(&changedone_c_)==0); + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(&readable,&writable); + + if (!readable.size() && !writable.size()) { + continue; + } + //no locking of m_ + //because no add_callback() and del_callback should + //modify callbacks_[fd] while the fd is not dead + for (unsigned int i = 0; i < readable.size(); i++) { + int fd = readable[i]; + if (callbacks_[fd]) + callbacks_[fd]->read_cb(fd); + } + + for (unsigned int i = 0; i < writable.size(); i++) { + int fd = writable[i]; + if (callbacks_[fd]) + callbacks_[fd]->write_cb(fd); + } + } +} + +SelectAIO::SelectAIO() : highfds_(0) +{ + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); + + VERIFY(pipe(pipefd_) == 0); + FD_SET(pipefd_[0], &rfds_); + highfds_ = pipefd_[0]; + + int flags = fcntl(pipefd_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipefd_[0], F_SETFL, flags); + + VERIFY(pthread_mutex_init(&m_, NULL) == 0); +} + +SelectAIO::~SelectAIO() +{ + VERIFY(pthread_mutex_destroy(&m_) == 0); +} + +void +SelectAIO::watch_fd(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (highfds_ <= fd) + highfds_ = fd; + + if (flag == CB_RDONLY) { + FD_SET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + FD_SET(fd,&wfds_); + }else { + FD_SET(fd,&rfds_); + FD_SET(fd,&wfds_); + } + + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); +} + +bool +SelectAIO::is_watched(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (flag == CB_RDONLY) { + return FD_ISSET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + return FD_ISSET(fd,&wfds_); + }else{ + return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); + } +} + +bool +SelectAIO::unwatch_fd(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (flag == CB_RDONLY) { + FD_CLR(fd, &rfds_); + }else if (flag == CB_WRONLY) { + FD_CLR(fd, &wfds_); + }else if (flag == CB_RDWR) { + FD_CLR(fd, &wfds_); + FD_CLR(fd, &rfds_); + }else{ + VERIFY(0); + } + + if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { + if (fd == highfds_) { + int newh = pipefd_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_)) { + newh = i; + }else if (FD_ISSET(i, &wfds_)) { + newh = i; + } + } + highfds_ = newh; + } + } + if (flag == CB_RDWR) { + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + } + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); +} + +void +SelectAIO::wait_ready(std::vector *readable, std::vector *writable) +{ + fd_set trfds, twfds; + int high; + + { + ScopedLock ml(&m_); + trfds = rfds_; + twfds = wfds_; + high = highfds_; + + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + return; + } else { + perror("select:"); + jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); + VERIFY(0); + } + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(tmp==1); + }else { + if (FD_ISSET(fd, &twfds)) { + writable->push_back(fd); + } + if (FD_ISSET(fd, &trfds)) { + readable->push_back(fd); + } + } + } +} + +#ifdef __linux__ + +EPollAIO::EPollAIO() +{ + pollfd_ = epoll_create(MAX_POLL_FDS); + VERIFY(pollfd_ >= 0); + bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); +} + +EPollAIO::~EPollAIO() +{ + close(pollfd_); +} + +static inline +int poll_flag_to_event(poll_flag flag) +{ + int f; + if (flag == CB_RDONLY) { + f = EPOLLIN; + }else if (flag == CB_WRONLY) { + f = EPOLLOUT; + }else { //flag == CB_RDWR + f = EPOLLIN | EPOLLOUT; + } + return f; +} + +void +EPollAIO::watch_fd(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (int)flag; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + } + + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); +} + +bool +EPollAIO::unwatch_fd(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(int)flag; + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(op == EPOLL_CTL_DEL); + } + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); +} + +bool +EPollAIO::is_watched(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + return ((fdstatus_[fd] & CB_MASK) == flag); +} + +void +EPollAIO::wait_ready(std::vector *readable, std::vector *writable) +{ + int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) { + readable->push_back(ready_[i].data.fd); + } + if (ready_[i].events & EPOLLOUT) { + writable->push_back(ready_[i].data.fd); + } + } +} + +#endif diff --git a/rpc/pollmgr.h b/rpc/pollmgr.h new file mode 100644 index 0000000..c0a5748 --- /dev/null +++ b/rpc/pollmgr.h @@ -0,0 +1,107 @@ +#ifndef pollmgr_h +#define pollmgr_h + +#include +#include + +#ifdef __linux__ +#include +#endif + +#define MAX_POLL_FDS 128 + +typedef enum { + CB_NONE = 0x0, + CB_RDONLY = 0x1, + CB_WRONLY = 0x10, + CB_RDWR = 0x11, + CB_MASK = ~0x11, +} poll_flag; + +class aio_mgr { + public: + virtual void watch_fd(int fd, poll_flag flag) = 0; + virtual bool unwatch_fd(int fd, poll_flag flag) = 0; + virtual bool is_watched(int fd, poll_flag flag) = 0; + virtual void wait_ready(std::vector *readable, std::vector *writable) = 0; + virtual ~aio_mgr() {} +}; + +class aio_callback { + public: + virtual void read_cb(int fd) = 0; + virtual void write_cb(int fd) = 0; + virtual ~aio_callback() {} +}; + +class PollMgr { + public: + PollMgr(); + ~PollMgr(); + + static PollMgr *Instance(); + static PollMgr *CreateInst(); + + void add_callback(int fd, poll_flag flag, aio_callback *ch); + void del_callback(int fd, poll_flag flag); + bool has_callback(int fd, poll_flag flag, aio_callback *ch); + void block_remove_fd(int fd); + void wait_loop(); + + + static PollMgr *instance; + static int useful; + static int useless; + + private: + pthread_mutex_t m_; + pthread_cond_t changedone_c_; + pthread_t th_; + + aio_callback *callbacks_[MAX_POLL_FDS]; + aio_mgr *aio_; + bool pending_change_; + +}; + +class SelectAIO : public aio_mgr { + public : + + SelectAIO(); + ~SelectAIO(); + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + bool is_watched(int fd, poll_flag flag); + void wait_ready(std::vector *readable, std::vector *writable); + + private: + + fd_set rfds_; + fd_set wfds_; + int highfds_; + int pipefd_[2]; + + pthread_mutex_t m_; + +}; + +#ifdef __linux__ +class EPollAIO : public aio_mgr { + public: + EPollAIO(); + ~EPollAIO(); + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + bool is_watched(int fd, poll_flag flag); + void wait_ready(std::vector *readable, std::vector *writable); + + private: + int pollfd_; + struct epoll_event ready_[MAX_POLL_FDS]; + int fdstatus_[MAX_POLL_FDS]; + +}; +#endif /* __linux */ + +#endif /* pollmgr_h */ + diff --git a/rpc/rpc.cc b/rpc/rpc.cc new file mode 100644 index 0000000..e18506a --- /dev/null +++ b/rpc/rpc.cc @@ -0,0 +1,1086 @@ +/* + The rpcc class handles client-side RPC. Each rpcc is bound to a + single RPC server. The jobs of rpcc include maintaining a connection to + server, sending RPC requests and waiting for responses, retransmissions, + at-most-once delivery etc. + + The rpcs class handles the server side of RPC. Each rpcs handles multiple + connections from different rpcc objects. The jobs of rpcs include accepting + connections, dispatching requests to registered RPC handlers, at-most-once + delivery etc. + + Both rpcc and rpcs use the connection class as an abstraction for the + underlying communication channel. To send an RPC request/reply, one calls + connection::send() which blocks until data is sent or the connection has failed + (thus the caller can free the buffer when send() returns). When a + request/reply is received, connection makes a callback into the corresponding + rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). + + Thread organization: + rpcc uses application threads to send RPC requests and blocks to receive the + reply or error. All connections use a single PollMgr object to perform async + socket IO. PollMgr creates a single thread to examine the readiness of socket + file descriptors and informs the corresponding connection whenever a socket is + ready to be read or written. (We use asynchronous socket IO to reduce the + number of threads needed to manage these connections; without async IO, at + least one thread is needed per connection to read data without blocking other + activities.) Each rpcs object creates one thread for listening on the server + port and a pool of threads for executing RPC requests. The + thread pool allows us to control the number of threads spawned at the server + (spawning one thread per request will hurt when the server faces thousands of + requests). + + In order to delete a connection object, we must maintain a reference count. + For rpcc, + multiple client threads might be invoking the rpcc::call() functions and thus + holding multiple references to the underlying connection object. For rpcs, + multiple dispatch threads might be holding references to the same connection + object. A connection object is deleted only when the underlying connection is + dead and the reference count reaches zero. + + The previous version of the RPC library uses pthread_cancel* routines + to implement the deletion of rpcc and rpcs objects. The idea is to cancel + all active threads that might be holding a reference to an object before + deleting that object. However, pthread_cancel is not robust and there are + always bugs where outstanding references to deleted objects persist. + This version of the RPC library does not do pthread_cancel, but explicitly + joins exited threads to make sure no outstanding references exist before + deleting objects. + + To delete a rpcc object safely, the users of the library must ensure that + there are no outstanding calls on the rpcc object. + + To delete a rpcs object safely, we do the following in sequence: 1. stop + accepting new incoming connections. 2. close existing active connections. + 3. delete the dispatch thread pool which involves waiting for current active + RPC handlers to finish. It is interesting how a thread pool can be deleted + without using thread cancellation. The trick is to inject x "poison pills" for + a thread pool of x threads. Upon getting a poison pill instead of a normal + task, a worker thread will exit (and thread pool destructor waits to join all + x exited worker threads). + */ + +#include "rpc.h" +#include "method_thread.h" +#include "slock.h" + +#include +#include +#include +#include +#include + +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +const rpcc::TO rpcc::to_max = { 120000 }; +const rpcc::TO rpcc::to_min = { 1000 }; + +rpcc::caller::caller(unsigned int xxid, unmarshall *xun) +: xid(xxid), un(xun), done(false) +{ + VERIFY(pthread_mutex_init(&m,0) == 0); + VERIFY(pthread_cond_init(&c, 0) == 0); +} + +rpcc::caller::~caller() +{ + VERIFY(pthread_mutex_destroy(&m) == 0); + VERIFY(pthread_cond_destroy(&c) == 0); +} + +inline +void set_rand_seed() +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + srandom((int)ts.tv_nsec^((int)getpid())); +} + +rpcc::rpcc(sockaddr_in d, bool retrans) : + dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), + retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) +{ + VERIFY(pthread_mutex_init(&m_, 0) == 0); + VERIFY(pthread_mutex_init(&chan_m_, 0) == 0); + VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0); + + if(retrans){ + set_rand_seed(); + clt_nonce_ = random(); + } else { + // special client nonce 0 means this client does not + // require at-most-once logic from the server + // because it uses tcp and never retries a failed connection + clt_nonce_ = 0; + } + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + // xid starts with 1 and latest received reply starts with 0 + xid_rep_window_.push_back(0); + + jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", + clt_nonce_, lossytest_); +} + +// IMPORTANT: destruction should happen only when no external threads +// are blocked inside rpcc or will use rpcc in the future +rpcc::~rpcc() +{ + jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", + clt_nonce_, chan_?chan_->channo():-1); + if(chan_){ + chan_->closeconn(); + chan_->decref(); + } + VERIFY(calls_.size() == 0); + VERIFY(pthread_mutex_destroy(&m_) == 0); + VERIFY(pthread_mutex_destroy(&chan_m_) == 0); +} + +int +rpcc::bind(TO to) +{ + int r; + int ret = call(rpc_const::bind, 0, r, to); + if(ret == 0){ + ScopedLock ml(&m_); + bind_done_ = true; + srv_nonce_ = r; + } else { + jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", + inet_ntoa(dst_.sin_addr), ret); + } + return ret; +}; + +// Cancel all outstanding calls +void +rpcc::cancel(void) +{ + ScopedLock ml(&m_); + printf("rpcc::cancel: force callers to fail\n"); + std::map::iterator iter; + for(iter = calls_.begin(); iter != calls_.end(); iter++){ + caller *ca = iter->second; + + jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); + { + ScopedLock cl(&ca->m); + ca->done = true; + ca->intret = rpc_const::cancel_failure; + VERIFY(pthread_cond_signal(&ca->c) == 0); + } + } + + while (calls_.size () > 0){ + destroy_wait_ = true; + VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0); + } + printf("rpcc::cancel: done\n"); +} + +int +rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, + TO to) +{ + + caller ca(0, &rep); + int xid_rep; + { + ScopedLock ml(&m_); + + if((proc != rpc_const::bind && !bind_done_) || + (proc == rpc_const::bind && bind_done_)){ + jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n"); + return rpc_const::bind_failure; + } + + if(destroy_wait_){ + return rpc_const::cancel_failure; + } + + ca.xid = xid_++; + calls_[ca.xid] = &ca; + + req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, + xid_rep_window_.front()); + req.pack_req_header(h); + xid_rep = xid_rep_window_.front(); + } + + TO curr_to; + struct timespec now, nextdeadline, finaldeadline; + + clock_gettime(CLOCK_REALTIME, &now); + add_timespec(now, to.to, &finaldeadline); + curr_to.to = to_min.to; + + bool transmit = true; + connection *ch = NULL; + + while (1){ + if(transmit){ + get_refconn(&ch); + if(ch){ + if(reachable_) { + request forgot; + { + ScopedLock ml(&m_); + if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) { + forgot = dup_req_; + dup_req_.clear(); + } + } + if (forgot.isvalid()) + ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); + ch->send(req.cstr(), req.size()); + } + else jsl_log(JSL_DBG_1, "not reachable\n"); + jsl_log(JSL_DBG_2, + "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", + clt_nonce_, proc, ca.xid, clt_nonce_); + } + transmit = false; // only send once on a given channel + } + + if(!finaldeadline.tv_sec) + break; + + clock_gettime(CLOCK_REALTIME, &now); + add_timespec(now, curr_to.to, &nextdeadline); + if(cmp_timespec(nextdeadline,finaldeadline) > 0){ + nextdeadline = finaldeadline; + finaldeadline.tv_sec = 0; + } + + { + ScopedLock cal(&ca.m); + while (!ca.done){ + jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); + if(pthread_cond_timedwait(&ca.c, &ca.m, + &nextdeadline) == ETIMEDOUT){ + jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); + break; + } + } + if(ca.done){ + jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n"); + break; + } + } + + if(retrans_ && (!ch || ch->isdead())){ + // since connection is dead, retransmit + // on the new connection + transmit = true; + } + curr_to.to <<= 1; + } + + { + // no locking of ca.m since only this thread changes ca.xid + ScopedLock ml(&m_); + calls_.erase(ca.xid); + // may need to update the xid again here, in case the + // packet times out before it's even sent by the channel. + // I don't think there's any harm in maybe doing it twice + update_xid_rep(ca.xid); + + if(destroy_wait_){ + VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0); + } + } + + if (ca.done && lossytest_) + { + ScopedLock ml(&m_); + if (!dup_req_.isvalid()) { + dup_req_.buf.assign(req.cstr(), req.size()); + dup_req_.xid = ca.xid; + } + if (xid_rep > xid_rep_done_) + xid_rep_done_ = xid_rep; + } + + ScopedLock cal(&ca.m); + + jsl_log(JSL_DBG_2, + "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", + clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), + ntohs(dst_.sin_port), ca.done, ca.intret); + + if(ch) + ch->decref(); + + // destruction of req automatically frees its buffer + return (ca.done? ca.intret : rpc_const::timeout_failure); +} + +void +rpcc::get_refconn(connection **ch) +{ + ScopedLock ml(&chan_m_); + if(!chan_ || chan_->isdead()){ + if(chan_) + chan_->decref(); + chan_ = connect_to_dst(dst_, this, lossytest_); + } + if(ch && chan_){ + if(*ch){ + (*ch)->decref(); + } + *ch = chan_; + (*ch)->incref(); + } +} + +// PollMgr's thread is being used to +// make this upcall from connection object to rpcc. +// this funtion must not block. +// +// this function keeps no reference for connection *c +bool +rpcc::got_pdu(connection *c, char *b, int sz) +{ + unmarshall rep(b, sz); + reply_header h; + rep.unpack_reply_header(&h); + + if(!rep.ok()){ + jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n"); + return true; + } + + ScopedLock ml(&m_); + + update_xid_rep(h.xid); + + if(calls_.find(h.xid) == calls_.end()){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid); + return true; + } + caller *ca = calls_[h.xid]; + + ScopedLock cl(&ca->m); + if(!ca->done){ + ca->un->take_in(rep); + ca->intret = h.ret; + if(ca->intret < 0){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n", + h.xid, ca->intret); + } + ca->done = 1; + } + VERIFY(pthread_cond_broadcast(&ca->c) == 0); + return true; +} + +// assumes thread holds mutex m +void +rpcc::update_xid_rep(unsigned int xid) +{ + std::list::iterator it; + + if(xid <= xid_rep_window_.front()){ + return; + } + + for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ + if(*it > xid){ + xid_rep_window_.insert(it, xid); + goto compress; + } + } + xid_rep_window_.push_back(xid); + +compress: + it = xid_rep_window_.begin(); + for (it++; it != xid_rep_window_.end(); it++){ + while (xid_rep_window_.front() + 1 == *it) + xid_rep_window_.pop_front(); + } +} + + +rpcs::rpcs(unsigned int p1, int count) + : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) +{ + VERIFY(pthread_mutex_init(&procs_m_, 0) == 0); + VERIFY(pthread_mutex_init(&count_m_, 0) == 0); + VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0); + VERIFY(pthread_mutex_init(&conss_m_, 0) == 0); + + set_rand_seed(); + nonce_ = random(); + jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + reg(rpc_const::bind, this, &rpcs::rpcbind); + dispatchpool_ = new ThrPool(6,false); + + listener_ = new tcpsconn(this, port_, lossytest_); +} + +rpcs::~rpcs() +{ + // must delete listener before dispatchpool + delete listener_; + delete dispatchpool_; + free_reply_window(); +} + +bool +rpcs::got_pdu(connection *c, char *b, int sz) +{ + if(!reachable_){ + jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n"); + return true; + } + + djob_t *j = new djob_t(c, b, sz); + c->incref(); + bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j); + if(!succ || !reachable_){ + c->decref(); + delete j; + } + return succ; +} + +void +rpcs::reg1(unsigned int proc, handler *h) +{ + ScopedLock pl(&procs_m_); + VERIFY(procs_.count(proc) == 0); + procs_[proc] = h; + VERIFY(procs_.count(proc) >= 1); +} + +void +rpcs::updatestat(unsigned int proc) +{ + ScopedLock cl(&count_m_); + counts_[proc]++; + curr_counts_--; + if(curr_counts_ == 0){ + std::map::iterator i; + printf("RPC STATS: "); + for (i = counts_.begin(); i != counts_.end(); i++){ + printf("%x:%d ", i->first, i->second); + } + printf("\n"); + + ScopedLock rwl(&reply_window_m_); + std::map >::iterator clt; + + unsigned int totalrep = 0, maxrep = 0; + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + totalrep += clt->second.size(); + if(clt->second.size() > maxrep) + maxrep = clt->second.size(); + } + jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", + (int) reply_window_.size()-1, totalrep, maxrep); + curr_counts_ = counting_; + } +} + +void +rpcs::dispatch(djob_t *j) +{ + connection *c = j->conn; + unmarshall req(j->buf, j->sz); + delete j; + + req_header h; + req.unpack_req_header(&h); + int proc = h.proc; + + if(!req.ok()){ + jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); + c->decref(); + return; + } + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n", + h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); + + marshall rep; + reply_header rh(h.xid,0); + + // is client sending to an old instance of server? + if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n", + h.srv_nonce, nonce_, h.proc); + rh.ret = rpc_const::oldsrv_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + return; + } + + handler *f; + // is RPC proc a registered procedure? + { + ScopedLock pl(&procs_m_); + if(procs_.count(proc) < 1){ + fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n", + proc); + c->decref(); + VERIFY(0); + return; + } + + f = procs_[proc]; + } + + rpcs::rpcstate_t stat; + char *b1; + int sz1; + + if(h.clt_nonce){ + // have i seen this client before? + { + ScopedLock rwl(&reply_window_m_); + // if we don't know about this clt_nonce, create a cleanup object + if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ + VERIFY (reply_window_[h.clt_nonce].size() == 0); // create + reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid + jsl_log(JSL_DBG_2, + "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", + h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1); + } + } + + // save the latest good connection to the client + { + ScopedLock rwl(&conss_m_); + if(conns_.find(h.clt_nonce) == conns_.end()){ + c->incref(); + conns_[h.clt_nonce] = c; + } else if(conns_[h.clt_nonce]->compare(c) < 0){ + conns_[h.clt_nonce]->decref(); + c->incref(); + conns_[h.clt_nonce] = c; + } + } + + stat = checkduplicate_and_update(h.clt_nonce, h.xid, + h.xid_rep, &b1, &sz1); + } else { + // this client does not require at most once logic + stat = NEW; + } + + switch (stat){ + case NEW: // new request + if(counting_){ + updatestat(proc); + } + + rh.ret = f->fn(req, rep); + if (rh.ret == rpc_const::unmarshal_args_failure) { + fprintf(stderr, "rpcs::dispatch: failed to" + " unmarshall the arguments. You are" + " probably calling RPC 0x%x with wrong" + " types of arguments.\n", proc); + VERIFY(0); + } + VERIFY(rh.ret >= 0); + + rep.pack_reply_header(rh); + rep.take_buf(&b1,&sz1); + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n", + sz1, h.xid, proc, rh.ret, h.clt_nonce); + + if(h.clt_nonce > 0){ + // only record replies for clients that require at-most-once logic + add_reply(h.clt_nonce, h.xid, b1, sz1); + } + + // get the latest connection to the client + { + ScopedLock rwl(&conss_m_); + if(c->isdead() && c != conns_[h.clt_nonce]){ + c->decref(); + c = conns_[h.clt_nonce]; + c->incref(); + } + } + + c->send(b1, sz1); + if(h.clt_nonce == 0){ + // reply is not added to at-most-once window, free it + free(b1); + } + break; + case INPROGRESS: // server is working on this request + break; + case DONE: // duplicate and we still have the response + c->send(b1, sz1); + break; + case FORGOTTEN: // very old request and we don't have the response anymore + jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", + h.xid, h.clt_nonce); + rh.ret = rpc_const::atmostonce_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + break; + } + c->decref(); +} + +// rpcs::dispatch calls this when an RPC request arrives. +// +// checks to see if an RPC with xid from clt_nonce has already been received. +// if not, remembers the request in reply_window_. +// +// deletes remembered requests with XIDs <= xid_rep; the client +// says it has received a reply for every RPC up through xid_rep. +// frees the reply_t::buf of each such request. +// +// returns one of: +// NEW: never seen this xid before. +// INPROGRESS: seen this xid, and still processing it. +// DONE: seen this xid, previous reply returned in *b and *sz. +// FORGOTTEN: might have seen this xid, but deleted previous reply. +rpcs::rpcstate_t +rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, + unsigned int xid_rep, char **b, int *sz) +{ + ScopedLock rwl(&reply_window_m_); + + std::list &l = reply_window_[clt_nonce]; + + VERIFY(l.size() > 0); + VERIFY(xid >= xid_rep); + + unsigned int past_xid_rep = l.begin()->xid; + + std::list::iterator start = l.begin(), it; + it = ++start; + + if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) { + // scan for deletion candidates + for (; it != l.end() && it->xid < xid_rep; it++) { + if (it->cb_present) + free(it->buf); + } + l.erase(start, it); + l.begin()->xid = xid_rep; + } + + if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1) + return FORGOTTEN; + + // skip non-deletion candidates + while (it != l.end() && it->xid < xid) + it++; + + // if it's in the list it must be right here + if (it != l.end() && it->xid == xid) { + if (it->cb_present) { + // return information about the remembered reply + *b = it->buf; + *sz = it->sz; + return DONE; + } else { + return INPROGRESS; + } + } else { + // remember that a new request has arrived + l.insert(it, reply_t(xid)); + return NEW; + } +} + +// rpcs::dispatch calls add_reply when it is sending a reply to an RPC, +// and passes the return value in b and sz. +// add_reply() should remember b and sz. +// free_reply_window() and checkduplicate_and_update is responsible for +// calling free(b). +void +rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, + char *b, int sz) +{ + ScopedLock rwl(&reply_window_m_); + // remember the RPC reply value + std::list &l = reply_window_[clt_nonce]; + std::list::iterator it = l.begin(); + // skip to our place in the list + for (it++; it != l.end() && it->xid < xid; it++); + // there should already be an entry, so whine if there isn't + if (it == l.end() || it->xid != xid) { + fprintf(stderr, "Could not find reply struct in add_reply"); + l.insert(it, reply_t(xid, b, sz)); + } else { + *it = reply_t(xid, b, sz); + } +} + +void +rpcs::free_reply_window(void) +{ + std::map >::iterator clt; + std::list::iterator it; + + ScopedLock rwl(&reply_window_m_); + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + for (it = clt->second.begin(); it != clt->second.end(); it++){ + if (it->cb_present) + free(it->buf); + } + clt->second.clear(); + } + reply_window_.clear(); +} + +// rpc handler +int +rpcs::rpcbind(int a, int &r) +{ + jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); + r = nonce_; + return 0; +} + +void +marshall::rawbyte(unsigned char x) +{ + if(_ind >= _capa){ + _capa *= 2; + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + _buf[_ind++] = x; +} + +void +marshall::rawbytes(const char *p, int n) +{ + if((_ind+n) > _capa){ + _capa = _capa > n? 2*_capa:(_capa+n); + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + memcpy(_buf+_ind, p, n); + _ind += n; +} + +marshall & +operator<<(marshall &m, bool x) +{ + m.rawbyte(x); + return m; +} + +marshall & +operator<<(marshall &m, unsigned char x) +{ + m.rawbyte(x); + return m; +} + +marshall & +operator<<(marshall &m, char x) +{ + m << (unsigned char) x; + return m; +} + + +marshall & +operator<<(marshall &m, unsigned short x) +{ + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; +} + +marshall & +operator<<(marshall &m, short x) +{ + m << (unsigned short) x; + return m; +} + +marshall & +operator<<(marshall &m, unsigned int x) +{ + // network order is big-endian + m.rawbyte((x >> 24) & 0xff); + m.rawbyte((x >> 16) & 0xff); + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; +} + +marshall & +operator<<(marshall &m, int x) +{ + m << (unsigned int) x; + return m; +} + +marshall & +operator<<(marshall &m, const std::string &s) +{ + m << (unsigned int) s.size(); + m.rawbytes(s.data(), s.size()); + return m; +} + +marshall & +operator<<(marshall &m, unsigned long long x) +{ + m << (unsigned int) (x >> 32); + m << (unsigned int) x; + return m; +} + +void +marshall::pack(int x) +{ + rawbyte((x >> 24) & 0xff); + rawbyte((x >> 16) & 0xff); + rawbyte((x >> 8) & 0xff); + rawbyte(x & 0xff); +} + +void +unmarshall::unpack(int *x) +{ + (*x) = (rawbyte() & 0xff) << 24; + (*x) |= (rawbyte() & 0xff) << 16; + (*x) |= (rawbyte() & 0xff) << 8; + (*x) |= rawbyte() & 0xff; +} + +// take the contents from another unmarshall object +void +unmarshall::take_in(unmarshall &another) +{ + if(_buf) + free(_buf); + another.take_buf(&_buf, &_sz); + _ind = RPC_HEADER_SZ; + _ok = _sz >= RPC_HEADER_SZ?true:false; +} + +bool +unmarshall::okdone() +{ + if(ok() && _ind == _sz){ + return true; + } else { + return false; + } +} + +unsigned int +unmarshall::rawbyte() +{ + char c = 0; + if(_ind >= _sz) + _ok = false; + else + c = _buf[_ind++]; + return c; +} + +unmarshall & +operator>>(unmarshall &u, bool &x) +{ + x = (bool) u.rawbyte() ; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned char &x) +{ + x = (unsigned char) u.rawbyte() ; + return u; +} + +unmarshall & +operator>>(unmarshall &u, char &x) +{ + x = (char) u.rawbyte(); + return u; +} + + +unmarshall & +operator>>(unmarshall &u, unsigned short &x) +{ + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, short &x) +{ + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned int &x) +{ + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, int &x) +{ + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned long long &x) +{ + unsigned int h, l; + u >> h; + u >> l; + x = l | ((unsigned long long) h << 32); + return u; +} + +unmarshall & +operator>>(unmarshall &u, std::string &s) +{ + unsigned sz; + u >> sz; + if(u.ok()) + u.rawbytes(s, sz); + return u; +} + +void +unmarshall::rawbytes(std::string &ss, unsigned int n) +{ + if((_ind+n) > (unsigned)_sz){ + _ok = false; + } else { + std::string tmps = std::string(_buf+_ind, n); + swap(ss, tmps); + VERIFY(ss.size() == n); + _ind += n; + } +} + +bool operator<(const sockaddr_in &a, const sockaddr_in &b){ + return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || + ((a.sin_addr.s_addr == b.sin_addr.s_addr) && + ((a.sin_port < b.sin_port)))); +} + +/*---------------auxilary function--------------*/ +void +make_sockaddr(const char *hostandport, struct sockaddr_in *dst){ + + char host[200]; + const char *localhost = "127.0.0.1"; + const char *port = index(hostandport, ':'); + if(port == NULL){ + memcpy(host, localhost, strlen(localhost)+1); + port = hostandport; + } else { + memcpy(host, hostandport, port-hostandport); + host[port-hostandport] = '\0'; + port++; + } + + make_sockaddr(host, port, dst); + +} + +void +make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){ + + in_addr_t a; + + bzero(dst, sizeof(*dst)); + dst->sin_family = AF_INET; + + a = inet_addr(host); + if(a != INADDR_NONE){ + dst->sin_addr.s_addr = a; + } else { + struct hostent *hp = gethostbyname(host); + if(hp == 0 || hp->h_length != 4){ + fprintf(stderr, "cannot find host name %s\n", host); + exit(1); + } + dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; + } + dst->sin_port = htons(atoi(port)); +} + +int +cmp_timespec(const struct timespec &a, const struct timespec &b) +{ + if(a.tv_sec > b.tv_sec) + return 1; + else if(a.tv_sec < b.tv_sec) + return -1; + else { + if(a.tv_nsec > b.tv_nsec) + return 1; + else if(a.tv_nsec < b.tv_nsec) + return -1; + else + return 0; + } +} + +void +add_timespec(const struct timespec &a, int b, struct timespec *result) +{ + // convert to millisec, add timeout, convert back + result->tv_sec = a.tv_sec + b/1000; + result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000; + VERIFY(result->tv_nsec >= 0); + while (result->tv_nsec > 1000000000){ + result->tv_sec++; + result->tv_nsec-=1000000000; + } +} + +int +diff_timespec(const struct timespec &end, const struct timespec &start) +{ + int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0; + VERIFY(diff || end.tv_sec == start.tv_sec); + if(end.tv_nsec > start.tv_nsec){ + diff += (end.tv_nsec-start.tv_nsec)/1000000; + } else { + diff -= (start.tv_nsec-end.tv_nsec)/1000000; + } + return diff; +} diff --git a/rpc/rpc.h b/rpc/rpc.h new file mode 100644 index 0000000..fc61236 --- /dev/null +++ b/rpc/rpc.h @@ -0,0 +1,633 @@ +#ifndef rpc_h +#define rpc_h + +#include +#include +#include +#include +#include + +#include "thr_pool.h" +#include "marshall.h" +#include "connection.h" + +#ifdef DMALLOC +#include "dmalloc.h" +#endif + +class rpc_const { + public: + static const unsigned int bind = 1; // handler number reserved for bind + static const int timeout_failure = -1; + static const int unmarshal_args_failure = -2; + static const int unmarshal_reply_failure = -3; + static const int atmostonce_failure = -4; + static const int oldsrv_failure = -5; + static const int bind_failure = -6; + static const int cancel_failure = -7; +}; + +// rpc client endpoint. +// manages a xid space per destination socket +// threaded: multiple threads can be sending RPCs, +class rpcc : public chanmgr { + + private: + + //manages per rpc info + struct caller { + caller(unsigned int xxid, unmarshall *un); + ~caller(); + + unsigned int xid; + unmarshall *un; + int intret; + bool done; + pthread_mutex_t m; + pthread_cond_t c; + }; + + void get_refconn(connection **ch); + void update_xid_rep(unsigned int xid); + + + sockaddr_in dst_; + unsigned int clt_nonce_; + unsigned int srv_nonce_; + bool bind_done_; + unsigned int xid_; + int lossytest_; + bool retrans_; + bool reachable_; + + connection *chan_; + + pthread_mutex_t m_; // protect insert/delete to calls[] + pthread_mutex_t chan_m_; + + bool destroy_wait_; + pthread_cond_t destroy_wait_c_; + + std::map calls_; + std::list xid_rep_window_; + + struct request { + request() { clear(); } + void clear() { buf.clear(); xid = -1; } + bool isvalid() { return xid != -1; } + std::string buf; + int xid; + }; + struct request dup_req_; + int xid_rep_done_; + public: + + rpcc(sockaddr_in d, bool retrans=true); + ~rpcc(); + + struct TO { + int to; + }; + static const TO to_max; + static const TO to_min; + static TO to(int x) { TO t; t.to = x; return t;} + + unsigned int id() { return clt_nonce_; } + + int bind(TO to = to_max); + + void set_reachable(bool r) { reachable_ = r; } + + void cancel(); + + int islossy() { return lossytest_ > 0; } + + int call1(unsigned int proc, + marshall &req, unmarshall &rep, TO to); + + bool got_pdu(connection *c, char *b, int sz); + + + template + int call_m(unsigned int proc, marshall &req, R & r, TO to); + + template + int call(unsigned int proc, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r, + TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, const A6 & a6, + R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7, + R & r, TO to = to_max); + +}; + +template int +rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) +{ + unmarshall u; + int intret = call1(proc, req, u, to); + if (intret < 0) return intret; + u >> r; + if(u.okdone() != true) { + fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; +} + +template int +rpcc::call(unsigned int proc, R & r, TO to) +{ + marshall m; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to) +{ + marshall m; + m << a1; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, + const A6 & a6, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + m << a6; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, + const A6 & a6, const A7 & a7, + R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + m << a6; + m << a7; + return call_m(proc, m, r, to); +} + +bool operator<(const sockaddr_in &a, const sockaddr_in &b); + +class handler { + public: + handler() { } + virtual ~handler() { } + virtual int fn(unmarshall &, marshall &) = 0; +}; + + +// rpc server endpoint. +class rpcs : public chanmgr { + + typedef enum { + NEW, // new RPC, not a duplicate + INPROGRESS, // duplicate of an RPC we're still processing + DONE, // duplicate of an RPC we already replied to (have reply) + FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten + } rpcstate_t; + + private: + + // state about an in-progress or completed RPC, for at-most-once. + // if cb_present is true, then the RPC is complete and a reply + // has been sent; in that case buf points to a copy of the reply, + // and sz holds the size of the reply. + struct reply_t { + reply_t (unsigned int _xid) { + xid = _xid; + cb_present = false; + buf = NULL; + sz = 0; + } + reply_t (unsigned int _xid, char *_buf, int _sz) { + xid = _xid; + cb_present = true; + buf = _buf; + sz = _sz; + } + unsigned int xid; + bool cb_present; // whether the reply buffer is valid + char *buf; // the reply buffer + int sz; // the size of reply buffer + }; + + int port_; + unsigned int nonce_; + + // provide at most once semantics by maintaining a window of replies + // per client that that client hasn't acknowledged receiving yet. + // indexed by client nonce. + std::map > reply_window_; + + void free_reply_window(void); + void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); + + rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, + unsigned int xid, unsigned int rep_xid, + char **b, int *sz); + + void updatestat(unsigned int proc); + + // latest connection to the client + std::map conns_; + + // counting + const int counting_; + int curr_counts_; + std::map counts_; + + int lossytest_; + bool reachable_; + + // map proc # to function + std::map procs_; + + pthread_mutex_t procs_m_; // protect insert/delete to procs[] + pthread_mutex_t count_m_; //protect modification of counts + pthread_mutex_t reply_window_m_; // protect reply window et al + pthread_mutex_t conss_m_; // protect conns_ + + + protected: + + struct djob_t { + djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} + char *buf; + int sz; + connection *conn; + }; + void dispatch(djob_t *); + + // internal handler registration + void reg1(unsigned int proc, handler *); + + ThrPool* dispatchpool_; + tcpsconn* listener_; + + public: + rpcs(unsigned int port, int counts=0); + ~rpcs(); + inline int port() { return listener_->port(); } + //RPC handler for clients binding + int rpcbind(int a, int &r); + + void set_reachable(bool r) { reachable_ = r; } + + bool got_pdu(connection *c, char *b, int sz); + + // register a handler + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, + R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + const A6, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + const A6, const A7, + R & r)); +}; + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + R r; + args >> a1; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + R r; + args >> a1; + args >> a2; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + R r; + args >> a1; + args >> a2; + args >> a3; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, const A6 a6, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, const A6 a6, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, const A6 a6, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + A6 a6; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + args >> a6; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, const A6 a6, + const A7 a7, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, const A6 a6, const A7 a7, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, const A6 a6, + const A7 a7, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + A6 a6; + A7 a7; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + args >> a6; + args >> a7; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + + +void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); +void make_sockaddr(const char *host, const char *port, + struct sockaddr_in *dst); + +int cmp_timespec(const struct timespec &a, const struct timespec &b); +void add_timespec(const struct timespec &a, int b, struct timespec *result); +int diff_timespec(const struct timespec &a, const struct timespec &b); + +#endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc new file mode 100644 index 0000000..74c61d1 --- /dev/null +++ b/rpc/rpctest.cc @@ -0,0 +1,479 @@ +// RPC test and pseudo-documentation. +// generates print statements on failures, but eventually says "rpctest OK" + +#include "rpc.h" +#include +#include +#include +#include +#include +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +#define NUM_CL 2 + +rpcs *server; // server rpc object +rpcc *clients[NUM_CL]; // client rpc object +struct sockaddr_in dst; //server's ip address +int port; +pthread_attr_t attr; + +// server-side handlers. they must be methods of some class +// to simplify rpcs::reg(). a server process can have handlers +// from multiple classes. +class srv { + public: + int handle_22(const std::string a, const std::string b, std::string & r); + int handle_fast(const int a, int &r); + int handle_slow(const int a, int &r); + int handle_bigrep(const int a, std::string &r); +}; + +// a handler. a and b are arguments, r is the result. +// there can be multiple arguments but only one result. +// the caller also gets to see the int return value +// as the return value from rpcc::call(). +// rpcs::reg() decides how to unmarshall by looking +// at these argument types, so this function definition +// does what a .x file does in SunRPC. +int +srv::handle_22(const std::string a, std::string b, std::string &r) +{ + r = a + b; + return 0; +} + +int +srv::handle_fast(const int a, int &r) +{ + r = a + 1; + return 0; +} + +int +srv::handle_slow(const int a, int &r) +{ + usleep(random() % 5000); + r = a + 2; + return 0; +} + +int +srv::handle_bigrep(const int len, std::string &r) +{ + r = std::string(len, 'x'); + return 0; +} + +srv service; + +void startserver() +{ + server = new rpcs(port); + server->reg(22, &service, &srv::handle_22); + server->reg(23, &service, &srv::handle_fast); + server->reg(24, &service, &srv::handle_slow); + server->reg(25, &service, &srv::handle_bigrep); +} + +void +testmarshall() +{ + marshall m; + req_header rh(1,2,3,4,5); + m.pack_req_header(rh); + VERIFY(m.size()==RPC_HEADER_SZ); + int i = 12345; + unsigned long long l = 1223344455L; + std::string s = std::string("hallo...."); + m << i; + m << l; + m << s; + + char *b; + int sz; + m.take_buf(&b,&sz); + VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); + + unmarshall un(b,sz); + req_header rh1; + un.unpack_req_header(&rh1); + VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); + int i1; + unsigned long long l1; + std::string s1; + un >> i1; + un >> l1; + un >> s1; + VERIFY(un.okdone()); + VERIFY(i1==i && l1==l && s1==s); +} + +void * +client1(void *xx) +{ + + // test concurrency. + int which_cl = ((unsigned long) xx ) % NUM_CL; + + for(int i = 0; i < 100; i++){ + int arg = (random() % 2000); + std::string rep; + int ret = clients[which_cl]->call(25, arg, rep); + VERIFY(ret == 0); + if ((int)rep.size()!=arg) { + printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); + } + VERIFY((int)rep.size() == arg); + } + + // test rpc replies coming back not in the order of + // the original calls -- i.e. does xid reply dispatch work. + for(int i = 0; i < 100; i++){ + int which = (random() % 2); + int arg = (random() % 1000); + int rep; + + struct timespec start,end; + clock_gettime(CLOCK_REALTIME, &start); + + int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep); + clock_gettime(CLOCK_REALTIME, &end); + int diff = diff_timespec(end, start); + if (ret != 0) + printf("%d ms have elapsed!!!\n", diff); + VERIFY(ret == 0); + VERIFY(rep == (which ? arg+1 : arg+2)); + } + + return 0; +} + +void * +client2(void *xx) +{ + int which_cl = ((unsigned long) xx ) % NUM_CL; + + time_t t1; + time(&t1); + + while(time(0) - t1 < 10){ + int arg = (random() % 2000); + std::string rep; + int ret = clients[which_cl]->call(25, arg, rep); + if ((int)rep.size()!=arg) { + printf("ask for %d reply got %d ret %d\n", + arg, (int)rep.size(), ret); + } + VERIFY((int)rep.size() == arg); + } + return 0; +} + +void * +client3(void *xx) +{ + rpcc *c = (rpcc *) xx; + + for(int i = 0; i < 4; i++){ + int rep; + int ret = c->call(24, i, rep, rpcc::to(3000)); + VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + } + return 0; +} + + +void +simple_tests(rpcc *c) +{ + printf("simple_tests\n"); + // an RPC call to procedure #22. + // rpcc::call() looks at the argument types to decide how + // to marshall the RPC call packet, and how to unmarshall + // the reply packet. + std::string rep; + int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == 0); // this is what handle_22 returns + VERIFY(rep == "hello goodbye"); + printf(" -- string concat RPC .. ok\n"); + + // small request, big reply (perhaps req via UDP, reply via TCP) + intret = c->call(25, 70000, rep, rpcc::to(200000)); + VERIFY(intret == 0); + VERIFY(rep.size() == 70000); + printf(" -- small request, big reply .. ok\n"); + +#if 0 + // too few arguments + intret = c->call(22, (std::string)"just one", rep); + VERIFY(intret < 0); + printf(" -- too few arguments .. failed ok\n"); + + // too many arguments; proc #23 expects just one. + intret = c->call(23, 1001, 1002, rep); + VERIFY(intret < 0); + printf(" -- too many arguments .. failed ok\n"); + + // wrong return value size + int wrongrep; + intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep); + VERIFY(intret < 0); + printf(" -- wrong ret value size .. failed ok\n"); +#endif + + // specify a timeout value to an RPC that should succeed (udp) + int xx = 0; + intret = c->call(23, 77, xx, rpcc::to(3000)); + VERIFY(intret == 0 && xx == 78); + printf(" -- no suprious timeout .. ok\n"); + + // specify a timeout value to an RPC that should succeed (tcp) + { + std::string arg(1000, 'x'); + std::string rep; + c->call(22, arg, (std::string)"x", rep, rpcc::to(3000)); + VERIFY(rep.size() == 1001); + printf(" -- no suprious timeout .. ok\n"); + } + + // huge RPC + std::string big(1000000, 'x'); + intret = c->call(22, big, (std::string)"z", rep); + VERIFY(rep.size() == 1000001); + printf(" -- huge 1M rpc request .. ok\n"); + + // specify a timeout value to an RPC that should timeout (udp) + struct sockaddr_in non_existent; + memset(&non_existent, 0, sizeof(non_existent)); + non_existent.sin_family = AF_INET; + non_existent.sin_addr.s_addr = inet_addr("127.0.0.1"); + non_existent.sin_port = htons(7661); + rpcc *c1 = new rpcc(non_existent); + time_t t0 = time(0); + intret = c1->bind(rpcc::to(3000)); + time_t t1 = time(0); + VERIFY(intret < 0 && (t1 - t0) <= 4); + printf(" -- rpc timeout .. ok\n"); + printf("simple_tests OK\n"); +} + +void +concurrent_test(int nt) +{ + // create threads that make lots of calls in parallel, + // to test thread synchronization for concurrent calls + // and dispatches. + int ret; + + printf("start concurrent_test (%d threads) ...", nt); + + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf(" OK\n"); +} + +void +lossy_test() +{ + int ret; + + printf("start lossy_test ..."); + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + + if (server) { + delete server; + startserver(); + } + + for (int i = 0; i < NUM_CL; i++) { + delete clients[i]; + clients[i] = new rpcc(dst); + VERIFY(clients[i]->bind()==0); + } + + int nt = 1; + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i); + VERIFY(ret == 0); + } + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf(".. OK\n"); + VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); +} + +void +failure_test() +{ + rpcc *client1; + rpcc *client = clients[0]; + + printf("failure_test\n"); + + delete server; + + client1 = new rpcc(dst); + VERIFY (client1->bind(rpcc::to(3000)) < 0); + printf(" -- create new client and try to bind to failed server .. failed ok\n"); + + delete client1; + + startserver(); + + std::string rep; + int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == rpc_const::oldsrv_failure); + printf(" -- call recovered server with old client .. failed ok\n"); + + delete client; + + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + VERIFY (client->bind() < 0); + + intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == 0); + VERIFY(rep == "hello goodbye"); + + printf(" -- delete existing rpc client, create replacement rpc client .. ok\n"); + + + int nt = 10; + int ret; + printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); + + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client3, (void *) client); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf("ok\n"); + + delete server; + delete client; + + startserver(); + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + printf(" -- delete existing rpc client and server, create replacements.. ok\n"); + + printf(" -- concurrent test on new client and server w/ %d threads ..", nt); + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client3, (void *)client); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf("ok\n"); + + printf("failure_test OK\n"); +} + +int +main(int argc, char *argv[]) +{ + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + int debug_level = 0; + + bool isclient = false; + bool isserver = false; + + srandom(getpid()); + port = 20000 + (getpid() % 10000); + + char ch = 0; + while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { + switch (ch) { + case 'c': + isclient = true; + break; + case 's': + isserver = true; + break; + case 'd': + debug_level = atoi(optarg); + break; + case 'p': + port = atoi(optarg); + break; + case 'l': + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + default: + break; + } + } + + if (!isserver && !isclient) { + isserver = isclient = true; + } + + if (debug_level > 0) { + //__loginit.initNow(); + jsl_set_debug(debug_level); + jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); + } + + testmarshall(); + + pthread_attr_init(&attr); + // set stack size to 32K, so we don't run out of memory + pthread_attr_setstacksize(&attr, 32*1024); + + if (isserver) { + printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ); + startserver(); + } + + if (isclient) { + // server's address. + memset(&dst, 0, sizeof(dst)); + dst.sin_family = AF_INET; + dst.sin_addr.s_addr = inet_addr("127.0.0.1"); + dst.sin_port = htons(port); + + + // start the client. bind it to the server. + // starts a thread to listen for replies and hand them to + // the correct waiting caller thread. there should probably + // be only one rpcc per process. you probably need one + // rpcc per server. + for (int i = 0; i < NUM_CL; i++) { + clients[i] = new rpcc(dst); + VERIFY (clients[i]->bind() == 0); + } + + simple_tests(clients[0]); + concurrent_test(10); + lossy_test(); + if (isserver) { + failure_test(); + } + + printf("rpctest OK\n"); + + exit(0); + } + + while (1) { + sleep(1); + } +} diff --git a/rpc/slock.h b/rpc/slock.h new file mode 100644 index 0000000..7f419c4 --- /dev/null +++ b/rpc/slock.h @@ -0,0 +1,28 @@ +#ifndef __SCOPED_LOCK__ +#define __SCOPED_LOCK__ + +#include +#include "lang/verify.h" +struct ScopedLock { + private: + pthread_mutex_t *m_; + public: + ScopedLock(pthread_mutex_t *m): m_(m) { + VERIFY(pthread_mutex_lock(m_)==0); + } + ~ScopedLock() { + VERIFY(pthread_mutex_unlock(m_)==0); + } +}; +struct ScopedUnlock { + private: + pthread_mutex_t *m_; + public: + ScopedUnlock(pthread_mutex_t *m): m_(m) { + VERIFY(pthread_mutex_unlock(m_)==0); + } + ~ScopedUnlock() { + VERIFY(pthread_mutex_lock(m_)==0); + } +}; +#endif /*__SCOPED_LOCK__*/ diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc new file mode 100644 index 0000000..f9f32fa --- /dev/null +++ b/rpc/thr_pool.cc @@ -0,0 +1,69 @@ +#include "slock.h" +#include "thr_pool.h" +#include +#include +#include "lang/verify.h" + +static void * +do_worker(void *arg) +{ + ThrPool *tp = (ThrPool *)arg; + while (1) { + ThrPool::job_t j; + if (!tp->takeJob(&j)) + break; //die + + (void)(j.f)(j.a); + } + pthread_exit(NULL); +} + +//if blocking, then addJob() blocks when queue is full +//otherwise, addJob() simply returns false when queue is full +ThrPool::ThrPool(int sz, bool blocking) +: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) +{ + pthread_attr_init(&attr_); + pthread_attr_setstacksize(&attr_, 128<<10); + + for (int i = 0; i < sz; i++) { + pthread_t t; + VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0); + th_.push_back(t); + } +} + +//IMPORTANT: this function can be called only when no external thread +//will ever use this thread pool again or is currently blocking on it +ThrPool::~ThrPool() +{ + for (int i = 0; i < nthreads_; i++) { + job_t j; + j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit + jobq_.enq(j); + } + + for (int i = 0; i < nthreads_; i++) { + VERIFY(pthread_join(th_[i], NULL)==0); + } + + VERIFY(pthread_attr_destroy(&attr_)==0); +} + +bool +ThrPool::addJob(void *(*f)(void *), void *a) +{ + job_t j; + j.f = f; + j.a = a; + + return jobq_.enq(j,blockadd_); +} + +bool +ThrPool::takeJob(job_t *j) +{ + jobq_.deq(j); + return (j->f!=NULL); +} + diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h new file mode 100644 index 0000000..5095961 --- /dev/null +++ b/rpc/thr_pool.h @@ -0,0 +1,66 @@ +#ifndef __THR_POOL__ +#define __THR_POOL__ + +#include +#include + +#include "fifo.h" + +class ThrPool { + + + public: + struct job_t { + void *(*f)(void *); //function point + void *a; //function arguments + }; + + ThrPool(int sz, bool blocking=true); + ~ThrPool(); + template bool addObjJob(C *o, void (C::*m)(A), A a); + void waitDone(); + + bool takeJob(job_t *j); + + private: + pthread_attr_t attr_; + int nthreads_; + bool blockadd_; + + + fifo jobq_; + std::vector th_; + + bool addJob(void *(*f)(void *), void *a); +}; + + template bool +ThrPool::addObjJob(C *o, void (C::*m)(A), A a) +{ + + class objfunc_wrapper { + public: + C *o; + void (C::*m)(A a); + A a; + static void *func(void *vvv) { + objfunc_wrapper *x = (objfunc_wrapper*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + (o->*m)(a); + delete x; + return 0; + } + }; + + objfunc_wrapper *x = new objfunc_wrapper; + x->o = o; + x->m = m; + x->a = a; + return addJob(&objfunc_wrapper::func, (void *)x); +} + + +#endif + diff --git a/rsm.cc b/rsm.cc new file mode 100644 index 0000000..7664aa1 --- /dev/null +++ b/rsm.cc @@ -0,0 +1,610 @@ +// +// Replicated state machine implementation with a primary and several +// backups. The primary receives requests, assigns each a view stamp (a +// vid, and a sequence number) in the order of reception, and forwards +// them to all backups. A backup executes requests in the order that +// the primary stamps them and replies with an OK to the primary. The +// primary executes the request after it receives OKs from all backups, +// and sends the reply back to the client. +// +// The config module will tell the RSM about a new view. If the +// primary in the previous view is a member of the new view, then it +// will stay the primary. Otherwise, the smallest numbered node of +// the previous view will be the new primary. In either case, the new +// primary will be a node from the previous view. The configuration +// module constructs the sequence of views for the RSM and the RSM +// ensures there will be always one primary, who was a member of the +// last view. +// +// When a new node starts, the recovery thread is in charge of joining +// the RSM. It will collect the internal RSM state from the primary; +// the primary asks the config module to add the new node and returns +// to the joining the internal RSM state (e.g., paxos log). Since +// there is only one primary, all joins happen in well-defined total +// order. +// +// The recovery thread also runs during a view change (e.g, when a node +// has failed). After a failure some of the backups could have +// processed a request that the primary has not, but those results are +// not visible to clients (since the primary responds). If the +// primary of the previous view is in the current view, then it will +// be the primary and its state is authoritive: the backups download +// from the primary the current state. A primary waits until all +// backups have downloaded the state. Once the RSM is in sync, the +// primary accepts requests again from clients. If one of the backups +// is the new primary, then its state is authoritative. In either +// scenario, the next view uses a node as primary that has the state +// resulting from processing all acknowledged client requests. +// Therefore, if the nodes sync up before processing the next request, +// the next view will have the correct state. +// +// While the RSM in a view change (i.e., a node has failed, a new view +// has been formed, but the sync hasn't completed), another failure +// could happen, which complicates a view change. During syncing the +// primary or backups can timeout, and initiate another Paxos round. +// There are 2 variables that RSM uses to keep track in what state it +// is: +// - inviewchange: a node has failed and the RSM is performing a view change +// - insync: this node is syncing its state +// +// If inviewchange is false and a node is the primary, then it can +// process client requests. If it is true, clients are told to retry +// later again. While inviewchange is true, the RSM may go through several +// member list changes, one by one. After a member list +// change completes, the nodes tries to sync. If the sync complets, +// the view change completes (and inviewchange is set to false). If +// the sync fails, the node may start another member list change +// (inviewchange = true and insync = false). +// +// The implementation should be used only with servers that run all +// requests run to completion; in particular, a request shouldn't +// block. If a request blocks, the backup won't respond to the +// primary, and the primary won't execute the request. A request may +// send an RPC to another host, but the RPC should be a one-way +// message to that host; the backup shouldn't do anything based on the +// response or execute after the response, because it is not +// guaranteed that all backup will receive the same response and +// execute in the same order. +// +// The implementation can be viewed as a layered system: +// RSM module ---- in charge of replication +// config module ---- in charge of view management +// Paxos module ---- in charge of running Paxos to agree on a value +// +// Each module has threads and internal locks. Furthermore, a thread +// may call down through the layers (e.g., to run Paxos's proposer). +// When Paxos's acceptor accepts a new value for an instance, a thread +// will invoke an upcall to inform higher layers of the new value. +// The rule is that a module releases its internal locks before it +// upcalls, but can keep its locks when calling down. + +#include +#include + +#include "handle.h" +#include "rsm.h" +#include "tprintf.h" +#include "lang/verify.h" +#include "rsm_client.h" + +static void *recoverythread(void *x) { + rsm *r = (rsm *) x; + r->recovery(); + return 0; +} + +rsm::rsm(std::string _first, std::string _me) : + stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), + partitioned (false), dopartition(false), break1(false), break2(false) +{ + pthread_t th; + + last_myvs.vid = 0; + last_myvs.seqno = 0; + myvs = last_myvs; + myvs.seqno = 1; + + cfg = new config(_first, _me, this); + + if (_first == _me) { + // Commit the first view here. We can not have acceptor::acceptor + // do the commit, since at that time this->cfg is not initialized + commit_change(1); + } + rsmrpc = cfg->get_rpcs(); + rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke); + rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members); + rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke); + rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq); + rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq); + rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq); + + // tester must be on different port, otherwise it may partition itself + testsvr = new rpcs(atoi(_me.c_str()) + 1); + testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq); + testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq); + + { + ScopedLock ml(rsm_mutex); + VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0); + } +} + +void rsm::reg1(int proc, handler *h) { + ScopedLock ml(rsm_mutex); + procs[proc] = h; +} + +// The recovery thread runs this function +void rsm::recovery() { + bool r = true; + ScopedLock ml(rsm_mutex); + + while (1) { + while (!cfg->ismember(cfg->myaddr(), vid_commit)) { + if (join(primary)) { + tprintf("recovery: joined\n"); + commit_change_wo(cfg->vid()); + } else { + ScopedUnlock su(rsm_mutex); + sleep (30); // XXX make another node in cfg primary? + } + } + vid_insync = vid_commit; + tprintf("recovery: sync vid_insync %d\n", vid_insync); + if (primary == cfg->myaddr()) { + r = sync_with_backups(); + } else { + r = sync_with_primary(); + } + tprintf("recovery: sync done\n"); + + // If there was a commited viewchange during the synchronization, restart + // the recovery + if (vid_insync != vid_commit) + continue; + + if (r) { + myvs.vid = vid_commit; + myvs.seqno = 1; + inviewchange = false; + } + tprintf("recovery: go to sleep %d %d\n", insync, inviewchange); + recovery_cond.wait(rsm_mutex); + } +} + +template +std::ostream & operator<<(std::ostream &o, const std::vector &d) { + o << "["; + for (typename std::vector::const_iterator i=d.begin(); i!=d.end(); i++) { + o << *i; + if (i+1 != d.end()) + o << ", "; + } + o << "]"; + return o; +} + +bool rsm::sync_with_backups() { + { + ScopedUnlock su(rsm_mutex); + // Make sure that the state of lock_server_cache_rsm is stable during + // synchronization; otherwise, the primary's state may be more recent + // than replicas after the synchronization. + ScopedLock ml(invoke_mutex); + // By acquiring and releasing the invoke_mutex once, we make sure that + // the state of lock_server_cache_rsm will not be changed until all + // replicas are synchronized. The reason is that client_invoke arrives + // after this point of time will see inviewchange == true, and returns + // BUSY. + } + // Start accepting synchronization request (statetransferreq) now! + insync = true; + backups = std::vector(cfg->get_view(vid_insync)); + backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); + LOG("rsm::sync_with_backups " << backups); + sync_cond.wait(rsm_mutex); + insync = false; + return true; +} + + +bool rsm::sync_with_primary() { + // Remember the primary of vid_insync + std::string m = primary; + while (vid_insync == vid_commit) { + if (statetransfer(m)) + break; + } + return statetransferdone(m); +} + + +/** + * Call to transfer state from m to the local node. + * Assumes that rsm_mutex is already held. + */ +bool rsm::statetransfer(std::string m) +{ + rsm_protocol::transferres r; + handle h(m); + int ret; + tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n", + m.c_str(), last_myvs.vid, last_myvs.seqno); + rpcc *cl; + { + ScopedUnlock su(rsm_mutex); + cl = h.safebind(); + if (cl) { + ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(), + last_myvs, vid_insync, r, rpcc::to(1000)); + } + } + if (cl == 0 || ret != rsm_protocol::OK) { + tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(), + (long unsigned) cl, ret); + return false; + } + if (stf && last_myvs != r.last) { + stf->unmarshal_state(r.state); + } + last_myvs = r.last; + tprintf("rsm::statetransfer transfer from %s success, vs(%d,%d)\n", + m.c_str(), last_myvs.vid, last_myvs.seqno); + return true; +} + +bool rsm::statetransferdone(std::string m) { + ScopedUnlock su(rsm_mutex); + handle h(m); + rpcc *cl = h.safebind(); + if (!cl) + return false; + int r; + rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); + if (ret != rsm_protocol::OK) + return false; + return true; +} + + +bool rsm::join(std::string m) { + handle h(m); + int ret; + rsm_protocol::joinres r; + + tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid, + last_myvs.seqno); + rpcc *cl; + { + ScopedUnlock su(rsm_mutex); + cl = h.safebind(); + if (cl != 0) { + ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs, + r, rpcc::to(120000)); + } + } + + if (cl == 0 || ret != rsm_protocol::OK) { + tprintf("rsm::join: couldn't reach %s %p %d\n", m.c_str(), + cl, ret); + return false; + } + tprintf("rsm::join: succeeded %s\n", r.log.c_str()); + cfg->restore(r.log); + return true; +} + +/* + * Config informs rsm whenever it has successfully + * completed a view change + */ +void rsm::commit_change(unsigned vid) { + ScopedLock ml(rsm_mutex); + commit_change_wo(vid); + if (cfg->ismember(cfg->myaddr(), vid_commit)) + breakpoint2(); +} + +void rsm::commit_change_wo(unsigned vid) { + if (vid <= vid_commit) + return; + tprintf("commit_change: new view (%d) last vs (%d,%d) %s insync %d\n", + vid, last_myvs.vid, last_myvs.seqno, primary.c_str(), insync); + vid_commit = vid; + inviewchange = true; + set_primary(vid); + recovery_cond.signal(); + sync_cond.signal(); + if (cfg->ismember(cfg->myaddr(), vid_commit)) + breakpoint2(); +} + + +void rsm::execute(int procno, std::string req, std::string &r) { + tprintf("execute\n"); + handler *h = procs[procno]; + VERIFY(h); + unmarshall args(req); + marshall rep; + std::string reps; + rsm_protocol::status ret = h->fn(args, rep); + marshall rep1; + rep1 << ret; + rep1 << rep.str(); + r = rep1.str(); +} + +// +// Clients call client_invoke to invoke a procedure on the replicated state +// machine: the primary receives the request, assigns it a sequence +// number, and invokes it on all members of the replicated state +// machine. +// +rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) { + LOG("rsm::client_invoke: procno 0x" << std::hex << procno); + ScopedLock ml(invoke_mutex); + std::vector m; + std::string myaddr; + viewstamp vs; + { + ScopedLock ml(rsm_mutex); + LOG("Checking for inviewchange"); + if (inviewchange) + return rsm_client_protocol::BUSY; + LOG("Checking for primacy"); + myaddr = cfg->myaddr(); + if (primary != myaddr) + return rsm_client_protocol::NOTPRIMARY; + LOG("Assigning a viewstamp"); + m = cfg->get_view(vid_commit); + // assign the RPC the next viewstamp number + vs = myvs; + myvs++; + } + + // send an invoke RPC to all slaves in the current view with a timeout of 1 second + LOG("Invoking slaves"); + for (unsigned i = 0; i < m.size(); i++) { + if (m[i] != myaddr) { + // if invoke on slave fails, return rsm_client_protocol::BUSY + handle h(m[i]); + LOG("Sending invoke to " << m[i]); + rpcc *cl = h.safebind(); + if (!cl) + return rsm_client_protocol::BUSY; + rsm_protocol::status ret; + int r; + ret = cl->call(rsm_protocol::invoke, procno, vs, req, r, rpcc::to(1000)); + LOG("Invoke returned " << ret); + if (ret != rsm_protocol::OK) + return rsm_client_protocol::BUSY; + breakpoint1(); + partition1(); + } + } + execute(procno, req, r); + last_myvs = vs; + return rsm_client_protocol::OK; +} + +// +// The primary calls the internal invoke at each member of the +// replicated state machine +// +// the replica must execute requests in order (with no gaps) +// according to requests' seqno + +rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) { + LOG("rsm::invoke: procno 0x" << std::hex << proc); + ScopedLock ml(invoke_mutex); + std::vector m; + std::string myaddr; + { + ScopedLock ml(rsm_mutex); + // check if !inviewchange + LOG("Checking for view change"); + if (inviewchange) + return rsm_protocol::ERR; + // check if slave + LOG("Checking for slave status"); + myaddr = cfg->myaddr(); + if (primary == myaddr) + return rsm_protocol::ERR; + m = cfg->get_view(vid_commit); + if (find(m.begin(), m.end(), myaddr) == m.end()) + return rsm_protocol::ERR; + // check sequence number + LOG("Checking sequence number"); + if (vs != myvs) + return rsm_protocol::ERR; + myvs++; + } + std::string r; + execute(proc, req, r); + last_myvs = vs; + breakpoint1(); + return rsm_protocol::OK; +} + +/** + * RPC handler: Send back the local node's state to the caller + */ +rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid, + rsm_protocol::transferres &r) { + ScopedLock ml(rsm_mutex); + int ret = rsm_protocol::OK; + tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), + last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); + if (!insync || vid != vid_insync) { + return rsm_protocol::BUSY; + } + if (stf && last != last_myvs) + r.state = stf->marshal_state(); + r.last = last_myvs; + return ret; +} + +/** + * RPC handler: Inform the local node (the primary) that node m has synchronized + * for view vid + */ +rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { + ScopedLock ml(rsm_mutex); + if (!insync || vid != vid_insync) + return rsm_protocol::BUSY; + backups.erase(find(backups.begin(), backups.end(), m)); + if (backups.empty()) + sync_cond.signal(); + return rsm_protocol::OK; +} + +// a node that wants to join an RSM as a server sends a +// joinreq to the RSM's current primary; this is the +// handler for that RPC. +rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) { + int ret = rsm_protocol::OK; + + ScopedLock ml(rsm_mutex); + tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(), + last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); + if (cfg->ismember(m, vid_commit)) { + tprintf("joinreq: is still a member\n"); + r.log = cfg->dump(); + } else if (cfg->myaddr() != primary) { + tprintf("joinreq: busy\n"); + ret = rsm_protocol::BUSY; + } else { + // We cache vid_commit to avoid adding m to a view which already contains + // m due to race condition + unsigned vid_cache = vid_commit; + bool succ; + { + ScopedUnlock su(rsm_mutex); + succ = cfg->add(m, vid_cache); + } + if (cfg->ismember(m, cfg->vid())) { + r.log = cfg->dump(); + tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str()); + } else { + tprintf("joinreq: failed; proposer couldn't add %d\n", succ); + ret = rsm_protocol::BUSY; + } + } + return ret; +} + +/* + * RPC handler: Send back all the nodes this local knows about to client + * so the client can switch to a different primary + * when it existing primary fails + */ +rsm_client_protocol::status rsm::client_members(int i, std::vector &r) { + std::vector m; + ScopedLock ml(rsm_mutex); + m = cfg->get_view(vid_commit); + m.push_back(primary); + r = m; + tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(), + primary.c_str()); + return rsm_client_protocol::OK; +} + +// if primary is member of new view, that node is primary +// otherwise, the lowest number node of the previous view. +// caller should hold rsm_mutex +void rsm::set_primary(unsigned vid) { + std::vector c = cfg->get_view(vid); + std::vector p = cfg->get_view(vid - 1); + VERIFY (c.size() > 0); + + if (isamember(primary,c)) { + tprintf("set_primary: primary stays %s\n", primary.c_str()); + return; + } + + VERIFY(p.size() > 0); + for (unsigned i = 0; i < p.size(); i++) { + if (isamember(p[i], c)) { + primary = p[i]; + tprintf("set_primary: primary is %s\n", primary.c_str()); + return; + } + } + VERIFY(0); +} + +bool rsm::amiprimary() { + ScopedLock ml(rsm_mutex); + return primary == cfg->myaddr() && !inviewchange; +} + + +// Testing server + +// Simulate partitions + +// assumes caller holds rsm_mutex +void rsm::net_repair_wo(bool heal) { + std::vector m; + m = cfg->get_view(vid_commit); + for (unsigned i = 0; i < m.size(); i++) { + if (m[i] != cfg->myaddr()) { + handle h(m[i]); + tprintf("rsm::net_repair_wo: %s %d\n", m[i].c_str(), heal); + if (h.safebind()) h.safebind()->set_reachable(heal); + } + } + rsmrpc->set_reachable(heal); +} + +rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) { + ScopedLock ml(rsm_mutex); + tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", + heal, dopartition, partitioned); + if (heal) { + net_repair_wo(heal); + partitioned = false; + } else { + dopartition = true; + partitioned = false; + } + r = rsm_test_protocol::OK; + return r; +} + +// simulate failure at breakpoint 1 and 2 + +void rsm::breakpoint1() { + if (break1) { + tprintf("Dying at breakpoint 1 in rsm!\n"); + exit(1); + } +} + +void rsm::breakpoint2() { + if (break2) { + tprintf("Dying at breakpoint 2 in rsm!\n"); + exit(1); + } +} + +void rsm::partition1() { + if (dopartition) { + net_repair_wo(false); + dopartition = false; + partitioned = true; + } +} + +rsm_test_protocol::status rsm::breakpointreq(int b, int &r) { + r = rsm_test_protocol::OK; + ScopedLock ml(rsm_mutex); + tprintf("rsm::breakpointreq: %d\n", b); + if (b == 1) break1 = true; + else if (b == 2) break2 = true; + else if (b == 3 || b == 4) cfg->breakpoint(b); + else r = rsm_test_protocol::ERR; + return r; +} diff --git a/rsm.h b/rsm.h new file mode 100644 index 0000000..c5bf4fc --- /dev/null +++ b/rsm.h @@ -0,0 +1,236 @@ +// replicated state machine interface. + +#ifndef rsm_h +#define rsm_h + +#include +#include +#include "rsm_protocol.h" +#include "rsm_state_transfer.h" +#include "rpc.h" +#include +#include "config.h" + + +class rsm : public config_view_change { + private: + void reg1(int proc, handler *); + protected: + std::map procs; + config *cfg; + class rsm_state_transfer *stf; + rpcs *rsmrpc; + // On slave: expected viewstamp of next invoke request + // On primary: viewstamp for the next request from rsm_client + viewstamp myvs; + viewstamp last_myvs; // Viewstamp of the last executed request + std::string primary; + bool insync; + bool inviewchange; + unsigned vid_commit; // Latest view id that is known to rsm layer + unsigned vid_insync; // The view id that this node is synchronizing for + std::vector backups; // A list of unsynchronized backups + + // For testing purposes + rpcs *testsvr; + bool partitioned; + bool dopartition; + bool break1; + bool break2; + + + rsm_client_protocol::status client_members(int i, + std::vector &r); + rsm_protocol::status invoke(int proc, viewstamp vs, std::string mreq, + int &dummy); + rsm_protocol::status transferreq(std::string src, viewstamp last, unsigned vid, + rsm_protocol::transferres &r); + rsm_protocol::status transferdonereq(std::string m, unsigned vid, int &); + rsm_protocol::status joinreq(std::string src, viewstamp last, + rsm_protocol::joinres &r); + rsm_test_protocol::status test_net_repairreq(int heal, int &r); + rsm_test_protocol::status breakpointreq(int b, int &r); + + mutex rsm_mutex; + mutex invoke_mutex; + cond recovery_cond; + cond sync_cond; + + void execute(int procno, std::string req, std::string &r); + rsm_client_protocol::status client_invoke(int procno, std::string req, + std::string &r); + bool statetransfer(std::string m); + bool statetransferdone(std::string m); + bool join(std::string m); + void set_primary(unsigned vid); + std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid); + bool sync_with_backups(); + bool sync_with_primary(); + void net_repair_wo(bool heal); + void breakpoint1(); + void breakpoint2(); + void partition1(); + void commit_change_wo(unsigned vid); + public: + rsm (std::string _first, std::string _me); + ~rsm() {}; + + bool amiprimary(); + void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }; + void recovery(); + void commit_change(unsigned vid); + + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, const A5 a5, R &)); +}; + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + R r; + args >> a1; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + R r; + args >> a1; + args >> a2; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + R r; + args >> a1; + args >> a2; + args >> a3; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,a3,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,a3,a4,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + + +template void + rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,a3,a4,a5,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +#endif /* rsm_h */ diff --git a/rsm_client.cc b/rsm_client.cc new file mode 100644 index 0000000..ee25564 --- /dev/null +++ b/rsm_client.cc @@ -0,0 +1,94 @@ +#include "rsm_client.h" +#include +#include +#include +#include +#include "lang/verify.h" + + +rsm_client::rsm_client(std::string dst) { + printf("create rsm_client\n"); + std::vector mems; + + pthread_mutex_init(&rsm_client_mutex, NULL); + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + primary = dst; + + { + ScopedLock ml(&rsm_client_mutex); + VERIFY (init_members()); + } + printf("rsm_client: done\n"); +} + +// Assumes caller holds rsm_client_mutex +void rsm_client::primary_failure() { + primary = known_mems.back(); + known_mems.pop_back(); +} + +rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) { + int ret; + ScopedLock ml(&rsm_client_mutex); + while (1) { + printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str()); + handle h(primary); + + VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0); + rpcc *cl = h.safebind(); + if (cl) + ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000)); + VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0); + + if (!cl) + goto prim_fail; + + printf("rsm_client::invoke proc %x primary %s ret %d\n", proc, + primary.c_str(), ret); + if (ret == rsm_client_protocol::OK) + break; + if (ret == rsm_client_protocol::BUSY) { + printf("rsm is busy %s\n", primary.c_str()); + sleep(3); + continue; + } + if (ret == rsm_client_protocol::NOTPRIMARY) { + printf("primary %s isn't the primary--let's get a complete list of mems\n", + primary.c_str()); + if (init_members()) + continue; + } +prim_fail: + printf("primary %s failed ret %d\n", primary.c_str(), ret); + primary_failure(); + printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str()); + } + return ret; +} + +bool rsm_client::init_members() { + printf("rsm_client::init_members get members!\n"); + handle h(primary); + VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0); + int ret; + rpcc *cl = h.safebind(); + if (cl) { + ret = cl->call(rsm_client_protocol::members, 0, known_mems, + rpcc::to(1000)); + } + VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0); + if (cl == 0 || ret != rsm_protocol::OK) + return false; + if (known_mems.size() < 1) { + printf("rsm_client::init_members do not know any members!\n"); + VERIFY(0); + } + + primary = known_mems.back(); + known_mems.pop_back(); + + printf("rsm_client::init_members: primary %s\n", primary.c_str()); + + return true; +} diff --git a/rsm_client.h b/rsm_client.h new file mode 100644 index 0000000..3219179 --- /dev/null +++ b/rsm_client.h @@ -0,0 +1,131 @@ +#ifndef rsm_client_h +#define rsm_client_h + +#include "rpc.h" +#include "rsm_protocol.h" +#include +#include + + +// +// rsm client interface. +// +// The client stubs package up an rpc, and then call the invoke procedure +// on the replicated state machine passing the RPC as an argument. This way +// the replicated state machine isn't service specific; any server can use it. +// + +class rsm_client { + protected: + std::string primary; + std::vector known_mems; + pthread_mutex_t rsm_client_mutex; + void primary_failure(); + bool init_members(); + public: + rsm_client(std::string dst); + rsm_protocol::status invoke(int proc, std::string req, std::string &rep); + + template + int call(unsigned int proc, const A1 & a1, R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, R &r); + private: + template int call_m(unsigned int proc, marshall &req, R &r); +}; + +template +int rsm_client::call_m(unsigned int proc, marshall &req, R &r) { + std::string rep; + std::string res; + int intret = invoke(proc, req.str(), rep); + VERIFY( intret == rsm_client_protocol::OK ); + unmarshall u(rep); + u >> intret; + if (intret < 0) return intret; + u >> res; + if (!u.okdone()) { + fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" + "You probably forgot to set the reply string in " + "rsm::client_invoke, or you may call RPC 0x%x with wrong return " + "type\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + unmarshall u1(res); + u1 >> r; + if(!u1.okdone()) { + fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, R & r) { + marshall m; + m << a1; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, R & r) { + marshall m; + m << a1; + m << a2; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, R & r) { + marshall m; + std::string rep; + std::string res; + m << a1; + m << a2; + m << a3; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, R & r) { + marshall m; + std::string rep; + std::string res; + m << a1; + m << a2; + m << a3; + m << a4; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, const A5 & a5, R & r) { + marshall m; + std::string rep; + std::string res; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + return call_m(proc, m, r); +} + +#endif diff --git a/rsm_protocol.h b/rsm_protocol.h new file mode 100644 index 0000000..a27ef83 --- /dev/null +++ b/rsm_protocol.h @@ -0,0 +1,116 @@ +#ifndef rsm_protocol_h +#define rsm_protocol_h + +#include "rpc.h" + + +class rsm_client_protocol { + public: + enum xxstatus { OK, ERR, NOTPRIMARY, BUSY}; + typedef int status; + enum rpc_numbers { + invoke = 0x9001, + members, + }; +}; + + +struct viewstamp { + viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) { + vid = _vid; + seqno = _seqno; + }; + unsigned int vid; + unsigned int seqno; + inline void operator++(int) { + seqno++; + }; +}; + +class rsm_protocol { + public: + enum xxstatus { OK, ERR, BUSY}; + typedef int status; + enum rpc_numbers { + invoke = 0x10001, + transferreq, + transferdonereq, + joinreq, + }; + + struct transferres { + std::string state; + viewstamp last; + }; + + struct joinres { + std::string log; + }; +}; + +inline bool operator==(viewstamp a, viewstamp b) { + return a.vid == b.vid && a.seqno == b.seqno; +} + +inline bool operator>(viewstamp a, viewstamp b) { + return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno); +} + +inline bool operator!=(viewstamp a, viewstamp b) { + return a.vid != b.vid || a.seqno != b.seqno; +} + +inline marshall& operator<<(marshall &m, viewstamp v) +{ + m << v.vid; + m << v.seqno; + return m; +} + +inline unmarshall& operator>>(unmarshall &u, viewstamp &v) { + u >> v.vid; + u >> v.seqno; + return u; +} + +inline marshall & +operator<<(marshall &m, rsm_protocol::transferres r) +{ + m << r.state; + m << r.last; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, rsm_protocol::transferres &r) +{ + u >> r.state; + u >> r.last; + return u; +} + +inline marshall & +operator<<(marshall &m, rsm_protocol::joinres r) +{ + m << r.log; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, rsm_protocol::joinres &r) +{ + u >> r.log; + return u; +} + +class rsm_test_protocol { + public: + enum xxstatus { OK, ERR}; + typedef int status; + enum rpc_numbers { + net_repair = 0x12001, + breakpoint = 0x12002, + }; +}; + +#endif diff --git a/rsm_state_transfer.h b/rsm_state_transfer.h new file mode 100644 index 0000000..6c7e0e4 --- /dev/null +++ b/rsm_state_transfer.h @@ -0,0 +1,11 @@ +#ifndef rsm_state_transfer_h +#define rsm_state_transfer_h + +class rsm_state_transfer { + public: + virtual std::string marshal_state() = 0; + virtual void unmarshal_state(std::string) = 0; + virtual ~rsm_state_transfer() {}; +}; + +#endif diff --git a/rsm_tester.cc b/rsm_tester.cc new file mode 100644 index 0000000..08034e2 --- /dev/null +++ b/rsm_tester.cc @@ -0,0 +1,40 @@ +// +// RSM test client +// + +#include "rsm_protocol.h" +#include "rsmtest_client.h" +#include "rpc.h" +#include +#include +#include +#include +#include +using namespace std; + +rsmtest_client *lc; + +int +main(int argc, char *argv[]) +{ + int r; + + if(argc != 4){ + fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]); + exit(1); + } + + lc = new rsmtest_client(argv[1]); + string command(argv[2]); + if (command == "partition") { + r = lc->net_repair(atoi(argv[3])); + printf ("net_repair returned %d\n", r); + } else if (command == "breakpoint") { + int b = atoi(argv[3]); + r = lc->breakpoint(b); + printf ("breakpoint %d returned %d\n", b, r); + } else { + fprintf(stderr, "Unknown command %s\n", argv[2]); + } + exit(0); +} diff --git a/rsm_tester.pl b/rsm_tester.pl new file mode 100755 index 0000000..7018164 --- /dev/null +++ b/rsm_tester.pl @@ -0,0 +1,912 @@ +#!/usr/bin/perl -w + +use POSIX ":sys_wait_h"; +use Getopt::Std; +use strict; + + +my @pid; +my @logs = (); +my @views = (); #expected views +my %in_views; #the number of views a node is expected to be present +my @p; +my $t; +my $always_kill = 0; + +use sigtrap 'handler' => \&killprocess, 'HUP', 'INT', 'ABRT', 'QUIT', 'TERM'; + +sub paxos_log { + my $port = shift; + return "paxos-$port.log"; +} + +sub mydie { + my ($s) = @_; + killprocess() if ($always_kill); + die $s; +} + +sub killprocess { + print "killprocess: forcestop all spawned processes...@pid \n"; + kill 9, @pid; +} + +sub cleanup { + kill 9, @pid; + unlink(@logs); + sleep 2; +} + +sub spawn { + my ($p, @a) = @_; + my $aa = join("-", @a); + if (my $pid = fork) { +# parent + push( @logs, "$p-$aa.log" ); + if( $p =~ /config_server/ ) { + push( @logs, paxos_log($a[1]) ); + } + if( $p =~ /lock_server/ ) { + push( @logs, paxos_log($a[1]) ); + } + return $pid; + } elsif (defined $pid) { +# child + open(STDOUT, ">>$p-$aa.log") + or mydie "Couln't redirect stout\n"; + open(STDERR, ">&STDOUT") + or mydie "Couln't redirect stderr\n"; + $| = 1; + print "$p @a\n"; + exec "$p @a" + or mydie "Cannot start new $p @a $!\n"; + } else { + mydie "Cannot fork: $!\n"; + } +} + +sub randports { + + my $num = shift; + my @p = (); + for( my $i = 0; $i < $num; $i++ ) { + push( @p, int(rand(54000))+10000 ); + } + my @sp = sort { $a <=> $b } @p; + return @sp; +} + +sub print_config { + my @ports = @_; + open( CONFIG, ">config" ) or mydie( "Couldn't open config for writing" ); + foreach my $p (@ports) { + printf CONFIG "%05d\n", $p; + } + close( CONFIG ); +} + +sub spawn_ls { + my $master = shift; + my $port = shift; + return spawn( "./lock_server", $master, $port ); +} + +sub spawn_config { + my $master = shift; + my $port = shift; + return spawn( "./config_server", $master, $port ); +} + +sub check_views { + + my $l = shift; + my $v = shift; + my $last_v = shift; + + open( LOG, "<$l" ) + or mydie( "Failed: couldn't read $l" ); + my @log = ; + close(LOG); + + my @vs = @{$v}; + + my $i = 0; + my @last_view; + foreach my $line (@log) { + if( $line =~ /^done (\d+) ([\d\s]+)$/ ) { + + my $num = $1; + my @view = split( /\s+/, $2 ); + @last_view = @view; + + if( $i > $#vs ) { +# let there be extra views + next; + } + + my $e = $vs[$i]; + my @expected = @{$e}; + + if( @expected != @view ) { + mydie( "Failed: In log $l at view $num is (@view), but expected $i (@expected)" ); + } + + $i++; + } + } + + if( $i <= $#vs ) { + mydie( "Failed: In log $l, not enough views seen!" ); + } + + if( defined $last_v ) { + my @last_exp_v = @{$last_v}; + if( @last_exp_v != @last_view ) { + mydie( "Failed: In log $l last view didn't match, got view @last_view, but expected @last_exp_v" ); + } + } + +} + +sub get_num_views { + + my $log = shift; + my $including = shift; + my $nv = `grep "done " $log | grep "$including" | wc -l`; + chomp $nv; + return $nv; + +} + +sub wait_for_view_change { + + my $log = shift; + my $num_views = shift; + my $including = shift; + my $timeout = shift; + + my $start = time(); + while( (get_num_views( $log, $including ) < $num_views) and + ($start + $timeout > time()) ) { + my $lastv = `grep done $log | tail -n 1`; + chomp $lastv; + print " Waiting for $including to be present in >=$num_views views in $log (Last view: $lastv)\n"; + sleep 1; + } + + if( get_num_views( $log, $including ) < $num_views) { + mydie( "Failed: Timed out waiting for $including to be in >=$num_views in log $log" ); + }else{ + print " Done: $including is in >=$num_views views in $log\n"; + } +} + +sub waitpid_to { + my $pid = shift; + my $to = shift; + + my $start = time(); + my $done_pid; + do { + sleep 1; + $done_pid = waitpid($pid, POSIX::WNOHANG); + } while( $done_pid <= 0 and (time() - $start) < $to ); + + if( $done_pid <= 0 ) { + kill 9,$pid; + mydie( "Failed: Timed out waiting for process $pid\n" ); + } else { + return 1; + } + +} + +sub wait_and_check_expected_view($) { + my $v = shift; + push @views, $v; + for (my $i = 0; $i <=$#$v; $i++) { + $in_views{$v->[$i]}++; + } + foreach my $port (@$v) { + wait_for_view_change(paxos_log($port), $in_views{$port}, $port, 20); + } + foreach my $port (@$v) { + my $log = paxos_log($port); + check_views( $log, \@views ); + } +} + +sub start_nodes ($$){ + + @pid = (); + @logs = (); + @views = (); + for (my $i = 0; $i <= $#p; $i++) { + $in_views{$p[$i]} = 0; + } + + my $n = shift; + my $command = shift; + + for (my $i = 0; $i < $n; $i++) { + if ($command eq "ls") { + @pid = (@pid, spawn_ls($p[0],$p[$i])); + print "Start lock_server on $p[$i]\n"; + }elsif ($command eq "config_server"){ + @pid = (@pid, spawn_config($p[0],$p[$i])); + print "Start config on $p[$i]\n"; + } + sleep 1; + + my @vv = @p[0..$i]; + wait_and_check_expected_view(\@vv); + } + +} + +my %options; +getopts("s:k",\%options); +if (defined($options{s})) { + srand($options{s}); +} +if (defined($options{k})) { + $always_kill = 1; +} + +#get a sorted list of random ports +@p = randports(5); +print_config( @p[0..4] ); + +my @do_run = (); +my $NUM_TESTS = 17; + +# see which tests are set +if( $#ARGV > -1 ) { + foreach my $t (@ARGV) { + if( $t < $NUM_TESTS && $t >= 0 ) { + $do_run[$t] = 1; + } + } +} else { +# turn on all tests + for( my $i = 0; $i < $NUM_TESTS; $i++ ) { + $do_run[$i] = 1; + } +} + +if ($do_run[0]) { + print "test0: start 3-process lock server\n"; + start_nodes(3,"ls"); + cleanup(); + sleep 2; +} + +if ($do_run[1]) { + print "test1: start 3-process lock server, kill third server\n"; + start_nodes(3,"ls"); + + print "Kill third server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + cleanup(); + sleep 2; +} + +if ($do_run[2]) { + print "test2: start 3-process lock server, kill first server\n"; + start_nodes(3,"ls"); + + print "Kill first (PID: $pid[0]) on port $p[0]\n"; + kill "TERM", $pid[0]; + + sleep 5; + + # it should go through 4 views + my @v4 = ($p[1], $p[2]); + wait_and_check_expected_view(\@v4); + + cleanup(); + sleep 2; +} + + +if ($do_run[3]) { + + print "test3: start 3-process lock_server, kill a server, restart a server\n"; + start_nodes(3,"ls"); + + print "Kill server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print "Restart killed server on port $p[2]\n"; + $pid[2] = spawn_ls ($p[0], $p[2]); + + sleep 5; + + my @v5 = ($p[0], $p[1], $p[2]); + wait_and_check_expected_view(\@v5); + + cleanup(); + sleep 2; +} + +if ($do_run[4]) { + print "test4: 3-process lock_server, kill third server, kill second server, restart third server, kill third server again, restart second server, re-restart third server, check logs\n"; + start_nodes(3,"ls"); + + print "Kill server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print "Kill server (PID: $pid[1]) on port $p[1]\n"; + kill "TERM", $pid[1]; + + sleep 5; + #no view change can happen because of a lack of majority + + print "Restarting server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + sleep 5; + + #no view change can happen because of a lack of majority + foreach my $port (@p[0..2]) { + my $num_v = get_num_views(paxos_log($port), $port); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + # kill node 3 again, + print "Kill server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + + + print "Restarting server on port $p[1]\n"; + $pid[1] = spawn_ls($p[0], $p[1]); + + sleep 7; + + foreach my $port (@p[0..1]) { + $in_views{$port} = get_num_views( paxos_log($port), $port ); + print " Node $port is present in ", $in_views{$port}, " views in ", paxos_log($port), "\n"; + } + + print "Restarting server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + my @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + +# now check the paxos logs and make sure the logs go through the right +# views + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + cleanup(); + +} + +if ($do_run[5]) { + print "test5: 3-process lock_server, send signal 1 to first server, kill third server, restart third server, check logs\n"; + start_nodes(3,"ls"); + + print "Sending paxos breakpoint 1 to first server on port $p[0]\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 3); + + sleep 1; + + print "Kill third server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + foreach my $port (@p[0..2]) { + my $num_v = get_num_views( paxos_log($port), $port ); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + print "Restarting third server on port $p[2]\n"; + $pid[2]= spawn_ls($p[0], $p[2]); + my @lastv = ($p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + sleep 10; + +# now check the paxos logs and make sure the logs go through the right +# views + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + cleanup(); + +} + +if ($do_run[6]) { + print "test6: 4-process lock_server, send signal 2 to first server, kill fourth server, restart fourth server, check logs\n"; + start_nodes(4,"ls"); + print "Sending paxos breakpoint 2 to first server on port $p[0]\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 4); + + sleep 1; + + print "Kill fourth server (PID: $pid[3]) on port $p[3]\n"; + kill "TERM", $pid[3]; + + sleep 5; + + foreach my $port ($p[1],$p[2]) { + my $num_v = get_num_views( paxos_log($port), $port ); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + sleep 5; + + print "Restarting fourth server on port $p[3]\n"; + $pid[3] = spawn_ls($p[1], $p[3]); + + sleep 5; + + my @v5 = ($p[0],$p[1],$p[2]); + foreach my $port (@v5) { + $in_views{$port}++; + } + push @views, \@v5; + + sleep 10; + + # the 6th view will be (2,3) or (1,2,3,4) + my @v6 = ($p[1],$p[2]); + foreach my $port (@v6) { + $in_views{$port}++; + } + foreach my $port (@v6) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 30); + } + + # final will be (2,3,4) + my @lastv = ($p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv ); + } + cleanup(); + +} + +if ($do_run[7]) { + print "test7: 4-process lock_server, send signal 2 to first server, kill fourth server, kill other servers, restart other servers, restart fourth server, check logs\n"; + start_nodes(4,"ls"); + print "Sending paxos breakpoint 2 to first server on port $p[0]\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 4); + sleep 3; + + print "Kill fourth server (PID: $pid[3]) on port $p[3]\n"; + kill "TERM", $pid[3]; + + sleep 5; + + print "Kill third server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + print "Kill second server (PID: $pid[1]) on port $p[1]\n"; + kill "TERM", $pid[1]; + + sleep 5; + + print "Restarting second server on port $p[1]\n"; + $pid[1] = spawn_ls($p[0], $p[1]); + + sleep 5; + + print "Restarting third server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + sleep 5; + +#no view change is possible by now because there is no majority + foreach my $port ($p[1],$p[2]) { + my $num_v = get_num_views( paxos_log($port), $port ); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + print "Restarting fourth server on port $p[3]\n"; + $pid[3] = spawn_ls($p[1], $p[3]); + + sleep 5; + + my @v5 = ($p[0], $p[1], $p[2]); + push @views, \@v5; + foreach my $port (@v5) { + $in_views{$port}++; + } + + sleep 15; + my @lastv = ($p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + cleanup(); + +} + +if ($do_run[8]) { + print "test8: start 3-process lock service\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 8" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[9]) { + + print "test9: start 3-process rsm, kill second slave while lock_tester is running\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep int(rand(10)+1); + + print "Kill slave (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 3; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 9" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[10]) { + + print "test10: start 3-process rsm, kill second slave and restarts it later while lock_tester is running\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep int(rand(10)+1); + + print "Kill slave (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 3; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + sleep 3; + + print "Restarting killed lock_server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + my @v5 = ($p[0],$p[1],$p[2]); + wait_and_check_expected_view(\@v5); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 10" ); + } + + cleanup(); + sleep 2; +} + + +if ($do_run[11]) { + + print "test11: start 3-process rsm, kill primary while lock_tester is running\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep int(rand(10)+1); + + print "Kill primary (PID: $pid[0]) on port $p[0]\n"; + kill "TERM", $pid[0]; + + sleep 3; + + # it should go through 4 views + my @v4 = ($p[1], $p[2]); + wait_and_check_expected_view(\@v4); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 11" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[12]) { + + print "test12: start 3-process rsm, kill master at break1 and restart it while lock_tester is running\n"; + + start_nodes(3, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill master (PID: $pid[0]) on port $p[0] at breakpoint 1\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 1); + + + sleep 1; + + # it should go through 5 views + my @v4 = ($p[1], $p[2]); + wait_and_check_expected_view(\@v4); + + print "Restarting killed lock_server on port $p[0]\n"; + $pid[0] = spawn_ls($p[1], $p[0]); + + sleep 3; + + # the last view should include all nodes + my @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 12" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[13]) { + + print "test13: start 3-process rsm, kill slave at break1 and restart it while lock_tester is running\n"; + + start_nodes(3, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill slave (PID: $pid[2]) on port $p[2] at breakpoint 1\n"; + spawn("./rsm_tester", $p[2]+1, "breakpoint", 1); + + sleep 1; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print "Restarting killed lock_server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + sleep 3; + + # the last view should include all nodes + my @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 13" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[14]) { + + print "test14: start 5-process rsm, kill slave break1, kill slave break2\n"; + + start_nodes(5, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1\n"; + spawn("./rsm_tester", $p[4]+1, "breakpoint", 1); + + + print "Kill slave (PID: $pid[3]) on port $p[3] at breakpoint 2\n"; + spawn("./rsm_tester", $p[3]+1, "breakpoint", 2); + + + sleep 1; + + # two view changes: + + print "first view change wait\n"; + my @lastv = ($p[0],$p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print "second view change wait\n"; + + @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 14" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[15]) { + + print "test15: start 5-process rsm, kill slave break1, kill primary break2\n"; + + start_nodes(5, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1\n"; + spawn("./rsm_tester", $p[4]+1, "breakpoint", 1); + + + print "Kill primary (PID: $pid[0]) on port $p[0] at breakpoint 2\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 2); + + sleep 1; + + # two view changes: + + print "first view change wait\n"; + my @lastv = ($p[0],$p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print "second view change wait\n"; + + @lastv = ($p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 15" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[16]) { + + print "test16: start 3-process rsm, partition primary, heal it\n"; + + start_nodes(3, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Partition primary (PID: $pid[0]) on port $p[0] at breakpoint\n"; + + spawn("./rsm_tester", $p[0]+1, "partition", 0); + + sleep 3; + + print "first view change wait\n"; + my @lastv = ($p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + sleep 1; + + print "Heal partition primary (PID: $pid[0]) on port $p[0] at breakpoint\n"; + spawn("./rsm_tester", $p[0]+1, "partition", 1); + + sleep 1; + + # xxx it should test that this is the 5th view! + print "second view change wait\n"; + @lastv = ($p[0], $p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 16" ); + } + + cleanup(); + sleep 2; +} + +print "tests done OK\n"; + +unlink("config"); diff --git a/rsmtest_client.cc b/rsmtest_client.cc new file mode 100644 index 0000000..f008228 --- /dev/null +++ b/rsmtest_client.cc @@ -0,0 +1,39 @@ +// RPC stubs for clients to talk to rsmtest_server + +#include "rsmtest_client.h" +#include "rpc.h" +#include + +#include +#include +#include + +rsmtest_client::rsmtest_client(std::string dst) +{ + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + cl = new rpcc(dstsock); + if (cl->bind() < 0) { + printf("rsmtest_client: call bind\n"); + } +} + +int +rsmtest_client::net_repair(int heal) +{ + int r; + int ret = cl->call(rsm_test_protocol::net_repair, heal, r); + VERIFY (ret == rsm_test_protocol::OK); + return r; +} + +int +rsmtest_client::breakpoint(int b) +{ + int r; + int ret = cl->call(rsm_test_protocol::breakpoint, b, r); + VERIFY (ret == rsm_test_protocol::OK); + return r; +} + + diff --git a/rsmtest_client.h b/rsmtest_client.h new file mode 100644 index 0000000..a175d9e --- /dev/null +++ b/rsmtest_client.h @@ -0,0 +1,20 @@ +// rsmtest client interface. + +#ifndef rsmtest_client_h +#define rsmtest_client_h + +#include +#include "rsm_protocol.h" +#include "rpc.h" + +// Client interface to the rsmtest server +class rsmtest_client { + protected: + rpcc *cl; + public: + rsmtest_client(std::string d); + virtual ~rsmtest_client() {}; + virtual rsm_test_protocol::status net_repair(int heal); + virtual rsm_test_protocol::status breakpoint(int b); +}; +#endif diff --git a/srlock.cc b/srlock.cc new file mode 100644 index 0000000..8966527 --- /dev/null +++ b/srlock.cc @@ -0,0 +1,16 @@ +#include "srlock.h" + +ScopedRemoteLock::ScopedRemoteLock(lock_client *lc, lock_protocol::lockid_t lid) : + lc_(lc), lid_(lid) { + lc_->acquire(lid_); + releaseOnFree = true; +} + +void ScopedRemoteLock::retain() { + releaseOnFree = false; +} + +ScopedRemoteLock::~ScopedRemoteLock() { + if (releaseOnFree) + lc_->release(lid_); +} diff --git a/srlock.h b/srlock.h new file mode 100644 index 0000000..40bd8bd --- /dev/null +++ b/srlock.h @@ -0,0 +1,18 @@ +#ifndef srlock_h +#define srlock_h + +#include "lock_protocol.h" +#include "lock_client.h" + +class ScopedRemoteLock { + protected: + lock_client *lc_; + lock_protocol::lockid_t lid_; + bool releaseOnFree; + public: + ScopedRemoteLock(lock_client *, lock_protocol::lockid_t); + void retain(); + ~ScopedRemoteLock(); +}; + +#endif diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..d3cf93b --- /dev/null +++ b/start.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +ulimit -c unlimited + +NUM_LS=${1:-0} + +BASE_PORT=$RANDOM +BASE_PORT=$[BASE_PORT+2000] +LOCK_PORT=$[BASE_PORT+6] + +if [ $NUM_LS -gt 1 ]; then + x=0 + rm config + while [ $x -lt $NUM_LS ]; do + port=$[LOCK_PORT+2*x] + x=$[x+1] + echo $port >> config + done + x=0 + while [ $x -lt $NUM_LS ]; do + port=$[LOCK_PORT+2*x] + x=$[x+1] + echo "starting ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &" + ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 & + sleep 1 + done +else + echo "starting ./lock_server $LOCK_PORT > lock_server.log 2>&1 &" + ./lock_server $LOCK_PORT > lock_server.log 2>&1 & + sleep 1 +fi diff --git a/stop.sh b/stop.sh new file mode 100755 index 0000000..2d0a1f6 --- /dev/null +++ b/stop.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +pkill -u $USER lock_server diff --git a/tprintf.cc b/tprintf.cc new file mode 100644 index 0000000..8f6c8a9 --- /dev/null +++ b/tprintf.cc @@ -0,0 +1,16 @@ +#include "mutex.h" +#include +#include +#include "tprintf.h" + +uint64_t utime() { + struct timeval tp; + gettimeofday(&tp, NULL); + return (tp.tv_usec + (uint64_t)tp.tv_sec * 1000000) % 1000000000; +} + +mutex cerr_mutex; +std::map thread_name_map; +int next_thread_num = 0; +std::map instance_name_map; +int next_instance_num = 0; diff --git a/tprintf.h b/tprintf.h new file mode 100644 index 0000000..6a0cb2a --- /dev/null +++ b/tprintf.h @@ -0,0 +1,90 @@ +#ifndef TPRINTF_H +#define TPRINTF_H + +#include +#include +#include "mutex.h" +#include +#include +#include + +extern mutex cerr_mutex; +extern std::map thread_name_map; +extern int next_thread_num; +extern std::map instance_name_map; +extern int next_instance_num; +extern char tprintf_thread_prefix; + +#define LOG_PREFIX { \ + cerr_mutex.acquire(); \ + pthread_t self = pthread_self(); \ + int tid = thread_name_map[self]; \ + if (tid==0) \ + tid = thread_name_map[self] = ++next_thread_num; \ + std::cerr << std::left << std::setw(9) << utime() << " "; \ + std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \ + std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \ +} +#define LOG_THIS_POINTER { \ + int self = instance_name_map[this]; \ + if (self==0) \ + self = instance_name_map[this] = ++next_instance_num; \ + std::cerr << "#" << std::setw(2) << self; \ +} +#define LOG_SUFFIX { \ + cerr_mutex.release(); \ +} + +#define LOG_NONMEMBER(x) { \ + LOG_PREFIX; \ + std::cerr << x << std::endl; \ + LOG_SUFFIX; \ +} +#define LOG(x) { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << x << std::endl; \ + LOG_SUFFIX; \ +} +#define JOIN(from,to,sep) ({ \ + ostringstream oss; \ + for(typeof(from) i=from;i!=to;i++) \ + oss << *i << sep; \ + oss.str(); \ +}) +#define LOG_FUNC_ENTER { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << "lid=" << lid; \ + std::cerr << std::endl; \ + LOG_SUFFIX; \ +} +#define LOG_FUNC_ENTER_SERVER { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << "lid=" << lid; \ + std::cerr << " client=" << id << "," << xid; \ + std::cerr << std::endl; \ + LOG_SUFFIX; \ +} +#define LOG_FUNC_EXIT { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << "return" << lid; \ + std::cerr << std::endl; \ + LOG_SUFFIX; \ +} + +#define tprintf(args...) { \ + int len = snprintf(NULL, 0, args); \ + char buf[len+1]; \ + buf[len] = '\0'; \ + snprintf(buf, len+1, args); \ + if (buf[len-1]=='\n') \ + buf[len-1] = '\0'; \ + LOG_NONMEMBER(buf); \ +} + +uint64_t utime(); + +#endif