From 5fd8cc8409d0efadc07dfe8d6774ad9ff477663d Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Sat, 14 Sep 2013 21:48:20 -0400 Subject: [PATCH] Imported from 6.824 labs --- .gitignore | 12 + Makefile | 39 ++ config.cc | 332 ++++++++++++++ config.h | 54 +++ gettime.cc | 135 ++++++ gettime.h | 15 + handle.cc | 114 +++++ handle.h | 79 ++++ lang/algorithm.h | 18 + lang/verify.h | 15 + lock_client.cc | 63 +++ lock_client.h | 52 +++ lock_client_cache_rsm.cc | 210 +++++++++ lock_client_cache_rsm.h | 82 ++++ lock_demo.cc | 30 ++ lock_protocol.h | 30 ++ lock_server.cc | 46 ++ lock_server.h | 36 ++ lock_server_cache_rsm.cc | 245 +++++++++++ lock_server_cache_rsm.h | 52 +++ lock_smain.cc | 71 +++ lock_tester.cc | 242 +++++++++++ log.cc | 132 ++++++ log.h | 28 ++ mutex.cc | 42 ++ mutex.h | 28 ++ paxos.cc | 372 ++++++++++++++++ paxos.h | 100 +++++ paxos_protocol.h | 133 ++++++ random.cc | 36 ++ random.h | 7 + rpc/connection.cc | 448 +++++++++++++++++++ rpc/connection.h | 101 +++++ rpc/fifo.h | 94 ++++ rpc/jsl_log.cc | 9 + rpc/jsl_log.h | 25 ++ rpc/marshall.h | 261 +++++++++++ rpc/method_thread.h | 164 +++++++ rpc/pollmgr.cc | 360 +++++++++++++++ rpc/pollmgr.h | 107 +++++ rpc/rpc.cc | 1086 ++++++++++++++++++++++++++++++++++++++++++++++ rpc/rpc.h | 633 +++++++++++++++++++++++++++ rpc/rpctest.cc | 479 ++++++++++++++++++++ rpc/slock.h | 28 ++ rpc/thr_pool.cc | 69 +++ rpc/thr_pool.h | 66 +++ rsm.cc | 610 ++++++++++++++++++++++++++ rsm.h | 236 ++++++++++ rsm_client.cc | 94 ++++ rsm_client.h | 131 ++++++ rsm_protocol.h | 116 +++++ rsm_state_transfer.h | 11 + rsm_tester.cc | 40 ++ rsm_tester.pl | 912 ++++++++++++++++++++++++++++++++++++++ rsmtest_client.cc | 39 ++ rsmtest_client.h | 20 + srlock.cc | 16 + srlock.h | 18 + start.sh | 31 ++ stop.sh | 3 + tprintf.cc | 16 + tprintf.h | 90 ++++ 62 files changed, 9163 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 config.cc create mode 100644 config.h create mode 100644 gettime.cc create mode 100644 gettime.h create mode 100644 handle.cc create mode 100644 handle.h create mode 100644 lang/algorithm.h create mode 100644 lang/verify.h create mode 100644 lock_client.cc create mode 100644 lock_client.h create mode 100644 lock_client_cache_rsm.cc create mode 100644 lock_client_cache_rsm.h create mode 100644 lock_demo.cc create mode 100644 lock_protocol.h create mode 100644 lock_server.cc create mode 100644 lock_server.h create mode 100644 lock_server_cache_rsm.cc create mode 100644 lock_server_cache_rsm.h create mode 100644 lock_smain.cc create mode 100644 lock_tester.cc create mode 100644 log.cc create mode 100644 log.h create mode 100644 mutex.cc create mode 100644 mutex.h create mode 100644 paxos.cc create mode 100644 paxos.h create mode 100644 paxos_protocol.h create mode 100644 random.cc create mode 100644 random.h create mode 100644 rpc/connection.cc create mode 100644 rpc/connection.h create mode 100644 rpc/fifo.h create mode 100644 rpc/jsl_log.cc create mode 100644 rpc/jsl_log.h create mode 100644 rpc/marshall.h create mode 100644 rpc/method_thread.h create mode 100644 rpc/pollmgr.cc create mode 100644 rpc/pollmgr.h create mode 100644 rpc/rpc.cc create mode 100644 rpc/rpc.h create mode 100644 rpc/rpctest.cc create mode 100644 rpc/slock.h create mode 100644 rpc/thr_pool.cc create mode 100644 rpc/thr_pool.h create mode 100644 rsm.cc create mode 100644 rsm.h create mode 100644 rsm_client.cc create mode 100644 rsm_client.h create mode 100644 rsm_protocol.h create mode 100644 rsm_state_transfer.h create mode 100644 rsm_tester.cc create mode 100755 rsm_tester.pl create mode 100644 rsmtest_client.cc create mode 100644 rsmtest_client.h create mode 100644 srlock.cc create mode 100644 srlock.h create mode 100755 start.sh create mode 100755 stop.sh create mode 100644 tprintf.cc create mode 100644 tprintf.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..510791b --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +*.dSYM +*.o +*.d +rpc/rpctest +lock_tester +lock_demo +lock_server +*.swp +*.swo +*.a +*.log +rsm_tester diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..edf9fd0 --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ +CXXFLAGS = -g -MMD -Wall -I. -I./rpc +LDFLAGS = -L. -L/usr/local/lib +LDLIBS = -lpthread +LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt) +LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl) +CXX = g++ +CC = g++ + +all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest + +rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o gettime.o + rm -f $@ + ar cq $@ $^ + ranlib rpc/librpc.a + +rpc/rpctest: rpc/rpctest.o rpc/librpc.a + +lock_demo=lock_demo.o lock_client.o +lock_demo : $(lock_demo) rpc/librpc.a + +lock_tester=lock_tester.o lock_client.o mutex.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o +lock_tester : $(lock_tester) rpc/librpc.a + +lock_server=lock_server.o lock_smain.o mutex.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o +lock_server : $(lock_server) rpc/librpc.a + +rsm_tester=rsm_tester.o rsmtest_client.o +rsm_tester: $(rsm_tester) rpc/librpc.a + +%.o: %.cc + $(CXX) $(CXXFLAGS) -c $< -o $@ + +-include *.d +-include rpc/*.d + +clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester +.PHONY: clean +clean: + rm -rf $(clean_files) diff --git a/config.cc b/config.cc new file mode 100644 index 0000000..0f9ab4c --- /dev/null +++ b/config.cc @@ -0,0 +1,332 @@ +#include +#include +#include +#include "config.h" +#include "paxos.h" +#include "handle.h" +#include "tprintf.h" +#include "lang/verify.h" + +// The config module maintains views. As a node joins or leaves a +// view, the next view will be the same as previous view, except with +// the new node added or removed. The first view contains only node +// 1. If node 2 joins after the first node (it will download the views +// from node 1), it will learn about view 1 with the first node as the +// only member. It will then invoke Paxos to create the next view. +// It will tell Paxos to ask the nodes in view 1 to agree on the value +// {1, 2}. If Paxos returns success, then it moves to view 2 with +// {1,2} as the members. When node 3 joins, the config module runs +// Paxos with the nodes in view 2 and the proposed value to be +// {1,2,3}. And so on. When a node discovers that some node of the +// current view is not responding, it kicks off Paxos to propose a new +// value (the current view minus the node that isn't responding). The +// config module uses Paxos to create a total order of views, and it +// is ensured that the majority of the previous view agrees to the +// next view. The Paxos log contains all the values (i.e., views) +// agreed on. +// +// The RSM module informs config to add nodes. The config module +// runs a heartbeater thread that checks in with nodes. If a node +// doesn't respond, the config module will invoke Paxos's proposer to +// remove the node. Higher layers will learn about this change when a +// Paxos acceptor accepts the new proposed value through +// paxos_commit(). +// +// To be able to bring other nodes up to date to the latest formed +// view, each node will have a complete history of all view numbers +// and their values that it knows about. At any time a node can reboot +// and when it re-joins, it may be many views behind; by remembering +// all views, the other nodes can bring this re-joined node up to +// date. + +static void * +heartbeatthread(void *x) +{ + config *r = (config *) x; + r->heartbeater(); + return 0; +} + +config::config(std::string _first, std::string _me, config_view_change *_vc) + : myvid (0), first (_first), me (_me), vc (_vc) +{ + VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0); + VERIFY(pthread_cond_init(&config_cond, NULL) == 0); + + std::ostringstream ost; + ost << me; + + acc = new acceptor(this, me == _first, me, ost.str()); + pro = new proposer(this, acc, me); + + // XXX hack; maybe should have its own port number + pxsrpc = acc->get_rpcs(); + pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat); + + { + ScopedLock ml(&cfg_mutex); + + reconstruct(); + + pthread_t th; + VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0); + } +} + +void +config::restore(std::string s) +{ + ScopedLock ml(&cfg_mutex); + acc->restore(s); + reconstruct(); +} + +std::vector +config::get_view(unsigned instance) +{ + ScopedLock ml(&cfg_mutex); + return get_view_wo(instance); +} + +// caller should hold cfg_mutex +std::vector +config::get_view_wo(unsigned instance) +{ + std::string value = acc->value(instance); + tprintf("get_view(%d): returns %s\n", instance, value.c_str()); + return members(value); +} + +std::vector +config::members(std::string value) +{ + std::istringstream ist(value); + std::string m; + std::vector view; + while (ist >> m) { + view.push_back(m); + } + return view; +} + +std::string +config::value(std::vector m) +{ + std::ostringstream ost; + for (unsigned i = 0; i < m.size(); i++) { + ost << m[i]; + ost << " "; + } + return ost.str(); +} + +// caller should hold cfg_mutex +void +config::reconstruct() +{ + if (acc->instance() > 0) { + std::string m; + myvid = acc->instance(); + mems = get_view_wo(myvid); + tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str()); + } +} + +// Called by Paxos's acceptor. +void +config::paxos_commit(unsigned instance, std::string value) +{ + std::string m; + std::vector newmem; + ScopedLock ml(&cfg_mutex); + + newmem = members(value); + tprintf("config::paxos_commit: %d: %s\n", instance, + print_members(newmem).c_str()); + + for (unsigned i = 0; i < mems.size(); i++) { + tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str()); + if (!isamember(mems[i], newmem) && me != mems[i]) { + tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); + mgr.delete_handle(mems[i]); + } + } + + mems = newmem; + myvid = instance; + if (vc) { + unsigned vid = myvid; + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + vc->commit_change(vid); + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + } +} + +bool +config::ismember(std::string m, unsigned vid) +{ + bool r; + ScopedLock ml(&cfg_mutex); + std::vector v = get_view_wo(vid); + r = isamember(m, v); + return r; +} + +bool +config::add(std::string new_m, unsigned vid) +{ + std::vector m; + std::vector curm; + ScopedLock ml(&cfg_mutex); + if (vid != myvid) + return false; + tprintf("config::add %s\n", new_m.c_str()); + m = mems; + m.push_back(new_m); + curm = mems; + std::string v = value(m); + int nextvid = myvid + 1; + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + bool r = pro->run(nextvid, curm, v); + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + if (r) { + tprintf("config::add: proposer returned success\n"); + } else { + tprintf("config::add: proposer returned failure\n"); + } + return r; +} + +// caller should hold cfg_mutex +bool +config::remove_wo(std::string m) +{ + tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str()); + std::vector n; + for (unsigned i = 0; i < mems.size(); i++) { + if (mems[i] != m) n.push_back(mems[i]); + } + std::string v = value(n); + std::vector cmems = mems; + int nextvid = myvid + 1; + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + bool r = pro->run(nextvid, cmems, v); + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + if (r) { + tprintf("config::remove: proposer returned success\n"); + } else { + tprintf("config::remove: proposer returned failure\n"); + } + return r; +} + +void +config::heartbeater() +{ + struct timeval now; + struct timespec next_timeout; + std::string m; + heartbeat_t h; + bool stable; + unsigned vid; + std::vector cmems; + ScopedLock ml(&cfg_mutex); + + while (1) { + + gettimeofday(&now, NULL); + next_timeout.tv_sec = now.tv_sec + 3; + next_timeout.tv_nsec = 0; + tprintf("heartbeater: go to sleep\n"); + pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout); + + stable = true; + vid = myvid; + cmems = get_view_wo(vid); + tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str()); + + if (!isamember(me, cmems)) { + tprintf("heartbeater: not member yet; skip hearbeat\n"); + continue; + } + + //find the node with the smallest id + m = me; + for (unsigned i = 0; i < cmems.size(); i++) { + if (m > cmems[i]) + m = cmems[i]; + } + + if (m == me) { + //if i am the one with smallest id, ping the rest of the nodes + for (unsigned i = 0; i < cmems.size(); i++) { + if (cmems[i] != me) { + if ((h = doheartbeat(cmems[i])) != OK) { + stable = false; + m = cmems[i]; + break; + } + } + } + } else { + //the rest of the nodes ping the one with smallest id + if ((h = doheartbeat(m)) != OK) + stable = false; + } + + if (!stable && vid == myvid) { + remove_wo(m); + } + } +} + +paxos_protocol::status +config::heartbeat(std::string m, unsigned vid, int &r) +{ + ScopedLock ml(&cfg_mutex); + int ret = paxos_protocol::ERR; + r = (int) myvid; + tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid); + if (vid == myvid) { + ret = paxos_protocol::OK; + } else if (pro->isrunning()) { + VERIFY (vid == myvid + 1 || vid + 1 == myvid); + ret = paxos_protocol::OK; + } else { + ret = paxos_protocol::ERR; + } + return ret; +} + +config::heartbeat_t +config::doheartbeat(std::string m) +{ + int ret = rpc_const::timeout_failure; + int r; + unsigned vid = myvid; + heartbeat_t res = OK; + + tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); + handle h(m); + VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); + rpcc *cl = h.safebind(); + if (cl) { + ret = cl->call(paxos_protocol::heartbeat, me, vid, r, + rpcc::to(1000)); + } + VERIFY(pthread_mutex_lock(&cfg_mutex)==0); + if (ret != paxos_protocol::OK) { + if (ret == rpc_const::atmostonce_failure || + ret == rpc_const::oldsrv_failure) { + mgr.delete_handle(m); + } else { + tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", + m.c_str(), ret, vid, r); + if (ret < 0) res = FAILURE; + else res = VIEWERR; + } + } + tprintf("doheartbeat done %d\n", res); + return res; +} + diff --git a/config.h b/config.h new file mode 100644 index 0000000..9d7b023 --- /dev/null +++ b/config.h @@ -0,0 +1,54 @@ +#ifndef config_h +#define config_h + +#include +#include +#include "paxos.h" + +class config_view_change { + public: + virtual void commit_change(unsigned vid) = 0; + virtual ~config_view_change() {}; +}; + +class config : public paxos_change { + private: + acceptor *acc; + proposer *pro; + rpcs *pxsrpc; + unsigned myvid; + std::string first; + std::string me; + config_view_change *vc; + std::vector mems; + pthread_mutex_t cfg_mutex; + pthread_cond_t heartbeat_cond; + pthread_cond_t config_cond; + paxos_protocol::status heartbeat(std::string m, unsigned instance, int &r); + std::string value(std::vector mems); + std::vector members(std::string v); + std::vector get_view_wo(unsigned instance); + bool remove_wo(std::string); + void reconstruct(); + typedef enum { + OK, // response and same view # + VIEWERR, // response but different view # + FAILURE, // no response + } heartbeat_t; + heartbeat_t doheartbeat(std::string m); + public: + config(std::string _first, std::string _me, config_view_change *_vc); + unsigned vid() { return myvid; } + std::string myaddr() { return me; }; + std::string dump() { return acc->dump(); }; + std::vector get_view(unsigned instance); + void restore(std::string s); + bool add(std::string, unsigned vid); + bool ismember(std::string m, unsigned vid); + void heartbeater(void); + void paxos_commit(unsigned instance, std::string v); + rpcs *get_rpcs() { return acc->get_rpcs(); } + void breakpoint(int b) { pro->breakpoint(b); } +}; + +#endif diff --git a/gettime.cc b/gettime.cc new file mode 100644 index 0000000..926c510 --- /dev/null +++ b/gettime.cc @@ -0,0 +1,135 @@ +/* + * Copyright (c), MM Weiss + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the MM Weiss nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * clock_gettime_stub.c + * gcc -Wall -c clock_gettime_stub.c + * posix realtime functions; MacOS user space glue + */ + +/* @comment + * other possible implementation using intel builtin rdtsc + * rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html + * + * we could get the ticks by doing this + * + * __asm __volatile("mov %%ebx, %%esi\n\t" + * "cpuid\n\t" + * "xchg %%esi, %%ebx\n\t" + * "rdtsc" + * : "=a" (a), + * "=d" (d) + * ); + + * we could even replace our tricky sched_yield call by assembly code to get a better accurency, + * anyway the following C stub will satisfy 99% of apps using posix clock_gettime call, + * moreover, the setter version (clock_settime) could be easly written using mach primitives: + * http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time) + * + * hackers don't be crackers, don't you use a flush toilet? + * + * + * @see draft: ./posix-realtime-stub/posix-realtime-stub.c + * + */ + + +#ifdef __APPLE__ + +#pragma weak clock_gettime + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef enum { + CLOCK_REALTIME, + CLOCK_MONOTONIC, + CLOCK_PROCESS_CPUTIME_ID, + CLOCK_THREAD_CPUTIME_ID +} clockid_t; + +static mach_timebase_info_data_t __clock_gettime_inf; + +int clock_gettime(clockid_t clk_id, struct timespec *tp) { + kern_return_t ret; + clock_serv_t clk; + clock_id_t clk_serv_id; + mach_timespec_t tm; + + uint64_t start, end, delta, nano; + + int retval = -1; + switch (clk_id) { + case CLOCK_REALTIME: + case CLOCK_MONOTONIC: + clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK; + if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) { + if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) { + tp->tv_sec = tm.tv_sec; + tp->tv_nsec = tm.tv_nsec; + retval = 0; + } + } + if (KERN_SUCCESS != ret) { + errno = EINVAL; + retval = -1; + } + break; + case CLOCK_PROCESS_CPUTIME_ID: + case CLOCK_THREAD_CPUTIME_ID: + start = mach_absolute_time(); + if (clk_id == CLOCK_PROCESS_CPUTIME_ID) { + getpid(); + } else { + sched_yield(); + } + end = mach_absolute_time(); + delta = end - start; + if (0 == __clock_gettime_inf.denom) { + mach_timebase_info(&__clock_gettime_inf); + } + nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom; + tp->tv_sec = nano * 1e-9; + tp->tv_nsec = nano - (tp->tv_sec * 1e9); + retval = 0; + break; + default: + errno = EINVAL; + retval = -1; + } + return retval; +} + +#endif // __APPLE__ diff --git a/gettime.h b/gettime.h new file mode 100644 index 0000000..f10def4 --- /dev/null +++ b/gettime.h @@ -0,0 +1,15 @@ +#ifndef gettime_h +#define gettime_h + +#ifdef __APPLE__ +typedef enum { + CLOCK_REALTIME, + CLOCK_MONOTONIC, + CLOCK_PROCESS_CPUTIME_ID, + CLOCK_THREAD_CPUTIME_ID +} clockid_t; + +int clock_gettime(clockid_t clk_id, struct timespec *tp); +#endif + +#endif diff --git a/handle.cc b/handle.cc new file mode 100644 index 0000000..a233d5b --- /dev/null +++ b/handle.cc @@ -0,0 +1,114 @@ +#include "handle.h" +#include +#include "tprintf.h" + +handle_mgr mgr; + +handle::handle(std::string m) +{ + h = mgr.get_handle(m); +} + +rpcc * +handle::safebind() +{ + if (!h) + return NULL; + ScopedLock ml(&h->cl_mutex); + if (h->del) + return NULL; + if (h->cl) + return h->cl; + sockaddr_in dstsock; + make_sockaddr(h->m.c_str(), &dstsock); + rpcc *cl = new rpcc(dstsock); + tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); + int ret; + // Starting with lab 6, our test script assumes that the failure + // can be detected by paxos and rsm layer within few seconds. We have + // to set the timeout with a small value to support the assumption. + // + // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of + // lab 6 and lab 7 because the rpc layer may delay your RPC request, + // and cause a time out failure. Please make sure RPC_LOSSY is set to 0. + ret = cl->bind(rpcc::to(1000)); + if (ret < 0) { + tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); + delete cl; + h->del = true; + } else { + tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str()); + h->cl = cl; + } + return h->cl; +} + +handle::~handle() +{ + if (h) mgr.done_handle(h); +} + +handle_mgr::handle_mgr() +{ + VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0); +} + +struct hinfo * +handle_mgr::get_handle(std::string m) +{ + ScopedLock ml(&handle_mutex); + struct hinfo *h = 0; + if (hmap.find(m) == hmap.end()) { + h = new hinfo; + h->cl = NULL; + h->del = false; + h->refcnt = 1; + h->m = m; + pthread_mutex_init(&h->cl_mutex, NULL); + hmap[m] = h; + } else if (!hmap[m]->del) { + h = hmap[m]; + h->refcnt ++; + } + return h; +} + +void +handle_mgr::done_handle(struct hinfo *h) +{ + ScopedLock ml(&handle_mutex); + h->refcnt--; + if (h->refcnt == 0 && h->del) + delete_handle_wo(h->m); +} + +void +handle_mgr::delete_handle(std::string m) +{ + ScopedLock ml(&handle_mutex); + delete_handle_wo(m); +} + +// Must be called with handle_mutex locked. +void +handle_mgr::delete_handle_wo(std::string m) +{ + if (hmap.find(m) == hmap.end()) { + tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str()); + } else { + tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(), + hmap[m]->refcnt); + struct hinfo *h = hmap[m]; + if (h->refcnt == 0) { + if (h->cl) { + h->cl->cancel(); + delete h->cl; + } + pthread_mutex_destroy(&h->cl_mutex); + hmap.erase(m); + delete h; + } else { + h->del = true; + } + } +} diff --git a/handle.h b/handle.h new file mode 100644 index 0000000..ecd8884 --- /dev/null +++ b/handle.h @@ -0,0 +1,79 @@ +// manage a cache of RPC connections. +// assuming cid is a std::string holding the +// host:port of the RPC server you want +// to talk to: +// +// handle h(cid); +// rpcc *cl = h.safebind(); +// if(cl){ +// ret = cl->call(...); +// } else { +// bind() failed +// } +// +// if the calling program has not contacted +// cid before, safebind() will create a new +// connection, call bind(), and return +// an rpcc*, or 0 if bind() failed. if the +// program has previously contacted cid, +// safebind() just returns the previously +// created rpcc*. best not to hold any +// mutexes while calling safebind(). + +#ifndef handle_h +#define handle_h + +#include +#include +#include "rpc.h" + +struct hinfo { + rpcc *cl; + int refcnt; + bool del; + std::string m; + pthread_mutex_t cl_mutex; +}; + +class handle { + private: + struct hinfo *h; + public: + handle(std::string m); + ~handle(); + /* safebind will try to bind with the rpc server on the first call. + * Since bind may block, the caller probably should not hold a mutex + * when calling safebind. + * + * return: + * if the first safebind succeeded, all later calls would return + * a rpcc object; otherwise, all later calls would return NULL. + * + * Example: + * handle h(dst); + * XXX_protocol::status ret; + * if (h.safebind()) { + * ret = h.safebind()->call(...); + * } + * if (!h.safebind() || ret != XXX_protocol::OK) { + * // handle failure + * } + */ + rpcc *safebind(); +}; + +class handle_mgr { + private: + pthread_mutex_t handle_mutex; + std::map hmap; + public: + handle_mgr(); + struct hinfo *get_handle(std::string m); + void done_handle(struct hinfo *h); + void delete_handle(std::string m); + void delete_handle_wo(std::string m); +}; + +extern class handle_mgr mgr; + +#endif diff --git a/lang/algorithm.h b/lang/algorithm.h new file mode 100644 index 0000000..a487094 --- /dev/null +++ b/lang/algorithm.h @@ -0,0 +1,18 @@ +// compile time version of min and max + +#ifndef algorithm_h +#define algorithm_h + +template +struct static_max +{ + static const int value = A > B ? A : B; +}; + +template +struct static_min +{ + static const int value = A < B ? A : B; +}; + +#endif diff --git a/lang/verify.h b/lang/verify.h new file mode 100644 index 0000000..2b092d2 --- /dev/null +++ b/lang/verify.h @@ -0,0 +1,15 @@ +// safe assertions. + +#ifndef verify_client_h +#define verify_client_h + +#include +#include + +#ifdef NDEBUG +#define VERIFY(expr) do { if (!(expr)) abort(); } while (0) +#else +#define VERIFY(expr) assert(expr) +#endif + +#endif diff --git a/lock_client.cc b/lock_client.cc new file mode 100644 index 0000000..6e9fab1 --- /dev/null +++ b/lock_client.cc @@ -0,0 +1,63 @@ +// RPC stubs for clients to talk to lock_server + +#include "lock_client.h" +#include "rpc.h" +#include + +#include +#include +#include + +lock_client::lock_client(std::string dst) +{ + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + cl = new rpcc(dstsock); + if (cl->bind() < 0) { + printf("lock_client: call bind\n"); + } +} + +int +lock_client::stat(lock_protocol::lockid_t lid) +{ + int r; + lock_protocol::status ret = cl->call(lock_protocol::stat, cl->id(), lid, r); + VERIFY (ret == lock_protocol::OK); + return r; +} + +lock_protocol::status +lock_client::acquire(lock_protocol::lockid_t lid) +{ + int r; + return cl->call(lock_protocol::acquire, cl->id(), lid, r); +} + +lock_protocol::status +lock_client::release(lock_protocol::lockid_t lid) +{ + int r; + return cl->call(lock_protocol::release, cl->id(), lid, r); +} + +t4_lock_client *t4_lock_client_new(const char *dst) { + return (t4_lock_client *)new lock_client(dst); +} + +void t4_lock_client_delete(t4_lock_client *client) { + delete (lock_client *)client; +} + +t4_status t4_lock_client_acquire(t4_lock_client *client, t4_lockid_t lid) { + return ((lock_client *)client)->acquire(lid); +} + +t4_status t4_lock_client_release(t4_lock_client *client, t4_lockid_t lid) { + return ((lock_client *)client)->acquire(lid); +} + +t4_status t4_lock_client_stat(t4_lock_client *client, t4_lockid_t lid) { + return ((lock_client *)client)->stat(lid); +} + diff --git a/lock_client.h b/lock_client.h new file mode 100644 index 0000000..df22711 --- /dev/null +++ b/lock_client.h @@ -0,0 +1,52 @@ +// lock client interface. + +#ifndef lock_client_h +#define lock_client_h + +#ifdef __cplusplus + +#include +#include "lock_protocol.h" +#include "rpc.h" +#include + +// Client interface to the lock server +class lock_client { + protected: + rpcc *cl; + public: + lock_client(std::string d); + virtual ~lock_client() {}; + virtual lock_protocol::status acquire(lock_protocol::lockid_t); + virtual lock_protocol::status release(lock_protocol::lockid_t); + virtual lock_protocol::status stat(lock_protocol::lockid_t); +}; + +#endif + +extern "C" { + +struct _t4_lock_client; +typedef struct _t4_lock_client t4_lock_client; + +typedef enum { + T4_OK, + T4_RETRY, + T4_RPCERR, + T4_NOENT, + T4_IOERR +} t4_xxstatus; + +typedef int t4_status; + +typedef unsigned long long t4_lockid_t; + +t4_lock_client *t4_lock_client_new(const char *dst); +void t4_lock_client_delete(t4_lock_client *); +t4_status t4_lock_client_acquire(t4_lock_client *, t4_lockid_t); +t4_status t4_lock_client_release(t4_lock_client *, t4_lockid_t); +t4_status t4_lock_client_stat(t4_lock_client *, t4_lockid_t); + +} + +#endif diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc new file mode 100644 index 0000000..9ec802c --- /dev/null +++ b/lock_client_cache_rsm.cc @@ -0,0 +1,210 @@ +// RPC stubs for clients to talk to lock_server, and cache the locks +// see lock_client.cache.h for protocol details. + +#include "lock_client_cache_rsm.h" +#include "rpc.h" +#include +#include +#include +#include "tprintf.h" + +#include "rsm_client.h" + +lock_state::lock_state(): + state(none) +{ +} + +void lock_state::wait() { + pthread_t self = pthread_self(); + c[self].wait(m); + c.erase(self); +} + +void lock_state::signal() { + // signal anyone + if (c.begin() != c.end()) + c.begin()->second.signal(); +} + +void lock_state::signal(pthread_t who) { + if (c.count(who)) + c[who].signal(); +} + +static void * releasethread(void *x) { + lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x; + cc->releaser(); + return 0; +} + +int lock_client_cache_rsm::last_port = 0; + +lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { + ScopedLock sl(lock_table_lock); + // by the semantics of std::map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_user *_lu) : lock_client(xdst), lu(_lu) { + srand(time(NULL)^last_port); + rlock_port = ((rand()%32000) | (0x1 << 10)); + const char *hname; + // VERIFY(gethostname(hname, 100) == 0); + hname = "127.0.0.1"; + ostringstream host; + host << hname << ":" << rlock_port; + id = host.str(); + last_port = rlock_port; + rpcs *rlsrpc = new rpcs(rlock_port); + rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler); + rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler); + { + ScopedLock sl(xid_mutex); + xid = 0; + } + rsmc = new rsm_client(xdst); + int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this); + VERIFY (r == 0); +} + +void lock_client_cache_rsm::releaser() { + while (1) { + lock_protocol::lockid_t lid; + release_fifo.deq(&lid); + LOG("Releaser: " << lid); + + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread); + st.state = lock_state::releasing; + { + ScopedUnlock su(st.m); + int r; + rsmc->call(lock_protocol::release, lid, id, st.xid, r); + } + st.state = lock_state::none; + LOG("Lock " << lid << ": none"); + st.signal(); + } +} + +lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) { + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + pthread_t self = pthread_self(); + + // check for reentrancy + VERIFY(st.state != lock_state::locked || st.held_by != self); + VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end()); + + st.wanted_by.push_back(self); + + while (1) { + if (st.state != lock_state::free) + LOG("Lock " << lid << ": not free"); + + if (st.state == lock_state::none || st.state == lock_state::retrying) { + if (st.state == lock_state::none) { + ScopedLock sl(xid_mutex); + st.xid = xid++; + } + st.state = lock_state::acquiring; + LOG("Lock " << lid << ": acquiring"); + lock_protocol::status result; + { + ScopedUnlock su(st.m); + int r; + result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r); + } + LOG("acquire returned " << result); + if (result == lock_protocol::OK) { + st.state = lock_state::free; + LOG("Lock " << lid << ": free"); + } + } + + VERIFY(st.wanted_by.size() != 0); + if (st.state == lock_state::free) { + // is it for me? + pthread_t front = st.wanted_by.front(); + if (front == releaser_thread) { + st.wanted_by.pop_front(); + st.state = lock_state::locked; + st.held_by = releaser_thread; + LOG("Queuing " << lid << " for release"); + release_fifo.enq(lid); + } else if (front == self) { + st.wanted_by.pop_front(); + st.state = lock_state::locked; + st.held_by = self; + break; + } else { + st.signal(front); + } + } + + LOG("waiting..."); + st.wait(); + LOG("wait ended"); + } + + LOG("Lock " << lid << ": locked"); + return lock_protocol::OK; +} + +lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) { + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + pthread_t self = pthread_self(); + VERIFY(st.state == lock_state::locked && st.held_by == self); + st.state = lock_state::free; + LOG("Lock " << lid << ": free"); + if (st.wanted_by.size()) { + pthread_t front = st.wanted_by.front(); + if (front == releaser_thread) { + st.state = lock_state::locked; + st.held_by = releaser_thread; + st.wanted_by.pop_front(); + LOG("Queuing " << lid << " for release"); + release_fifo.enq(lid); + } else + st.signal(front); + } + LOG("Finished signaling."); + return lock_protocol::OK; +} + +rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { + LOG("Revoke handler " << lid << " " << xid); + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + + if (st.state == lock_state::releasing || st.state == lock_state::none) + return rlock_protocol::OK; + + if (st.state == lock_state::free && + (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) { + // gimme + st.state = lock_state::locked; + st.held_by = releaser_thread; + if (st.wanted_by.size()) + st.wanted_by.pop_front(); + release_fifo.enq(lid); + } else { + // get in line + st.wanted_by.push_back(releaser_thread); + } + return rlock_protocol::OK; +} + +rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + VERIFY(st.state == lock_state::acquiring); + st.state = lock_state::retrying; + LOG("Lock " << lid << ": none"); + st.signal(); // only one thread needs to wake up + return rlock_protocol::OK; +} diff --git a/lock_client_cache_rsm.h b/lock_client_cache_rsm.h new file mode 100644 index 0000000..28b0323 --- /dev/null +++ b/lock_client_cache_rsm.h @@ -0,0 +1,82 @@ +// lock client interface. + +#ifndef lock_client_cache_rsm_h + +#define lock_client_cache_rsm_h + +#include +#include "lock_protocol.h" +#include "rpc.h" +#include "lock_client.h" +#include "lang/verify.h" +#include "mutex.h" +#include "rpc/fifo.h" +#include "rsm_client.h" + +// Classes that inherit lock_release_user can override dorelease so that +// that they will be called when lock_client releases a lock. +// You will not need to do anything with this class until Lab 5. +class lock_release_user { + public: + virtual void dorelease(lock_protocol::lockid_t) = 0; + virtual ~lock_release_user() {}; +}; + +using namespace std; + +typedef string callback; + +class lock_state { +public: + lock_state(); + enum { + none = 0, + retrying, + free, + locked, + acquiring, + releasing + } state; + pthread_t held_by; + list wanted_by; + mutex m; + map c; + lock_protocol::xid_t xid; + void wait(); + void signal(); + void signal(pthread_t who); +}; + +typedef map lock_map; + +class lock_client_cache_rsm; + +// Clients that caches locks. The server can revoke locks using +// lock_revoke_server. +class lock_client_cache_rsm : public lock_client { + private: + pthread_t releaser_thread; + rsm_client *rsmc; + class lock_release_user *lu; + int rlock_port; + string hostname; + string id; + mutex xid_mutex; + lock_protocol::xid_t xid; + fifo release_fifo; + mutex lock_table_lock; + lock_map lock_table; + lock_state &get_lock_state(lock_protocol::lockid_t lid); + public: + static int last_port; + lock_client_cache_rsm(string xdst, class lock_release_user *l = 0); + virtual ~lock_client_cache_rsm() {}; + lock_protocol::status acquire(lock_protocol::lockid_t); + virtual lock_protocol::status release(lock_protocol::lockid_t); + void releaser(); + rlock_protocol::status revoke_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &); + rlock_protocol::status retry_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &); +}; + + +#endif diff --git a/lock_demo.cc b/lock_demo.cc new file mode 100644 index 0000000..4d74acb --- /dev/null +++ b/lock_demo.cc @@ -0,0 +1,30 @@ +// +// Lock demo +// + +#include "lock_protocol.h" +#include "lock_client.h" +#include "rpc.h" +#include +#include +#include +#include + +std::string dst; +lock_client *lc; + +int +main(int argc, char *argv[]) +{ + int r; + + if(argc != 2){ + fprintf(stderr, "Usage: %s [host:]port\n", argv[0]); + exit(1); + } + + dst = argv[1]; + lc = new lock_client(dst); + r = lc->stat(1); + printf ("stat returned %d\n", r); +} diff --git a/lock_protocol.h b/lock_protocol.h new file mode 100644 index 0000000..60df0ef --- /dev/null +++ b/lock_protocol.h @@ -0,0 +1,30 @@ +// lock protocol + +#ifndef lock_protocol_h +#define lock_protocol_h + +#include "rpc.h" + +class lock_protocol { + public: + enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR }; + typedef int status; + typedef unsigned long long lockid_t; + typedef unsigned long long xid_t; + enum rpc_numbers { + acquire = 0x7001, + release, + stat + }; +}; + +class rlock_protocol { + public: + enum xxstatus { OK, RPCERR }; + typedef int status; + enum rpc_numbers { + revoke = 0x8001, + retry = 0x8002 + }; +}; +#endif diff --git a/lock_server.cc b/lock_server.cc new file mode 100644 index 0000000..3801814 --- /dev/null +++ b/lock_server.cc @@ -0,0 +1,46 @@ +// the lock server implementation + +#include "lock_server.h" +#include +#include +#include +#include + +lock_server::lock_server(): + nacquire (0) +{ +} + +// caller must hold lock_lock +mutex & +lock_server::get_lock(lock_protocol::lockid_t lid) { + lock_lock.acquire(); + // by the semantics of std::map, this will create + // the mutex if it doesn't already exist + mutex &l = locks[lid]; + lock_lock.release(); + return l; +} + +lock_protocol::status +lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r) +{ + lock_protocol::status ret = lock_protocol::OK; + printf("stat request from clt %d\n", clt); + r = nacquire; + return ret; +} + +lock_protocol::status +lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r) +{ + get_lock(lid).acquire(); + return lock_protocol::OK; +} + +lock_protocol::status +lock_server::release(int clt, lock_protocol::lockid_t lid, int &r) +{ + get_lock(lid).release(); + return lock_protocol::OK; +} diff --git a/lock_server.h b/lock_server.h new file mode 100644 index 0000000..e24e359 --- /dev/null +++ b/lock_server.h @@ -0,0 +1,36 @@ +// this is the lock server +// the lock client has a similar interface + +#ifndef lock_server_h +#define lock_server_h + +#include +#include "lock_protocol.h" +#include "lock_client.h" +#include "rpc.h" +#include +#include +#include +#include "mutex.h" + +using namespace std; + +typedef map lock_map; + +class lock_server { + + protected: + int nacquire; + mutex lock_lock; + lock_map locks; + mutex &get_lock(lock_protocol::lockid_t lid); + + public: + lock_server(); + ~lock_server() {}; + lock_protocol::status stat(int clt, lock_protocol::lockid_t lid, int &); + lock_protocol::status acquire(int clt, lock_protocol::lockid_t lid, int &); + lock_protocol::status release(int clt, lock_protocol::lockid_t lid, int &); +}; + +#endif diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc new file mode 100644 index 0000000..5149049 --- /dev/null +++ b/lock_server_cache_rsm.cc @@ -0,0 +1,245 @@ +// the caching lock server implementation + +#include "lock_server_cache_rsm.h" +#include +#include +#include +#include +#include "lang/verify.h" +#include "handle.h" +#include "tprintf.h" +#include "rpc/marshall.h" + +lock_state::lock_state(): + held(false) +{ +} + +template +ostringstream & operator<<(ostringstream &o, const pair &d) { + o << "<" << d.first << "," << d.second << ">"; + return o; +} + +template +marshall & operator<<(marshall &m, const list &d) { + m << vector(d.begin(), d.end()); + return m; +} + +template +unmarshall & operator>>(unmarshall &u, list &d) { + vector v; + u >> v; + d.assign(v.begin(), v.end()); + return u; +} + + +template +marshall & operator<<(marshall &m, const pair &d) { + m << d.first; + m << d.second; + return m; +} + +template +unmarshall & operator>>(unmarshall &u, pair &d) { + u >> d.first; + u >> d.second; + return u; +} + +marshall & operator<<(marshall &m, const lock_state &d) { + m << d.held; + m << d.held_by; + m << d.wanted_by; + return m; +} + +unmarshall & operator>>(unmarshall &u, lock_state &d) { + u >> d.held; + u >> d.held_by; + u >> d.wanted_by; + return u; +} + + +lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { + ScopedLock sl(lock_table_lock); + // by the semantics of map, this will create + // the lock if it doesn't already exist + return lock_table[lid]; +} + +static void *revokethread(void *x) { + lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x; + sc->revoker(); + return 0; +} + +static void *retrythread(void *x) { + lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x; + sc->retryer(); + return 0; +} + +lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) { + pthread_t th; + VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0); + VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0); + rsm->set_state_transfer(this); +} + +void lock_server_cache_rsm::revoker() { + while (1) { + lock_protocol::lockid_t lid; + revoke_fifo.deq(&lid); + LOG("Revoking " << lid); + if (rsm && !rsm->amiprimary()) + continue; + + lock_state &st = get_lock_state(lid); + holder held_by; + { + ScopedLock sl(st.m); + held_by = st.held_by; + } + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(held_by.first).safebind(); + if (proxy) { + int r; + rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r); + LOG("Revoke returned " << ret); + } + } +} + +void lock_server_cache_rsm::retryer() { + while (1) { + lock_protocol::lockid_t lid; + retry_fifo.deq(&lid); + if (rsm && !rsm->amiprimary()) + continue; + + LOG("Sending retry for " << lid); + lock_state &st = get_lock_state(lid); + holder front; + { + ScopedLock sl(st.m); + if (st.wanted_by.empty()) + continue; + front = st.wanted_by.front(); + } + + rlock_protocol::status ret = -1; + + rpcc *proxy = NULL; + // try a few times? + //int t=5; + //while (t-- && !proxy) + proxy = handle(front.first).safebind(); + if (proxy) { + int r; + ret = proxy->call(rlock_protocol::retry, lid, front.second, r); + LOG("Retry returned " << ret); + } + } +} + +int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) { + LOG_FUNC_ENTER_SERVER; + holder h = holder(id, xid); + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + + // deal with duplicated requests + if (st.old_requests.count(id)) { + lock_protocol::xid_t old_xid = st.old_requests[id]; + if (old_xid > xid) + return lock_protocol::RPCERR; + else if (old_xid == xid) { + if (st.held && st.held_by == h) { + LOG("Client " << id << " sent duplicate acquire xid=" << xid); + return lock_protocol::OK; + } + } + } + + // grant the lock if it's available and I'm next in line + if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) { + if (!st.wanted_by.empty()) + st.wanted_by.pop_front(); + st.old_requests[id] = xid; + + st.held = true; + st.held_by = h; + LOG("Lock " << lid << " held by " << h.first); + if (st.wanted_by.size()) + revoke_fifo.enq(lid); + return lock_protocol::OK; + } + + // get in line + bool found = false; + for (list::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) { + if (i->first == id) { + // make sure client is obeying serialization + if (i->second != xid) { + LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second); + return lock_protocol::RPCERR; + } + found = true; + break; + } + } + if (!found) + st.wanted_by.push_back(h); + + LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); + + // send revoke if we're first in line + if (st.wanted_by.front() == h) + revoke_fifo.enq(lid); + + return lock_protocol::RETRY; +} + +int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) { + LOG_FUNC_ENTER_SERVER; + lock_state &st = get_lock_state(lid); + ScopedLock sl(st.m); + if (st.held && st.held_by == holder(id, xid)) { + st.held = false; + LOG("Lock " << lid << " not held"); + } + if (st.wanted_by.size()) + retry_fifo.enq(lid); + return lock_protocol::OK; +} + +string lock_server_cache_rsm::marshal_state() { + ScopedLock sl(lock_table_lock); + marshall rep; + rep << nacquire; + rep << lock_table; + return rep.str(); +} + +void lock_server_cache_rsm::unmarshal_state(string state) { + ScopedLock sl(lock_table_lock); + unmarshall rep(state); + rep >> nacquire; + rep >> lock_table; +} + +lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) { + printf("stat request\n"); + r = nacquire; + return lock_protocol::OK; +} + diff --git a/lock_server_cache_rsm.h b/lock_server_cache_rsm.h new file mode 100644 index 0000000..eb86bd0 --- /dev/null +++ b/lock_server_cache_rsm.h @@ -0,0 +1,52 @@ +#ifndef lock_server_cache_rsm_h +#define lock_server_cache_rsm_h + +#include + +#include +#include +#include "lock_protocol.h" +#include "rpc.h" +#include "mutex.h" +#include "rsm_state_transfer.h" +#include "rsm.h" +#include "rpc/fifo.h" + +using namespace std; + +typedef string callback; +typedef pair holder; + +class lock_state { +public: + lock_state(); + bool held; + holder held_by; + list wanted_by; + map old_requests; + mutex m; +}; + +typedef map lock_map; + +class lock_server_cache_rsm : public rsm_state_transfer { + private: + int nacquire; + mutex lock_table_lock; + lock_map lock_table; + lock_state &get_lock_state(lock_protocol::lockid_t lid); + fifo retry_fifo; + fifo revoke_fifo; + class rsm *rsm; + public: + lock_server_cache_rsm(class rsm *rsm = 0); + lock_protocol::status stat(lock_protocol::lockid_t, int &); + void revoker(); + void retryer(); + string marshal_state(); + void unmarshal_state(string state); + int acquire(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &); + int release(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &); +}; + +#endif diff --git a/lock_smain.cc b/lock_smain.cc new file mode 100644 index 0000000..fba7eae --- /dev/null +++ b/lock_smain.cc @@ -0,0 +1,71 @@ +#include "rpc.h" +#include +#include +#include +#include "lock_server_cache_rsm.h" +#include "paxos.h" +#include "rsm.h" + +#include "jsl_log.h" + +// Main loop of lock_server + +char tprintf_thread_prefix = 's'; + +int +main(int argc, char *argv[]) +{ + int count = 0; + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + + srandom(getpid()); + + if(argc != 3){ + fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); + exit(1); + } + + char *count_env = getenv("RPC_COUNT"); + if(count_env != NULL){ + count = atoi(count_env); + } + + //jsl_set_debug(2); + // Comment out the next line to switch between the ordinary lock + // server and the RSM. In Lab 6, we disable the lock server and + // implement Paxos. In Lab 7, we will make the lock server use your + // RSM layer. +#define RSM +#ifdef RSM +// You must comment out the next line once you are done with Step One. +//#define STEP_ONE +#ifdef STEP_ONE + rpcs server(atoi(argv[1])); + lock_server_cache_rsm ls; + server.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); + server.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); + server.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); +#else + rsm rsm(argv[1], argv[2]); + lock_server_cache_rsm ls(&rsm); + rsm.set_state_transfer((rsm_state_transfer *)&ls); + rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); + rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); + rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); +#endif // STEP_ONE +#endif // RSM + +#ifndef RSM + lock_server_cache ls; + rpcs server(atoi(argv[1]), count); + server.reg(lock_protocol::stat, &ls, &lock_server_cache::stat); + server.reg(lock_protocol::acquire, &ls, &lock_server_cache::acquire); + server.reg(lock_protocol::release, &ls, &lock_server_cache::release); +#endif + + + while(1) + sleep(1000); +} diff --git a/lock_tester.cc b/lock_tester.cc new file mode 100644 index 0000000..dd7c07b --- /dev/null +++ b/lock_tester.cc @@ -0,0 +1,242 @@ +// +// Lock server tester +// + +#include "lock_protocol.h" +#include "lock_client.h" +#include "rpc.h" +#include "jsl_log.h" +#include +#include +#include +#include +#include "lang/verify.h" +#include "lock_client_cache_rsm.h" +#include "tprintf.h" + +char tprintf_thread_prefix = 'c'; + +// must be >= 2 +int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. +std::string dst; +lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt]; +lock_protocol::lockid_t a = 1; +lock_protocol::lockid_t b = 2; +lock_protocol::lockid_t c = 3; + +// check_grant() and check_release() check that the lock server +// doesn't grant the same lock to both clients. +// it assumes that lock names are distinct in the first byte. +int ct[256]; +pthread_mutex_t count_mutex; + +void +check_grant(lock_protocol::lockid_t lid) +{ + ScopedLock ml(&count_mutex); + int x = lid & 0xff; + if(ct[x] != 0){ + fprintf(stderr, "error: server granted %016llx twice\n", lid); + fprintf(stdout, "error: server granted %016llx twice\n", lid); + exit(1); + } + ct[x] += 1; +} + +void +check_release(lock_protocol::lockid_t lid) +{ + ScopedLock ml(&count_mutex); + int x = lid & 0xff; + if(ct[x] != 1){ + fprintf(stderr, "error: client released un-held lock %016llx\n", lid); + exit(1); + } + ct[x] -= 1; +} + +void +test1(void) +{ + tprintf ("acquire a release a acquire a release a\n"); + lc[0]->acquire(a); + check_grant(a); + lc[0]->release(a); + check_release(a); + lc[0]->acquire(a); + check_grant(a); + lc[0]->release(a); + check_release(a); + + tprintf ("acquire a acquire b release b release a\n"); + lc[0]->acquire(a); + check_grant(a); + lc[0]->acquire(b); + check_grant(b); + lc[0]->release(b); + check_release(b); + lc[0]->release(a); + check_release(a); +} + +void * +test2(void *x) +{ + int i = * (int *) x; + + tprintf ("test2: client %d acquire a release a\n", i); + lc[i]->acquire(a); + tprintf ("test2: client %d acquire done\n", i); + check_grant(a); + sleep(1); + tprintf ("test2: client %d release\n", i); + check_release(a); + lc[i]->release(a); + tprintf ("test2: client %d release done\n", i); + return 0; +} + +void * +test3(void *x) +{ + int i = * (int *) x; + + tprintf ("test3: client %d acquire a release a concurrent\n", i); + for (int j = 0; j < 10; j++) { + lc[i]->acquire(a); + check_grant(a); + tprintf ("test3: client %d got lock\n", i); + check_release(a); + lc[i]->release(a); + } + return 0; +} + +void * +test4(void *x) +{ + int i = * (int *) x; + + tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i); + for (int j = 0; j < 10; j++) { + lc[0]->acquire(a); + check_grant(a); + tprintf ("test4: thread %d on client 0 got lock\n", i); + check_release(a); + lc[0]->release(a); + } + return 0; +} + +void * +test5(void *x) +{ + int i = * (int *) x; + + tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i); + for (int j = 0; j < 10; j++) { + if (i < 5) lc[0]->acquire(a); + else lc[1]->acquire(a); + check_grant(a); + tprintf ("test5: client %d got lock\n", i); + check_release(a); + if (i < 5) lc[0]->release(a); + else lc[1]->release(a); + } + return 0; +} + +int +main(int argc, char *argv[]) +{ + int r; + pthread_t th[nt]; + int test = 0; + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + srandom(getpid()); + + //jsl_set_debug(2); + + if(argc < 2) { + fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); + exit(1); + } + + dst = argv[1]; + + if (argc > 2) { + test = atoi(argv[2]); + if(test < 1 || test > 5){ + tprintf("Test number must be between 1 and 5\n"); + exit(1); + } + } + + VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0); + tprintf("cache lock client\n"); + for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst); + + if(!test || test == 1){ + test1(); + } + + if(!test || test == 2){ + // test2 + for (int i = 0; i < nt; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test2, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < nt; i++) { + pthread_join(th[i], NULL); + } + } + + if(!test || test == 3){ + tprintf("test 3\n"); + + // test3 + for (int i = 0; i < nt; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test3, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < nt; i++) { + pthread_join(th[i], NULL); + } + } + + if(!test || test == 4){ + tprintf("test 4\n"); + + // test 4 + for (int i = 0; i < 2; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test4, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < 2; i++) { + pthread_join(th[i], NULL); + } + } + + if(!test || test == 5){ + tprintf("test 5\n"); + + // test 5 + + for (int i = 0; i < nt; i++) { + int *a = new int (i); + r = pthread_create(&th[i], NULL, test5, (void *) a); + VERIFY (r == 0); + } + for (int i = 0; i < nt; i++) { + pthread_join(th[i], NULL); + } + } + + tprintf ("%s: passed all tests successfully\n", argv[0]); + +} diff --git a/log.cc b/log.cc new file mode 100644 index 0000000..a760815 --- /dev/null +++ b/log.cc @@ -0,0 +1,132 @@ +#include "paxos.h" +#include +#include + +// Paxos must maintain some durable state (i.e., that survives power +// failures) to run Paxos correct. This module implements a log with +// all durable state to run Paxos. Since the values chosen correspond +// to views, the log contains all views since the beginning of time. + +log::log(acceptor *_acc, std::string _me) + : pxs (_acc) +{ + name = "paxos-" + _me + ".log"; + logread(); +} + +void +log::logread(void) +{ + std::ifstream from; + std::string type; + unsigned instance; + + from.open(name.c_str()); + printf ("logread\n"); + while (from >> type) { + if (type == "done") { + std::string v; + from >> instance; + from.get(); + getline(from, v); + pxs->values[instance] = v; + pxs->instance_h = instance; + printf ("logread: instance: %d w. v = %s\n", instance, + pxs->values[instance].c_str()); + pxs->v_a.clear(); + pxs->n_h.n = 0; + pxs->n_a.n = 0; + } else if (type == "propseen") { + from >> pxs->n_h.n; + from >> pxs->n_h.m; + printf("logread: high update: %d(%s)\n", pxs->n_h.n, pxs->n_h.m.c_str()); + } else if (type == "accepted") { + std::string v; + from >> pxs->n_a.n; + from >> pxs->n_a.m; + from.get(); + getline(from, v); + pxs->v_a = v; + printf("logread: prop update %d(%s) with v = %s\n", pxs->n_a.n, + pxs->n_a.m.c_str(), pxs->v_a.c_str()); + } else { + printf("logread: unknown log record\n"); + VERIFY(0); + } + } + from.close(); +} + +std::string +log::dump() +{ + std::ifstream from; + std::string res; + std::string v; + from.open(name.c_str()); + while (getline(from, v)) { + res = res + v + "\n"; + } + from.close(); + return res; +} + +void +log::restore(std::string s) +{ + std::ofstream f; + printf("restore: %s\n", s.c_str()); + f.open(name.c_str(), std::ios::trunc); + f << s; + f.close(); +} + +// XXX should be an atomic operation +void +log::loginstance(unsigned instance, std::string v) +{ + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "done"; + f << " "; + f << instance; + f << " "; + f << v; + f << "\n"; + f.close(); +} + +// an acceptor should call logprop(n_h) when it +// receives a prepare to which it responds prepare_ok(). +void +log::logprop(prop_t n_h) +{ + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "propseen"; + f << " "; + f << n_h.n; + f << " "; + f << n_h.m; + f << "\n"; + f.close(); +} + +// an acceptor should call logaccept(n_a, v_a) when it +// receives an accept RPC to which it replies accept_ok(). +void +log::logaccept(prop_t n, std::string v) +{ + std::ofstream f; + f.open(name.c_str(), std::ios::app); + f << "accepted"; + f << " "; + f << n.n; + f << " "; + f << n.m; + f << " "; + f << v; + f << "\n"; + f.close(); +} + diff --git a/log.h b/log.h new file mode 100644 index 0000000..5bd2779 --- /dev/null +++ b/log.h @@ -0,0 +1,28 @@ +#ifndef log_h +#define log_h + +#include +#include + + +class acceptor; + +class log { + private: + std::string name; + acceptor *pxs; + public: + log (acceptor*, std::string _me); + std::string dump(); + void restore(std::string s); + void logread(void); + /* Log a committed paxos instance*/ + void loginstance(unsigned instance, std::string v); + /* Log the highest proposal number that the local paxos acceptor has ever seen */ + void logprop(prop_t n_h); + /* Log the proposal (proposal number and value) that the local paxos acceptor + accept has ever accepted */ + void logaccept(prop_t n_a, std::string v); +}; + +#endif /* log_h */ diff --git a/mutex.cc b/mutex.cc new file mode 100644 index 0000000..4e54cc0 --- /dev/null +++ b/mutex.cc @@ -0,0 +1,42 @@ +#include "mutex.h" +#include "lang/verify.h" + +mutex::mutex() { + VERIFY(pthread_mutex_init(&m, NULL) == 0); +} + +mutex::~mutex() { + VERIFY(pthread_mutex_destroy(&m) == 0); +} + +void mutex::acquire() { + VERIFY(pthread_mutex_lock(&m) == 0); +} + +void mutex::release() { + VERIFY(pthread_mutex_unlock(&m) == 0); +} + +mutex::operator pthread_mutex_t *() { + return &m; +} + +cond::cond() { + VERIFY(pthread_cond_init(&c, NULL) == 0); +} + +cond::~cond() { + VERIFY(pthread_cond_destroy(&c) == 0); +} + +void cond::wait(mutex &m) { + VERIFY(pthread_cond_wait(&c, m) == 0); +} + +void cond::signal() { + VERIFY(pthread_cond_signal(&c) == 0); +} + +void cond::broadcast() { + VERIFY(pthread_cond_broadcast(&c) == 0); +} diff --git a/mutex.h b/mutex.h new file mode 100644 index 0000000..228a3bb --- /dev/null +++ b/mutex.h @@ -0,0 +1,28 @@ +#ifndef mutex_h +#define mutex_h + +#include + +class mutex { + protected: + pthread_mutex_t m; + public: + mutex(); + ~mutex(); + void acquire(); + void release(); + operator pthread_mutex_t *(); +}; + +class cond { + protected: + pthread_cond_t c; + public: + cond(); + ~cond(); + void wait(mutex &m); + void signal(); + void broadcast(); +}; + +#endif diff --git a/paxos.cc b/paxos.cc new file mode 100644 index 0000000..c3a445f --- /dev/null +++ b/paxos.cc @@ -0,0 +1,372 @@ +#include "paxos.h" +#include "handle.h" +// #include +#include +#include "tprintf.h" +#include "lang/verify.h" + +// This module implements the proposer and acceptor of the Paxos +// distributed algorithm as described by Lamport's "Paxos Made +// Simple". To kick off an instance of Paxos, the caller supplies a +// list of nodes, a proposed value, and invokes the proposer. If the +// majority of the nodes agree on the proposed value after running +// this instance of Paxos, the acceptor invokes the upcall +// paxos_commit to inform higher layers of the agreed value for this +// instance. + + +bool +operator> (const prop_t &a, const prop_t &b) +{ + return (a.n > b.n || (a.n == b.n && a.m > b.m)); +} + +bool +operator>= (const prop_t &a, const prop_t &b) +{ + return (a.n > b.n || (a.n == b.n && a.m >= b.m)); +} + +std::string +print_members(const std::vector &nodes) +{ + std::string s; + s.clear(); + for (unsigned i = 0; i < nodes.size(); i++) { + s += nodes[i]; + if (i < (nodes.size()-1)) + s += ","; + } + return s; +} + +bool isamember(std::string m, const std::vector &nodes) +{ + for (unsigned i = 0; i < nodes.size(); i++) { + if (nodes[i] == m) return 1; + } + return 0; +} + +bool +proposer::isrunning() +{ + bool r; + ScopedLock ml(pxs_mutex); + r = !stable; + return r; +} + +// check if the servers in l2 contains a majority of servers in l1 +bool +proposer::majority(const std::vector &l1, + const std::vector &l2) +{ + unsigned n = 0; + + for (unsigned i = 0; i < l1.size(); i++) { + if (isamember(l1[i], l2)) + n++; + } + return n >= (l1.size() >> 1) + 1; +} + +proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, + std::string _me) + : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), + stable (true) +{ + my_n.n = 0; + my_n.m = me; +} + +void +proposer::setn() +{ + my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; +} + +bool +proposer::run(int instance, std::vector cur_nodes, std::string newv) +{ + std::vector accepts; + std::vector nodes; + std::string v; + bool r = false; + + ScopedLock ml(pxs_mutex); + tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", + print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + if (!stable) { // already running proposer? + tprintf("proposer::run: already running\n"); + return false; + } + stable = false; + setn(); + accepts.clear(); + v.clear(); + if (prepare(instance, accepts, cur_nodes, v)) { + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of prepare responses\n"); + + if (v.size() == 0) + v = newv; + + breakpoint1(); + + nodes = accepts; + accepts.clear(); + accept(instance, accepts, nodes, v); + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of accept responses\n"); + + breakpoint2(); + + decide(instance, accepts, v); + r = true; + } else { + tprintf("paxos::manager: no majority of accept responses\n"); + } + } else { + tprintf("paxos::manager: no majority of prepare responses\n"); + } + } else { + tprintf("paxos::manager: prepare is rejected %d\n", stable); + } + stable = true; + return r; +} + +// proposer::run() calls prepare to send prepare RPCs to nodes +// and collect responses. if one of those nodes +// replies with an oldinstance, return false. +// otherwise fill in accepts with set of nodes that accepted, +// set v to the v_a with the highest n_a, and return true. +bool +proposer::prepare(unsigned instance, std::vector &accepts, + std::vector nodes, + std::string &v) +{ + struct paxos_protocol::preparearg arg = { instance, my_n }; + struct paxos_protocol::prepareres res; + prop_t n_a = { 0, "" }; + rpcc *r; + for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { + handle h(*i); + if (!(r = h.safebind())) + continue; + int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000)); + if (status == paxos_protocol::OK) { + if (res.oldinstance) { + tprintf("commiting old instance!\n"); + acc->commit(instance, res.v_a); + return false; + } + if (res.accept) { + accepts.push_back(*i); + if (res.n_a >= n_a) { + tprintf("found a newer accepted proposal\n"); + v = res.v_a; + n_a = res.n_a; + } + } + } + } + return true; +} + +// run() calls this to send out accept RPCs to accepts. +// fill in accepts with list of nodes that accepted. +void +proposer::accept(unsigned instance, std::vector &accepts, + std::vector nodes, std::string v) +{ + struct paxos_protocol::acceptarg arg = { instance, my_n, v }; + rpcc *r; + for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { + handle h(*i); + if (!(r = h.safebind())) + continue; + bool accept = false; + int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000)); + if (status == paxos_protocol::OK) { + if (accept) + accepts.push_back(*i); + } + } +} + +void +proposer::decide(unsigned instance, std::vector accepts, + std::string v) +{ + struct paxos_protocol::decidearg arg = { instance, v }; + rpcc *r; + for (std::vector::iterator i=accepts.begin(); i!=accepts.end(); i++) { + handle h(*i); + if (!(r = h.safebind())) + continue; + int res = 0; + r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000)); + } +} + +acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, + std::string _value) + : cfg(_cfg), me (_me), instance_h(0) +{ + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + + l = new log (this, me); + + if (instance_h == 0 && _first) { + values[1] = _value; + l->loginstance(1, _value); + instance_h = 1; + } + + pxs = new rpcs(atoi(_me.c_str())); + pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq); + pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq); + pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq); +} + +paxos_protocol::status +acceptor::preparereq(std::string src, paxos_protocol::preparearg a, + paxos_protocol::prepareres &r) +{ + ScopedLock ml(pxs_mutex); + r.oldinstance = false; + r.accept = false; + r.n_a = n_a; + r.v_a = v_a; + if (a.instance <= instance_h) { + r.oldinstance = true; + r.v_a = values[a.instance]; + } else if (a.n > n_h) { + n_h = a.n; + l->logprop(n_h); + r.accept = true; + } else { + tprintf("I totally rejected this request. Ha.\n"); + } + return paxos_protocol::OK; +} + +paxos_protocol::status +acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) +{ + ScopedLock ml(pxs_mutex); + r = false; + if (a.n >= n_h) { + n_a = a.n; + v_a = a.v; + l->logaccept(n_a, v_a); + r = true; + } + return paxos_protocol::OK; +} + +// the src argument is only for debug purpose +paxos_protocol::status +acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) +{ + ScopedLock ml(pxs_mutex); + tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", + a.instance, instance_h, v_a.c_str()); + if (a.instance == instance_h + 1) { + VERIFY(v_a == a.v); + commit_wo(a.instance, v_a); + } else if (a.instance <= instance_h) { + // we are ahead ignore. + } else { + // we are behind + VERIFY(0); + } + return paxos_protocol::OK; +} + +void +acceptor::commit_wo(unsigned instance, std::string value) +{ + //assume pxs_mutex is held + tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + if (instance > instance_h) { + tprintf("commit: highestaccepteinstance = %d\n", instance); + values[instance] = value; + l->loginstance(instance, value); + instance_h = instance; + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + if (cfg) { + pxs_mutex.release(); + cfg->paxos_commit(instance, value); + pxs_mutex.acquire(); + } + } +} + +void +acceptor::commit(unsigned instance, std::string value) +{ + ScopedLock ml(pxs_mutex); + commit_wo(instance, value); +} + +std::string +acceptor::dump() +{ + return l->dump(); +} + +void +acceptor::restore(std::string s) +{ + l->restore(s); + l->logread(); +} + + + +// For testing purposes + +// Call this from your code between phases prepare and accept of proposer +void +proposer::breakpoint1() +{ + if (break1) { + tprintf("Dying at breakpoint 1!\n"); + exit(1); + } +} + +// Call this from your code between phases accept and decide of proposer +void +proposer::breakpoint2() +{ + if (break2) { + tprintf("Dying at breakpoint 2!\n"); + exit(1); + } +} + +void +proposer::breakpoint(int b) +{ + if (b == 3) { + tprintf("Proposer: breakpoint 1\n"); + break1 = true; + } else if (b == 4) { + tprintf("Proposer: breakpoint 2\n"); + break2 = true; + } +} diff --git a/paxos.h b/paxos.h new file mode 100644 index 0000000..7188edb --- /dev/null +++ b/paxos.h @@ -0,0 +1,100 @@ +#ifndef paxos_h +#define paxos_h + +#include +#include +#include "rpc.h" +#include "paxos_protocol.h" +#include "log.h" +#include "mutex.h" + + +class paxos_change { + public: + virtual void paxos_commit(unsigned instance, std::string v) = 0; + virtual ~paxos_change() {}; +}; + +class acceptor { + private: + log *l; + rpcs *pxs; + paxos_change *cfg; + std::string me; + mutex pxs_mutex; + + // Acceptor state + prop_t n_h; // number of the highest proposal seen in a prepare + prop_t n_a; // number of highest proposal accepted + std::string v_a; // value of highest proposal accepted + unsigned instance_h; // number of the highest instance we have decided + std::map values; // vals of each instance + + void commit_wo(unsigned instance, std::string v); + paxos_protocol::status preparereq(std::string src, + paxos_protocol::preparearg a, + paxos_protocol::prepareres &r); + paxos_protocol::status acceptreq(std::string src, + paxos_protocol::acceptarg a, bool &r); + paxos_protocol::status decidereq(std::string src, + paxos_protocol::decidearg a, int &r); + + friend class log; + + public: + acceptor(class paxos_change *cfg, bool _first, std::string _me, + std::string _value); + ~acceptor() {}; + void commit(unsigned instance, std::string v); + unsigned instance() { return instance_h; } + std::string value(unsigned instance) { return values[instance]; } + std::string dump(); + void restore(std::string); + rpcs *get_rpcs() { return pxs; }; + prop_t get_n_h() { return n_h; }; + unsigned get_instance_h() { return instance_h; }; +}; + +extern bool isamember(std::string m, const std::vector &nodes); +extern std::string print_members(const std::vector &nodes); + +class proposer { + private: + log *l; + paxos_change *cfg; + acceptor *acc; + std::string me; + bool break1; + bool break2; + + mutex pxs_mutex; + + // Proposer state + bool stable; + prop_t my_n; // number of the last proposal used in this instance + + void setn(); + bool prepare(unsigned instance, std::vector &accepts, + std::vector nodes, + std::string &v); + void accept(unsigned instance, std::vector &accepts, + std::vector nodes, std::string v); + void decide(unsigned instance, std::vector accepts, + std::string v); + + void breakpoint1(); + void breakpoint2(); + bool majority(const std::vector &l1, const std::vector &l2); + + friend class log; + public: + proposer(class paxos_change *cfg, class acceptor *_acceptor, std::string _me); + ~proposer() {}; + bool run(int instance, std::vector cnodes, std::string v); + bool isrunning(); + void breakpoint(int b); +}; + + + +#endif /* paxos_h */ diff --git a/paxos_protocol.h b/paxos_protocol.h new file mode 100644 index 0000000..9c2703e --- /dev/null +++ b/paxos_protocol.h @@ -0,0 +1,133 @@ +#ifndef paxos_protocol_h +#define paxos_protocol_h + +#include "rpc.h" + +struct prop_t { + unsigned n; + std::string m; +}; + +class paxos_protocol { + public: + enum xxstatus { OK, ERR }; + typedef int status; + enum rpc_numbers { + preparereq = 0x11001, + acceptreq, + decidereq, + heartbeat, + }; + + struct preparearg { + unsigned instance; + prop_t n; + }; + + struct prepareres { + bool oldinstance; + bool accept; + prop_t n_a; + std::string v_a; + }; + + struct acceptarg { + unsigned instance; + prop_t n; + std::string v; + }; + + struct decidearg { + unsigned instance; + std::string v; + }; + +}; + +inline unmarshall & +operator>>(unmarshall &u, prop_t &a) +{ + u >> a.n; + u >> a.m; + return u; +} + +inline marshall & +operator<<(marshall &m, prop_t a) +{ + m << a.n; + m << a.m; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::preparearg &a) +{ + u >> a.instance; + u >> a.n; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::preparearg a) +{ + m << a.instance; + m << a.n; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::prepareres &r) +{ + u >> r.oldinstance; + u >> r.accept; + u >> r.n_a; + u >> r.v_a; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::prepareres r) +{ + m << r.oldinstance; + m << r.accept; + m << r.n_a; + m << r.v_a; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::acceptarg &a) +{ + u >> a.instance; + u >> a.n; + u >> a.v; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::acceptarg a) +{ + m << a.instance; + m << a.n; + m << a.v; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, paxos_protocol::decidearg &a) +{ + u >> a.instance; + u >> a.v; + return u; +} + +inline marshall & +operator<<(marshall &m, paxos_protocol::decidearg a) +{ + m << a.instance; + m << a.v; + return m; +} + +#endif diff --git a/random.cc b/random.cc new file mode 100644 index 0000000..d344958 --- /dev/null +++ b/random.cc @@ -0,0 +1,36 @@ +#include "mutex.h" +#include +#include "rpc/slock.h" +#include + +static mutex rand_m; + +void srand_safe(unsigned int seed) { + ScopedLock s(rand_m); + srandom(seed); +} + +// RAND_MAX is guaranteed to be at least 32767 +// but math with rand() is annoying. +// Here's a canned solution from +// http://www.azillionmonkeys.com/qed/random.html +// which gives uniform numbers in [0, r) + +#define RS_SCALE (1.0 / (1.0 + RAND_MAX)) + +double drand (void) { + double d; + do { + d = (((random() * RS_SCALE) + random()) * RS_SCALE + random()) * RS_SCALE; + } while (d >= 1); /* Round off */ + return d; +} + +#define irand(x) ((unsigned int) ((x) * drand())) + +// use this to construct a 32-bit thread-safe RNG +#define RAND32 ((uint32_t)irand(1ul<<32)) +uint32_t rand32_safe() { + ScopedLock s(rand_m); + return RAND32; +} diff --git a/random.h b/random.h new file mode 100644 index 0000000..fce86ef --- /dev/null +++ b/random.h @@ -0,0 +1,7 @@ +#ifndef random_h +#define random_h + +void srand_safe(unsigned int seed); +uint32_t rand32_safe(); + +#endif diff --git a/rpc/connection.cc b/rpc/connection.cc new file mode 100644 index 0000000..c22ad45 --- /dev/null +++ b/rpc/connection.cc @@ -0,0 +1,448 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "method_thread.h" +#include "connection.h" +#include "slock.h" +#include "pollmgr.h" +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +#define MAX_PDU (10<<20) //maximum PDF is 10M + + +connection::connection(chanmgr *m1, int f1, int l1) +: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) +{ + + int flags = fcntl(fd_, F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(fd_, F_SETFL, flags); + + signal(SIGPIPE, SIG_IGN); + VERIFY(pthread_mutex_init(&m_,0)==0); + VERIFY(pthread_mutex_init(&ref_m_,0)==0); + VERIFY(pthread_cond_init(&send_wait_,0)==0); + VERIFY(pthread_cond_init(&send_complete_,0)==0); + + VERIFY(gettimeofday(&create_time_, NULL) == 0); + + PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); +} + +connection::~connection() +{ + VERIFY(dead_); + VERIFY(pthread_mutex_destroy(&m_)== 0); + VERIFY(pthread_mutex_destroy(&ref_m_)== 0); + VERIFY(pthread_cond_destroy(&send_wait_) == 0); + VERIFY(pthread_cond_destroy(&send_complete_) == 0); + if (rpdu_.buf) + free(rpdu_.buf); + VERIFY(!wpdu_.buf); + close(fd_); +} + +void +connection::incref() +{ + ScopedLock ml(&ref_m_); + refno_++; +} + +bool +connection::isdead() +{ + ScopedLock ml(&m_); + return dead_; +} + +void +connection::closeconn() +{ + { + ScopedLock ml(&m_); + if (!dead_) { + dead_ = true; + shutdown(fd_,SHUT_RDWR); + }else{ + return; + } + } + //after block_remove_fd, select will never wait on fd_ + //and no callbacks will be active + PollMgr::Instance()->block_remove_fd(fd_); +} + +void +connection::decref() +{ + VERIFY(pthread_mutex_lock(&ref_m_)==0); + refno_ --; + VERIFY(refno_>=0); + if (refno_==0) { + VERIFY(pthread_mutex_lock(&m_)==0); + if (dead_) { + VERIFY(pthread_mutex_unlock(&ref_m_)==0); + VERIFY(pthread_mutex_unlock(&m_)==0); + delete this; + return; + } + VERIFY(pthread_mutex_unlock(&m_)==0); + } + pthread_mutex_unlock(&ref_m_); +} + +int +connection::ref() +{ + ScopedLock rl(&ref_m_); + return refno_; +} + +int +connection::compare(connection *another) +{ + if (create_time_.tv_sec > another->create_time_.tv_sec) + return 1; + if (create_time_.tv_sec < another->create_time_.tv_sec) + return -1; + if (create_time_.tv_usec > another->create_time_.tv_usec) + return 1; + if (create_time_.tv_usec < another->create_time_.tv_usec) + return -1; + return 0; +} + +bool +connection::send(char *b, int sz) +{ + ScopedLock ml(&m_); + waiters_++; + while (!dead_ && wpdu_.buf) { + VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); + } + waiters_--; + if (dead_) { + return false; + } + wpdu_.buf = b; + wpdu_.sz = sz; + wpdu_.solong = 0; + + if (lossy_) { + if ((random()%100) < lossy_) { + jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_); + shutdown(fd_,SHUT_RDWR); + } + } + + if (!writepdu()) { + dead_ = true; + VERIFY(pthread_mutex_unlock(&m_) == 0); + PollMgr::Instance()->block_remove_fd(fd_); + VERIFY(pthread_mutex_lock(&m_) == 0); + }else{ + if (wpdu_.solong == wpdu_.sz) { + }else{ + //should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { + VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); + } + } + } + bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); + wpdu_.solong = wpdu_.sz = 0; + wpdu_.buf = NULL; + if (waiters_ > 0) + pthread_cond_broadcast(&send_wait_); + return ret; +} + +//fd_ is ready to be written +void +connection::write_cb(int s) +{ + ScopedLock ml(&m_); + VERIFY(!dead_); + VERIFY(fd_ == s); + if (wpdu_.sz == 0) { + PollMgr::Instance()->del_callback(fd_,CB_WRONLY); + return; + } + if (!writepdu()) { + PollMgr::Instance()->del_callback(fd_, CB_RDWR); + dead_ = true; + }else{ + VERIFY(wpdu_.solong >= 0); + if (wpdu_.solong < wpdu_.sz) { + return; + } + } + pthread_cond_signal(&send_complete_); +} + +//fd_ is ready to be read +void +connection::read_cb(int s) +{ + ScopedLock ml(&m_); + VERIFY(fd_ == s); + if (dead_) { + return; + } + + bool succ = true; + if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + succ = readpdu(); + } + + if (!succ) { + PollMgr::Instance()->del_callback(fd_,CB_RDWR); + dead_ = true; + pthread_cond_signal(&send_complete_); + } + + if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + //chanmgr has successfully consumed the pdu + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + } + } +} + +bool +connection::writepdu() +{ + VERIFY(wpdu_.solong >= 0); + if (wpdu_.solong == wpdu_.sz) + return true; + + if (wpdu_.solong == 0) { + int sz = htonl(wpdu_.sz); + bcopy(&sz,wpdu_.buf,sizeof(sz)); + } + int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + if (n < 0) { + if (errno != EAGAIN) { + jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno); + wpdu_.solong = -1; + wpdu_.sz = 0; + } + return (errno == EAGAIN); + } + wpdu_.solong += n; + return true; +} + +bool +connection::readpdu() +{ + if (!rpdu_.sz) { + int sz, sz1; + int n = read(fd_, &sz1, sizeof(sz1)); + + if (n == 0) { + return false; + } + + if (n < 0) { + VERIFY(errno!=EAGAIN); + return false; + } + + if (n >0 && n!= sizeof(sz)) { + jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n"); + return false; + } + + sz = ntohl(sz1); + + if (sz > MAX_PDU) { + char *tmpb = (char *)&sz1; + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); + return false; + } + + rpdu_.sz = sz; + VERIFY(rpdu_.buf == NULL); + rpdu_.buf = (char *)malloc(sz+sizeof(sz)); + VERIFY(rpdu_.buf); + bcopy(&sz1,rpdu_.buf,sizeof(sz)); + rpdu_.solong = sizeof(sz); + } + + int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + if (n <= 0) { + if (errno == EAGAIN) + return true; + if (rpdu_.buf) + free(rpdu_.buf); + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + return (errno == EAGAIN); + } + rpdu_.solong += n; + return true; +} + +tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +: mgr_(m1), lossy_(lossytest) +{ + + VERIFY(pthread_mutex_init(&m_,NULL) == 0); + + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + tcp_ = socket(AF_INET, SOCK_STREAM, 0); + if(tcp_ < 0){ + perror("tcpsconn::tcpsconn accept_loop socket:"); + VERIFY(0); + } + + int yes = 1; + setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + + if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ + perror("accept_loop tcp bind:"); + VERIFY(0); + } + + if(listen(tcp_, 1000) < 0) { + perror("tcpsconn::tcpsconn listen:"); + VERIFY(0); + } + + socklen_t addrlen = sizeof(sin); + VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); + port_ = ntohs(sin.sin_port); + + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, + sin.sin_port); + + if (pipe(pipe_) < 0) { + perror("accept_loop pipe:"); + VERIFY(0); + } + + int flags = fcntl(pipe_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipe_[0], F_SETFL, flags); + + VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); +} + +tcpsconn::~tcpsconn() +{ + VERIFY(close(pipe_[1]) == 0); + VERIFY(pthread_join(th_, NULL) == 0); + + //close all the active connections + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end(); i++) { + i->second->closeconn(); + i->second->decref(); + } +} + +void +tcpsconn::process_accept() +{ + sockaddr_in sin; + socklen_t slen = sizeof(sin); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + if (s1 < 0) { + perror("tcpsconn::accept_conn error"); + pthread_exit(NULL); + } + + jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", + s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); + connection *ch = new connection(mgr_, s1, lossy_); + + // garbage collect all dead connections with refcount of 1 + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end();) { + if (i->second->isdead() && i->second->ref() == 1) { + jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", + i->second->channo()); + i->second->decref(); + // Careful not to reuse i right after erase. (i++) will + // be evaluated before the erase call because in C++, + // there is a sequence point before a function call. + // See http://en.wikipedia.org/wiki/Sequence_point. + conns_.erase(i++); + } else + ++i; + } + + conns_[ch->channo()] = ch; +} + +void +tcpsconn::accept_conn() +{ + fd_set rfds; + int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; + + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + perror("accept_conn select:"); + jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + VERIFY(0); + } + } + + if (FD_ISSET(pipe_[0], &rfds)) { + close(pipe_[0]); + close(tcp_); + return; + } + else if (FD_ISSET(tcp_, &rfds)) { + process_accept(); + } else { + VERIFY(0); + } + } +} + +connection * +connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) +{ + int s= socket(AF_INET, SOCK_STREAM, 0); + int yes = 1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", + inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); + close(s); + return NULL; + } + jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n", + s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); + return new connection(mgr, s, lossy); +} + + diff --git a/rpc/connection.h b/rpc/connection.h new file mode 100644 index 0000000..da48cf4 --- /dev/null +++ b/rpc/connection.h @@ -0,0 +1,101 @@ +#ifndef connection_h +#define connection_h 1 + +#include +#include +#include +#include +#include + +#include + +#include "pollmgr.h" + +class connection; + +class chanmgr { + public: + virtual bool got_pdu(connection *c, char *b, int sz) = 0; + virtual ~chanmgr() {} +}; + +class connection : public aio_callback { + public: + struct charbuf { + charbuf(): buf(NULL), sz(0), solong(0) {} + charbuf (char *b, int s) : buf(b), sz(s), solong(0){} + char *buf; + int sz; + int solong; //amount of bytes written or read so far + }; + + connection(chanmgr *m1, int f1, int lossytest=0); + ~connection(); + + int channo() { return fd_; } + bool isdead(); + void closeconn(); + + bool send(char *b, int sz); + void write_cb(int s); + void read_cb(int s); + + void incref(); + void decref(); + int ref(); + + int compare(connection *another); + private: + + bool readpdu(); + bool writepdu(); + + chanmgr *mgr_; + const int fd_; + bool dead_; + + charbuf wpdu_; + charbuf rpdu_; + + struct timeval create_time_; + + int waiters_; + int refno_; + const int lossy_; + + pthread_mutex_t m_; + pthread_mutex_t ref_m_; + pthread_cond_t send_complete_; + pthread_cond_t send_wait_; +}; + +class tcpsconn { + public: + tcpsconn(chanmgr *m1, int port, int lossytest=0); + ~tcpsconn(); + inline int port() { return port_; } + void accept_conn(); + private: + int port_; + pthread_mutex_t m_; + pthread_t th_; + int pipe_[2]; + + int tcp_; //file desciptor for accepting connection + chanmgr *mgr_; + int lossy_; + std::map conns_; + + void process_accept(); +}; + +struct bundle { + bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} + chanmgr *mgr; + int tcp; + int lossy; +}; + +void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0); +connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); +#endif diff --git a/rpc/fifo.h b/rpc/fifo.h new file mode 100644 index 0000000..979cf62 --- /dev/null +++ b/rpc/fifo.h @@ -0,0 +1,94 @@ +#ifndef fifo_h +#define fifo_h + +// fifo template +// blocks enq() and deq() when queue is FULL or EMPTY + +#include +#include +#include +#include +#include +#include "slock.h" +#include "lang/verify.h" + +template +class fifo { + public: + fifo(int m=0); + ~fifo(); + bool enq(T, bool blocking=true); + void deq(T *); + bool size(); + + private: + std::list q_; + pthread_mutex_t m_; + pthread_cond_t non_empty_c_; // q went non-empty + pthread_cond_t has_space_c_; // q is not longer overfull + unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit +}; + +template +fifo::fifo(int limit) : max_(limit) +{ + VERIFY(pthread_mutex_init(&m_, 0) == 0); + VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0); + VERIFY(pthread_cond_init(&has_space_c_, 0) == 0); +} + +template +fifo::~fifo() +{ + //fifo is to be deleted only when no threads are using it! + VERIFY(pthread_mutex_destroy(&m_)==0); + VERIFY(pthread_cond_destroy(&non_empty_c_) == 0); + VERIFY(pthread_cond_destroy(&has_space_c_) == 0); +} + +template bool +fifo::size() +{ + ScopedLock ml(&m_); + return q_.size(); +} + +template bool +fifo::enq(T e, bool blocking) +{ + ScopedLock ml(&m_); + while (1) { + if (!max_ || q_.size() < max_) { + q_.push_back(e); + break; + } + if (blocking) + VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0); + else + return false; + } + VERIFY(pthread_cond_signal(&non_empty_c_) == 0); + return true; +} + +template void +fifo::deq(T *e) +{ + ScopedLock ml(&m_); + + while(1) { + if(q_.empty()){ + VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0); + } else { + *e = q_.front(); + q_.pop_front(); + if (max_ && q_.size() < max_) { + VERIFY(pthread_cond_signal(&has_space_c_)==0); + } + break; + } + } + return; +} + +#endif diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc new file mode 100644 index 0000000..06e5c2c --- /dev/null +++ b/rpc/jsl_log.cc @@ -0,0 +1,9 @@ +#include "jsl_log.h" + +int JSL_DEBUG_LEVEL = 0; +void +jsl_set_debug(int level) { + JSL_DEBUG_LEVEL = level; +} + + diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h new file mode 100644 index 0000000..7f92998 --- /dev/null +++ b/rpc/jsl_log.h @@ -0,0 +1,25 @@ +#ifndef __JSL_LOG_H__ +#define __JSL_LOG_H__ 1 + +enum dbcode { + JSL_DBG_OFF = 0, + JSL_DBG_1 = 1, // Critical + JSL_DBG_2 = 2, // Error + JSL_DBG_3 = 3, // Info + JSL_DBG_4 = 4, // Debugging +}; + +extern int JSL_DEBUG_LEVEL; + +#define jsl_log(level,...) \ + do { \ + if(JSL_DEBUG_LEVEL < abs(level)) \ + {;} \ + else { \ + { printf(__VA_ARGS__);} \ + } \ + } while(0) + +void jsl_set_debug(int level); + +#endif // __JSL_LOG_H__ diff --git a/rpc/marshall.h b/rpc/marshall.h new file mode 100644 index 0000000..e0370d1 --- /dev/null +++ b/rpc/marshall.h @@ -0,0 +1,261 @@ +#ifndef marshall_h +#define marshall_h + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "lang/verify.h" +#include "lang/algorithm.h" + +struct req_header { + req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0): + xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} + int xid; + int proc; + unsigned int clt_nonce; + unsigned int srv_nonce; + int xid_rep; +}; + +struct reply_header { + reply_header(int x=0, int r=0): xid(x), ret(r) {} + int xid; + int ret; +}; + +typedef uint64_t rpc_checksum_t; +typedef int rpc_sz_t; + +enum { + //size of initial buffer allocation + DEFAULT_RPC_SZ = 1024, +#if RPC_CHECKSUMMING + //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum + RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t) +#else + RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) +#endif +}; + +class marshall { + private: + char *_buf; // Base of the raw bytes buffer (dynamically readjusted) + int _capa; // Capacity of the buffer + int _ind; // Read/write head position + + public: + marshall() { + _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); + VERIFY(_buf); + _capa = DEFAULT_RPC_SZ; + _ind = RPC_HEADER_SZ; + } + + ~marshall() { + if (_buf) + free(_buf); + } + + int size() { return _ind;} + char *cstr() { return _buf;} + + void rawbyte(unsigned char); + void rawbytes(const char *, int); + + // Return the current content (excluding header) as a string + std::string get_content() { + return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ); + } + + // Return the current content (excluding header) as a string + std::string str() { + return get_content(); + } + + void pack(int i); + + void pack_req_header(const req_header &h) { + int saved_sz = _ind; + //leave the first 4-byte empty for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + pack(h.xid); + pack(h.proc); + pack((int)h.clt_nonce); + pack((int)h.srv_nonce); + pack(h.xid_rep); + _ind = saved_sz; + } + + void pack_reply_header(const reply_header &h) { + int saved_sz = _ind; + //leave the first 4-byte empty for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + pack(h.xid); + pack(h.ret); + _ind = saved_sz; + } + + void take_buf(char **b, int *s) { + *b = _buf; + *s = _ind; + _buf = NULL; + _ind = 0; + return; + } +}; +marshall& operator<<(marshall &, bool); +marshall& operator<<(marshall &, unsigned int); +marshall& operator<<(marshall &, int); +marshall& operator<<(marshall &, unsigned char); +marshall& operator<<(marshall &, char); +marshall& operator<<(marshall &, unsigned short); +marshall& operator<<(marshall &, short); +marshall& operator<<(marshall &, unsigned long long); +marshall& operator<<(marshall &, const std::string &); + +class unmarshall { + private: + char *_buf; + int _sz; + int _ind; + bool _ok; + public: + unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {} + unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {} + unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false) + { + //take the content which does not exclude a RPC header from a string + take_content(s); + } + ~unmarshall() { + if (_buf) free(_buf); + } + + //take contents from another unmarshall object + void take_in(unmarshall &another); + + //take the content which does not exclude a RPC header from a string + void take_content(const std::string &s) { + _sz = s.size()+RPC_HEADER_SZ; + _buf = (char *)realloc(_buf,_sz); + VERIFY(_buf); + _ind = RPC_HEADER_SZ; + memcpy(_buf+_ind, s.data(), s.size()); + _ok = true; + } + + bool ok() { return _ok; } + char *cstr() { return _buf;} + bool okdone(); + unsigned int rawbyte(); + void rawbytes(std::string &s, unsigned int n); + + int ind() { return _ind;} + int size() { return _sz;} + void unpack(int *); //non-const ref + void take_buf(char **b, int *sz) { + *b = _buf; + *sz = _sz; + _sz = _ind = 0; + _buf = NULL; + } + + void unpack_req_header(req_header *h) { + //the first 4-byte is for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + unpack(&h->xid); + unpack(&h->proc); + unpack((int *)&h->clt_nonce); + unpack((int *)&h->srv_nonce); + unpack(&h->xid_rep); + _ind = RPC_HEADER_SZ; + } + + void unpack_reply_header(reply_header *h) { + //the first 4-byte is for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + unpack(&h->xid); + unpack(&h->ret); + _ind = RPC_HEADER_SZ; + } +}; + +unmarshall& operator>>(unmarshall &, bool &); +unmarshall& operator>>(unmarshall &, unsigned char &); +unmarshall& operator>>(unmarshall &, char &); +unmarshall& operator>>(unmarshall &, unsigned short &); +unmarshall& operator>>(unmarshall &, short &); +unmarshall& operator>>(unmarshall &, unsigned int &); +unmarshall& operator>>(unmarshall &, int &); +unmarshall& operator>>(unmarshall &, unsigned long long &); +unmarshall& operator>>(unmarshall &, std::string &); + +template marshall & +operator<<(marshall &m, std::vector v) +{ + m << (unsigned int) v.size(); + for(unsigned i = 0; i < v.size(); i++) + m << v[i]; + return m; +} + +template unmarshall & +operator>>(unmarshall &u, std::vector &v) +{ + unsigned n; + u >> n; + for(unsigned i = 0; i < n; i++){ + C z; + u >> z; + v.push_back(z); + } + return u; +} + +template marshall & +operator<<(marshall &m, const std::map &d) { + typename std::map::const_iterator i; + + m << (unsigned int) d.size(); + + for (i = d.begin(); i != d.end(); i++) { + m << i->first << i->second; + } + return m; +} + +template unmarshall & +operator>>(unmarshall &u, std::map &d) { + unsigned int n; + u >> n; + + d.clear(); + + for (unsigned int lcv = 0; lcv < n; lcv++) { + A a; + B b; + u >> a >> b; + d[a] = b; + } + return u; +} + +#endif diff --git a/rpc/method_thread.h b/rpc/method_thread.h new file mode 100644 index 0000000..bcbc08b --- /dev/null +++ b/rpc/method_thread.h @@ -0,0 +1,164 @@ +#ifndef method_thread_h +#define method_thread_h + +// method_thread(): start a thread that runs an object method. +// returns a pthread_t on success, and zero on error. + +#include +#include +#include +#include +#include "lang/verify.h" + +static pthread_t +method_thread_parent(void *(*fn)(void *), void *arg, bool detach) +{ + pthread_t th; + pthread_attr_t attr; + pthread_attr_init(&attr); + // set stack size to 100K, so we don't run out of memory + pthread_attr_setstacksize(&attr, 100*1024); + int err = pthread_create(&th, &attr, fn, arg); + pthread_attr_destroy(&attr); + if (err != 0) { + fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err)); + exit(1); + } + + if (detach) { + // don't keep thread state around after exit, to avoid + // running out of threads. set detach==false if you plan + // to pthread_join. + VERIFY(pthread_detach(th) == 0); + } + + return th; +} + +static void +method_thread_child() +{ + // defer pthread_cancel() by default. check explicitly by + // enabling then pthread_testcancel(). + int oldstate, oldtype; + VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0); + VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)()) +{ + class XXX { + public: + C *o; + void (C::*m)(); + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)() = x->m; + delete x; + method_thread_child(); + (o->*m)(); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A), A a) +{ + class XXX { + public: + C *o; + void (C::*m)(A a); + A a; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + delete x; + method_thread_child(); + (o->*m)(a); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a = a; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +namespace { + // ~xavid: this causes a bizzare compile error on OS X.5 when + // it's declared in the function, so I moved it out here. + template + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2); + A1 a1; + A2 a2; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + delete x; + method_thread_child(); + (o->*m)(a1, a2); + return 0; + } + }; +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2) +{ + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3) +{ + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2, A3 a3); + A1 a1; + A2 a2; + A3 a3; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 , A3 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + A3 a3 = x->a3; + delete x; + method_thread_child(); + (o->*m)(a1, a2, a3); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + x->a3 = a3; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +#endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc new file mode 100644 index 0000000..f73a3a5 --- /dev/null +++ b/rpc/pollmgr.cc @@ -0,0 +1,360 @@ +#include +#include +#include +#include + +#include "slock.h" +#include "jsl_log.h" +#include "method_thread.h" +#include "lang/verify.h" +#include "pollmgr.h" + +PollMgr *PollMgr::instance = NULL; +static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT; + +void +PollMgrInit() +{ + PollMgr::instance = new PollMgr(); +} + +PollMgr * +PollMgr::Instance() +{ + pthread_once(&pollmgr_is_initialized, PollMgrInit); + return instance; +} + +PollMgr::PollMgr() : pending_change_(false) +{ + bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); + aio_ = new SelectAIO(); + //aio_ = new EPollAIO(); + + VERIFY(pthread_mutex_init(&m_, NULL) == 0); + VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0); + VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0); +} + +PollMgr::~PollMgr() +{ + //never kill me!!! + VERIFY(0); +} + +void +PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) +{ + VERIFY(fd < MAX_POLL_FDS); + + ScopedLock ml(&m_); + aio_->watch_fd(fd, flag); + + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; +} + +//remove all callbacks related to fd +//the return guarantees that callbacks related to fd +//will never be called again +void +PollMgr::block_remove_fd(int fd) +{ + ScopedLock ml(&m_); + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; + VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0); + callbacks_[fd] = NULL; +} + +void +PollMgr::del_callback(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (aio_->unwatch_fd(fd, flag)) { + callbacks_[fd] = NULL; + } +} + +bool +PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) +{ + ScopedLock ml(&m_); + if (!callbacks_[fd] || callbacks_[fd]!=c) + return false; + + return aio_->is_watched(fd, flag); +} + +void +PollMgr::wait_loop() +{ + + std::vector readable; + std::vector writable; + + while (1) { + { + ScopedLock ml(&m_); + if (pending_change_) { + pending_change_ = false; + VERIFY(pthread_cond_broadcast(&changedone_c_)==0); + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(&readable,&writable); + + if (!readable.size() && !writable.size()) { + continue; + } + //no locking of m_ + //because no add_callback() and del_callback should + //modify callbacks_[fd] while the fd is not dead + for (unsigned int i = 0; i < readable.size(); i++) { + int fd = readable[i]; + if (callbacks_[fd]) + callbacks_[fd]->read_cb(fd); + } + + for (unsigned int i = 0; i < writable.size(); i++) { + int fd = writable[i]; + if (callbacks_[fd]) + callbacks_[fd]->write_cb(fd); + } + } +} + +SelectAIO::SelectAIO() : highfds_(0) +{ + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); + + VERIFY(pipe(pipefd_) == 0); + FD_SET(pipefd_[0], &rfds_); + highfds_ = pipefd_[0]; + + int flags = fcntl(pipefd_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipefd_[0], F_SETFL, flags); + + VERIFY(pthread_mutex_init(&m_, NULL) == 0); +} + +SelectAIO::~SelectAIO() +{ + VERIFY(pthread_mutex_destroy(&m_) == 0); +} + +void +SelectAIO::watch_fd(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (highfds_ <= fd) + highfds_ = fd; + + if (flag == CB_RDONLY) { + FD_SET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + FD_SET(fd,&wfds_); + }else { + FD_SET(fd,&rfds_); + FD_SET(fd,&wfds_); + } + + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); +} + +bool +SelectAIO::is_watched(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (flag == CB_RDONLY) { + return FD_ISSET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + return FD_ISSET(fd,&wfds_); + }else{ + return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); + } +} + +bool +SelectAIO::unwatch_fd(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (flag == CB_RDONLY) { + FD_CLR(fd, &rfds_); + }else if (flag == CB_WRONLY) { + FD_CLR(fd, &wfds_); + }else if (flag == CB_RDWR) { + FD_CLR(fd, &wfds_); + FD_CLR(fd, &rfds_); + }else{ + VERIFY(0); + } + + if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { + if (fd == highfds_) { + int newh = pipefd_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_)) { + newh = i; + }else if (FD_ISSET(i, &wfds_)) { + newh = i; + } + } + highfds_ = newh; + } + } + if (flag == CB_RDWR) { + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + } + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); +} + +void +SelectAIO::wait_ready(std::vector *readable, std::vector *writable) +{ + fd_set trfds, twfds; + int high; + + { + ScopedLock ml(&m_); + trfds = rfds_; + twfds = wfds_; + high = highfds_; + + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + return; + } else { + perror("select:"); + jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); + VERIFY(0); + } + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(tmp==1); + }else { + if (FD_ISSET(fd, &twfds)) { + writable->push_back(fd); + } + if (FD_ISSET(fd, &trfds)) { + readable->push_back(fd); + } + } + } +} + +#ifdef __linux__ + +EPollAIO::EPollAIO() +{ + pollfd_ = epoll_create(MAX_POLL_FDS); + VERIFY(pollfd_ >= 0); + bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); +} + +EPollAIO::~EPollAIO() +{ + close(pollfd_); +} + +static inline +int poll_flag_to_event(poll_flag flag) +{ + int f; + if (flag == CB_RDONLY) { + f = EPOLLIN; + }else if (flag == CB_WRONLY) { + f = EPOLLOUT; + }else { //flag == CB_RDWR + f = EPOLLIN | EPOLLOUT; + } + return f; +} + +void +EPollAIO::watch_fd(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (int)flag; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + } + + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); +} + +bool +EPollAIO::unwatch_fd(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(int)flag; + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(op == EPOLL_CTL_DEL); + } + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); +} + +bool +EPollAIO::is_watched(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + return ((fdstatus_[fd] & CB_MASK) == flag); +} + +void +EPollAIO::wait_ready(std::vector *readable, std::vector *writable) +{ + int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) { + readable->push_back(ready_[i].data.fd); + } + if (ready_[i].events & EPOLLOUT) { + writable->push_back(ready_[i].data.fd); + } + } +} + +#endif diff --git a/rpc/pollmgr.h b/rpc/pollmgr.h new file mode 100644 index 0000000..c0a5748 --- /dev/null +++ b/rpc/pollmgr.h @@ -0,0 +1,107 @@ +#ifndef pollmgr_h +#define pollmgr_h + +#include +#include + +#ifdef __linux__ +#include +#endif + +#define MAX_POLL_FDS 128 + +typedef enum { + CB_NONE = 0x0, + CB_RDONLY = 0x1, + CB_WRONLY = 0x10, + CB_RDWR = 0x11, + CB_MASK = ~0x11, +} poll_flag; + +class aio_mgr { + public: + virtual void watch_fd(int fd, poll_flag flag) = 0; + virtual bool unwatch_fd(int fd, poll_flag flag) = 0; + virtual bool is_watched(int fd, poll_flag flag) = 0; + virtual void wait_ready(std::vector *readable, std::vector *writable) = 0; + virtual ~aio_mgr() {} +}; + +class aio_callback { + public: + virtual void read_cb(int fd) = 0; + virtual void write_cb(int fd) = 0; + virtual ~aio_callback() {} +}; + +class PollMgr { + public: + PollMgr(); + ~PollMgr(); + + static PollMgr *Instance(); + static PollMgr *CreateInst(); + + void add_callback(int fd, poll_flag flag, aio_callback *ch); + void del_callback(int fd, poll_flag flag); + bool has_callback(int fd, poll_flag flag, aio_callback *ch); + void block_remove_fd(int fd); + void wait_loop(); + + + static PollMgr *instance; + static int useful; + static int useless; + + private: + pthread_mutex_t m_; + pthread_cond_t changedone_c_; + pthread_t th_; + + aio_callback *callbacks_[MAX_POLL_FDS]; + aio_mgr *aio_; + bool pending_change_; + +}; + +class SelectAIO : public aio_mgr { + public : + + SelectAIO(); + ~SelectAIO(); + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + bool is_watched(int fd, poll_flag flag); + void wait_ready(std::vector *readable, std::vector *writable); + + private: + + fd_set rfds_; + fd_set wfds_; + int highfds_; + int pipefd_[2]; + + pthread_mutex_t m_; + +}; + +#ifdef __linux__ +class EPollAIO : public aio_mgr { + public: + EPollAIO(); + ~EPollAIO(); + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + bool is_watched(int fd, poll_flag flag); + void wait_ready(std::vector *readable, std::vector *writable); + + private: + int pollfd_; + struct epoll_event ready_[MAX_POLL_FDS]; + int fdstatus_[MAX_POLL_FDS]; + +}; +#endif /* __linux */ + +#endif /* pollmgr_h */ + diff --git a/rpc/rpc.cc b/rpc/rpc.cc new file mode 100644 index 0000000..e18506a --- /dev/null +++ b/rpc/rpc.cc @@ -0,0 +1,1086 @@ +/* + The rpcc class handles client-side RPC. Each rpcc is bound to a + single RPC server. The jobs of rpcc include maintaining a connection to + server, sending RPC requests and waiting for responses, retransmissions, + at-most-once delivery etc. + + The rpcs class handles the server side of RPC. Each rpcs handles multiple + connections from different rpcc objects. The jobs of rpcs include accepting + connections, dispatching requests to registered RPC handlers, at-most-once + delivery etc. + + Both rpcc and rpcs use the connection class as an abstraction for the + underlying communication channel. To send an RPC request/reply, one calls + connection::send() which blocks until data is sent or the connection has failed + (thus the caller can free the buffer when send() returns). When a + request/reply is received, connection makes a callback into the corresponding + rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). + + Thread organization: + rpcc uses application threads to send RPC requests and blocks to receive the + reply or error. All connections use a single PollMgr object to perform async + socket IO. PollMgr creates a single thread to examine the readiness of socket + file descriptors and informs the corresponding connection whenever a socket is + ready to be read or written. (We use asynchronous socket IO to reduce the + number of threads needed to manage these connections; without async IO, at + least one thread is needed per connection to read data without blocking other + activities.) Each rpcs object creates one thread for listening on the server + port and a pool of threads for executing RPC requests. The + thread pool allows us to control the number of threads spawned at the server + (spawning one thread per request will hurt when the server faces thousands of + requests). + + In order to delete a connection object, we must maintain a reference count. + For rpcc, + multiple client threads might be invoking the rpcc::call() functions and thus + holding multiple references to the underlying connection object. For rpcs, + multiple dispatch threads might be holding references to the same connection + object. A connection object is deleted only when the underlying connection is + dead and the reference count reaches zero. + + The previous version of the RPC library uses pthread_cancel* routines + to implement the deletion of rpcc and rpcs objects. The idea is to cancel + all active threads that might be holding a reference to an object before + deleting that object. However, pthread_cancel is not robust and there are + always bugs where outstanding references to deleted objects persist. + This version of the RPC library does not do pthread_cancel, but explicitly + joins exited threads to make sure no outstanding references exist before + deleting objects. + + To delete a rpcc object safely, the users of the library must ensure that + there are no outstanding calls on the rpcc object. + + To delete a rpcs object safely, we do the following in sequence: 1. stop + accepting new incoming connections. 2. close existing active connections. + 3. delete the dispatch thread pool which involves waiting for current active + RPC handlers to finish. It is interesting how a thread pool can be deleted + without using thread cancellation. The trick is to inject x "poison pills" for + a thread pool of x threads. Upon getting a poison pill instead of a normal + task, a worker thread will exit (and thread pool destructor waits to join all + x exited worker threads). + */ + +#include "rpc.h" +#include "method_thread.h" +#include "slock.h" + +#include +#include +#include +#include +#include + +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +const rpcc::TO rpcc::to_max = { 120000 }; +const rpcc::TO rpcc::to_min = { 1000 }; + +rpcc::caller::caller(unsigned int xxid, unmarshall *xun) +: xid(xxid), un(xun), done(false) +{ + VERIFY(pthread_mutex_init(&m,0) == 0); + VERIFY(pthread_cond_init(&c, 0) == 0); +} + +rpcc::caller::~caller() +{ + VERIFY(pthread_mutex_destroy(&m) == 0); + VERIFY(pthread_cond_destroy(&c) == 0); +} + +inline +void set_rand_seed() +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + srandom((int)ts.tv_nsec^((int)getpid())); +} + +rpcc::rpcc(sockaddr_in d, bool retrans) : + dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), + retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) +{ + VERIFY(pthread_mutex_init(&m_, 0) == 0); + VERIFY(pthread_mutex_init(&chan_m_, 0) == 0); + VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0); + + if(retrans){ + set_rand_seed(); + clt_nonce_ = random(); + } else { + // special client nonce 0 means this client does not + // require at-most-once logic from the server + // because it uses tcp and never retries a failed connection + clt_nonce_ = 0; + } + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + // xid starts with 1 and latest received reply starts with 0 + xid_rep_window_.push_back(0); + + jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", + clt_nonce_, lossytest_); +} + +// IMPORTANT: destruction should happen only when no external threads +// are blocked inside rpcc or will use rpcc in the future +rpcc::~rpcc() +{ + jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", + clt_nonce_, chan_?chan_->channo():-1); + if(chan_){ + chan_->closeconn(); + chan_->decref(); + } + VERIFY(calls_.size() == 0); + VERIFY(pthread_mutex_destroy(&m_) == 0); + VERIFY(pthread_mutex_destroy(&chan_m_) == 0); +} + +int +rpcc::bind(TO to) +{ + int r; + int ret = call(rpc_const::bind, 0, r, to); + if(ret == 0){ + ScopedLock ml(&m_); + bind_done_ = true; + srv_nonce_ = r; + } else { + jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", + inet_ntoa(dst_.sin_addr), ret); + } + return ret; +}; + +// Cancel all outstanding calls +void +rpcc::cancel(void) +{ + ScopedLock ml(&m_); + printf("rpcc::cancel: force callers to fail\n"); + std::map::iterator iter; + for(iter = calls_.begin(); iter != calls_.end(); iter++){ + caller *ca = iter->second; + + jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); + { + ScopedLock cl(&ca->m); + ca->done = true; + ca->intret = rpc_const::cancel_failure; + VERIFY(pthread_cond_signal(&ca->c) == 0); + } + } + + while (calls_.size () > 0){ + destroy_wait_ = true; + VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0); + } + printf("rpcc::cancel: done\n"); +} + +int +rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, + TO to) +{ + + caller ca(0, &rep); + int xid_rep; + { + ScopedLock ml(&m_); + + if((proc != rpc_const::bind && !bind_done_) || + (proc == rpc_const::bind && bind_done_)){ + jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n"); + return rpc_const::bind_failure; + } + + if(destroy_wait_){ + return rpc_const::cancel_failure; + } + + ca.xid = xid_++; + calls_[ca.xid] = &ca; + + req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, + xid_rep_window_.front()); + req.pack_req_header(h); + xid_rep = xid_rep_window_.front(); + } + + TO curr_to; + struct timespec now, nextdeadline, finaldeadline; + + clock_gettime(CLOCK_REALTIME, &now); + add_timespec(now, to.to, &finaldeadline); + curr_to.to = to_min.to; + + bool transmit = true; + connection *ch = NULL; + + while (1){ + if(transmit){ + get_refconn(&ch); + if(ch){ + if(reachable_) { + request forgot; + { + ScopedLock ml(&m_); + if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) { + forgot = dup_req_; + dup_req_.clear(); + } + } + if (forgot.isvalid()) + ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); + ch->send(req.cstr(), req.size()); + } + else jsl_log(JSL_DBG_1, "not reachable\n"); + jsl_log(JSL_DBG_2, + "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", + clt_nonce_, proc, ca.xid, clt_nonce_); + } + transmit = false; // only send once on a given channel + } + + if(!finaldeadline.tv_sec) + break; + + clock_gettime(CLOCK_REALTIME, &now); + add_timespec(now, curr_to.to, &nextdeadline); + if(cmp_timespec(nextdeadline,finaldeadline) > 0){ + nextdeadline = finaldeadline; + finaldeadline.tv_sec = 0; + } + + { + ScopedLock cal(&ca.m); + while (!ca.done){ + jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); + if(pthread_cond_timedwait(&ca.c, &ca.m, + &nextdeadline) == ETIMEDOUT){ + jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); + break; + } + } + if(ca.done){ + jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n"); + break; + } + } + + if(retrans_ && (!ch || ch->isdead())){ + // since connection is dead, retransmit + // on the new connection + transmit = true; + } + curr_to.to <<= 1; + } + + { + // no locking of ca.m since only this thread changes ca.xid + ScopedLock ml(&m_); + calls_.erase(ca.xid); + // may need to update the xid again here, in case the + // packet times out before it's even sent by the channel. + // I don't think there's any harm in maybe doing it twice + update_xid_rep(ca.xid); + + if(destroy_wait_){ + VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0); + } + } + + if (ca.done && lossytest_) + { + ScopedLock ml(&m_); + if (!dup_req_.isvalid()) { + dup_req_.buf.assign(req.cstr(), req.size()); + dup_req_.xid = ca.xid; + } + if (xid_rep > xid_rep_done_) + xid_rep_done_ = xid_rep; + } + + ScopedLock cal(&ca.m); + + jsl_log(JSL_DBG_2, + "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", + clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), + ntohs(dst_.sin_port), ca.done, ca.intret); + + if(ch) + ch->decref(); + + // destruction of req automatically frees its buffer + return (ca.done? ca.intret : rpc_const::timeout_failure); +} + +void +rpcc::get_refconn(connection **ch) +{ + ScopedLock ml(&chan_m_); + if(!chan_ || chan_->isdead()){ + if(chan_) + chan_->decref(); + chan_ = connect_to_dst(dst_, this, lossytest_); + } + if(ch && chan_){ + if(*ch){ + (*ch)->decref(); + } + *ch = chan_; + (*ch)->incref(); + } +} + +// PollMgr's thread is being used to +// make this upcall from connection object to rpcc. +// this funtion must not block. +// +// this function keeps no reference for connection *c +bool +rpcc::got_pdu(connection *c, char *b, int sz) +{ + unmarshall rep(b, sz); + reply_header h; + rep.unpack_reply_header(&h); + + if(!rep.ok()){ + jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n"); + return true; + } + + ScopedLock ml(&m_); + + update_xid_rep(h.xid); + + if(calls_.find(h.xid) == calls_.end()){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid); + return true; + } + caller *ca = calls_[h.xid]; + + ScopedLock cl(&ca->m); + if(!ca->done){ + ca->un->take_in(rep); + ca->intret = h.ret; + if(ca->intret < 0){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n", + h.xid, ca->intret); + } + ca->done = 1; + } + VERIFY(pthread_cond_broadcast(&ca->c) == 0); + return true; +} + +// assumes thread holds mutex m +void +rpcc::update_xid_rep(unsigned int xid) +{ + std::list::iterator it; + + if(xid <= xid_rep_window_.front()){ + return; + } + + for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ + if(*it > xid){ + xid_rep_window_.insert(it, xid); + goto compress; + } + } + xid_rep_window_.push_back(xid); + +compress: + it = xid_rep_window_.begin(); + for (it++; it != xid_rep_window_.end(); it++){ + while (xid_rep_window_.front() + 1 == *it) + xid_rep_window_.pop_front(); + } +} + + +rpcs::rpcs(unsigned int p1, int count) + : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) +{ + VERIFY(pthread_mutex_init(&procs_m_, 0) == 0); + VERIFY(pthread_mutex_init(&count_m_, 0) == 0); + VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0); + VERIFY(pthread_mutex_init(&conss_m_, 0) == 0); + + set_rand_seed(); + nonce_ = random(); + jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + reg(rpc_const::bind, this, &rpcs::rpcbind); + dispatchpool_ = new ThrPool(6,false); + + listener_ = new tcpsconn(this, port_, lossytest_); +} + +rpcs::~rpcs() +{ + // must delete listener before dispatchpool + delete listener_; + delete dispatchpool_; + free_reply_window(); +} + +bool +rpcs::got_pdu(connection *c, char *b, int sz) +{ + if(!reachable_){ + jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n"); + return true; + } + + djob_t *j = new djob_t(c, b, sz); + c->incref(); + bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j); + if(!succ || !reachable_){ + c->decref(); + delete j; + } + return succ; +} + +void +rpcs::reg1(unsigned int proc, handler *h) +{ + ScopedLock pl(&procs_m_); + VERIFY(procs_.count(proc) == 0); + procs_[proc] = h; + VERIFY(procs_.count(proc) >= 1); +} + +void +rpcs::updatestat(unsigned int proc) +{ + ScopedLock cl(&count_m_); + counts_[proc]++; + curr_counts_--; + if(curr_counts_ == 0){ + std::map::iterator i; + printf("RPC STATS: "); + for (i = counts_.begin(); i != counts_.end(); i++){ + printf("%x:%d ", i->first, i->second); + } + printf("\n"); + + ScopedLock rwl(&reply_window_m_); + std::map >::iterator clt; + + unsigned int totalrep = 0, maxrep = 0; + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + totalrep += clt->second.size(); + if(clt->second.size() > maxrep) + maxrep = clt->second.size(); + } + jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", + (int) reply_window_.size()-1, totalrep, maxrep); + curr_counts_ = counting_; + } +} + +void +rpcs::dispatch(djob_t *j) +{ + connection *c = j->conn; + unmarshall req(j->buf, j->sz); + delete j; + + req_header h; + req.unpack_req_header(&h); + int proc = h.proc; + + if(!req.ok()){ + jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); + c->decref(); + return; + } + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n", + h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); + + marshall rep; + reply_header rh(h.xid,0); + + // is client sending to an old instance of server? + if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n", + h.srv_nonce, nonce_, h.proc); + rh.ret = rpc_const::oldsrv_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + return; + } + + handler *f; + // is RPC proc a registered procedure? + { + ScopedLock pl(&procs_m_); + if(procs_.count(proc) < 1){ + fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n", + proc); + c->decref(); + VERIFY(0); + return; + } + + f = procs_[proc]; + } + + rpcs::rpcstate_t stat; + char *b1; + int sz1; + + if(h.clt_nonce){ + // have i seen this client before? + { + ScopedLock rwl(&reply_window_m_); + // if we don't know about this clt_nonce, create a cleanup object + if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ + VERIFY (reply_window_[h.clt_nonce].size() == 0); // create + reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid + jsl_log(JSL_DBG_2, + "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", + h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1); + } + } + + // save the latest good connection to the client + { + ScopedLock rwl(&conss_m_); + if(conns_.find(h.clt_nonce) == conns_.end()){ + c->incref(); + conns_[h.clt_nonce] = c; + } else if(conns_[h.clt_nonce]->compare(c) < 0){ + conns_[h.clt_nonce]->decref(); + c->incref(); + conns_[h.clt_nonce] = c; + } + } + + stat = checkduplicate_and_update(h.clt_nonce, h.xid, + h.xid_rep, &b1, &sz1); + } else { + // this client does not require at most once logic + stat = NEW; + } + + switch (stat){ + case NEW: // new request + if(counting_){ + updatestat(proc); + } + + rh.ret = f->fn(req, rep); + if (rh.ret == rpc_const::unmarshal_args_failure) { + fprintf(stderr, "rpcs::dispatch: failed to" + " unmarshall the arguments. You are" + " probably calling RPC 0x%x with wrong" + " types of arguments.\n", proc); + VERIFY(0); + } + VERIFY(rh.ret >= 0); + + rep.pack_reply_header(rh); + rep.take_buf(&b1,&sz1); + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n", + sz1, h.xid, proc, rh.ret, h.clt_nonce); + + if(h.clt_nonce > 0){ + // only record replies for clients that require at-most-once logic + add_reply(h.clt_nonce, h.xid, b1, sz1); + } + + // get the latest connection to the client + { + ScopedLock rwl(&conss_m_); + if(c->isdead() && c != conns_[h.clt_nonce]){ + c->decref(); + c = conns_[h.clt_nonce]; + c->incref(); + } + } + + c->send(b1, sz1); + if(h.clt_nonce == 0){ + // reply is not added to at-most-once window, free it + free(b1); + } + break; + case INPROGRESS: // server is working on this request + break; + case DONE: // duplicate and we still have the response + c->send(b1, sz1); + break; + case FORGOTTEN: // very old request and we don't have the response anymore + jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", + h.xid, h.clt_nonce); + rh.ret = rpc_const::atmostonce_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + break; + } + c->decref(); +} + +// rpcs::dispatch calls this when an RPC request arrives. +// +// checks to see if an RPC with xid from clt_nonce has already been received. +// if not, remembers the request in reply_window_. +// +// deletes remembered requests with XIDs <= xid_rep; the client +// says it has received a reply for every RPC up through xid_rep. +// frees the reply_t::buf of each such request. +// +// returns one of: +// NEW: never seen this xid before. +// INPROGRESS: seen this xid, and still processing it. +// DONE: seen this xid, previous reply returned in *b and *sz. +// FORGOTTEN: might have seen this xid, but deleted previous reply. +rpcs::rpcstate_t +rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, + unsigned int xid_rep, char **b, int *sz) +{ + ScopedLock rwl(&reply_window_m_); + + std::list &l = reply_window_[clt_nonce]; + + VERIFY(l.size() > 0); + VERIFY(xid >= xid_rep); + + unsigned int past_xid_rep = l.begin()->xid; + + std::list::iterator start = l.begin(), it; + it = ++start; + + if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) { + // scan for deletion candidates + for (; it != l.end() && it->xid < xid_rep; it++) { + if (it->cb_present) + free(it->buf); + } + l.erase(start, it); + l.begin()->xid = xid_rep; + } + + if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1) + return FORGOTTEN; + + // skip non-deletion candidates + while (it != l.end() && it->xid < xid) + it++; + + // if it's in the list it must be right here + if (it != l.end() && it->xid == xid) { + if (it->cb_present) { + // return information about the remembered reply + *b = it->buf; + *sz = it->sz; + return DONE; + } else { + return INPROGRESS; + } + } else { + // remember that a new request has arrived + l.insert(it, reply_t(xid)); + return NEW; + } +} + +// rpcs::dispatch calls add_reply when it is sending a reply to an RPC, +// and passes the return value in b and sz. +// add_reply() should remember b and sz. +// free_reply_window() and checkduplicate_and_update is responsible for +// calling free(b). +void +rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, + char *b, int sz) +{ + ScopedLock rwl(&reply_window_m_); + // remember the RPC reply value + std::list &l = reply_window_[clt_nonce]; + std::list::iterator it = l.begin(); + // skip to our place in the list + for (it++; it != l.end() && it->xid < xid; it++); + // there should already be an entry, so whine if there isn't + if (it == l.end() || it->xid != xid) { + fprintf(stderr, "Could not find reply struct in add_reply"); + l.insert(it, reply_t(xid, b, sz)); + } else { + *it = reply_t(xid, b, sz); + } +} + +void +rpcs::free_reply_window(void) +{ + std::map >::iterator clt; + std::list::iterator it; + + ScopedLock rwl(&reply_window_m_); + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + for (it = clt->second.begin(); it != clt->second.end(); it++){ + if (it->cb_present) + free(it->buf); + } + clt->second.clear(); + } + reply_window_.clear(); +} + +// rpc handler +int +rpcs::rpcbind(int a, int &r) +{ + jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); + r = nonce_; + return 0; +} + +void +marshall::rawbyte(unsigned char x) +{ + if(_ind >= _capa){ + _capa *= 2; + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + _buf[_ind++] = x; +} + +void +marshall::rawbytes(const char *p, int n) +{ + if((_ind+n) > _capa){ + _capa = _capa > n? 2*_capa:(_capa+n); + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + memcpy(_buf+_ind, p, n); + _ind += n; +} + +marshall & +operator<<(marshall &m, bool x) +{ + m.rawbyte(x); + return m; +} + +marshall & +operator<<(marshall &m, unsigned char x) +{ + m.rawbyte(x); + return m; +} + +marshall & +operator<<(marshall &m, char x) +{ + m << (unsigned char) x; + return m; +} + + +marshall & +operator<<(marshall &m, unsigned short x) +{ + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; +} + +marshall & +operator<<(marshall &m, short x) +{ + m << (unsigned short) x; + return m; +} + +marshall & +operator<<(marshall &m, unsigned int x) +{ + // network order is big-endian + m.rawbyte((x >> 24) & 0xff); + m.rawbyte((x >> 16) & 0xff); + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; +} + +marshall & +operator<<(marshall &m, int x) +{ + m << (unsigned int) x; + return m; +} + +marshall & +operator<<(marshall &m, const std::string &s) +{ + m << (unsigned int) s.size(); + m.rawbytes(s.data(), s.size()); + return m; +} + +marshall & +operator<<(marshall &m, unsigned long long x) +{ + m << (unsigned int) (x >> 32); + m << (unsigned int) x; + return m; +} + +void +marshall::pack(int x) +{ + rawbyte((x >> 24) & 0xff); + rawbyte((x >> 16) & 0xff); + rawbyte((x >> 8) & 0xff); + rawbyte(x & 0xff); +} + +void +unmarshall::unpack(int *x) +{ + (*x) = (rawbyte() & 0xff) << 24; + (*x) |= (rawbyte() & 0xff) << 16; + (*x) |= (rawbyte() & 0xff) << 8; + (*x) |= rawbyte() & 0xff; +} + +// take the contents from another unmarshall object +void +unmarshall::take_in(unmarshall &another) +{ + if(_buf) + free(_buf); + another.take_buf(&_buf, &_sz); + _ind = RPC_HEADER_SZ; + _ok = _sz >= RPC_HEADER_SZ?true:false; +} + +bool +unmarshall::okdone() +{ + if(ok() && _ind == _sz){ + return true; + } else { + return false; + } +} + +unsigned int +unmarshall::rawbyte() +{ + char c = 0; + if(_ind >= _sz) + _ok = false; + else + c = _buf[_ind++]; + return c; +} + +unmarshall & +operator>>(unmarshall &u, bool &x) +{ + x = (bool) u.rawbyte() ; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned char &x) +{ + x = (unsigned char) u.rawbyte() ; + return u; +} + +unmarshall & +operator>>(unmarshall &u, char &x) +{ + x = (char) u.rawbyte(); + return u; +} + + +unmarshall & +operator>>(unmarshall &u, unsigned short &x) +{ + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, short &x) +{ + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned int &x) +{ + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, int &x) +{ + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned long long &x) +{ + unsigned int h, l; + u >> h; + u >> l; + x = l | ((unsigned long long) h << 32); + return u; +} + +unmarshall & +operator>>(unmarshall &u, std::string &s) +{ + unsigned sz; + u >> sz; + if(u.ok()) + u.rawbytes(s, sz); + return u; +} + +void +unmarshall::rawbytes(std::string &ss, unsigned int n) +{ + if((_ind+n) > (unsigned)_sz){ + _ok = false; + } else { + std::string tmps = std::string(_buf+_ind, n); + swap(ss, tmps); + VERIFY(ss.size() == n); + _ind += n; + } +} + +bool operator<(const sockaddr_in &a, const sockaddr_in &b){ + return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || + ((a.sin_addr.s_addr == b.sin_addr.s_addr) && + ((a.sin_port < b.sin_port)))); +} + +/*---------------auxilary function--------------*/ +void +make_sockaddr(const char *hostandport, struct sockaddr_in *dst){ + + char host[200]; + const char *localhost = "127.0.0.1"; + const char *port = index(hostandport, ':'); + if(port == NULL){ + memcpy(host, localhost, strlen(localhost)+1); + port = hostandport; + } else { + memcpy(host, hostandport, port-hostandport); + host[port-hostandport] = '\0'; + port++; + } + + make_sockaddr(host, port, dst); + +} + +void +make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){ + + in_addr_t a; + + bzero(dst, sizeof(*dst)); + dst->sin_family = AF_INET; + + a = inet_addr(host); + if(a != INADDR_NONE){ + dst->sin_addr.s_addr = a; + } else { + struct hostent *hp = gethostbyname(host); + if(hp == 0 || hp->h_length != 4){ + fprintf(stderr, "cannot find host name %s\n", host); + exit(1); + } + dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; + } + dst->sin_port = htons(atoi(port)); +} + +int +cmp_timespec(const struct timespec &a, const struct timespec &b) +{ + if(a.tv_sec > b.tv_sec) + return 1; + else if(a.tv_sec < b.tv_sec) + return -1; + else { + if(a.tv_nsec > b.tv_nsec) + return 1; + else if(a.tv_nsec < b.tv_nsec) + return -1; + else + return 0; + } +} + +void +add_timespec(const struct timespec &a, int b, struct timespec *result) +{ + // convert to millisec, add timeout, convert back + result->tv_sec = a.tv_sec + b/1000; + result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000; + VERIFY(result->tv_nsec >= 0); + while (result->tv_nsec > 1000000000){ + result->tv_sec++; + result->tv_nsec-=1000000000; + } +} + +int +diff_timespec(const struct timespec &end, const struct timespec &start) +{ + int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0; + VERIFY(diff || end.tv_sec == start.tv_sec); + if(end.tv_nsec > start.tv_nsec){ + diff += (end.tv_nsec-start.tv_nsec)/1000000; + } else { + diff -= (start.tv_nsec-end.tv_nsec)/1000000; + } + return diff; +} diff --git a/rpc/rpc.h b/rpc/rpc.h new file mode 100644 index 0000000..fc61236 --- /dev/null +++ b/rpc/rpc.h @@ -0,0 +1,633 @@ +#ifndef rpc_h +#define rpc_h + +#include +#include +#include +#include +#include + +#include "thr_pool.h" +#include "marshall.h" +#include "connection.h" + +#ifdef DMALLOC +#include "dmalloc.h" +#endif + +class rpc_const { + public: + static const unsigned int bind = 1; // handler number reserved for bind + static const int timeout_failure = -1; + static const int unmarshal_args_failure = -2; + static const int unmarshal_reply_failure = -3; + static const int atmostonce_failure = -4; + static const int oldsrv_failure = -5; + static const int bind_failure = -6; + static const int cancel_failure = -7; +}; + +// rpc client endpoint. +// manages a xid space per destination socket +// threaded: multiple threads can be sending RPCs, +class rpcc : public chanmgr { + + private: + + //manages per rpc info + struct caller { + caller(unsigned int xxid, unmarshall *un); + ~caller(); + + unsigned int xid; + unmarshall *un; + int intret; + bool done; + pthread_mutex_t m; + pthread_cond_t c; + }; + + void get_refconn(connection **ch); + void update_xid_rep(unsigned int xid); + + + sockaddr_in dst_; + unsigned int clt_nonce_; + unsigned int srv_nonce_; + bool bind_done_; + unsigned int xid_; + int lossytest_; + bool retrans_; + bool reachable_; + + connection *chan_; + + pthread_mutex_t m_; // protect insert/delete to calls[] + pthread_mutex_t chan_m_; + + bool destroy_wait_; + pthread_cond_t destroy_wait_c_; + + std::map calls_; + std::list xid_rep_window_; + + struct request { + request() { clear(); } + void clear() { buf.clear(); xid = -1; } + bool isvalid() { return xid != -1; } + std::string buf; + int xid; + }; + struct request dup_req_; + int xid_rep_done_; + public: + + rpcc(sockaddr_in d, bool retrans=true); + ~rpcc(); + + struct TO { + int to; + }; + static const TO to_max; + static const TO to_min; + static TO to(int x) { TO t; t.to = x; return t;} + + unsigned int id() { return clt_nonce_; } + + int bind(TO to = to_max); + + void set_reachable(bool r) { reachable_ = r; } + + void cancel(); + + int islossy() { return lossytest_ > 0; } + + int call1(unsigned int proc, + marshall &req, unmarshall &rep, TO to); + + bool got_pdu(connection *c, char *b, int sz); + + + template + int call_m(unsigned int proc, marshall &req, R & r, TO to); + + template + int call(unsigned int proc, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r, + TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, const A6 & a6, + R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7, + R & r, TO to = to_max); + +}; + +template int +rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) +{ + unmarshall u; + int intret = call1(proc, req, u, to); + if (intret < 0) return intret; + u >> r; + if(u.okdone() != true) { + fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; +} + +template int +rpcc::call(unsigned int proc, R & r, TO to) +{ + marshall m; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to) +{ + marshall m; + m << a1; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, + const A6 & a6, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + m << a6; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, + const A6 & a6, const A7 & a7, + R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + m << a6; + m << a7; + return call_m(proc, m, r, to); +} + +bool operator<(const sockaddr_in &a, const sockaddr_in &b); + +class handler { + public: + handler() { } + virtual ~handler() { } + virtual int fn(unmarshall &, marshall &) = 0; +}; + + +// rpc server endpoint. +class rpcs : public chanmgr { + + typedef enum { + NEW, // new RPC, not a duplicate + INPROGRESS, // duplicate of an RPC we're still processing + DONE, // duplicate of an RPC we already replied to (have reply) + FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten + } rpcstate_t; + + private: + + // state about an in-progress or completed RPC, for at-most-once. + // if cb_present is true, then the RPC is complete and a reply + // has been sent; in that case buf points to a copy of the reply, + // and sz holds the size of the reply. + struct reply_t { + reply_t (unsigned int _xid) { + xid = _xid; + cb_present = false; + buf = NULL; + sz = 0; + } + reply_t (unsigned int _xid, char *_buf, int _sz) { + xid = _xid; + cb_present = true; + buf = _buf; + sz = _sz; + } + unsigned int xid; + bool cb_present; // whether the reply buffer is valid + char *buf; // the reply buffer + int sz; // the size of reply buffer + }; + + int port_; + unsigned int nonce_; + + // provide at most once semantics by maintaining a window of replies + // per client that that client hasn't acknowledged receiving yet. + // indexed by client nonce. + std::map > reply_window_; + + void free_reply_window(void); + void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); + + rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, + unsigned int xid, unsigned int rep_xid, + char **b, int *sz); + + void updatestat(unsigned int proc); + + // latest connection to the client + std::map conns_; + + // counting + const int counting_; + int curr_counts_; + std::map counts_; + + int lossytest_; + bool reachable_; + + // map proc # to function + std::map procs_; + + pthread_mutex_t procs_m_; // protect insert/delete to procs[] + pthread_mutex_t count_m_; //protect modification of counts + pthread_mutex_t reply_window_m_; // protect reply window et al + pthread_mutex_t conss_m_; // protect conns_ + + + protected: + + struct djob_t { + djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} + char *buf; + int sz; + connection *conn; + }; + void dispatch(djob_t *); + + // internal handler registration + void reg1(unsigned int proc, handler *); + + ThrPool* dispatchpool_; + tcpsconn* listener_; + + public: + rpcs(unsigned int port, int counts=0); + ~rpcs(); + inline int port() { return listener_->port(); } + //RPC handler for clients binding + int rpcbind(int a, int &r); + + void set_reachable(bool r) { reachable_ = r; } + + bool got_pdu(connection *c, char *b, int sz); + + // register a handler + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, + R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + const A6, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + const A6, const A7, + R & r)); +}; + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + R r; + args >> a1; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + R r; + args >> a1; + args >> a2; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + R r; + args >> a1; + args >> a2; + args >> a3; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, const A6 a6, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, const A6 a6, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, const A6 a6, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + A6 a6; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + args >> a6; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, const A6 a6, + const A7 a7, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, const A6 a6, const A7 a7, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, const A6 a6, + const A7 a7, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + A6 a6; + A7 a7; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + args >> a6; + args >> a7; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + + +void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); +void make_sockaddr(const char *host, const char *port, + struct sockaddr_in *dst); + +int cmp_timespec(const struct timespec &a, const struct timespec &b); +void add_timespec(const struct timespec &a, int b, struct timespec *result); +int diff_timespec(const struct timespec &a, const struct timespec &b); + +#endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc new file mode 100644 index 0000000..74c61d1 --- /dev/null +++ b/rpc/rpctest.cc @@ -0,0 +1,479 @@ +// RPC test and pseudo-documentation. +// generates print statements on failures, but eventually says "rpctest OK" + +#include "rpc.h" +#include +#include +#include +#include +#include +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +#define NUM_CL 2 + +rpcs *server; // server rpc object +rpcc *clients[NUM_CL]; // client rpc object +struct sockaddr_in dst; //server's ip address +int port; +pthread_attr_t attr; + +// server-side handlers. they must be methods of some class +// to simplify rpcs::reg(). a server process can have handlers +// from multiple classes. +class srv { + public: + int handle_22(const std::string a, const std::string b, std::string & r); + int handle_fast(const int a, int &r); + int handle_slow(const int a, int &r); + int handle_bigrep(const int a, std::string &r); +}; + +// a handler. a and b are arguments, r is the result. +// there can be multiple arguments but only one result. +// the caller also gets to see the int return value +// as the return value from rpcc::call(). +// rpcs::reg() decides how to unmarshall by looking +// at these argument types, so this function definition +// does what a .x file does in SunRPC. +int +srv::handle_22(const std::string a, std::string b, std::string &r) +{ + r = a + b; + return 0; +} + +int +srv::handle_fast(const int a, int &r) +{ + r = a + 1; + return 0; +} + +int +srv::handle_slow(const int a, int &r) +{ + usleep(random() % 5000); + r = a + 2; + return 0; +} + +int +srv::handle_bigrep(const int len, std::string &r) +{ + r = std::string(len, 'x'); + return 0; +} + +srv service; + +void startserver() +{ + server = new rpcs(port); + server->reg(22, &service, &srv::handle_22); + server->reg(23, &service, &srv::handle_fast); + server->reg(24, &service, &srv::handle_slow); + server->reg(25, &service, &srv::handle_bigrep); +} + +void +testmarshall() +{ + marshall m; + req_header rh(1,2,3,4,5); + m.pack_req_header(rh); + VERIFY(m.size()==RPC_HEADER_SZ); + int i = 12345; + unsigned long long l = 1223344455L; + std::string s = std::string("hallo...."); + m << i; + m << l; + m << s; + + char *b; + int sz; + m.take_buf(&b,&sz); + VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); + + unmarshall un(b,sz); + req_header rh1; + un.unpack_req_header(&rh1); + VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); + int i1; + unsigned long long l1; + std::string s1; + un >> i1; + un >> l1; + un >> s1; + VERIFY(un.okdone()); + VERIFY(i1==i && l1==l && s1==s); +} + +void * +client1(void *xx) +{ + + // test concurrency. + int which_cl = ((unsigned long) xx ) % NUM_CL; + + for(int i = 0; i < 100; i++){ + int arg = (random() % 2000); + std::string rep; + int ret = clients[which_cl]->call(25, arg, rep); + VERIFY(ret == 0); + if ((int)rep.size()!=arg) { + printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); + } + VERIFY((int)rep.size() == arg); + } + + // test rpc replies coming back not in the order of + // the original calls -- i.e. does xid reply dispatch work. + for(int i = 0; i < 100; i++){ + int which = (random() % 2); + int arg = (random() % 1000); + int rep; + + struct timespec start,end; + clock_gettime(CLOCK_REALTIME, &start); + + int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep); + clock_gettime(CLOCK_REALTIME, &end); + int diff = diff_timespec(end, start); + if (ret != 0) + printf("%d ms have elapsed!!!\n", diff); + VERIFY(ret == 0); + VERIFY(rep == (which ? arg+1 : arg+2)); + } + + return 0; +} + +void * +client2(void *xx) +{ + int which_cl = ((unsigned long) xx ) % NUM_CL; + + time_t t1; + time(&t1); + + while(time(0) - t1 < 10){ + int arg = (random() % 2000); + std::string rep; + int ret = clients[which_cl]->call(25, arg, rep); + if ((int)rep.size()!=arg) { + printf("ask for %d reply got %d ret %d\n", + arg, (int)rep.size(), ret); + } + VERIFY((int)rep.size() == arg); + } + return 0; +} + +void * +client3(void *xx) +{ + rpcc *c = (rpcc *) xx; + + for(int i = 0; i < 4; i++){ + int rep; + int ret = c->call(24, i, rep, rpcc::to(3000)); + VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + } + return 0; +} + + +void +simple_tests(rpcc *c) +{ + printf("simple_tests\n"); + // an RPC call to procedure #22. + // rpcc::call() looks at the argument types to decide how + // to marshall the RPC call packet, and how to unmarshall + // the reply packet. + std::string rep; + int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == 0); // this is what handle_22 returns + VERIFY(rep == "hello goodbye"); + printf(" -- string concat RPC .. ok\n"); + + // small request, big reply (perhaps req via UDP, reply via TCP) + intret = c->call(25, 70000, rep, rpcc::to(200000)); + VERIFY(intret == 0); + VERIFY(rep.size() == 70000); + printf(" -- small request, big reply .. ok\n"); + +#if 0 + // too few arguments + intret = c->call(22, (std::string)"just one", rep); + VERIFY(intret < 0); + printf(" -- too few arguments .. failed ok\n"); + + // too many arguments; proc #23 expects just one. + intret = c->call(23, 1001, 1002, rep); + VERIFY(intret < 0); + printf(" -- too many arguments .. failed ok\n"); + + // wrong return value size + int wrongrep; + intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep); + VERIFY(intret < 0); + printf(" -- wrong ret value size .. failed ok\n"); +#endif + + // specify a timeout value to an RPC that should succeed (udp) + int xx = 0; + intret = c->call(23, 77, xx, rpcc::to(3000)); + VERIFY(intret == 0 && xx == 78); + printf(" -- no suprious timeout .. ok\n"); + + // specify a timeout value to an RPC that should succeed (tcp) + { + std::string arg(1000, 'x'); + std::string rep; + c->call(22, arg, (std::string)"x", rep, rpcc::to(3000)); + VERIFY(rep.size() == 1001); + printf(" -- no suprious timeout .. ok\n"); + } + + // huge RPC + std::string big(1000000, 'x'); + intret = c->call(22, big, (std::string)"z", rep); + VERIFY(rep.size() == 1000001); + printf(" -- huge 1M rpc request .. ok\n"); + + // specify a timeout value to an RPC that should timeout (udp) + struct sockaddr_in non_existent; + memset(&non_existent, 0, sizeof(non_existent)); + non_existent.sin_family = AF_INET; + non_existent.sin_addr.s_addr = inet_addr("127.0.0.1"); + non_existent.sin_port = htons(7661); + rpcc *c1 = new rpcc(non_existent); + time_t t0 = time(0); + intret = c1->bind(rpcc::to(3000)); + time_t t1 = time(0); + VERIFY(intret < 0 && (t1 - t0) <= 4); + printf(" -- rpc timeout .. ok\n"); + printf("simple_tests OK\n"); +} + +void +concurrent_test(int nt) +{ + // create threads that make lots of calls in parallel, + // to test thread synchronization for concurrent calls + // and dispatches. + int ret; + + printf("start concurrent_test (%d threads) ...", nt); + + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf(" OK\n"); +} + +void +lossy_test() +{ + int ret; + + printf("start lossy_test ..."); + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + + if (server) { + delete server; + startserver(); + } + + for (int i = 0; i < NUM_CL; i++) { + delete clients[i]; + clients[i] = new rpcc(dst); + VERIFY(clients[i]->bind()==0); + } + + int nt = 1; + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i); + VERIFY(ret == 0); + } + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf(".. OK\n"); + VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); +} + +void +failure_test() +{ + rpcc *client1; + rpcc *client = clients[0]; + + printf("failure_test\n"); + + delete server; + + client1 = new rpcc(dst); + VERIFY (client1->bind(rpcc::to(3000)) < 0); + printf(" -- create new client and try to bind to failed server .. failed ok\n"); + + delete client1; + + startserver(); + + std::string rep; + int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == rpc_const::oldsrv_failure); + printf(" -- call recovered server with old client .. failed ok\n"); + + delete client; + + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + VERIFY (client->bind() < 0); + + intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == 0); + VERIFY(rep == "hello goodbye"); + + printf(" -- delete existing rpc client, create replacement rpc client .. ok\n"); + + + int nt = 10; + int ret; + printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); + + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client3, (void *) client); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf("ok\n"); + + delete server; + delete client; + + startserver(); + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + printf(" -- delete existing rpc client and server, create replacements.. ok\n"); + + printf(" -- concurrent test on new client and server w/ %d threads ..", nt); + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client3, (void *)client); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf("ok\n"); + + printf("failure_test OK\n"); +} + +int +main(int argc, char *argv[]) +{ + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + int debug_level = 0; + + bool isclient = false; + bool isserver = false; + + srandom(getpid()); + port = 20000 + (getpid() % 10000); + + char ch = 0; + while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { + switch (ch) { + case 'c': + isclient = true; + break; + case 's': + isserver = true; + break; + case 'd': + debug_level = atoi(optarg); + break; + case 'p': + port = atoi(optarg); + break; + case 'l': + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + default: + break; + } + } + + if (!isserver && !isclient) { + isserver = isclient = true; + } + + if (debug_level > 0) { + //__loginit.initNow(); + jsl_set_debug(debug_level); + jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); + } + + testmarshall(); + + pthread_attr_init(&attr); + // set stack size to 32K, so we don't run out of memory + pthread_attr_setstacksize(&attr, 32*1024); + + if (isserver) { + printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ); + startserver(); + } + + if (isclient) { + // server's address. + memset(&dst, 0, sizeof(dst)); + dst.sin_family = AF_INET; + dst.sin_addr.s_addr = inet_addr("127.0.0.1"); + dst.sin_port = htons(port); + + + // start the client. bind it to the server. + // starts a thread to listen for replies and hand them to + // the correct waiting caller thread. there should probably + // be only one rpcc per process. you probably need one + // rpcc per server. + for (int i = 0; i < NUM_CL; i++) { + clients[i] = new rpcc(dst); + VERIFY (clients[i]->bind() == 0); + } + + simple_tests(clients[0]); + concurrent_test(10); + lossy_test(); + if (isserver) { + failure_test(); + } + + printf("rpctest OK\n"); + + exit(0); + } + + while (1) { + sleep(1); + } +} diff --git a/rpc/slock.h b/rpc/slock.h new file mode 100644 index 0000000..7f419c4 --- /dev/null +++ b/rpc/slock.h @@ -0,0 +1,28 @@ +#ifndef __SCOPED_LOCK__ +#define __SCOPED_LOCK__ + +#include +#include "lang/verify.h" +struct ScopedLock { + private: + pthread_mutex_t *m_; + public: + ScopedLock(pthread_mutex_t *m): m_(m) { + VERIFY(pthread_mutex_lock(m_)==0); + } + ~ScopedLock() { + VERIFY(pthread_mutex_unlock(m_)==0); + } +}; +struct ScopedUnlock { + private: + pthread_mutex_t *m_; + public: + ScopedUnlock(pthread_mutex_t *m): m_(m) { + VERIFY(pthread_mutex_unlock(m_)==0); + } + ~ScopedUnlock() { + VERIFY(pthread_mutex_lock(m_)==0); + } +}; +#endif /*__SCOPED_LOCK__*/ diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc new file mode 100644 index 0000000..f9f32fa --- /dev/null +++ b/rpc/thr_pool.cc @@ -0,0 +1,69 @@ +#include "slock.h" +#include "thr_pool.h" +#include +#include +#include "lang/verify.h" + +static void * +do_worker(void *arg) +{ + ThrPool *tp = (ThrPool *)arg; + while (1) { + ThrPool::job_t j; + if (!tp->takeJob(&j)) + break; //die + + (void)(j.f)(j.a); + } + pthread_exit(NULL); +} + +//if blocking, then addJob() blocks when queue is full +//otherwise, addJob() simply returns false when queue is full +ThrPool::ThrPool(int sz, bool blocking) +: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) +{ + pthread_attr_init(&attr_); + pthread_attr_setstacksize(&attr_, 128<<10); + + for (int i = 0; i < sz; i++) { + pthread_t t; + VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0); + th_.push_back(t); + } +} + +//IMPORTANT: this function can be called only when no external thread +//will ever use this thread pool again or is currently blocking on it +ThrPool::~ThrPool() +{ + for (int i = 0; i < nthreads_; i++) { + job_t j; + j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit + jobq_.enq(j); + } + + for (int i = 0; i < nthreads_; i++) { + VERIFY(pthread_join(th_[i], NULL)==0); + } + + VERIFY(pthread_attr_destroy(&attr_)==0); +} + +bool +ThrPool::addJob(void *(*f)(void *), void *a) +{ + job_t j; + j.f = f; + j.a = a; + + return jobq_.enq(j,blockadd_); +} + +bool +ThrPool::takeJob(job_t *j) +{ + jobq_.deq(j); + return (j->f!=NULL); +} + diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h new file mode 100644 index 0000000..5095961 --- /dev/null +++ b/rpc/thr_pool.h @@ -0,0 +1,66 @@ +#ifndef __THR_POOL__ +#define __THR_POOL__ + +#include +#include + +#include "fifo.h" + +class ThrPool { + + + public: + struct job_t { + void *(*f)(void *); //function point + void *a; //function arguments + }; + + ThrPool(int sz, bool blocking=true); + ~ThrPool(); + template bool addObjJob(C *o, void (C::*m)(A), A a); + void waitDone(); + + bool takeJob(job_t *j); + + private: + pthread_attr_t attr_; + int nthreads_; + bool blockadd_; + + + fifo jobq_; + std::vector th_; + + bool addJob(void *(*f)(void *), void *a); +}; + + template bool +ThrPool::addObjJob(C *o, void (C::*m)(A), A a) +{ + + class objfunc_wrapper { + public: + C *o; + void (C::*m)(A a); + A a; + static void *func(void *vvv) { + objfunc_wrapper *x = (objfunc_wrapper*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + (o->*m)(a); + delete x; + return 0; + } + }; + + objfunc_wrapper *x = new objfunc_wrapper; + x->o = o; + x->m = m; + x->a = a; + return addJob(&objfunc_wrapper::func, (void *)x); +} + + +#endif + diff --git a/rsm.cc b/rsm.cc new file mode 100644 index 0000000..7664aa1 --- /dev/null +++ b/rsm.cc @@ -0,0 +1,610 @@ +// +// Replicated state machine implementation with a primary and several +// backups. The primary receives requests, assigns each a view stamp (a +// vid, and a sequence number) in the order of reception, and forwards +// them to all backups. A backup executes requests in the order that +// the primary stamps them and replies with an OK to the primary. The +// primary executes the request after it receives OKs from all backups, +// and sends the reply back to the client. +// +// The config module will tell the RSM about a new view. If the +// primary in the previous view is a member of the new view, then it +// will stay the primary. Otherwise, the smallest numbered node of +// the previous view will be the new primary. In either case, the new +// primary will be a node from the previous view. The configuration +// module constructs the sequence of views for the RSM and the RSM +// ensures there will be always one primary, who was a member of the +// last view. +// +// When a new node starts, the recovery thread is in charge of joining +// the RSM. It will collect the internal RSM state from the primary; +// the primary asks the config module to add the new node and returns +// to the joining the internal RSM state (e.g., paxos log). Since +// there is only one primary, all joins happen in well-defined total +// order. +// +// The recovery thread also runs during a view change (e.g, when a node +// has failed). After a failure some of the backups could have +// processed a request that the primary has not, but those results are +// not visible to clients (since the primary responds). If the +// primary of the previous view is in the current view, then it will +// be the primary and its state is authoritive: the backups download +// from the primary the current state. A primary waits until all +// backups have downloaded the state. Once the RSM is in sync, the +// primary accepts requests again from clients. If one of the backups +// is the new primary, then its state is authoritative. In either +// scenario, the next view uses a node as primary that has the state +// resulting from processing all acknowledged client requests. +// Therefore, if the nodes sync up before processing the next request, +// the next view will have the correct state. +// +// While the RSM in a view change (i.e., a node has failed, a new view +// has been formed, but the sync hasn't completed), another failure +// could happen, which complicates a view change. During syncing the +// primary or backups can timeout, and initiate another Paxos round. +// There are 2 variables that RSM uses to keep track in what state it +// is: +// - inviewchange: a node has failed and the RSM is performing a view change +// - insync: this node is syncing its state +// +// If inviewchange is false and a node is the primary, then it can +// process client requests. If it is true, clients are told to retry +// later again. While inviewchange is true, the RSM may go through several +// member list changes, one by one. After a member list +// change completes, the nodes tries to sync. If the sync complets, +// the view change completes (and inviewchange is set to false). If +// the sync fails, the node may start another member list change +// (inviewchange = true and insync = false). +// +// The implementation should be used only with servers that run all +// requests run to completion; in particular, a request shouldn't +// block. If a request blocks, the backup won't respond to the +// primary, and the primary won't execute the request. A request may +// send an RPC to another host, but the RPC should be a one-way +// message to that host; the backup shouldn't do anything based on the +// response or execute after the response, because it is not +// guaranteed that all backup will receive the same response and +// execute in the same order. +// +// The implementation can be viewed as a layered system: +// RSM module ---- in charge of replication +// config module ---- in charge of view management +// Paxos module ---- in charge of running Paxos to agree on a value +// +// Each module has threads and internal locks. Furthermore, a thread +// may call down through the layers (e.g., to run Paxos's proposer). +// When Paxos's acceptor accepts a new value for an instance, a thread +// will invoke an upcall to inform higher layers of the new value. +// The rule is that a module releases its internal locks before it +// upcalls, but can keep its locks when calling down. + +#include +#include + +#include "handle.h" +#include "rsm.h" +#include "tprintf.h" +#include "lang/verify.h" +#include "rsm_client.h" + +static void *recoverythread(void *x) { + rsm *r = (rsm *) x; + r->recovery(); + return 0; +} + +rsm::rsm(std::string _first, std::string _me) : + stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), + partitioned (false), dopartition(false), break1(false), break2(false) +{ + pthread_t th; + + last_myvs.vid = 0; + last_myvs.seqno = 0; + myvs = last_myvs; + myvs.seqno = 1; + + cfg = new config(_first, _me, this); + + if (_first == _me) { + // Commit the first view here. We can not have acceptor::acceptor + // do the commit, since at that time this->cfg is not initialized + commit_change(1); + } + rsmrpc = cfg->get_rpcs(); + rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke); + rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members); + rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke); + rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq); + rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq); + rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq); + + // tester must be on different port, otherwise it may partition itself + testsvr = new rpcs(atoi(_me.c_str()) + 1); + testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq); + testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq); + + { + ScopedLock ml(rsm_mutex); + VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0); + } +} + +void rsm::reg1(int proc, handler *h) { + ScopedLock ml(rsm_mutex); + procs[proc] = h; +} + +// The recovery thread runs this function +void rsm::recovery() { + bool r = true; + ScopedLock ml(rsm_mutex); + + while (1) { + while (!cfg->ismember(cfg->myaddr(), vid_commit)) { + if (join(primary)) { + tprintf("recovery: joined\n"); + commit_change_wo(cfg->vid()); + } else { + ScopedUnlock su(rsm_mutex); + sleep (30); // XXX make another node in cfg primary? + } + } + vid_insync = vid_commit; + tprintf("recovery: sync vid_insync %d\n", vid_insync); + if (primary == cfg->myaddr()) { + r = sync_with_backups(); + } else { + r = sync_with_primary(); + } + tprintf("recovery: sync done\n"); + + // If there was a commited viewchange during the synchronization, restart + // the recovery + if (vid_insync != vid_commit) + continue; + + if (r) { + myvs.vid = vid_commit; + myvs.seqno = 1; + inviewchange = false; + } + tprintf("recovery: go to sleep %d %d\n", insync, inviewchange); + recovery_cond.wait(rsm_mutex); + } +} + +template +std::ostream & operator<<(std::ostream &o, const std::vector &d) { + o << "["; + for (typename std::vector::const_iterator i=d.begin(); i!=d.end(); i++) { + o << *i; + if (i+1 != d.end()) + o << ", "; + } + o << "]"; + return o; +} + +bool rsm::sync_with_backups() { + { + ScopedUnlock su(rsm_mutex); + // Make sure that the state of lock_server_cache_rsm is stable during + // synchronization; otherwise, the primary's state may be more recent + // than replicas after the synchronization. + ScopedLock ml(invoke_mutex); + // By acquiring and releasing the invoke_mutex once, we make sure that + // the state of lock_server_cache_rsm will not be changed until all + // replicas are synchronized. The reason is that client_invoke arrives + // after this point of time will see inviewchange == true, and returns + // BUSY. + } + // Start accepting synchronization request (statetransferreq) now! + insync = true; + backups = std::vector(cfg->get_view(vid_insync)); + backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); + LOG("rsm::sync_with_backups " << backups); + sync_cond.wait(rsm_mutex); + insync = false; + return true; +} + + +bool rsm::sync_with_primary() { + // Remember the primary of vid_insync + std::string m = primary; + while (vid_insync == vid_commit) { + if (statetransfer(m)) + break; + } + return statetransferdone(m); +} + + +/** + * Call to transfer state from m to the local node. + * Assumes that rsm_mutex is already held. + */ +bool rsm::statetransfer(std::string m) +{ + rsm_protocol::transferres r; + handle h(m); + int ret; + tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n", + m.c_str(), last_myvs.vid, last_myvs.seqno); + rpcc *cl; + { + ScopedUnlock su(rsm_mutex); + cl = h.safebind(); + if (cl) { + ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(), + last_myvs, vid_insync, r, rpcc::to(1000)); + } + } + if (cl == 0 || ret != rsm_protocol::OK) { + tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(), + (long unsigned) cl, ret); + return false; + } + if (stf && last_myvs != r.last) { + stf->unmarshal_state(r.state); + } + last_myvs = r.last; + tprintf("rsm::statetransfer transfer from %s success, vs(%d,%d)\n", + m.c_str(), last_myvs.vid, last_myvs.seqno); + return true; +} + +bool rsm::statetransferdone(std::string m) { + ScopedUnlock su(rsm_mutex); + handle h(m); + rpcc *cl = h.safebind(); + if (!cl) + return false; + int r; + rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); + if (ret != rsm_protocol::OK) + return false; + return true; +} + + +bool rsm::join(std::string m) { + handle h(m); + int ret; + rsm_protocol::joinres r; + + tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid, + last_myvs.seqno); + rpcc *cl; + { + ScopedUnlock su(rsm_mutex); + cl = h.safebind(); + if (cl != 0) { + ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs, + r, rpcc::to(120000)); + } + } + + if (cl == 0 || ret != rsm_protocol::OK) { + tprintf("rsm::join: couldn't reach %s %p %d\n", m.c_str(), + cl, ret); + return false; + } + tprintf("rsm::join: succeeded %s\n", r.log.c_str()); + cfg->restore(r.log); + return true; +} + +/* + * Config informs rsm whenever it has successfully + * completed a view change + */ +void rsm::commit_change(unsigned vid) { + ScopedLock ml(rsm_mutex); + commit_change_wo(vid); + if (cfg->ismember(cfg->myaddr(), vid_commit)) + breakpoint2(); +} + +void rsm::commit_change_wo(unsigned vid) { + if (vid <= vid_commit) + return; + tprintf("commit_change: new view (%d) last vs (%d,%d) %s insync %d\n", + vid, last_myvs.vid, last_myvs.seqno, primary.c_str(), insync); + vid_commit = vid; + inviewchange = true; + set_primary(vid); + recovery_cond.signal(); + sync_cond.signal(); + if (cfg->ismember(cfg->myaddr(), vid_commit)) + breakpoint2(); +} + + +void rsm::execute(int procno, std::string req, std::string &r) { + tprintf("execute\n"); + handler *h = procs[procno]; + VERIFY(h); + unmarshall args(req); + marshall rep; + std::string reps; + rsm_protocol::status ret = h->fn(args, rep); + marshall rep1; + rep1 << ret; + rep1 << rep.str(); + r = rep1.str(); +} + +// +// Clients call client_invoke to invoke a procedure on the replicated state +// machine: the primary receives the request, assigns it a sequence +// number, and invokes it on all members of the replicated state +// machine. +// +rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) { + LOG("rsm::client_invoke: procno 0x" << std::hex << procno); + ScopedLock ml(invoke_mutex); + std::vector m; + std::string myaddr; + viewstamp vs; + { + ScopedLock ml(rsm_mutex); + LOG("Checking for inviewchange"); + if (inviewchange) + return rsm_client_protocol::BUSY; + LOG("Checking for primacy"); + myaddr = cfg->myaddr(); + if (primary != myaddr) + return rsm_client_protocol::NOTPRIMARY; + LOG("Assigning a viewstamp"); + m = cfg->get_view(vid_commit); + // assign the RPC the next viewstamp number + vs = myvs; + myvs++; + } + + // send an invoke RPC to all slaves in the current view with a timeout of 1 second + LOG("Invoking slaves"); + for (unsigned i = 0; i < m.size(); i++) { + if (m[i] != myaddr) { + // if invoke on slave fails, return rsm_client_protocol::BUSY + handle h(m[i]); + LOG("Sending invoke to " << m[i]); + rpcc *cl = h.safebind(); + if (!cl) + return rsm_client_protocol::BUSY; + rsm_protocol::status ret; + int r; + ret = cl->call(rsm_protocol::invoke, procno, vs, req, r, rpcc::to(1000)); + LOG("Invoke returned " << ret); + if (ret != rsm_protocol::OK) + return rsm_client_protocol::BUSY; + breakpoint1(); + partition1(); + } + } + execute(procno, req, r); + last_myvs = vs; + return rsm_client_protocol::OK; +} + +// +// The primary calls the internal invoke at each member of the +// replicated state machine +// +// the replica must execute requests in order (with no gaps) +// according to requests' seqno + +rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) { + LOG("rsm::invoke: procno 0x" << std::hex << proc); + ScopedLock ml(invoke_mutex); + std::vector m; + std::string myaddr; + { + ScopedLock ml(rsm_mutex); + // check if !inviewchange + LOG("Checking for view change"); + if (inviewchange) + return rsm_protocol::ERR; + // check if slave + LOG("Checking for slave status"); + myaddr = cfg->myaddr(); + if (primary == myaddr) + return rsm_protocol::ERR; + m = cfg->get_view(vid_commit); + if (find(m.begin(), m.end(), myaddr) == m.end()) + return rsm_protocol::ERR; + // check sequence number + LOG("Checking sequence number"); + if (vs != myvs) + return rsm_protocol::ERR; + myvs++; + } + std::string r; + execute(proc, req, r); + last_myvs = vs; + breakpoint1(); + return rsm_protocol::OK; +} + +/** + * RPC handler: Send back the local node's state to the caller + */ +rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid, + rsm_protocol::transferres &r) { + ScopedLock ml(rsm_mutex); + int ret = rsm_protocol::OK; + tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), + last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); + if (!insync || vid != vid_insync) { + return rsm_protocol::BUSY; + } + if (stf && last != last_myvs) + r.state = stf->marshal_state(); + r.last = last_myvs; + return ret; +} + +/** + * RPC handler: Inform the local node (the primary) that node m has synchronized + * for view vid + */ +rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { + ScopedLock ml(rsm_mutex); + if (!insync || vid != vid_insync) + return rsm_protocol::BUSY; + backups.erase(find(backups.begin(), backups.end(), m)); + if (backups.empty()) + sync_cond.signal(); + return rsm_protocol::OK; +} + +// a node that wants to join an RSM as a server sends a +// joinreq to the RSM's current primary; this is the +// handler for that RPC. +rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) { + int ret = rsm_protocol::OK; + + ScopedLock ml(rsm_mutex); + tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(), + last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); + if (cfg->ismember(m, vid_commit)) { + tprintf("joinreq: is still a member\n"); + r.log = cfg->dump(); + } else if (cfg->myaddr() != primary) { + tprintf("joinreq: busy\n"); + ret = rsm_protocol::BUSY; + } else { + // We cache vid_commit to avoid adding m to a view which already contains + // m due to race condition + unsigned vid_cache = vid_commit; + bool succ; + { + ScopedUnlock su(rsm_mutex); + succ = cfg->add(m, vid_cache); + } + if (cfg->ismember(m, cfg->vid())) { + r.log = cfg->dump(); + tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str()); + } else { + tprintf("joinreq: failed; proposer couldn't add %d\n", succ); + ret = rsm_protocol::BUSY; + } + } + return ret; +} + +/* + * RPC handler: Send back all the nodes this local knows about to client + * so the client can switch to a different primary + * when it existing primary fails + */ +rsm_client_protocol::status rsm::client_members(int i, std::vector &r) { + std::vector m; + ScopedLock ml(rsm_mutex); + m = cfg->get_view(vid_commit); + m.push_back(primary); + r = m; + tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(), + primary.c_str()); + return rsm_client_protocol::OK; +} + +// if primary is member of new view, that node is primary +// otherwise, the lowest number node of the previous view. +// caller should hold rsm_mutex +void rsm::set_primary(unsigned vid) { + std::vector c = cfg->get_view(vid); + std::vector p = cfg->get_view(vid - 1); + VERIFY (c.size() > 0); + + if (isamember(primary,c)) { + tprintf("set_primary: primary stays %s\n", primary.c_str()); + return; + } + + VERIFY(p.size() > 0); + for (unsigned i = 0; i < p.size(); i++) { + if (isamember(p[i], c)) { + primary = p[i]; + tprintf("set_primary: primary is %s\n", primary.c_str()); + return; + } + } + VERIFY(0); +} + +bool rsm::amiprimary() { + ScopedLock ml(rsm_mutex); + return primary == cfg->myaddr() && !inviewchange; +} + + +// Testing server + +// Simulate partitions + +// assumes caller holds rsm_mutex +void rsm::net_repair_wo(bool heal) { + std::vector m; + m = cfg->get_view(vid_commit); + for (unsigned i = 0; i < m.size(); i++) { + if (m[i] != cfg->myaddr()) { + handle h(m[i]); + tprintf("rsm::net_repair_wo: %s %d\n", m[i].c_str(), heal); + if (h.safebind()) h.safebind()->set_reachable(heal); + } + } + rsmrpc->set_reachable(heal); +} + +rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) { + ScopedLock ml(rsm_mutex); + tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", + heal, dopartition, partitioned); + if (heal) { + net_repair_wo(heal); + partitioned = false; + } else { + dopartition = true; + partitioned = false; + } + r = rsm_test_protocol::OK; + return r; +} + +// simulate failure at breakpoint 1 and 2 + +void rsm::breakpoint1() { + if (break1) { + tprintf("Dying at breakpoint 1 in rsm!\n"); + exit(1); + } +} + +void rsm::breakpoint2() { + if (break2) { + tprintf("Dying at breakpoint 2 in rsm!\n"); + exit(1); + } +} + +void rsm::partition1() { + if (dopartition) { + net_repair_wo(false); + dopartition = false; + partitioned = true; + } +} + +rsm_test_protocol::status rsm::breakpointreq(int b, int &r) { + r = rsm_test_protocol::OK; + ScopedLock ml(rsm_mutex); + tprintf("rsm::breakpointreq: %d\n", b); + if (b == 1) break1 = true; + else if (b == 2) break2 = true; + else if (b == 3 || b == 4) cfg->breakpoint(b); + else r = rsm_test_protocol::ERR; + return r; +} diff --git a/rsm.h b/rsm.h new file mode 100644 index 0000000..c5bf4fc --- /dev/null +++ b/rsm.h @@ -0,0 +1,236 @@ +// replicated state machine interface. + +#ifndef rsm_h +#define rsm_h + +#include +#include +#include "rsm_protocol.h" +#include "rsm_state_transfer.h" +#include "rpc.h" +#include +#include "config.h" + + +class rsm : public config_view_change { + private: + void reg1(int proc, handler *); + protected: + std::map procs; + config *cfg; + class rsm_state_transfer *stf; + rpcs *rsmrpc; + // On slave: expected viewstamp of next invoke request + // On primary: viewstamp for the next request from rsm_client + viewstamp myvs; + viewstamp last_myvs; // Viewstamp of the last executed request + std::string primary; + bool insync; + bool inviewchange; + unsigned vid_commit; // Latest view id that is known to rsm layer + unsigned vid_insync; // The view id that this node is synchronizing for + std::vector backups; // A list of unsynchronized backups + + // For testing purposes + rpcs *testsvr; + bool partitioned; + bool dopartition; + bool break1; + bool break2; + + + rsm_client_protocol::status client_members(int i, + std::vector &r); + rsm_protocol::status invoke(int proc, viewstamp vs, std::string mreq, + int &dummy); + rsm_protocol::status transferreq(std::string src, viewstamp last, unsigned vid, + rsm_protocol::transferres &r); + rsm_protocol::status transferdonereq(std::string m, unsigned vid, int &); + rsm_protocol::status joinreq(std::string src, viewstamp last, + rsm_protocol::joinres &r); + rsm_test_protocol::status test_net_repairreq(int heal, int &r); + rsm_test_protocol::status breakpointreq(int b, int &r); + + mutex rsm_mutex; + mutex invoke_mutex; + cond recovery_cond; + cond sync_cond; + + void execute(int procno, std::string req, std::string &r); + rsm_client_protocol::status client_invoke(int procno, std::string req, + std::string &r); + bool statetransfer(std::string m); + bool statetransferdone(std::string m); + bool join(std::string m); + void set_primary(unsigned vid); + std::string find_highest(viewstamp &vs, std::string &m, unsigned &vid); + bool sync_with_backups(); + bool sync_with_primary(); + void net_repair_wo(bool heal); + void breakpoint1(); + void breakpoint2(); + void partition1(); + void commit_change_wo(unsigned vid); + public: + rsm (std::string _first, std::string _me); + ~rsm() {}; + + bool amiprimary(); + void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }; + void recovery(); + void commit_change(unsigned vid); + + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, R &)); + template + void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, const A5 a5, R &)); +}; + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + R r; + args >> a1; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + R r; + args >> a1; + args >> a2; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + R r; + args >> a1; + args >> a2; + args >> a3; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,a3,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template +void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, R & r)) { + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,a3,a4,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + + +template void + rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + VERIFY(args.okdone()); + int b = (sob->*meth)(a1,a2,a3,a4,a5,r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +#endif /* rsm_h */ diff --git a/rsm_client.cc b/rsm_client.cc new file mode 100644 index 0000000..ee25564 --- /dev/null +++ b/rsm_client.cc @@ -0,0 +1,94 @@ +#include "rsm_client.h" +#include +#include +#include +#include +#include "lang/verify.h" + + +rsm_client::rsm_client(std::string dst) { + printf("create rsm_client\n"); + std::vector mems; + + pthread_mutex_init(&rsm_client_mutex, NULL); + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + primary = dst; + + { + ScopedLock ml(&rsm_client_mutex); + VERIFY (init_members()); + } + printf("rsm_client: done\n"); +} + +// Assumes caller holds rsm_client_mutex +void rsm_client::primary_failure() { + primary = known_mems.back(); + known_mems.pop_back(); +} + +rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) { + int ret; + ScopedLock ml(&rsm_client_mutex); + while (1) { + printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str()); + handle h(primary); + + VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0); + rpcc *cl = h.safebind(); + if (cl) + ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000)); + VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0); + + if (!cl) + goto prim_fail; + + printf("rsm_client::invoke proc %x primary %s ret %d\n", proc, + primary.c_str(), ret); + if (ret == rsm_client_protocol::OK) + break; + if (ret == rsm_client_protocol::BUSY) { + printf("rsm is busy %s\n", primary.c_str()); + sleep(3); + continue; + } + if (ret == rsm_client_protocol::NOTPRIMARY) { + printf("primary %s isn't the primary--let's get a complete list of mems\n", + primary.c_str()); + if (init_members()) + continue; + } +prim_fail: + printf("primary %s failed ret %d\n", primary.c_str(), ret); + primary_failure(); + printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str()); + } + return ret; +} + +bool rsm_client::init_members() { + printf("rsm_client::init_members get members!\n"); + handle h(primary); + VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0); + int ret; + rpcc *cl = h.safebind(); + if (cl) { + ret = cl->call(rsm_client_protocol::members, 0, known_mems, + rpcc::to(1000)); + } + VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0); + if (cl == 0 || ret != rsm_protocol::OK) + return false; + if (known_mems.size() < 1) { + printf("rsm_client::init_members do not know any members!\n"); + VERIFY(0); + } + + primary = known_mems.back(); + known_mems.pop_back(); + + printf("rsm_client::init_members: primary %s\n", primary.c_str()); + + return true; +} diff --git a/rsm_client.h b/rsm_client.h new file mode 100644 index 0000000..3219179 --- /dev/null +++ b/rsm_client.h @@ -0,0 +1,131 @@ +#ifndef rsm_client_h +#define rsm_client_h + +#include "rpc.h" +#include "rsm_protocol.h" +#include +#include + + +// +// rsm client interface. +// +// The client stubs package up an rpc, and then call the invoke procedure +// on the replicated state machine passing the RPC as an argument. This way +// the replicated state machine isn't service specific; any server can use it. +// + +class rsm_client { + protected: + std::string primary; + std::vector known_mems; + pthread_mutex_t rsm_client_mutex; + void primary_failure(); + bool init_members(); + public: + rsm_client(std::string dst); + rsm_protocol::status invoke(int proc, std::string req, std::string &rep); + + template + int call(unsigned int proc, const A1 & a1, R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, R &r); + + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, R &r); + private: + template int call_m(unsigned int proc, marshall &req, R &r); +}; + +template +int rsm_client::call_m(unsigned int proc, marshall &req, R &r) { + std::string rep; + std::string res; + int intret = invoke(proc, req.str(), rep); + VERIFY( intret == rsm_client_protocol::OK ); + unmarshall u(rep); + u >> intret; + if (intret < 0) return intret; + u >> res; + if (!u.okdone()) { + fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" + "You probably forgot to set the reply string in " + "rsm::client_invoke, or you may call RPC 0x%x with wrong return " + "type\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + unmarshall u1(res); + u1 >> r; + if(!u1.okdone()) { + fprintf(stderr, "rsm_client::call_m: failed to unmarshall the reply.\n" + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, R & r) { + marshall m; + m << a1; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, R & r) { + marshall m; + m << a1; + m << a2; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, R & r) { + marshall m; + std::string rep; + std::string res; + m << a1; + m << a2; + m << a3; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, R & r) { + marshall m; + std::string rep; + std::string res; + m << a1; + m << a2; + m << a3; + m << a4; + return call_m(proc, m, r); +} + +template +int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, const A5 & a5, R & r) { + marshall m; + std::string rep; + std::string res; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + return call_m(proc, m, r); +} + +#endif diff --git a/rsm_protocol.h b/rsm_protocol.h new file mode 100644 index 0000000..a27ef83 --- /dev/null +++ b/rsm_protocol.h @@ -0,0 +1,116 @@ +#ifndef rsm_protocol_h +#define rsm_protocol_h + +#include "rpc.h" + + +class rsm_client_protocol { + public: + enum xxstatus { OK, ERR, NOTPRIMARY, BUSY}; + typedef int status; + enum rpc_numbers { + invoke = 0x9001, + members, + }; +}; + + +struct viewstamp { + viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) { + vid = _vid; + seqno = _seqno; + }; + unsigned int vid; + unsigned int seqno; + inline void operator++(int) { + seqno++; + }; +}; + +class rsm_protocol { + public: + enum xxstatus { OK, ERR, BUSY}; + typedef int status; + enum rpc_numbers { + invoke = 0x10001, + transferreq, + transferdonereq, + joinreq, + }; + + struct transferres { + std::string state; + viewstamp last; + }; + + struct joinres { + std::string log; + }; +}; + +inline bool operator==(viewstamp a, viewstamp b) { + return a.vid == b.vid && a.seqno == b.seqno; +} + +inline bool operator>(viewstamp a, viewstamp b) { + return (a.vid > b.vid) || ((a.vid == b.vid) && a.seqno > b.seqno); +} + +inline bool operator!=(viewstamp a, viewstamp b) { + return a.vid != b.vid || a.seqno != b.seqno; +} + +inline marshall& operator<<(marshall &m, viewstamp v) +{ + m << v.vid; + m << v.seqno; + return m; +} + +inline unmarshall& operator>>(unmarshall &u, viewstamp &v) { + u >> v.vid; + u >> v.seqno; + return u; +} + +inline marshall & +operator<<(marshall &m, rsm_protocol::transferres r) +{ + m << r.state; + m << r.last; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, rsm_protocol::transferres &r) +{ + u >> r.state; + u >> r.last; + return u; +} + +inline marshall & +operator<<(marshall &m, rsm_protocol::joinres r) +{ + m << r.log; + return m; +} + +inline unmarshall & +operator>>(unmarshall &u, rsm_protocol::joinres &r) +{ + u >> r.log; + return u; +} + +class rsm_test_protocol { + public: + enum xxstatus { OK, ERR}; + typedef int status; + enum rpc_numbers { + net_repair = 0x12001, + breakpoint = 0x12002, + }; +}; + +#endif diff --git a/rsm_state_transfer.h b/rsm_state_transfer.h new file mode 100644 index 0000000..6c7e0e4 --- /dev/null +++ b/rsm_state_transfer.h @@ -0,0 +1,11 @@ +#ifndef rsm_state_transfer_h +#define rsm_state_transfer_h + +class rsm_state_transfer { + public: + virtual std::string marshal_state() = 0; + virtual void unmarshal_state(std::string) = 0; + virtual ~rsm_state_transfer() {}; +}; + +#endif diff --git a/rsm_tester.cc b/rsm_tester.cc new file mode 100644 index 0000000..08034e2 --- /dev/null +++ b/rsm_tester.cc @@ -0,0 +1,40 @@ +// +// RSM test client +// + +#include "rsm_protocol.h" +#include "rsmtest_client.h" +#include "rpc.h" +#include +#include +#include +#include +#include +using namespace std; + +rsmtest_client *lc; + +int +main(int argc, char *argv[]) +{ + int r; + + if(argc != 4){ + fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]); + exit(1); + } + + lc = new rsmtest_client(argv[1]); + string command(argv[2]); + if (command == "partition") { + r = lc->net_repair(atoi(argv[3])); + printf ("net_repair returned %d\n", r); + } else if (command == "breakpoint") { + int b = atoi(argv[3]); + r = lc->breakpoint(b); + printf ("breakpoint %d returned %d\n", b, r); + } else { + fprintf(stderr, "Unknown command %s\n", argv[2]); + } + exit(0); +} diff --git a/rsm_tester.pl b/rsm_tester.pl new file mode 100755 index 0000000..7018164 --- /dev/null +++ b/rsm_tester.pl @@ -0,0 +1,912 @@ +#!/usr/bin/perl -w + +use POSIX ":sys_wait_h"; +use Getopt::Std; +use strict; + + +my @pid; +my @logs = (); +my @views = (); #expected views +my %in_views; #the number of views a node is expected to be present +my @p; +my $t; +my $always_kill = 0; + +use sigtrap 'handler' => \&killprocess, 'HUP', 'INT', 'ABRT', 'QUIT', 'TERM'; + +sub paxos_log { + my $port = shift; + return "paxos-$port.log"; +} + +sub mydie { + my ($s) = @_; + killprocess() if ($always_kill); + die $s; +} + +sub killprocess { + print "killprocess: forcestop all spawned processes...@pid \n"; + kill 9, @pid; +} + +sub cleanup { + kill 9, @pid; + unlink(@logs); + sleep 2; +} + +sub spawn { + my ($p, @a) = @_; + my $aa = join("-", @a); + if (my $pid = fork) { +# parent + push( @logs, "$p-$aa.log" ); + if( $p =~ /config_server/ ) { + push( @logs, paxos_log($a[1]) ); + } + if( $p =~ /lock_server/ ) { + push( @logs, paxos_log($a[1]) ); + } + return $pid; + } elsif (defined $pid) { +# child + open(STDOUT, ">>$p-$aa.log") + or mydie "Couln't redirect stout\n"; + open(STDERR, ">&STDOUT") + or mydie "Couln't redirect stderr\n"; + $| = 1; + print "$p @a\n"; + exec "$p @a" + or mydie "Cannot start new $p @a $!\n"; + } else { + mydie "Cannot fork: $!\n"; + } +} + +sub randports { + + my $num = shift; + my @p = (); + for( my $i = 0; $i < $num; $i++ ) { + push( @p, int(rand(54000))+10000 ); + } + my @sp = sort { $a <=> $b } @p; + return @sp; +} + +sub print_config { + my @ports = @_; + open( CONFIG, ">config" ) or mydie( "Couldn't open config for writing" ); + foreach my $p (@ports) { + printf CONFIG "%05d\n", $p; + } + close( CONFIG ); +} + +sub spawn_ls { + my $master = shift; + my $port = shift; + return spawn( "./lock_server", $master, $port ); +} + +sub spawn_config { + my $master = shift; + my $port = shift; + return spawn( "./config_server", $master, $port ); +} + +sub check_views { + + my $l = shift; + my $v = shift; + my $last_v = shift; + + open( LOG, "<$l" ) + or mydie( "Failed: couldn't read $l" ); + my @log = ; + close(LOG); + + my @vs = @{$v}; + + my $i = 0; + my @last_view; + foreach my $line (@log) { + if( $line =~ /^done (\d+) ([\d\s]+)$/ ) { + + my $num = $1; + my @view = split( /\s+/, $2 ); + @last_view = @view; + + if( $i > $#vs ) { +# let there be extra views + next; + } + + my $e = $vs[$i]; + my @expected = @{$e}; + + if( @expected != @view ) { + mydie( "Failed: In log $l at view $num is (@view), but expected $i (@expected)" ); + } + + $i++; + } + } + + if( $i <= $#vs ) { + mydie( "Failed: In log $l, not enough views seen!" ); + } + + if( defined $last_v ) { + my @last_exp_v = @{$last_v}; + if( @last_exp_v != @last_view ) { + mydie( "Failed: In log $l last view didn't match, got view @last_view, but expected @last_exp_v" ); + } + } + +} + +sub get_num_views { + + my $log = shift; + my $including = shift; + my $nv = `grep "done " $log | grep "$including" | wc -l`; + chomp $nv; + return $nv; + +} + +sub wait_for_view_change { + + my $log = shift; + my $num_views = shift; + my $including = shift; + my $timeout = shift; + + my $start = time(); + while( (get_num_views( $log, $including ) < $num_views) and + ($start + $timeout > time()) ) { + my $lastv = `grep done $log | tail -n 1`; + chomp $lastv; + print " Waiting for $including to be present in >=$num_views views in $log (Last view: $lastv)\n"; + sleep 1; + } + + if( get_num_views( $log, $including ) < $num_views) { + mydie( "Failed: Timed out waiting for $including to be in >=$num_views in log $log" ); + }else{ + print " Done: $including is in >=$num_views views in $log\n"; + } +} + +sub waitpid_to { + my $pid = shift; + my $to = shift; + + my $start = time(); + my $done_pid; + do { + sleep 1; + $done_pid = waitpid($pid, POSIX::WNOHANG); + } while( $done_pid <= 0 and (time() - $start) < $to ); + + if( $done_pid <= 0 ) { + kill 9,$pid; + mydie( "Failed: Timed out waiting for process $pid\n" ); + } else { + return 1; + } + +} + +sub wait_and_check_expected_view($) { + my $v = shift; + push @views, $v; + for (my $i = 0; $i <=$#$v; $i++) { + $in_views{$v->[$i]}++; + } + foreach my $port (@$v) { + wait_for_view_change(paxos_log($port), $in_views{$port}, $port, 20); + } + foreach my $port (@$v) { + my $log = paxos_log($port); + check_views( $log, \@views ); + } +} + +sub start_nodes ($$){ + + @pid = (); + @logs = (); + @views = (); + for (my $i = 0; $i <= $#p; $i++) { + $in_views{$p[$i]} = 0; + } + + my $n = shift; + my $command = shift; + + for (my $i = 0; $i < $n; $i++) { + if ($command eq "ls") { + @pid = (@pid, spawn_ls($p[0],$p[$i])); + print "Start lock_server on $p[$i]\n"; + }elsif ($command eq "config_server"){ + @pid = (@pid, spawn_config($p[0],$p[$i])); + print "Start config on $p[$i]\n"; + } + sleep 1; + + my @vv = @p[0..$i]; + wait_and_check_expected_view(\@vv); + } + +} + +my %options; +getopts("s:k",\%options); +if (defined($options{s})) { + srand($options{s}); +} +if (defined($options{k})) { + $always_kill = 1; +} + +#get a sorted list of random ports +@p = randports(5); +print_config( @p[0..4] ); + +my @do_run = (); +my $NUM_TESTS = 17; + +# see which tests are set +if( $#ARGV > -1 ) { + foreach my $t (@ARGV) { + if( $t < $NUM_TESTS && $t >= 0 ) { + $do_run[$t] = 1; + } + } +} else { +# turn on all tests + for( my $i = 0; $i < $NUM_TESTS; $i++ ) { + $do_run[$i] = 1; + } +} + +if ($do_run[0]) { + print "test0: start 3-process lock server\n"; + start_nodes(3,"ls"); + cleanup(); + sleep 2; +} + +if ($do_run[1]) { + print "test1: start 3-process lock server, kill third server\n"; + start_nodes(3,"ls"); + + print "Kill third server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + cleanup(); + sleep 2; +} + +if ($do_run[2]) { + print "test2: start 3-process lock server, kill first server\n"; + start_nodes(3,"ls"); + + print "Kill first (PID: $pid[0]) on port $p[0]\n"; + kill "TERM", $pid[0]; + + sleep 5; + + # it should go through 4 views + my @v4 = ($p[1], $p[2]); + wait_and_check_expected_view(\@v4); + + cleanup(); + sleep 2; +} + + +if ($do_run[3]) { + + print "test3: start 3-process lock_server, kill a server, restart a server\n"; + start_nodes(3,"ls"); + + print "Kill server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print "Restart killed server on port $p[2]\n"; + $pid[2] = spawn_ls ($p[0], $p[2]); + + sleep 5; + + my @v5 = ($p[0], $p[1], $p[2]); + wait_and_check_expected_view(\@v5); + + cleanup(); + sleep 2; +} + +if ($do_run[4]) { + print "test4: 3-process lock_server, kill third server, kill second server, restart third server, kill third server again, restart second server, re-restart third server, check logs\n"; + start_nodes(3,"ls"); + + print "Kill server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print "Kill server (PID: $pid[1]) on port $p[1]\n"; + kill "TERM", $pid[1]; + + sleep 5; + #no view change can happen because of a lack of majority + + print "Restarting server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + sleep 5; + + #no view change can happen because of a lack of majority + foreach my $port (@p[0..2]) { + my $num_v = get_num_views(paxos_log($port), $port); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + # kill node 3 again, + print "Kill server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + + + print "Restarting server on port $p[1]\n"; + $pid[1] = spawn_ls($p[0], $p[1]); + + sleep 7; + + foreach my $port (@p[0..1]) { + $in_views{$port} = get_num_views( paxos_log($port), $port ); + print " Node $port is present in ", $in_views{$port}, " views in ", paxos_log($port), "\n"; + } + + print "Restarting server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + my @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + +# now check the paxos logs and make sure the logs go through the right +# views + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + cleanup(); + +} + +if ($do_run[5]) { + print "test5: 3-process lock_server, send signal 1 to first server, kill third server, restart third server, check logs\n"; + start_nodes(3,"ls"); + + print "Sending paxos breakpoint 1 to first server on port $p[0]\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 3); + + sleep 1; + + print "Kill third server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 5; + foreach my $port (@p[0..2]) { + my $num_v = get_num_views( paxos_log($port), $port ); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + print "Restarting third server on port $p[2]\n"; + $pid[2]= spawn_ls($p[0], $p[2]); + my @lastv = ($p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + sleep 10; + +# now check the paxos logs and make sure the logs go through the right +# views + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + cleanup(); + +} + +if ($do_run[6]) { + print "test6: 4-process lock_server, send signal 2 to first server, kill fourth server, restart fourth server, check logs\n"; + start_nodes(4,"ls"); + print "Sending paxos breakpoint 2 to first server on port $p[0]\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 4); + + sleep 1; + + print "Kill fourth server (PID: $pid[3]) on port $p[3]\n"; + kill "TERM", $pid[3]; + + sleep 5; + + foreach my $port ($p[1],$p[2]) { + my $num_v = get_num_views( paxos_log($port), $port ); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + sleep 5; + + print "Restarting fourth server on port $p[3]\n"; + $pid[3] = spawn_ls($p[1], $p[3]); + + sleep 5; + + my @v5 = ($p[0],$p[1],$p[2]); + foreach my $port (@v5) { + $in_views{$port}++; + } + push @views, \@v5; + + sleep 10; + + # the 6th view will be (2,3) or (1,2,3,4) + my @v6 = ($p[1],$p[2]); + foreach my $port (@v6) { + $in_views{$port}++; + } + foreach my $port (@v6) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 30); + } + + # final will be (2,3,4) + my @lastv = ($p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv ); + } + cleanup(); + +} + +if ($do_run[7]) { + print "test7: 4-process lock_server, send signal 2 to first server, kill fourth server, kill other servers, restart other servers, restart fourth server, check logs\n"; + start_nodes(4,"ls"); + print "Sending paxos breakpoint 2 to first server on port $p[0]\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 4); + sleep 3; + + print "Kill fourth server (PID: $pid[3]) on port $p[3]\n"; + kill "TERM", $pid[3]; + + sleep 5; + + print "Kill third server (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + print "Kill second server (PID: $pid[1]) on port $p[1]\n"; + kill "TERM", $pid[1]; + + sleep 5; + + print "Restarting second server on port $p[1]\n"; + $pid[1] = spawn_ls($p[0], $p[1]); + + sleep 5; + + print "Restarting third server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + sleep 5; + +#no view change is possible by now because there is no majority + foreach my $port ($p[1],$p[2]) { + my $num_v = get_num_views( paxos_log($port), $port ); + die "$num_v views in ", paxos_log($port), " : no new views should be formed due to the lack of majority\n" if ($num_v != $in_views{$port}); + } + + print "Restarting fourth server on port $p[3]\n"; + $pid[3] = spawn_ls($p[1], $p[3]); + + sleep 5; + + my @v5 = ($p[0], $p[1], $p[2]); + push @views, \@v5; + foreach my $port (@v5) { + $in_views{$port}++; + } + + sleep 15; + my @lastv = ($p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + cleanup(); + +} + +if ($do_run[8]) { + print "test8: start 3-process lock service\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 8" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[9]) { + + print "test9: start 3-process rsm, kill second slave while lock_tester is running\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep int(rand(10)+1); + + print "Kill slave (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 3; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 9" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[10]) { + + print "test10: start 3-process rsm, kill second slave and restarts it later while lock_tester is running\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep int(rand(10)+1); + + print "Kill slave (PID: $pid[2]) on port $p[2]\n"; + kill "TERM", $pid[2]; + + sleep 3; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + sleep 3; + + print "Restarting killed lock_server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + my @v5 = ($p[0],$p[1],$p[2]); + wait_and_check_expected_view(\@v5); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 10" ); + } + + cleanup(); + sleep 2; +} + + +if ($do_run[11]) { + + print "test11: start 3-process rsm, kill primary while lock_tester is running\n"; + start_nodes(3,"ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep int(rand(10)+1); + + print "Kill primary (PID: $pid[0]) on port $p[0]\n"; + kill "TERM", $pid[0]; + + sleep 3; + + # it should go through 4 views + my @v4 = ($p[1], $p[2]); + wait_and_check_expected_view(\@v4); + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 11" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[12]) { + + print "test12: start 3-process rsm, kill master at break1 and restart it while lock_tester is running\n"; + + start_nodes(3, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill master (PID: $pid[0]) on port $p[0] at breakpoint 1\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 1); + + + sleep 1; + + # it should go through 5 views + my @v4 = ($p[1], $p[2]); + wait_and_check_expected_view(\@v4); + + print "Restarting killed lock_server on port $p[0]\n"; + $pid[0] = spawn_ls($p[1], $p[0]); + + sleep 3; + + # the last view should include all nodes + my @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 12" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[13]) { + + print "test13: start 3-process rsm, kill slave at break1 and restart it while lock_tester is running\n"; + + start_nodes(3, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill slave (PID: $pid[2]) on port $p[2] at breakpoint 1\n"; + spawn("./rsm_tester", $p[2]+1, "breakpoint", 1); + + sleep 1; + + # it should go through 4 views + my @v4 = ($p[0], $p[1]); + wait_and_check_expected_view(\@v4); + + print "Restarting killed lock_server on port $p[2]\n"; + $pid[2] = spawn_ls($p[0], $p[2]); + + sleep 3; + + # the last view should include all nodes + my @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + foreach my $port (@lastv) { + check_views( paxos_log($port), \@views, \@lastv); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 13" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[14]) { + + print "test14: start 5-process rsm, kill slave break1, kill slave break2\n"; + + start_nodes(5, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1\n"; + spawn("./rsm_tester", $p[4]+1, "breakpoint", 1); + + + print "Kill slave (PID: $pid[3]) on port $p[3] at breakpoint 2\n"; + spawn("./rsm_tester", $p[3]+1, "breakpoint", 2); + + + sleep 1; + + # two view changes: + + print "first view change wait\n"; + my @lastv = ($p[0],$p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print "second view change wait\n"; + + @lastv = ($p[0],$p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 14" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[15]) { + + print "test15: start 5-process rsm, kill slave break1, kill primary break2\n"; + + start_nodes(5, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Kill slave (PID: $pid[4]) on port $p[4] at breakpoint 1\n"; + spawn("./rsm_tester", $p[4]+1, "breakpoint", 1); + + + print "Kill primary (PID: $pid[0]) on port $p[0] at breakpoint 2\n"; + spawn("./rsm_tester", $p[0]+1, "breakpoint", 2); + + sleep 1; + + # two view changes: + + print "first view change wait\n"; + my @lastv = ($p[0],$p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print "second view change wait\n"; + + @lastv = ($p[1],$p[2],$p[3]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 15" ); + } + + cleanup(); + sleep 2; +} + +if ($do_run[16]) { + + print "test16: start 3-process rsm, partition primary, heal it\n"; + + start_nodes(3, "ls"); + + print "Start lock_tester $p[0]\n"; + $t = spawn("./lock_tester", $p[0]); + + sleep 1; + + print "Partition primary (PID: $pid[0]) on port $p[0] at breakpoint\n"; + + spawn("./rsm_tester", $p[0]+1, "partition", 0); + + sleep 3; + + print "first view change wait\n"; + my @lastv = ($p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + sleep 1; + + print "Heal partition primary (PID: $pid[0]) on port $p[0] at breakpoint\n"; + spawn("./rsm_tester", $p[0]+1, "partition", 1); + + sleep 1; + + # xxx it should test that this is the 5th view! + print "second view change wait\n"; + @lastv = ($p[0], $p[1],$p[2]); + foreach my $port (@lastv) { + wait_for_view_change(paxos_log($port), $in_views{$port}+1, $port, 20); + } + + print " Wait for lock_tester to finish (waitpid $t)\n"; + waitpid_to($t, 600); + + if( system( "grep \"passed all tests successfully\" lock_tester-$p[0].log" ) ) { + mydie( "Failed lock tester for test 16" ); + } + + cleanup(); + sleep 2; +} + +print "tests done OK\n"; + +unlink("config"); diff --git a/rsmtest_client.cc b/rsmtest_client.cc new file mode 100644 index 0000000..f008228 --- /dev/null +++ b/rsmtest_client.cc @@ -0,0 +1,39 @@ +// RPC stubs for clients to talk to rsmtest_server + +#include "rsmtest_client.h" +#include "rpc.h" +#include + +#include +#include +#include + +rsmtest_client::rsmtest_client(std::string dst) +{ + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + cl = new rpcc(dstsock); + if (cl->bind() < 0) { + printf("rsmtest_client: call bind\n"); + } +} + +int +rsmtest_client::net_repair(int heal) +{ + int r; + int ret = cl->call(rsm_test_protocol::net_repair, heal, r); + VERIFY (ret == rsm_test_protocol::OK); + return r; +} + +int +rsmtest_client::breakpoint(int b) +{ + int r; + int ret = cl->call(rsm_test_protocol::breakpoint, b, r); + VERIFY (ret == rsm_test_protocol::OK); + return r; +} + + diff --git a/rsmtest_client.h b/rsmtest_client.h new file mode 100644 index 0000000..a175d9e --- /dev/null +++ b/rsmtest_client.h @@ -0,0 +1,20 @@ +// rsmtest client interface. + +#ifndef rsmtest_client_h +#define rsmtest_client_h + +#include +#include "rsm_protocol.h" +#include "rpc.h" + +// Client interface to the rsmtest server +class rsmtest_client { + protected: + rpcc *cl; + public: + rsmtest_client(std::string d); + virtual ~rsmtest_client() {}; + virtual rsm_test_protocol::status net_repair(int heal); + virtual rsm_test_protocol::status breakpoint(int b); +}; +#endif diff --git a/srlock.cc b/srlock.cc new file mode 100644 index 0000000..8966527 --- /dev/null +++ b/srlock.cc @@ -0,0 +1,16 @@ +#include "srlock.h" + +ScopedRemoteLock::ScopedRemoteLock(lock_client *lc, lock_protocol::lockid_t lid) : + lc_(lc), lid_(lid) { + lc_->acquire(lid_); + releaseOnFree = true; +} + +void ScopedRemoteLock::retain() { + releaseOnFree = false; +} + +ScopedRemoteLock::~ScopedRemoteLock() { + if (releaseOnFree) + lc_->release(lid_); +} diff --git a/srlock.h b/srlock.h new file mode 100644 index 0000000..40bd8bd --- /dev/null +++ b/srlock.h @@ -0,0 +1,18 @@ +#ifndef srlock_h +#define srlock_h + +#include "lock_protocol.h" +#include "lock_client.h" + +class ScopedRemoteLock { + protected: + lock_client *lc_; + lock_protocol::lockid_t lid_; + bool releaseOnFree; + public: + ScopedRemoteLock(lock_client *, lock_protocol::lockid_t); + void retain(); + ~ScopedRemoteLock(); +}; + +#endif diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..d3cf93b --- /dev/null +++ b/start.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +ulimit -c unlimited + +NUM_LS=${1:-0} + +BASE_PORT=$RANDOM +BASE_PORT=$[BASE_PORT+2000] +LOCK_PORT=$[BASE_PORT+6] + +if [ $NUM_LS -gt 1 ]; then + x=0 + rm config + while [ $x -lt $NUM_LS ]; do + port=$[LOCK_PORT+2*x] + x=$[x+1] + echo $port >> config + done + x=0 + while [ $x -lt $NUM_LS ]; do + port=$[LOCK_PORT+2*x] + x=$[x+1] + echo "starting ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 &" + ./lock_server $LOCK_PORT $port > lock_server$x.log 2>&1 & + sleep 1 + done +else + echo "starting ./lock_server $LOCK_PORT > lock_server.log 2>&1 &" + ./lock_server $LOCK_PORT > lock_server.log 2>&1 & + sleep 1 +fi diff --git a/stop.sh b/stop.sh new file mode 100755 index 0000000..2d0a1f6 --- /dev/null +++ b/stop.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +pkill -u $USER lock_server diff --git a/tprintf.cc b/tprintf.cc new file mode 100644 index 0000000..8f6c8a9 --- /dev/null +++ b/tprintf.cc @@ -0,0 +1,16 @@ +#include "mutex.h" +#include +#include +#include "tprintf.h" + +uint64_t utime() { + struct timeval tp; + gettimeofday(&tp, NULL); + return (tp.tv_usec + (uint64_t)tp.tv_sec * 1000000) % 1000000000; +} + +mutex cerr_mutex; +std::map thread_name_map; +int next_thread_num = 0; +std::map instance_name_map; +int next_instance_num = 0; diff --git a/tprintf.h b/tprintf.h new file mode 100644 index 0000000..6a0cb2a --- /dev/null +++ b/tprintf.h @@ -0,0 +1,90 @@ +#ifndef TPRINTF_H +#define TPRINTF_H + +#include +#include +#include "mutex.h" +#include +#include +#include + +extern mutex cerr_mutex; +extern std::map thread_name_map; +extern int next_thread_num; +extern std::map instance_name_map; +extern int next_instance_num; +extern char tprintf_thread_prefix; + +#define LOG_PREFIX { \ + cerr_mutex.acquire(); \ + pthread_t self = pthread_self(); \ + int tid = thread_name_map[self]; \ + if (tid==0) \ + tid = thread_name_map[self] = ++next_thread_num; \ + std::cerr << std::left << std::setw(9) << utime() << " "; \ + std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \ + std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \ +} +#define LOG_THIS_POINTER { \ + int self = instance_name_map[this]; \ + if (self==0) \ + self = instance_name_map[this] = ++next_instance_num; \ + std::cerr << "#" << std::setw(2) << self; \ +} +#define LOG_SUFFIX { \ + cerr_mutex.release(); \ +} + +#define LOG_NONMEMBER(x) { \ + LOG_PREFIX; \ + std::cerr << x << std::endl; \ + LOG_SUFFIX; \ +} +#define LOG(x) { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << x << std::endl; \ + LOG_SUFFIX; \ +} +#define JOIN(from,to,sep) ({ \ + ostringstream oss; \ + for(typeof(from) i=from;i!=to;i++) \ + oss << *i << sep; \ + oss.str(); \ +}) +#define LOG_FUNC_ENTER { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << "lid=" << lid; \ + std::cerr << std::endl; \ + LOG_SUFFIX; \ +} +#define LOG_FUNC_ENTER_SERVER { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << "lid=" << lid; \ + std::cerr << " client=" << id << "," << xid; \ + std::cerr << std::endl; \ + LOG_SUFFIX; \ +} +#define LOG_FUNC_EXIT { \ + LOG_PREFIX; \ + LOG_THIS_POINTER; \ + std::cerr << "return" << lid; \ + std::cerr << std::endl; \ + LOG_SUFFIX; \ +} + +#define tprintf(args...) { \ + int len = snprintf(NULL, 0, args); \ + char buf[len+1]; \ + buf[len] = '\0'; \ + snprintf(buf, len+1, args); \ + if (buf[len-1]=='\n') \ + buf[len-1] = '\0'; \ + LOG_NONMEMBER(buf); \ +} + +uint64_t utime(); + +#endif -- 1.7.9.5