From: Peter Iannucci Date: Mon, 16 Sep 2013 01:01:51 +0000 (-0400) Subject: Major clean-ups. Migrating to C++11. X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/a4175b2e216a20b86cc872dea8a08005c60617a5?ds=sidebyside Major clean-ups. Migrating to C++11. --- diff --git a/Makefile b/Makefile index edf9fd0..4a239bc 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,11 @@ -CXXFLAGS = -g -MMD -Wall -I. -I./rpc -LDFLAGS = -L. -L/usr/local/lib -LDLIBS = -lpthread -LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt) -LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl) +CXXFLAGS = -g -MMD -Werror -I. -std=c++11 +LDFLAGS = CXX = g++ CC = g++ all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest -rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o gettime.o +rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o rm -f $@ ar cq $@ $^ ranlib rpc/librpc.a @@ -18,10 +15,10 @@ rpc/rpctest: rpc/rpctest.o rpc/librpc.a lock_demo=lock_demo.o lock_client.o lock_demo : $(lock_demo) rpc/librpc.a -lock_tester=lock_tester.o lock_client.o mutex.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o +lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o lock_tester : $(lock_tester) rpc/librpc.a -lock_server=lock_server.o lock_smain.o mutex.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o +lock_server=lock_server.o lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o lock_server : $(lock_server) rpc/librpc.a rsm_tester=rsm_tester.o rsmtest_client.o diff --git a/config.cc b/config.cc index 0f9ab4c..5127cb2 100644 --- a/config.cc +++ b/config.cc @@ -1,3 +1,4 @@ +#include #include #include #include @@ -39,294 +40,281 @@ // all views, the other nodes can bring this re-joined node up to // date. -static void * -heartbeatthread(void *x) +config::config( + const std::string &_first, + const std::string &_me, + config_view_change *_vc) + : my_view_id(0), first(_first), me(_me), vc(_vc) { - config *r = (config *) x; - r->heartbeater(); - return 0; -} - -config::config(std::string _first, std::string _me, config_view_change *_vc) - : myvid (0), first (_first), me (_me), vc (_vc) -{ - VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0); - VERIFY(pthread_cond_init(&config_cond, NULL) == 0); - - std::ostringstream ost; - ost << me; - - acc = new acceptor(this, me == _first, me, ost.str()); - pro = new proposer(this, acc, me); + paxos_acceptor = new acceptor(this, me == _first, me, me); + paxos_proposer = new proposer(this, paxos_acceptor, me); - // XXX hack; maybe should have its own port number - pxsrpc = acc->get_rpcs(); - pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat); + // XXX hack; maybe should have its own port number + paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat); - { - ScopedLock ml(&cfg_mutex); - - reconstruct(); - - pthread_t th; - VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0); - } + { + lock ml(cfg_mutex); + reconstruct(); + std::thread(&config::heartbeater, this).detach(); + } } void -config::restore(std::string s) +config::restore(const std::string &s) { - ScopedLock ml(&cfg_mutex); - acc->restore(s); - reconstruct(); + lock ml(cfg_mutex); + paxos_acceptor->restore(s); + reconstruct(); } -std::vector -config::get_view(unsigned instance) +void +config::get_view(unsigned instance, std::vector &m) { - ScopedLock ml(&cfg_mutex); - return get_view_wo(instance); + lock ml(cfg_mutex); + get_view_wo(instance, m); } // caller should hold cfg_mutex -std::vector -config::get_view_wo(unsigned instance) +void +config::get_view_wo(unsigned instance, std::vector &m) { - std::string value = acc->value(instance); - tprintf("get_view(%d): returns %s\n", instance, value.c_str()); - return members(value); + std::string value = paxos_acceptor->value(instance); + tprintf("get_view(%d): returns %s\n", instance, value.c_str()); + members(value, m); } -std::vector -config::members(std::string value) +void +config::members(const std::string &value, std::vector &view) const { - std::istringstream ist(value); - std::string m; - std::vector view; - while (ist >> m) { - view.push_back(m); - } - return view; + std::istringstream ist(value); + std::string m; + view.clear(); + while (ist >> m) { + view.push_back(m); + } } std::string -config::value(std::vector m) +config::value(const std::vector &m) const { - std::ostringstream ost; - for (unsigned i = 0; i < m.size(); i++) { - ost << m[i]; - ost << " "; - } - return ost.str(); + std::ostringstream ost; + for (unsigned i = 0; i < m.size(); i++) { + ost << m[i]; + ost << " "; + } + return ost.str(); } // caller should hold cfg_mutex void config::reconstruct() { - if (acc->instance() > 0) { - std::string m; - myvid = acc->instance(); - mems = get_view_wo(myvid); - tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str()); - } + if (paxos_acceptor->instance() > 0) { + std::string m; + my_view_id = paxos_acceptor->instance(); + get_view_wo(my_view_id, mems); + tprintf("config::reconstruct: %d %s\n", + my_view_id, print_members(mems).c_str()); + } } // Called by Paxos's acceptor. void -config::paxos_commit(unsigned instance, std::string value) +config::paxos_commit(unsigned instance, const std::string &value) { - std::string m; - std::vector newmem; - ScopedLock ml(&cfg_mutex); - - newmem = members(value); - tprintf("config::paxos_commit: %d: %s\n", instance, - print_members(newmem).c_str()); - - for (unsigned i = 0; i < mems.size(); i++) { - tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str()); - if (!isamember(mems[i], newmem) && me != mems[i]) { - tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); - mgr.delete_handle(mems[i]); + std::string m; + std::vector newmem; + lock ml(cfg_mutex); + + members(value, newmem); + tprintf("config::paxos_commit: %d: %s\n", instance, + print_members(newmem).c_str()); + + for (unsigned i = 0; i < mems.size(); i++) { + tprintf("config::paxos_commit: is %s still a member?\n", + mems[i].c_str()); + if (!isamember(mems[i], newmem) && me != mems[i]) { + tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); + mgr.delete_handle(mems[i]); + } } - } - mems = newmem; - myvid = instance; - if (vc) { - unsigned vid = myvid; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - vc->commit_change(vid); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - } + mems = newmem; + my_view_id = instance; + if (vc) { + ml.unlock(); + vc->commit_change(instance); + ml.lock(); + } } bool -config::ismember(std::string m, unsigned vid) +config::ismember(const std::string &m, unsigned vid) { - bool r; - ScopedLock ml(&cfg_mutex); - std::vector v = get_view_wo(vid); - r = isamember(m, v); - return r; + lock ml(cfg_mutex); + std::vector v; + get_view_wo(vid, v); + return isamember(m, v); } bool -config::add(std::string new_m, unsigned vid) +config::add(const std::string &new_m, unsigned vid) { - std::vector m; - std::vector curm; - ScopedLock ml(&cfg_mutex); - if (vid != myvid) - return false; - tprintf("config::add %s\n", new_m.c_str()); - m = mems; - m.push_back(new_m); - curm = mems; - std::string v = value(m); - int nextvid = myvid + 1; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - bool r = pro->run(nextvid, curm, v); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (r) { - tprintf("config::add: proposer returned success\n"); - } else { - tprintf("config::add: proposer returned failure\n"); - } - return r; + std::vector m; + std::vector curm; + lock ml(cfg_mutex); + if (vid != my_view_id) + return false; + tprintf("config::add %s\n", new_m.c_str()); + m = mems; + m.push_back(new_m); + curm = mems; + std::string v = value(m); + int nextvid = my_view_id + 1; + bool r; + { + ml.unlock(); + r = paxos_proposer->run(nextvid, curm, v); + ml.lock(); + } + tprintf("config::add: proposer returned %s\n", + r ? "success" : "failure"); + return r; } // caller should hold cfg_mutex bool -config::remove_wo(std::string m) +config::remove(const std::string &m) { - tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str()); - std::vector n; - for (unsigned i = 0; i < mems.size(); i++) { - if (mems[i] != m) n.push_back(mems[i]); - } - std::string v = value(n); - std::vector cmems = mems; - int nextvid = myvid + 1; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - bool r = pro->run(nextvid, cmems, v); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (r) { - tprintf("config::remove: proposer returned success\n"); - } else { - tprintf("config::remove: proposer returned failure\n"); - } - return r; + adopt_lock ml(cfg_mutex); + tprintf("config::remove: my_view_id %d remove? %s\n", + my_view_id, m.c_str()); + std::vector n; + for (unsigned i = 0; i < mems.size(); i++) { + if (mems[i] != m) + n.push_back(mems[i]); + } + std::string v = value(n); + std::vector cmems = mems; + int nextvid = my_view_id + 1; + bool r; + { + ml.unlock(); + r = paxos_proposer->run(nextvid, cmems, v); + ml.lock(); + } + tprintf("config::remove: proposer returned %s\n", + r ? "success" : "failure"); + return r; } void config::heartbeater() { - struct timeval now; - struct timespec next_timeout; - std::string m; - heartbeat_t h; - bool stable; - unsigned vid; - std::vector cmems; - ScopedLock ml(&cfg_mutex); - - while (1) { - - gettimeofday(&now, NULL); - next_timeout.tv_sec = now.tv_sec + 3; - next_timeout.tv_nsec = 0; - tprintf("heartbeater: go to sleep\n"); - pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout); - - stable = true; - vid = myvid; - cmems = get_view_wo(vid); - tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str()); - - if (!isamember(me, cmems)) { - tprintf("heartbeater: not member yet; skip hearbeat\n"); - continue; - } - - //find the node with the smallest id - m = me; - for (unsigned i = 0; i < cmems.size(); i++) { - if (m > cmems[i]) - m = cmems[i]; - } - - if (m == me) { - //if i am the one with smallest id, ping the rest of the nodes - for (unsigned i = 0; i < cmems.size(); i++) { - if (cmems[i] != me) { - if ((h = doheartbeat(cmems[i])) != OK) { - stable = false; - m = cmems[i]; - break; - } - } - } - } else { - //the rest of the nodes ping the one with smallest id - if ((h = doheartbeat(m)) != OK) - stable = false; - } - - if (!stable && vid == myvid) { - remove_wo(m); + std::string m; + heartbeat_t h; + bool stable; + unsigned vid; + std::vector cmems; + lock ml(cfg_mutex); + + while (1) { + auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3); + tprintf("heartbeater: go to sleep\n"); + config_cond.wait_until(ml, next_timeout); + + stable = true; + vid = my_view_id; + get_view_wo(vid, cmems); + tprintf("heartbeater: current membership %s\n", + print_members(cmems).c_str()); + + if (!isamember(me, cmems)) { + tprintf("heartbeater: not member yet; skip hearbeat\n"); + continue; + } + + // who has the smallest ID? + m = me; + for (unsigned i = 0; i < cmems.size(); i++) { + if (m > cmems[i]) + m = cmems[i]; + } + + if (m == me) { + // ping the other nodes + for (unsigned i = 0; i < cmems.size(); i++) { + if (cmems[i] != me) { + if ((h = doheartbeat(cmems[i])) != OK) { + stable = false; + m = cmems[i]; + break; + } + } + } + } else { + // ping the node with the smallest ID + if ((h = doheartbeat(m)) != OK) + stable = false; + } + + if (!stable && vid == my_view_id) { + remove(m); + } } - } } paxos_protocol::status config::heartbeat(std::string m, unsigned vid, int &r) { - ScopedLock ml(&cfg_mutex); - int ret = paxos_protocol::ERR; - r = (int) myvid; - tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid); - if (vid == myvid) { - ret = paxos_protocol::OK; - } else if (pro->isrunning()) { - VERIFY (vid == myvid + 1 || vid + 1 == myvid); - ret = paxos_protocol::OK; - } else { - ret = paxos_protocol::ERR; - } - return ret; + lock ml(cfg_mutex); + int ret = paxos_protocol::ERR; + r = (int) my_view_id; + tprintf("heartbeat from %s(%d) my_view_id %d\n", + m.c_str(), vid, my_view_id); + if (vid == my_view_id) { + ret = paxos_protocol::OK; + } else if (paxos_proposer->isrunning()) { + VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id); + ret = paxos_protocol::OK; + } else { + ret = paxos_protocol::ERR; + } + return ret; } config::heartbeat_t -config::doheartbeat(std::string m) +config::doheartbeat(const std::string &m) { - int ret = rpc_const::timeout_failure; - int r; - unsigned vid = myvid; - heartbeat_t res = OK; - - tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); - handle h(m); - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - rpcc *cl = h.safebind(); - if (cl) { - ret = cl->call(paxos_protocol::heartbeat, me, vid, r, - rpcc::to(1000)); - } - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (ret != paxos_protocol::OK) { - if (ret == rpc_const::atmostonce_failure || - ret == rpc_const::oldsrv_failure) { - mgr.delete_handle(m); - } else { - tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", - m.c_str(), ret, vid, r); - if (ret < 0) res = FAILURE; - else res = VIEWERR; + adopt_lock ml(cfg_mutex); + int ret = rpc_const::timeout_failure; + int r; + unsigned vid = my_view_id; + heartbeat_t res = OK; + + tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); + handle h(m); + { + ml.unlock(); + rpcc *cl = h.safebind(); + if (cl) { + ret = cl->call(paxos_protocol::heartbeat, me, vid, r, + rpcc::to(1000)); + } + ml.lock(); + } + if (ret != paxos_protocol::OK) { + if (ret == rpc_const::atmostonce_failure || + ret == rpc_const::oldsrv_failure) { + mgr.delete_handle(m); + } else { + tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", + m.c_str(), ret, vid, r); + if (ret < 0) res = FAILURE; + else res = VIEWERR; + } } - } - tprintf("doheartbeat done %d\n", res); - return res; + tprintf("doheartbeat done %d\n", res); + return res; } diff --git a/config.h b/config.h index 9d7b023..cd276fb 100644 --- a/config.h +++ b/config.h @@ -4,51 +4,55 @@ #include #include #include "paxos.h" +#include "lock.h" class config_view_change { - public: - virtual void commit_change(unsigned vid) = 0; - virtual ~config_view_change() {}; + public: + virtual void commit_change(unsigned view_id) = 0; + virtual ~config_view_change() {}; }; class config : public paxos_change { - private: - acceptor *acc; - proposer *pro; - rpcs *pxsrpc; - unsigned myvid; - std::string first; - std::string me; - config_view_change *vc; - std::vector mems; - pthread_mutex_t cfg_mutex; - pthread_cond_t heartbeat_cond; - pthread_cond_t config_cond; - paxos_protocol::status heartbeat(std::string m, unsigned instance, int &r); - std::string value(std::vector mems); - std::vector members(std::string v); - std::vector get_view_wo(unsigned instance); - bool remove_wo(std::string); - void reconstruct(); - typedef enum { - OK, // response and same view # - VIEWERR, // response but different view # - FAILURE, // no response - } heartbeat_t; - heartbeat_t doheartbeat(std::string m); - public: - config(std::string _first, std::string _me, config_view_change *_vc); - unsigned vid() { return myvid; } - std::string myaddr() { return me; }; - std::string dump() { return acc->dump(); }; - std::vector get_view(unsigned instance); - void restore(std::string s); - bool add(std::string, unsigned vid); - bool ismember(std::string m, unsigned vid); - void heartbeater(void); - void paxos_commit(unsigned instance, std::string v); - rpcs *get_rpcs() { return acc->get_rpcs(); } - void breakpoint(int b) { pro->breakpoint(b); } + private: + acceptor *paxos_acceptor; + proposer *paxos_proposer; + unsigned my_view_id; + std::string first; + std::string me; + config_view_change *vc; + std::vector mems; + mutex cfg_mutex; + std::condition_variable config_cond; + paxos_protocol::status heartbeat( + std::string m, + unsigned instance, + int &r); + std::string value(const std::vector &mems) const; + void members(const std::string &v, std::vector &m) const; + void get_view_wo(unsigned instance, std::vector &m); + bool remove(const std::string &); + void reconstruct(); + typedef enum { + OK, // response and same view # + VIEWERR, // response but different view # + FAILURE, // no response + } heartbeat_t; + heartbeat_t doheartbeat(const std::string &m); + public: + config(const std::string &_first, + const std::string &_me, + config_view_change *_vc); + unsigned view_id() { return my_view_id; } + const std::string &myaddr() const { return me; }; + std::string dump() { return paxos_acceptor->dump(); }; + void get_view(unsigned instance, std::vector &m); + void restore(const std::string &s); + bool add(const std::string &, unsigned view_id); + bool ismember(const std::string &m, unsigned view_id); + void heartbeater(void); + void paxos_commit(unsigned instance, const std::string &v); + rpcs *get_rpcs() { return paxos_acceptor->get_rpcs(); } + void breakpoint(int b) { paxos_proposer->breakpoint(b); } }; #endif diff --git a/gettime.cc b/gettime.cc deleted file mode 100644 index 926c510..0000000 --- a/gettime.cc +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c), MM Weiss - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without modification, - * are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * 3. Neither the name of the MM Weiss nor the names of its contributors - * may be used to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES - * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT - * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR - * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, - * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/* - * clock_gettime_stub.c - * gcc -Wall -c clock_gettime_stub.c - * posix realtime functions; MacOS user space glue - */ - -/* @comment - * other possible implementation using intel builtin rdtsc - * rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html - * - * we could get the ticks by doing this - * - * __asm __volatile("mov %%ebx, %%esi\n\t" - * "cpuid\n\t" - * "xchg %%esi, %%ebx\n\t" - * "rdtsc" - * : "=a" (a), - * "=d" (d) - * ); - - * we could even replace our tricky sched_yield call by assembly code to get a better accurency, - * anyway the following C stub will satisfy 99% of apps using posix clock_gettime call, - * moreover, the setter version (clock_settime) could be easly written using mach primitives: - * http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time) - * - * hackers don't be crackers, don't you use a flush toilet? - * - * - * @see draft: ./posix-realtime-stub/posix-realtime-stub.c - * - */ - - -#ifdef __APPLE__ - -#pragma weak clock_gettime - -#include -#include -#include -#include -#include -#include -#include -#include - -typedef enum { - CLOCK_REALTIME, - CLOCK_MONOTONIC, - CLOCK_PROCESS_CPUTIME_ID, - CLOCK_THREAD_CPUTIME_ID -} clockid_t; - -static mach_timebase_info_data_t __clock_gettime_inf; - -int clock_gettime(clockid_t clk_id, struct timespec *tp) { - kern_return_t ret; - clock_serv_t clk; - clock_id_t clk_serv_id; - mach_timespec_t tm; - - uint64_t start, end, delta, nano; - - int retval = -1; - switch (clk_id) { - case CLOCK_REALTIME: - case CLOCK_MONOTONIC: - clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK; - if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) { - if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) { - tp->tv_sec = tm.tv_sec; - tp->tv_nsec = tm.tv_nsec; - retval = 0; - } - } - if (KERN_SUCCESS != ret) { - errno = EINVAL; - retval = -1; - } - break; - case CLOCK_PROCESS_CPUTIME_ID: - case CLOCK_THREAD_CPUTIME_ID: - start = mach_absolute_time(); - if (clk_id == CLOCK_PROCESS_CPUTIME_ID) { - getpid(); - } else { - sched_yield(); - } - end = mach_absolute_time(); - delta = end - start; - if (0 == __clock_gettime_inf.denom) { - mach_timebase_info(&__clock_gettime_inf); - } - nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom; - tp->tv_sec = nano * 1e-9; - tp->tv_nsec = nano - (tp->tv_sec * 1e9); - retval = 0; - break; - default: - errno = EINVAL; - retval = -1; - } - return retval; -} - -#endif // __APPLE__ diff --git a/gettime.h b/gettime.h deleted file mode 100644 index f10def4..0000000 --- a/gettime.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef gettime_h -#define gettime_h - -#ifdef __APPLE__ -typedef enum { - CLOCK_REALTIME, - CLOCK_MONOTONIC, - CLOCK_PROCESS_CPUTIME_ID, - CLOCK_THREAD_CPUTIME_ID -} clockid_t; - -int clock_gettime(clockid_t clk_id, struct timespec *tp); -#endif - -#endif diff --git a/handle.cc b/handle.cc index a233d5b..e998b3c 100644 --- a/handle.cc +++ b/handle.cc @@ -1,114 +1,112 @@ #include "handle.h" #include #include "tprintf.h" +#include "lock.h" handle_mgr mgr; handle::handle(std::string m) { - h = mgr.get_handle(m); + h = mgr.get_handle(m); } rpcc * handle::safebind() { - if (!h) - return NULL; - ScopedLock ml(&h->cl_mutex); - if (h->del) - return NULL; - if (h->cl) + if (!h) + return NULL; + lock ml(h->cl_mutex); + if (h->del) + return NULL; + if (h->cl) + return h->cl; + sockaddr_in dstsock; + make_sockaddr(h->m.c_str(), &dstsock); + rpcc *cl = new rpcc(dstsock); + tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); + int ret; + // Starting with lab 6, our test script assumes that the failure + // can be detected by paxos and rsm layer within few seconds. We have + // to set the timeout with a small value to support the assumption. + // + // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of + // lab 6 and lab 7 because the rpc layer may delay your RPC request, + // and cause a time out failure. Please make sure RPC_LOSSY is set to 0. + ret = cl->bind(rpcc::to(1000)); + if (ret < 0) { + tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); + delete cl; + h->del = true; + } else { + tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str()); + h->cl = cl; + } return h->cl; - sockaddr_in dstsock; - make_sockaddr(h->m.c_str(), &dstsock); - rpcc *cl = new rpcc(dstsock); - tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str()); - int ret; - // Starting with lab 6, our test script assumes that the failure - // can be detected by paxos and rsm layer within few seconds. We have - // to set the timeout with a small value to support the assumption. - // - // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of - // lab 6 and lab 7 because the rpc layer may delay your RPC request, - // and cause a time out failure. Please make sure RPC_LOSSY is set to 0. - ret = cl->bind(rpcc::to(1000)); - if (ret < 0) { - tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret); - delete cl; - h->del = true; - } else { - tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str()); - h->cl = cl; - } - return h->cl; } handle::~handle() { - if (h) mgr.done_handle(h); + if (h) mgr.done_handle(h); } handle_mgr::handle_mgr() { - VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0); } struct hinfo * handle_mgr::get_handle(std::string m) { - ScopedLock ml(&handle_mutex); - struct hinfo *h = 0; - if (hmap.find(m) == hmap.end()) { - h = new hinfo; - h->cl = NULL; - h->del = false; - h->refcnt = 1; - h->m = m; - pthread_mutex_init(&h->cl_mutex, NULL); - hmap[m] = h; - } else if (!hmap[m]->del) { - h = hmap[m]; - h->refcnt ++; - } - return h; + lock ml(handle_mutex); + struct hinfo *h = 0; + if (hmap.find(m) == hmap.end()) { + h = new hinfo; + h->cl = NULL; + h->del = false; + h->refcnt = 1; + h->m = m; + hmap[m] = h; + } else if (!hmap[m]->del) { + h = hmap[m]; + h->refcnt ++; + } + return h; } void handle_mgr::done_handle(struct hinfo *h) { - ScopedLock ml(&handle_mutex); - h->refcnt--; - if (h->refcnt == 0 && h->del) - delete_handle_wo(h->m); + lock ml(handle_mutex); + h->refcnt--; + if (h->refcnt == 0 && h->del) + delete_handle_wo(h->m); } void handle_mgr::delete_handle(std::string m) { - ScopedLock ml(&handle_mutex); - delete_handle_wo(m); + lock ml(handle_mutex); + delete_handle_wo(m); } // Must be called with handle_mutex locked. void handle_mgr::delete_handle_wo(std::string m) { - if (hmap.find(m) == hmap.end()) { - tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str()); - } else { - tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(), - hmap[m]->refcnt); - struct hinfo *h = hmap[m]; - if (h->refcnt == 0) { - if (h->cl) { - h->cl->cancel(); - delete h->cl; - } - pthread_mutex_destroy(&h->cl_mutex); - hmap.erase(m); - delete h; + if (hmap.find(m) == hmap.end()) { + tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str()); } else { - h->del = true; + tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(), + hmap[m]->refcnt); + struct hinfo *h = hmap[m]; + if (h->refcnt == 0) { + if (h->cl) { + h->cl->cancel(); + delete h->cl; + } + hmap.erase(m); + delete h; + } else { + h->del = true; + } } - } } diff --git a/handle.h b/handle.h index ecd8884..6b042fb 100644 --- a/handle.h +++ b/handle.h @@ -25,53 +25,53 @@ #include #include -#include "rpc.h" +#include "rpc/rpc.h" struct hinfo { rpcc *cl; int refcnt; bool del; std::string m; - pthread_mutex_t cl_mutex; + std::mutex cl_mutex; }; class handle { - private: - struct hinfo *h; - public: - handle(std::string m); - ~handle(); - /* safebind will try to bind with the rpc server on the first call. - * Since bind may block, the caller probably should not hold a mutex - * when calling safebind. - * - * return: - * if the first safebind succeeded, all later calls would return - * a rpcc object; otherwise, all later calls would return NULL. - * - * Example: - * handle h(dst); - * XXX_protocol::status ret; - * if (h.safebind()) { - * ret = h.safebind()->call(...); - * } - * if (!h.safebind() || ret != XXX_protocol::OK) { - * // handle failure - * } - */ - rpcc *safebind(); + private: + struct hinfo *h; + public: + handle(std::string m); + ~handle(); + /* safebind will try to bind with the rpc server on the first call. + * Since bind may block, the caller probably should not hold a mutex + * when calling safebind. + * + * return: + * if the first safebind succeeded, all later calls would return + * a rpcc object; otherwise, all later calls would return NULL. + * + * Example: + * handle h(dst); + * XXX_protocol::status ret; + * if (h.safebind()) { + * ret = h.safebind()->call(...); + * } + * if (!h.safebind() || ret != XXX_protocol::OK) { + * // handle failure + * } + */ + rpcc *safebind(); }; class handle_mgr { - private: - pthread_mutex_t handle_mutex; - std::map hmap; - public: - handle_mgr(); - struct hinfo *get_handle(std::string m); - void done_handle(struct hinfo *h); - void delete_handle(std::string m); - void delete_handle_wo(std::string m); + private: + std::mutex handle_mutex; + std::map hmap; + public: + handle_mgr(); + struct hinfo *get_handle(std::string m); + void done_handle(struct hinfo *h); + void delete_handle(std::string m); + void delete_handle_wo(std::string m); }; extern class handle_mgr mgr; diff --git a/lang/algorithm.h b/lang/algorithm.h deleted file mode 100644 index a487094..0000000 --- a/lang/algorithm.h +++ /dev/null @@ -1,18 +0,0 @@ -// compile time version of min and max - -#ifndef algorithm_h -#define algorithm_h - -template -struct static_max -{ - static const int value = A > B ? A : B; -}; - -template -struct static_min -{ - static const int value = A < B ? A : B; -}; - -#endif diff --git a/lock.h b/lock.h new file mode 100644 index 0000000..00d4374 --- /dev/null +++ b/lock.h @@ -0,0 +1,19 @@ +#ifndef lock_h +#define lock_h + +#include +#include + +using std::mutex; +using lock = std::unique_lock; + +class adopt_lock : public lock { +public: + inline adopt_lock(class mutex &m) : std::unique_lock(m, std::adopt_lock) { + } + inline ~adopt_lock() { + release(); + } +}; + +#endif diff --git a/lock_client.cc b/lock_client.cc index 6e9fab1..11bc476 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -1,7 +1,7 @@ // RPC stubs for clients to talk to lock_server #include "lock_client.h" -#include "rpc.h" +#include "rpc/rpc.h" #include #include diff --git a/lock_client.h b/lock_client.h index df22711..b1176c4 100644 --- a/lock_client.h +++ b/lock_client.h @@ -7,7 +7,7 @@ #include #include "lock_protocol.h" -#include "rpc.h" +#include "rpc/rpc.h" #include // Client interface to the lock server @@ -39,7 +39,7 @@ typedef enum { typedef int t4_status; -typedef unsigned long long t4_lockid_t; +typedef const char * t4_lockid_t; t4_lock_client *t4_lock_client_new(const char *dst); void t4_lock_client_delete(t4_lock_client *); diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc index 8d64e50..80bc87f 100644 --- a/lock_client_cache_rsm.cc +++ b/lock_client_cache_rsm.cc @@ -2,7 +2,7 @@ // see lock_client.cache.h for protocol details. #include "lock_client_cache_rsm.h" -#include "rpc.h" +#include "rpc/rpc.h" #include #include #include @@ -10,6 +10,9 @@ #include "tprintf.h" #include "rsm_client.h" +#include "lock.h" + +using std::ostringstream; lock_state::lock_state(): state(none) @@ -17,32 +20,29 @@ lock_state::lock_state(): } void lock_state::wait() { - pthread_t self = pthread_self(); - c[self].wait(m); + auto self = std::this_thread::get_id(); + { + adopt_lock ml(m); + c[self].wait(ml); + } c.erase(self); } void lock_state::signal() { // signal anyone if (c.begin() != c.end()) - c.begin()->second.signal(); + c.begin()->second.notify_one(); } -void lock_state::signal(pthread_t who) { +void lock_state::signal(std::thread::id who) { if (c.count(who)) - c[who].signal(); -} - -static void * releasethread(void *x) { - lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x; - cc->releaser(); - return 0; + c[who].notify_one(); } int lock_client_cache_rsm::last_port = 0; lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { - ScopedLock sl(lock_table_lock); + lock sl(lock_table_lock); // by the semantics of std::map, this will create // the lock if it doesn't already exist return lock_table[lid]; @@ -62,12 +62,11 @@ lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_use rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler); rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler); { - ScopedLock sl(xid_mutex); + lock sl(xid_mutex); xid = 0; } rsmc = new rsm_client(xdst); - int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this); - VERIFY (r == 0); + releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this); } void lock_client_cache_rsm::releaser() { @@ -77,13 +76,14 @@ void lock_client_cache_rsm::releaser() { LOG("Releaser: " << lid); lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); - VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread); + lock sl(st.m); + VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id()); st.state = lock_state::releasing; { - ScopedUnlock su(st.m); + sl.unlock(); int r; rsmc->call(lock_protocol::release, lid, id, st.xid, r); + sl.lock(); } st.state = lock_state::none; LOG("Lock " << lid << ": none"); @@ -93,8 +93,8 @@ void lock_client_cache_rsm::releaser() { lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) { lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); - pthread_t self = pthread_self(); + lock sl(st.m); + auto self = std::this_thread::get_id(); // check for reentrancy VERIFY(st.state != lock_state::locked || st.held_by != self); @@ -108,16 +108,17 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid if (st.state == lock_state::none || st.state == lock_state::retrying) { if (st.state == lock_state::none) { - ScopedLock sl(xid_mutex); + lock sl(xid_mutex); st.xid = xid++; } st.state = lock_state::acquiring; LOG("Lock " << lid << ": acquiring"); lock_protocol::status result; { - ScopedUnlock su(st.m); + sl.unlock(); int r; result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r); + sl.lock(); } LOG("acquire returned " << result); if (result == lock_protocol::OK) { @@ -129,11 +130,11 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid VERIFY(st.wanted_by.size() != 0); if (st.state == lock_state::free) { // is it for me? - pthread_t front = st.wanted_by.front(); - if (front == releaser_thread) { + auto front = st.wanted_by.front(); + if (front == releaser_thread.get_id()) { st.wanted_by.pop_front(); st.state = lock_state::locked; - st.held_by = releaser_thread; + st.held_by = releaser_thread.get_id(); LOG("Queuing " << lid << " for release"); release_fifo.enq(lid); } else if (front == self) { @@ -157,16 +158,16 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) { lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); - pthread_t self = pthread_self(); + lock sl(st.m); + auto self = std::this_thread::get_id(); VERIFY(st.state == lock_state::locked && st.held_by == self); st.state = lock_state::free; LOG("Lock " << lid << ": free"); if (st.wanted_by.size()) { - pthread_t front = st.wanted_by.front(); - if (front == releaser_thread) { + auto front = st.wanted_by.front(); + if (front == releaser_thread.get_id()) { st.state = lock_state::locked; - st.held_by = releaser_thread; + st.held_by = releaser_thread.get_id(); st.wanted_by.pop_front(); LOG("Queuing " << lid << " for release"); release_fifo.enq(lid); @@ -180,29 +181,29 @@ lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { LOG("Revoke handler " << lid << " " << xid); lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); + lock sl(st.m); if (st.state == lock_state::releasing || st.state == lock_state::none) return rlock_protocol::OK; if (st.state == lock_state::free && - (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) { + (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) { // gimme st.state = lock_state::locked; - st.held_by = releaser_thread; + st.held_by = releaser_thread.get_id(); if (st.wanted_by.size()) st.wanted_by.pop_front(); release_fifo.enq(lid); } else { // get in line - st.wanted_by.push_back(releaser_thread); + st.wanted_by.push_back(releaser_thread.get_id()); } return rlock_protocol::OK; } rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); + lock sl(st.m); VERIFY(st.state == lock_state::acquiring); st.state = lock_state::retrying; LOG("Lock " << lid << ": none"); diff --git a/lock_client_cache_rsm.h b/lock_client_cache_rsm.h index 28b0323..049d18a 100644 --- a/lock_client_cache_rsm.h +++ b/lock_client_cache_rsm.h @@ -6,23 +6,22 @@ #include #include "lock_protocol.h" -#include "rpc.h" +#include "rpc/rpc.h" #include "lock_client.h" #include "lang/verify.h" -#include "mutex.h" #include "rpc/fifo.h" #include "rsm_client.h" -// Classes that inherit lock_release_user can override dorelease so that -// that they will be called when lock_client releases a lock. -// You will not need to do anything with this class until Lab 5. class lock_release_user { public: virtual void dorelease(lock_protocol::lockid_t) = 0; virtual ~lock_release_user() {}; }; -using namespace std; +using std::string; +using std::thread; +using std::list; +using std::map; typedef string callback; @@ -37,14 +36,14 @@ public: acquiring, releasing } state; - pthread_t held_by; - list wanted_by; + std::thread::id held_by; + list wanted_by; mutex m; - map c; + map c; lock_protocol::xid_t xid; void wait(); void signal(); - void signal(pthread_t who); + void signal(std::thread::id who); }; typedef map lock_map; @@ -55,7 +54,7 @@ class lock_client_cache_rsm; // lock_revoke_server. class lock_client_cache_rsm : public lock_client { private: - pthread_t releaser_thread; + std::thread releaser_thread; rsm_client *rsmc; class lock_release_user *lu; int rlock_port; diff --git a/lock_demo.cc b/lock_demo.cc index 4d74acb..4285269 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -4,27 +4,20 @@ #include "lock_protocol.h" #include "lock_client.h" -#include "rpc.h" +#include "rpc/rpc.h" #include #include #include #include -std::string dst; -lock_client *lc; - int main(int argc, char *argv[]) { - int r; - - if(argc != 2){ - fprintf(stderr, "Usage: %s [host:]port\n", argv[0]); - exit(1); - } + if(argc != 2) { + fprintf(stderr, "Usage: %s [host:]port\n", argv[0]); + return 1; + } - dst = argv[1]; - lc = new lock_client(dst); - r = lc->stat(1); - printf ("stat returned %d\n", r); + lock_client *lc = new lock_client(argv[1]); + printf ("stat returned %d\n", lc->stat("1")); } diff --git a/lock_protocol.h b/lock_protocol.h index 60df0ef..61f0998 100644 --- a/lock_protocol.h +++ b/lock_protocol.h @@ -3,13 +3,14 @@ #ifndef lock_protocol_h #define lock_protocol_h -#include "rpc.h" +#include "rpc/rpc.h" +#include class lock_protocol { public: enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR }; typedef int status; - typedef unsigned long long lockid_t; + typedef std::string lockid_t; typedef unsigned long long xid_t; enum rpc_numbers { acquire = 0x7001, diff --git a/lock_server.cc b/lock_server.cc index 3801814..be0c1c9 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -5,6 +5,7 @@ #include #include #include +#include "lock.h" lock_server::lock_server(): nacquire (0) @@ -14,11 +15,10 @@ lock_server::lock_server(): // caller must hold lock_lock mutex & lock_server::get_lock(lock_protocol::lockid_t lid) { - lock_lock.acquire(); + lock ml(lock_lock); // by the semantics of std::map, this will create // the mutex if it doesn't already exist mutex &l = locks[lid]; - lock_lock.release(); return l; } @@ -34,13 +34,13 @@ lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r) lock_protocol::status lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r) { - get_lock(lid).acquire(); + get_lock(lid).lock(); return lock_protocol::OK; } lock_protocol::status lock_server::release(int clt, lock_protocol::lockid_t lid, int &r) { - get_lock(lid).release(); + get_lock(lid).unlock(); return lock_protocol::OK; } diff --git a/lock_server.h b/lock_server.h index e24e359..f03a717 100644 --- a/lock_server.h +++ b/lock_server.h @@ -7,13 +7,11 @@ #include #include "lock_protocol.h" #include "lock_client.h" -#include "rpc.h" -#include +#include "rpc/rpc.h" #include #include -#include "mutex.h" -using namespace std; +using std::map; typedef map lock_map; diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc index 5149049..c3f75e8 100644 --- a/lock_server_cache_rsm.cc +++ b/lock_server_cache_rsm.cc @@ -9,12 +9,25 @@ #include "handle.h" #include "tprintf.h" #include "rpc/marshall.h" +#include "lock.h" + +using std::ostringstream; +using std::istringstream; +using std::vector; lock_state::lock_state(): held(false) { } +lock_state& lock_state::operator=(const lock_state& o) { + held = o.held; + held_by = o.held_by; + wanted_by = o.wanted_by; + old_requests = o.old_requests; + return *this; +} + template ostringstream & operator<<(ostringstream &o, const pair &d) { o << "<" << d.first << "," << d.second << ">"; @@ -66,28 +79,15 @@ unmarshall & operator>>(unmarshall &u, lock_state &d) { lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) { - ScopedLock sl(lock_table_lock); + lock sl(lock_table_lock); // by the semantics of map, this will create // the lock if it doesn't already exist return lock_table[lid]; } -static void *revokethread(void *x) { - lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x; - sc->revoker(); - return 0; -} - -static void *retrythread(void *x) { - lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x; - sc->retryer(); - return 0; -} - lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) { - pthread_t th; - VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0); - VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0); + std::thread(&lock_server_cache_rsm::revoker, this).detach(); + std::thread(&lock_server_cache_rsm::retryer, this).detach(); rsm->set_state_transfer(this); } @@ -102,7 +102,7 @@ void lock_server_cache_rsm::revoker() { lock_state &st = get_lock_state(lid); holder held_by; { - ScopedLock sl(st.m); + lock sl(st.m); held_by = st.held_by; } @@ -130,7 +130,7 @@ void lock_server_cache_rsm::retryer() { lock_state &st = get_lock_state(lid); holder front; { - ScopedLock sl(st.m); + lock sl(st.m); if (st.wanted_by.empty()) continue; front = st.wanted_by.front(); @@ -155,7 +155,7 @@ int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_ LOG_FUNC_ENTER_SERVER; holder h = holder(id, xid); lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); + lock sl(st.m); // deal with duplicated requests if (st.old_requests.count(id)) { @@ -212,7 +212,7 @@ int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_ int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) { LOG_FUNC_ENTER_SERVER; lock_state &st = get_lock_state(lid); - ScopedLock sl(st.m); + lock sl(st.m); if (st.held && st.held_by == holder(id, xid)) { st.held = false; LOG("Lock " << lid << " not held"); @@ -223,7 +223,7 @@ int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, loc } string lock_server_cache_rsm::marshal_state() { - ScopedLock sl(lock_table_lock); + lock sl(lock_table_lock); marshall rep; rep << nacquire; rep << lock_table; @@ -231,7 +231,7 @@ string lock_server_cache_rsm::marshal_state() { } void lock_server_cache_rsm::unmarshal_state(string state) { - ScopedLock sl(lock_table_lock); + lock sl(lock_table_lock); unmarshall rep(state); rep >> nacquire; rep >> lock_table; diff --git a/lock_server_cache_rsm.h b/lock_server_cache_rsm.h index eb86bd0..4a33361 100644 --- a/lock_server_cache_rsm.h +++ b/lock_server_cache_rsm.h @@ -6,13 +6,16 @@ #include #include #include "lock_protocol.h" -#include "rpc.h" -#include "mutex.h" +#include "rpc/rpc.h" #include "rsm_state_transfer.h" #include "rsm.h" #include "rpc/fifo.h" +#include "lock.h" -using namespace std; +using std::string; +using std::pair; +using std::list; +using std::map; typedef string callback; typedef pair holder; @@ -25,6 +28,7 @@ public: list wanted_by; map old_requests; mutex m; + lock_state& operator=(const lock_state&); }; typedef map lock_map; diff --git a/lock_smain.cc b/lock_smain.cc index 4cc8136..c256cbe 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -1,4 +1,4 @@ -#include "rpc.h" +#include "rpc/rpc.h" #include #include #include @@ -7,8 +7,6 @@ #include "paxos.h" #include "rsm.h" -#include "jsl_log.h" - // Main loop of lock_server char tprintf_thread_prefix = 's'; @@ -16,57 +14,30 @@ char tprintf_thread_prefix = 's'; int main(int argc, char *argv[]) { - int count = 0; - - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); - - srandom(getpid()); + int count = 0; - if(argc != 3){ - fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); - exit(1); - } + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); - char *count_env = getenv("RPC_COUNT"); - if(count_env != NULL){ - count = atoi(count_env); - } + srandom(getpid()); - //jsl_set_debug(2); - // Comment out the next line to switch between the ordinary lock - // server and the RSM. In Lab 6, we disable the lock server and - // implement Paxos. In Lab 7, we will make the lock server use your - // RSM layer. -#define RSM -#ifdef RSM -// You must comment out the next line once you are done with Step One. -//#define STEP_ONE -#ifdef STEP_ONE - rpcs server(atoi(argv[1])); - lock_server_cache_rsm ls; - server.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); - server.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); - server.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); -#else - rsm rsm(argv[1], argv[2]); - lock_server_cache_rsm ls(&rsm); - rsm.set_state_transfer((rsm_state_transfer *)&ls); - rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); - rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); - rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); -#endif // STEP_ONE -#endif // RSM + if(argc != 3){ + fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); + exit(1); + } -#ifndef RSM - lock_server_cache ls; - rpcs server(atoi(argv[1]), count); - server.reg(lock_protocol::stat, &ls, &lock_server_cache::stat); - server.reg(lock_protocol::acquire, &ls, &lock_server_cache::acquire); - server.reg(lock_protocol::release, &ls, &lock_server_cache::release); -#endif + char *count_env = getenv("RPC_COUNT"); + if(count_env != NULL){ + count = atoi(count_env); + } + rsm rsm(argv[1], argv[2]); + lock_server_cache_rsm ls(&rsm); + rsm.set_state_transfer((rsm_state_transfer *)&ls); + rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); + rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); + rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); - while(1) - sleep(1000); + while(1) + sleep(1000); } diff --git a/lock_tester.cc b/lock_tester.cc index 8e736ce..d063cdc 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -4,8 +4,7 @@ #include "lock_protocol.h" #include "lock_client.h" -#include "rpc.h" -#include "jsl_log.h" +#include "rpc/rpc.h" #include #include #include @@ -15,46 +14,47 @@ #include "tprintf.h" #include #include +#include "lock.h" char tprintf_thread_prefix = 'c'; // must be >= 2 -int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. +const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. std::string dst; lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt]; -lock_protocol::lockid_t a = 1; -lock_protocol::lockid_t b = 2; -lock_protocol::lockid_t c = 3; +lock_protocol::lockid_t a = "1"; +lock_protocol::lockid_t b = "2"; +lock_protocol::lockid_t c = "3"; // check_grant() and check_release() check that the lock server // doesn't grant the same lock to both clients. // it assumes that lock names are distinct in the first byte. int ct[256]; -pthread_mutex_t count_mutex; +std::mutex count_mutex; void check_grant(lock_protocol::lockid_t lid) { - ScopedLock ml(&count_mutex); - int x = lid & 0xff; - if(ct[x] != 0){ - fprintf(stderr, "error: server granted %016llx twice\n", lid); - fprintf(stdout, "error: server granted %016llx twice\n", lid); - exit(1); - } - ct[x] += 1; + lock ml(count_mutex); + int x = lid[0] & 0x0f; + if(ct[x] != 0){ + fprintf(stderr, "error: server granted %s twice\n", lid.c_str()); + fprintf(stdout, "error: server granted %s twice\n", lid.c_str()); + exit(1); + } + ct[x] += 1; } void check_release(lock_protocol::lockid_t lid) { - ScopedLock ml(&count_mutex); - int x = lid & 0xff; - if(ct[x] != 1){ - fprintf(stderr, "error: client released un-held lock %016llx\n", lid); - exit(1); - } - ct[x] -= 1; + lock ml(count_mutex); + int x = lid[0] & 0x0f; + if(ct[x] != 1){ + fprintf(stderr, "error: client released un-held lock %s\n", lid.c_str()); + exit(1); + } + ct[x] -= 1; } void @@ -84,159 +84,151 @@ test1(void) void * test2(void *x) { - int i = * (int *) x; - - tprintf ("test2: client %d acquire a release a\n", i); - lc[i]->acquire(a); - tprintf ("test2: client %d acquire done\n", i); - check_grant(a); - sleep(1); - tprintf ("test2: client %d release\n", i); - check_release(a); - lc[i]->release(a); - tprintf ("test2: client %d release done\n", i); - return 0; -} + int i = * (int *) x; -void * -test3(void *x) -{ - int i = * (int *) x; - - tprintf ("test3: client %d acquire a release a concurrent\n", i); - for (int j = 0; j < 10; j++) { + tprintf ("test2: client %d acquire a release a\n", i); lc[i]->acquire(a); + tprintf ("test2: client %d acquire done\n", i); check_grant(a); - tprintf ("test3: client %d got lock\n", i); + sleep(1); + tprintf ("test2: client %d release\n", i); check_release(a); lc[i]->release(a); - } - return 0; + tprintf ("test2: client %d release done\n", i); + return 0; } void * -test4(void *x) +test3(void *x) { - int i = * (int *) x; + int i = * (int *) x; + + tprintf ("test3: client %d acquire a release a concurrent\n", i); + for (int j = 0; j < 10; j++) { + lc[i]->acquire(a); + check_grant(a); + tprintf ("test3: client %d got lock\n", i); + check_release(a); + lc[i]->release(a); + } + return 0; +} - tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i); - for (int j = 0; j < 10; j++) { - lc[0]->acquire(a); - check_grant(a); - tprintf ("test4: thread %d on client 0 got lock\n", i); - check_release(a); - lc[0]->release(a); - } - return 0; +void * +test4(void *x) +{ + int i = * (int *) x; + + tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i); + for (int j = 0; j < 10; j++) { + lc[0]->acquire(a); + check_grant(a); + tprintf ("test4: thread %d on client 0 got lock\n", i); + check_release(a); + lc[0]->release(a); + } + return 0; } void * test5(void *x) { - int i = * (int *) x; - - tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i); - for (int j = 0; j < 10; j++) { - if (i < 5) lc[0]->acquire(a); - else lc[1]->acquire(a); - check_grant(a); - tprintf ("test5: client %d got lock\n", i); - check_release(a); - if (i < 5) lc[0]->release(a); - else lc[1]->release(a); - } - return 0; + int i = * (int *) x; + + tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i); + for (int j = 0; j < 10; j++) { + if (i < 5) lc[0]->acquire(a); + else lc[1]->acquire(a); + check_grant(a); + tprintf ("test5: client %d got lock\n", i); + check_release(a); + if (i < 5) lc[0]->release(a); + else lc[1]->release(a); + } + return 0; } int main(int argc, char *argv[]) { - int r; - pthread_t th[nt]; + std::thread th[nt]; int test = 0; setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); srandom(getpid()); - //jsl_set_debug(2); - if(argc < 2) { - fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); - exit(1); + fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); + exit(1); } dst = argv[1]; if (argc > 2) { - test = atoi(argv[2]); - if(test < 1 || test > 5){ - tprintf("Test number must be between 1 and 5\n"); - exit(1); - } + test = atoi(argv[2]); + if(test < 1 || test > 5){ + tprintf("Test number must be between 1 and 5\n"); + exit(1); + } } - VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0); tprintf("cache lock client\n"); for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst); if(!test || test == 1){ - test1(); + test1(); } if(!test || test == 2){ - // test2 - for (int i = 0; i < nt; i++) { - int *a = new int (i); - r = pthread_create(&th[i], NULL, test2, (void *) a); - VERIFY (r == 0); - } - for (int i = 0; i < nt; i++) { - pthread_join(th[i], NULL); - } + // test2 + for (int i = 0; i < nt; i++) { + int *a = new int (i); + th[i] = std::thread(test2, a); + } + for (int i = 0; i < nt; i++) { + th[i].join(); + } } if(!test || test == 3){ - tprintf("test 3\n"); - - // test3 - for (int i = 0; i < nt; i++) { - int *a = new int (i); - r = pthread_create(&th[i], NULL, test3, (void *) a); - VERIFY (r == 0); - } - for (int i = 0; i < nt; i++) { - pthread_join(th[i], NULL); - } + tprintf("test 3\n"); + + // test3 + for (int i = 0; i < nt; i++) { + int *a = new int (i); + th[i] = std::thread(test3, a); + } + for (int i = 0; i < nt; i++) { + th[i].join(); + } } if(!test || test == 4){ - tprintf("test 4\n"); - - // test 4 - for (int i = 0; i < 2; i++) { - int *a = new int (i); - r = pthread_create(&th[i], NULL, test4, (void *) a); - VERIFY (r == 0); - } - for (int i = 0; i < 2; i++) { - pthread_join(th[i], NULL); - } + tprintf("test 4\n"); + + // test 4 + for (int i = 0; i < 2; i++) { + int *a = new int (i); + th[i] = std::thread(test4, a); + } + for (int i = 0; i < 2; i++) { + th[i].join(); + } } if(!test || test == 5){ - tprintf("test 5\n"); - - // test 5 - - for (int i = 0; i < nt; i++) { - int *a = new int (i); - r = pthread_create(&th[i], NULL, test5, (void *) a); - VERIFY (r == 0); - } - for (int i = 0; i < nt; i++) { - pthread_join(th[i], NULL); - } + tprintf("test 5\n"); + + // test 5 + + for (int i = 0; i < nt; i++) { + int *a = new int (i); + th[i] = std::thread(test5, a); + } + for (int i = 0; i < nt; i++) { + th[i].join(); + } } tprintf ("%s: passed all tests successfully\n", argv[0]); diff --git a/mutex.cc b/mutex.cc deleted file mode 100644 index 4e54cc0..0000000 --- a/mutex.cc +++ /dev/null @@ -1,42 +0,0 @@ -#include "mutex.h" -#include "lang/verify.h" - -mutex::mutex() { - VERIFY(pthread_mutex_init(&m, NULL) == 0); -} - -mutex::~mutex() { - VERIFY(pthread_mutex_destroy(&m) == 0); -} - -void mutex::acquire() { - VERIFY(pthread_mutex_lock(&m) == 0); -} - -void mutex::release() { - VERIFY(pthread_mutex_unlock(&m) == 0); -} - -mutex::operator pthread_mutex_t *() { - return &m; -} - -cond::cond() { - VERIFY(pthread_cond_init(&c, NULL) == 0); -} - -cond::~cond() { - VERIFY(pthread_cond_destroy(&c) == 0); -} - -void cond::wait(mutex &m) { - VERIFY(pthread_cond_wait(&c, m) == 0); -} - -void cond::signal() { - VERIFY(pthread_cond_signal(&c) == 0); -} - -void cond::broadcast() { - VERIFY(pthread_cond_broadcast(&c) == 0); -} diff --git a/mutex.h b/mutex.h deleted file mode 100644 index 228a3bb..0000000 --- a/mutex.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef mutex_h -#define mutex_h - -#include - -class mutex { - protected: - pthread_mutex_t m; - public: - mutex(); - ~mutex(); - void acquire(); - void release(); - operator pthread_mutex_t *(); -}; - -class cond { - protected: - pthread_cond_t c; - public: - cond(); - ~cond(); - void wait(mutex &m); - void signal(); - void broadcast(); -}; - -#endif diff --git a/paxos.cc b/paxos.cc index c3a445f..89f1714 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,9 +1,9 @@ #include "paxos.h" #include "handle.h" -// #include #include #include "tprintf.h" #include "lang/verify.h" +#include "lock.h" // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made @@ -52,7 +52,7 @@ bool proposer::isrunning() { bool r; - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r = !stable; return r; } @@ -94,7 +94,7 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv std::string v; bool r = false; - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); if (!stable) { // already running proposer? @@ -241,7 +241,7 @@ paxos_protocol::status acceptor::preparereq(std::string src, paxos_protocol::preparearg a, paxos_protocol::prepareres &r) { - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r.oldinstance = false; r.accept = false; r.n_a = n_a; @@ -262,7 +262,7 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a, paxos_protocol::status acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) { - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r = false; if (a.n >= n_h) { n_a = a.n; @@ -274,52 +274,53 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) } // the src argument is only for debug purpose -paxos_protocol::status + paxos_protocol::status acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) { - ScopedLock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); - if (a.instance == instance_h + 1) { - VERIFY(v_a == a.v); - commit_wo(a.instance, v_a); - } else if (a.instance <= instance_h) { - // we are ahead ignore. - } else { - // we are behind - VERIFY(0); - } - return paxos_protocol::OK; + lock ml(pxs_mutex); + tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", + a.instance, instance_h, v_a.c_str()); + if (a.instance == instance_h + 1) { + VERIFY(v_a == a.v); + commit_wo(a.instance, v_a); + } else if (a.instance <= instance_h) { + // we are ahead ignore. + } else { + // we are behind + VERIFY(0); + } + return paxos_protocol::OK; } void acceptor::commit_wo(unsigned instance, std::string value) { - //assume pxs_mutex is held - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); - if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); - values[instance] = value; - l->loginstance(instance, value); - instance_h = instance; - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - if (cfg) { - pxs_mutex.release(); - cfg->paxos_commit(instance, value); - pxs_mutex.acquire(); + //assume pxs_mutex is held + adopt_lock ml(pxs_mutex); + tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + if (instance > instance_h) { + tprintf("commit: highestaccepteinstance = %d\n", instance); + values[instance] = value; + l->loginstance(instance, value); + instance_h = instance; + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + if (cfg) { + ml.unlock(); + cfg->paxos_commit(instance, value); + ml.lock(); + } } - } } void acceptor::commit(unsigned instance, std::string value) { - ScopedLock ml(pxs_mutex); - commit_wo(instance, value); + lock ml(pxs_mutex); + commit_wo(instance, value); } std::string diff --git a/paxos.h b/paxos.h index 7188edb..c7b1af4 100644 --- a/paxos.h +++ b/paxos.h @@ -3,15 +3,14 @@ #include #include -#include "rpc.h" +#include "rpc/rpc.h" #include "paxos_protocol.h" #include "log.h" -#include "mutex.h" class paxos_change { public: - virtual void paxos_commit(unsigned instance, std::string v) = 0; + virtual void paxos_commit(unsigned instance, const std::string &v) = 0; virtual ~paxos_change() {}; }; @@ -21,7 +20,7 @@ class acceptor { rpcs *pxs; paxos_change *cfg; std::string me; - mutex pxs_mutex; + std::mutex pxs_mutex; // Acceptor state prop_t n_h; // number of the highest proposal seen in a prepare @@ -67,7 +66,7 @@ class proposer { bool break1; bool break2; - mutex pxs_mutex; + std::mutex pxs_mutex; // Proposer state bool stable; diff --git a/paxos_protocol.h b/paxos_protocol.h index 9c2703e..734ca51 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -1,7 +1,7 @@ #ifndef paxos_protocol_h #define paxos_protocol_h -#include "rpc.h" +#include "rpc/rpc.h" struct prop_t { unsigned n; diff --git a/random.cc b/random.cc deleted file mode 100644 index d344958..0000000 --- a/random.cc +++ /dev/null @@ -1,36 +0,0 @@ -#include "mutex.h" -#include -#include "rpc/slock.h" -#include - -static mutex rand_m; - -void srand_safe(unsigned int seed) { - ScopedLock s(rand_m); - srandom(seed); -} - -// RAND_MAX is guaranteed to be at least 32767 -// but math with rand() is annoying. -// Here's a canned solution from -// http://www.azillionmonkeys.com/qed/random.html -// which gives uniform numbers in [0, r) - -#define RS_SCALE (1.0 / (1.0 + RAND_MAX)) - -double drand (void) { - double d; - do { - d = (((random() * RS_SCALE) + random()) * RS_SCALE + random()) * RS_SCALE; - } while (d >= 1); /* Round off */ - return d; -} - -#define irand(x) ((unsigned int) ((x) * drand())) - -// use this to construct a 32-bit thread-safe RNG -#define RAND32 ((uint32_t)irand(1ul<<32)) -uint32_t rand32_safe() { - ScopedLock s(rand_m); - return RAND32; -} diff --git a/random.h b/random.h deleted file mode 100644 index fce86ef..0000000 --- a/random.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef random_h -#define random_h - -void srand_safe(unsigned int seed); -uint32_t rand32_safe(); - -#endif diff --git a/rpc/connection.cc b/rpc/connection.cc index c22ad45..fec7a4c 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -6,127 +6,116 @@ #include #include -#include "method_thread.h" #include "connection.h" -#include "slock.h" #include "pollmgr.h" #include "jsl_log.h" -#include "gettime.h" #include "lang/verify.h" +#include "lock.h" #define MAX_PDU (10<<20) //maximum PDF is 10M -connection::connection(chanmgr *m1, int f1, int l1) +connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) { - int flags = fcntl(fd_, F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(fd_, F_SETFL, flags); + int flags = fcntl(fd_, F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(fd_, F_SETFL, flags); + + signal(SIGPIPE, SIG_IGN); - signal(SIGPIPE, SIG_IGN); - VERIFY(pthread_mutex_init(&m_,0)==0); - VERIFY(pthread_mutex_init(&ref_m_,0)==0); - VERIFY(pthread_cond_init(&send_wait_,0)==0); - VERIFY(pthread_cond_init(&send_complete_,0)==0); - - VERIFY(gettimeofday(&create_time_, NULL) == 0); + VERIFY(gettimeofday(&create_time_, NULL) == 0); - PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); + PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } connection::~connection() { - VERIFY(dead_); - VERIFY(pthread_mutex_destroy(&m_)== 0); - VERIFY(pthread_mutex_destroy(&ref_m_)== 0); - VERIFY(pthread_cond_destroy(&send_wait_) == 0); - VERIFY(pthread_cond_destroy(&send_complete_) == 0); - if (rpdu_.buf) - free(rpdu_.buf); - VERIFY(!wpdu_.buf); - close(fd_); + VERIFY(dead_); + if (rpdu_.buf) + free(rpdu_.buf); + VERIFY(!wpdu_.buf); + close(fd_); } void connection::incref() { - ScopedLock ml(&ref_m_); - refno_++; + lock rl(ref_m_); + refno_++; } bool connection::isdead() { - ScopedLock ml(&m_); - return dead_; + lock ml(m_); + return dead_; } void connection::closeconn() { - { - ScopedLock ml(&m_); - if (!dead_) { - dead_ = true; - shutdown(fd_,SHUT_RDWR); - }else{ - return; - } - } - //after block_remove_fd, select will never wait on fd_ - //and no callbacks will be active - PollMgr::Instance()->block_remove_fd(fd_); + { + lock ml(m_); + if (!dead_) { + dead_ = true; + shutdown(fd_,SHUT_RDWR); + } else { + return; + } + } + //after block_remove_fd, select will never wait on fd_ + //and no callbacks will be active + PollMgr::Instance()->block_remove_fd(fd_); } void connection::decref() { - VERIFY(pthread_mutex_lock(&ref_m_)==0); - refno_ --; - VERIFY(refno_>=0); - if (refno_==0) { - VERIFY(pthread_mutex_lock(&m_)==0); - if (dead_) { - VERIFY(pthread_mutex_unlock(&ref_m_)==0); - VERIFY(pthread_mutex_unlock(&m_)==0); - delete this; - return; - } - VERIFY(pthread_mutex_unlock(&m_)==0); - } - pthread_mutex_unlock(&ref_m_); + bool dead = false; + { + lock rl(ref_m_); + refno_--; + VERIFY(refno_>=0); + if (refno_==0) { + lock ml(m_); + dead = dead_; + } + } + if (dead) { + delete this; + } } int connection::ref() { - ScopedLock rl(&ref_m_); + lock rl(ref_m_); return refno_; } int connection::compare(connection *another) { - if (create_time_.tv_sec > another->create_time_.tv_sec) - return 1; - if (create_time_.tv_sec < another->create_time_.tv_sec) - return -1; - if (create_time_.tv_usec > another->create_time_.tv_usec) - return 1; - if (create_time_.tv_usec < another->create_time_.tv_usec) - return -1; - return 0; + if (create_time_.tv_sec > another->create_time_.tv_sec) + return 1; + if (create_time_.tv_sec < another->create_time_.tv_sec) + return -1; + if (create_time_.tv_usec > another->create_time_.tv_usec) + return 1; + if (create_time_.tv_usec < another->create_time_.tv_usec) + return -1; + return 0; } bool connection::send(char *b, int sz) { - ScopedLock ml(&m_); + lock ml(m_); waiters_++; while (!dead_ && wpdu_.buf) { - VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); + send_wait_.wait(ml); } waiters_--; if (dead_) { @@ -145,16 +134,16 @@ connection::send(char *b, int sz) if (!writepdu()) { dead_ = true; - VERIFY(pthread_mutex_unlock(&m_) == 0); + ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); - VERIFY(pthread_mutex_lock(&m_) == 0); + ml.lock(); }else{ if (wpdu_.solong == wpdu_.sz) { }else{ //should be rare to need to explicitly add write callback PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { - VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); + send_complete_.wait(ml); } } } @@ -162,7 +151,7 @@ connection::send(char *b, int sz) wpdu_.solong = wpdu_.sz = 0; wpdu_.buf = NULL; if (waiters_ > 0) - pthread_cond_broadcast(&send_wait_); + send_wait_.notify_all(); return ret; } @@ -170,7 +159,7 @@ connection::send(char *b, int sz) void connection::write_cb(int s) { - ScopedLock ml(&m_); + lock ml(m_); VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.sz == 0) { @@ -180,20 +169,20 @@ connection::write_cb(int s) if (!writepdu()) { PollMgr::Instance()->del_callback(fd_, CB_RDWR); dead_ = true; - }else{ + } else { VERIFY(wpdu_.solong >= 0); if (wpdu_.solong < wpdu_.sz) { return; } - } - pthread_cond_signal(&send_complete_); + } + send_complete_.notify_one(); } //fd_ is ready to be read void connection::read_cb(int s) { - ScopedLock ml(&m_); + lock ml(m_); VERIFY(fd_ == s); if (dead_) { return; @@ -207,7 +196,7 @@ connection::read_cb(int s) if (!succ) { PollMgr::Instance()->del_callback(fd_,CB_RDWR); dead_ = true; - pthread_cond_signal(&send_complete_); + send_complete_.notify_one(); } if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { @@ -268,7 +257,7 @@ connection::readpdu() if (sz > MAX_PDU) { char *tmpb = (char *)&sz1; - jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); return false; } @@ -295,12 +284,9 @@ connection::readpdu() return true; } -tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) : mgr_(m1), lossy_(lossytest) { - - VERIFY(pthread_mutex_init(&m_,NULL) == 0); - struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; @@ -326,11 +312,11 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) VERIFY(0); } - socklen_t addrlen = sizeof(sin); - VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); - port_ = ntohs(sin.sin_port); + socklen_t addrlen = sizeof(sin); + VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); + port_ = ntohs(sin.sin_port); - jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, sin.sin_port); if (pipe(pipe_) < 0) { @@ -342,20 +328,20 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) flags |= O_NONBLOCK; fcntl(pipe_[0], F_SETFL, flags); - VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); + th_ = std::thread(&tcpsconn::accept_conn, this); } tcpsconn::~tcpsconn() { VERIFY(close(pipe_[1]) == 0); - VERIFY(pthread_join(th_, NULL) == 0); + th_.join(); //close all the active connections std::map::iterator i; for (i = conns_.begin(); i != conns_.end(); i++) { i->second->closeconn(); i->second->decref(); - } + } } void @@ -363,13 +349,13 @@ tcpsconn::process_accept() { sockaddr_in sin; socklen_t slen = sizeof(sin); - int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); if (s1 < 0) { perror("tcpsconn::accept_conn error"); - pthread_exit(NULL); + throw thread_exit_exception(); } - jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", + jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); @@ -398,34 +384,41 @@ tcpsconn::accept_conn() fd_set rfds; int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; - while (1) { - FD_ZERO(&rfds); - FD_SET(pipe_[0], &rfds); - FD_SET(tcp_, &rfds); - - int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - continue; - } else { - perror("accept_conn select:"); - jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); - VERIFY(0); - } - } - - if (FD_ISSET(pipe_[0], &rfds)) { - close(pipe_[0]); - close(tcp_); - return; - } - else if (FD_ISSET(tcp_, &rfds)) { - process_accept(); - } else { - VERIFY(0); - } - } + try { + + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + perror("accept_conn select:"); + jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + VERIFY(0); + } + } + + if (FD_ISSET(pipe_[0], &rfds)) { + close(pipe_[0]); + close(tcp_); + return; + } + else if (FD_ISSET(tcp_, &rfds)) { + process_accept(); + } else { + VERIFY(0); + } + } + } + catch (thread_exit_exception e) + { + return; + } } connection * @@ -435,7 +428,7 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", + jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); close(s); return NULL; @@ -445,4 +438,3 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) return new connection(mgr, s, lossy); } - diff --git a/rpc/connection.h b/rpc/connection.h index da48cf4..1ef4470 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -1,101 +1,104 @@ #ifndef connection_h -#define connection_h 1 +#define connection_h #include #include #include #include #include +#include #include #include "pollmgr.h" +class thread_exit_exception : std::exception { +}; + class connection; class chanmgr { - public: - virtual bool got_pdu(connection *c, char *b, int sz) = 0; - virtual ~chanmgr() {} + public: + virtual bool got_pdu(connection *c, char *b, int sz) = 0; + virtual ~chanmgr() {} }; class connection : public aio_callback { - public: - struct charbuf { - charbuf(): buf(NULL), sz(0), solong(0) {} - charbuf (char *b, int s) : buf(b), sz(s), solong(0){} - char *buf; - int sz; - int solong; //amount of bytes written or read so far - }; - - connection(chanmgr *m1, int f1, int lossytest=0); - ~connection(); - - int channo() { return fd_; } - bool isdead(); - void closeconn(); - - bool send(char *b, int sz); - void write_cb(int s); - void read_cb(int s); - - void incref(); - void decref(); - int ref(); - - int compare(connection *another); - private: - - bool readpdu(); - bool writepdu(); - - chanmgr *mgr_; - const int fd_; - bool dead_; - - charbuf wpdu_; - charbuf rpdu_; - - struct timeval create_time_; - - int waiters_; - int refno_; - const int lossy_; - - pthread_mutex_t m_; - pthread_mutex_t ref_m_; - pthread_cond_t send_complete_; - pthread_cond_t send_wait_; + public: + struct charbuf { + charbuf(): buf(NULL), sz(0), solong(0) {} + charbuf (char *b, int s) : buf(b), sz(s), solong(0){} + char *buf; + int sz; + int solong; //amount of bytes written or read so far + }; + + connection(chanmgr *m1, int f1, int lossytest=0); + ~connection(); + + int channo() { return fd_; } + bool isdead(); + void closeconn(); + + bool send(char *b, int sz); + void write_cb(int s); + void read_cb(int s); + + void incref(); + void decref(); + int ref(); + + int compare(connection *another); + private: + + bool readpdu(); + bool writepdu(); + + chanmgr *mgr_; + const int fd_; + bool dead_; + + charbuf wpdu_; + charbuf rpdu_; + + struct timeval create_time_; + + int waiters_; + int refno_; + const int lossy_; + + std::mutex m_; + std::mutex ref_m_; + std::condition_variable send_complete_; + std::condition_variable send_wait_; }; class tcpsconn { - public: - tcpsconn(chanmgr *m1, int port, int lossytest=0); - ~tcpsconn(); - inline int port() { return port_; } - void accept_conn(); - private: - int port_; - pthread_mutex_t m_; - pthread_t th_; - int pipe_[2]; - - int tcp_; //file desciptor for accepting connection - chanmgr *mgr_; - int lossy_; - std::map conns_; - - void process_accept(); + public: + tcpsconn(chanmgr *m1, int port, int lossytest=0); + ~tcpsconn(); + inline int port() { return port_; } + void accept_conn(); + private: + int port_; + std::mutex m_; + std::thread th_; + int pipe_[2]; + + int tcp_; //file desciptor for accepting connection + chanmgr *mgr_; + int lossy_; + std::map conns_; + + void process_accept(); }; struct bundle { - bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} - chanmgr *mgr; - int tcp; - int lossy; + bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} + chanmgr *mgr; + int tcp; + int lossy; }; -void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0); connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); #endif diff --git a/rpc/fifo.h b/rpc/fifo.h index 979cf62..d190c26 100644 --- a/rpc/fifo.h +++ b/rpc/fifo.h @@ -9,81 +9,69 @@ #include #include #include -#include "slock.h" #include "lang/verify.h" +#include "lock.h" template class fifo { public: fifo(int m=0); - ~fifo(); bool enq(T, bool blocking=true); void deq(T *); bool size(); private: std::list q_; - pthread_mutex_t m_; - pthread_cond_t non_empty_c_; // q went non-empty - pthread_cond_t has_space_c_; // q is not longer overfull + mutex m_; + std::condition_variable non_empty_c_; // q went non-empty + std::condition_variable has_space_c_; // q is not longer overfull unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit }; template fifo::fifo(int limit) : max_(limit) { - VERIFY(pthread_mutex_init(&m_, 0) == 0); - VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0); - VERIFY(pthread_cond_init(&has_space_c_, 0) == 0); -} - -template -fifo::~fifo() -{ - //fifo is to be deleted only when no threads are using it! - VERIFY(pthread_mutex_destroy(&m_)==0); - VERIFY(pthread_cond_destroy(&non_empty_c_) == 0); - VERIFY(pthread_cond_destroy(&has_space_c_) == 0); } template bool fifo::size() { - ScopedLock ml(&m_); + lock ml(m_); return q_.size(); } template bool fifo::enq(T e, bool blocking) { - ScopedLock ml(&m_); + lock ml(m_); while (1) { if (!max_ || q_.size() < max_) { q_.push_back(e); break; } - if (blocking) - VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0); + if (blocking) { + has_space_c_.wait(ml); + } else return false; } - VERIFY(pthread_cond_signal(&non_empty_c_) == 0); + non_empty_c_.notify_one(); return true; } template void fifo::deq(T *e) { - ScopedLock ml(&m_); + lock ml(m_); while(1) { if(q_.empty()){ - VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0); + non_empty_c_.wait(ml); } else { *e = q_.front(); q_.pop_front(); if (max_ && q_.size() < max_) { - VERIFY(pthread_cond_signal(&has_space_c_)==0); + has_space_c_.notify_one(); } break; } diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc index 06e5c2c..de02fc2 100644 --- a/rpc/jsl_log.cc +++ b/rpc/jsl_log.cc @@ -3,7 +3,7 @@ int JSL_DEBUG_LEVEL = 0; void jsl_set_debug(int level) { - JSL_DEBUG_LEVEL = level; + JSL_DEBUG_LEVEL = level; } diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h index 7f92998..c6ea812 100644 --- a/rpc/jsl_log.h +++ b/rpc/jsl_log.h @@ -1,25 +1,18 @@ -#ifndef __JSL_LOG_H__ -#define __JSL_LOG_H__ 1 +#ifndef jsl_log_h +#define jsl_log_h enum dbcode { - JSL_DBG_OFF = 0, - JSL_DBG_1 = 1, // Critical - JSL_DBG_2 = 2, // Error - JSL_DBG_3 = 3, // Info - JSL_DBG_4 = 4, // Debugging + JSL_DBG_OFF = 0, + JSL_DBG_1 = 1, // Critical + JSL_DBG_2 = 2, // Error + JSL_DBG_3 = 3, // Info + JSL_DBG_4 = 4, // Debugging }; extern int JSL_DEBUG_LEVEL; -#define jsl_log(level,...) \ - do { \ - if(JSL_DEBUG_LEVEL < abs(level)) \ - {;} \ - else { \ - { printf(__VA_ARGS__);} \ - } \ - } while(0) +#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);} void jsl_set_debug(int level); -#endif // __JSL_LOG_H__ +#endif diff --git a/rpc/marshall.h b/rpc/marshall.h index e0370d1..644a220 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -11,7 +11,6 @@ #include #include #include "lang/verify.h" -#include "lang/algorithm.h" struct req_header { req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0): @@ -32,16 +31,15 @@ struct reply_header { typedef uint64_t rpc_checksum_t; typedef int rpc_sz_t; -enum { - //size of initial buffer allocation - DEFAULT_RPC_SZ = 1024, +//size of initial buffer allocation +#define DEFAULT_RPC_SZ 1024 +#define RPC_HEADER_SZ_NO_CHECKSUM (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) #if RPC_CHECKSUMMING - //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum - RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t) +//size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum +#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM + sizeof(rpc_checksum_t)) #else - RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) +#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM) #endif -}; class marshall { private: diff --git a/rpc/method_thread.h b/rpc/method_thread.h deleted file mode 100644 index bcbc08b..0000000 --- a/rpc/method_thread.h +++ /dev/null @@ -1,164 +0,0 @@ -#ifndef method_thread_h -#define method_thread_h - -// method_thread(): start a thread that runs an object method. -// returns a pthread_t on success, and zero on error. - -#include -#include -#include -#include -#include "lang/verify.h" - -static pthread_t -method_thread_parent(void *(*fn)(void *), void *arg, bool detach) -{ - pthread_t th; - pthread_attr_t attr; - pthread_attr_init(&attr); - // set stack size to 100K, so we don't run out of memory - pthread_attr_setstacksize(&attr, 100*1024); - int err = pthread_create(&th, &attr, fn, arg); - pthread_attr_destroy(&attr); - if (err != 0) { - fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err)); - exit(1); - } - - if (detach) { - // don't keep thread state around after exit, to avoid - // running out of threads. set detach==false if you plan - // to pthread_join. - VERIFY(pthread_detach(th) == 0); - } - - return th; -} - -static void -method_thread_child() -{ - // defer pthread_cancel() by default. check explicitly by - // enabling then pthread_testcancel(). - int oldstate, oldtype; - VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0); - VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0); -} - -template pthread_t -method_thread(C *o, bool detach, void (C::*m)()) -{ - class XXX { - public: - C *o; - void (C::*m)(); - static void *yyy(void *vvv) { - XXX *x = (XXX*)vvv; - C *o = x->o; - void (C::*m)() = x->m; - delete x; - method_thread_child(); - (o->*m)(); - return 0; - } - }; - XXX *x = new XXX; - x->o = o; - x->m = m; - return method_thread_parent(&XXX::yyy, (void *) x, detach); -} - -template pthread_t -method_thread(C *o, bool detach, void (C::*m)(A), A a) -{ - class XXX { - public: - C *o; - void (C::*m)(A a); - A a; - static void *yyy(void *vvv) { - XXX *x = (XXX*)vvv; - C *o = x->o; - void (C::*m)(A ) = x->m; - A a = x->a; - delete x; - method_thread_child(); - (o->*m)(a); - return 0; - } - }; - XXX *x = new XXX; - x->o = o; - x->m = m; - x->a = a; - return method_thread_parent(&XXX::yyy, (void *) x, detach); -} - -namespace { - // ~xavid: this causes a bizzare compile error on OS X.5 when - // it's declared in the function, so I moved it out here. - template - class XXX { - public: - C *o; - void (C::*m)(A1 a1, A2 a2); - A1 a1; - A2 a2; - static void *yyy(void *vvv) { - XXX *x = (XXX*)vvv; - C *o = x->o; - void (C::*m)(A1 , A2 ) = x->m; - A1 a1 = x->a1; - A2 a2 = x->a2; - delete x; - method_thread_child(); - (o->*m)(a1, a2); - return 0; - } - }; -} - -template pthread_t -method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2) -{ - XXX *x = new XXX; - x->o = o; - x->m = m; - x->a1 = a1; - x->a2 = a2; - return method_thread_parent(&XXX::yyy, (void *) x, detach); -} - -template pthread_t -method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3) -{ - class XXX { - public: - C *o; - void (C::*m)(A1 a1, A2 a2, A3 a3); - A1 a1; - A2 a2; - A3 a3; - static void *yyy(void *vvv) { - XXX *x = (XXX*)vvv; - C *o = x->o; - void (C::*m)(A1 , A2 , A3 ) = x->m; - A1 a1 = x->a1; - A2 a2 = x->a2; - A3 a3 = x->a3; - delete x; - method_thread_child(); - (o->*m)(a1, a2, a3); - return 0; - } - }; - XXX *x = new XXX; - x->o = o; - x->m = m; - x->a1 = a1; - x->a2 = a2; - x->a3 = a3; - return method_thread_parent(&XXX::yyy, (void *) x, detach); -} - -#endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index f73a3a5..33aeae2 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,16 +1,14 @@ -#include #include #include #include -#include "slock.h" #include "jsl_log.h" -#include "method_thread.h" #include "lang/verify.h" #include "pollmgr.h" +#include "lock.h" PollMgr *PollMgr::instance = NULL; -static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT; +static std::once_flag pollmgr_is_initialized; void PollMgrInit() @@ -21,7 +19,7 @@ PollMgrInit() PollMgr * PollMgr::Instance() { - pthread_once(&pollmgr_is_initialized, PollMgrInit); + std::call_once(pollmgr_is_initialized, PollMgrInit); return instance; } @@ -31,9 +29,7 @@ PollMgr::PollMgr() : pending_change_(false) aio_ = new SelectAIO(); //aio_ = new EPollAIO(); - VERIFY(pthread_mutex_init(&m_, NULL) == 0); - VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0); - VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0); + th_ = std::thread(&PollMgr::wait_loop, this); } PollMgr::~PollMgr() @@ -47,7 +43,7 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { VERIFY(fd < MAX_POLL_FDS); - ScopedLock ml(&m_); + lock ml(m_); aio_->watch_fd(fd, flag); VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); @@ -60,17 +56,17 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) void PollMgr::block_remove_fd(int fd) { - ScopedLock ml(&m_); + lock ml(m_); aio_->unwatch_fd(fd, CB_RDWR); pending_change_ = true; - VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0); + changedone_c_.wait(ml); callbacks_[fd] = NULL; } void PollMgr::del_callback(int fd, poll_flag flag) { - ScopedLock ml(&m_); + lock ml(m_); if (aio_->unwatch_fd(fd, flag)) { callbacks_[fd] = NULL; } @@ -79,7 +75,7 @@ PollMgr::del_callback(int fd, poll_flag flag) bool PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) { - ScopedLock ml(&m_); + lock ml(m_); if (!callbacks_[fd] || callbacks_[fd]!=c) return false; @@ -95,10 +91,10 @@ PollMgr::wait_loop() while (1) { { - ScopedLock ml(&m_); + lock ml(m_); if (pending_change_) { pending_change_ = false; - VERIFY(pthread_cond_broadcast(&changedone_c_)==0); + changedone_c_.notify_all(); } } readable.clear(); @@ -137,19 +133,16 @@ SelectAIO::SelectAIO() : highfds_(0) int flags = fcntl(pipefd_[0], F_GETFL, NULL); flags |= O_NONBLOCK; fcntl(pipefd_[0], F_SETFL, flags); - - VERIFY(pthread_mutex_init(&m_, NULL) == 0); } SelectAIO::~SelectAIO() { - VERIFY(pthread_mutex_destroy(&m_) == 0); } void SelectAIO::watch_fd(int fd, poll_flag flag) { - ScopedLock ml(&m_); + lock ml(m_); if (highfds_ <= fd) highfds_ = fd; @@ -169,7 +162,7 @@ SelectAIO::watch_fd(int fd, poll_flag flag) bool SelectAIO::is_watched(int fd, poll_flag flag) { - ScopedLock ml(&m_); + lock ml(m_); if (flag == CB_RDONLY) { return FD_ISSET(fd,&rfds_); }else if (flag == CB_WRONLY) { @@ -182,7 +175,7 @@ SelectAIO::is_watched(int fd, poll_flag flag) bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { - ScopedLock ml(&m_); + lock ml(m_); if (flag == CB_RDONLY) { FD_CLR(fd, &rfds_); }else if (flag == CB_WRONLY) { @@ -221,11 +214,10 @@ SelectAIO::wait_ready(std::vector *readable, std::vector *writable) int high; { - ScopedLock ml(&m_); + lock ml(m_); trfds = rfds_; twfds = wfds_; high = highfds_; - } int ret = select(high+1, &trfds, &twfds, NULL, NULL); diff --git a/rpc/pollmgr.h b/rpc/pollmgr.h index c0a5748..89d1660 100644 --- a/rpc/pollmgr.h +++ b/rpc/pollmgr.h @@ -3,6 +3,7 @@ #include #include +#include #ifdef __linux__ #include @@ -54,9 +55,9 @@ class PollMgr { static int useless; private: - pthread_mutex_t m_; - pthread_cond_t changedone_c_; - pthread_t th_; + std::mutex m_; + std::condition_variable changedone_c_; + std::thread th_; aio_callback *callbacks_[MAX_POLL_FDS]; aio_mgr *aio_; @@ -81,7 +82,7 @@ class SelectAIO : public aio_mgr { int highfds_; int pipefd_[2]; - pthread_mutex_t m_; + std::mutex m_; }; diff --git a/rpc/rpc.cc b/rpc/rpc.cc index cd03073..5de3984 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -38,14 +38,8 @@ object. A connection object is deleted only when the underlying connection is dead and the reference count reaches zero. - The previous version of the RPC library uses pthread_cancel* routines - to implement the deletion of rpcc and rpcs objects. The idea is to cancel - all active threads that might be holding a reference to an object before - deleting that object. However, pthread_cancel is not robust and there are - always bugs where outstanding references to deleted objects persist. - This version of the RPC library does not do pthread_cancel, but explicitly - joins exited threads to make sure no outstanding references exist before - deleting objects. + This version of the RPC library explicitly joins exited threads to make sure + no outstanding references exist before deleting objects. To delete a rpcc object safely, the users of the library must ensure that there are no outstanding calls on the rpcc object. @@ -55,24 +49,21 @@ 3. delete the dispatch thread pool which involves waiting for current active RPC handlers to finish. It is interesting how a thread pool can be deleted without using thread cancellation. The trick is to inject x "poison pills" for - a thread pool of x threads. Upon getting a poison pill instead of a normal + a thread pool of x threads. Upon getting a poison pill instead of a normal task, a worker thread will exit (and thread pool destructor waits to join all x exited worker threads). */ #include "rpc.h" -#include "method_thread.h" -#include "slock.h" #include #include #include -#include #include #include +#include "lock.h" #include "jsl_log.h" -#include "gettime.h" #include "lang/verify.h" const rpcc::TO rpcc::to_max = { 120000 }; @@ -81,363 +72,346 @@ const rpcc::TO rpcc::to_min = { 1000 }; rpcc::caller::caller(unsigned int xxid, unmarshall *xun) : xid(xxid), un(xun), done(false) { - VERIFY(pthread_mutex_init(&m,0) == 0); - VERIFY(pthread_cond_init(&c, 0) == 0); } rpcc::caller::~caller() { - VERIFY(pthread_mutex_destroy(&m) == 0); - VERIFY(pthread_cond_destroy(&c) == 0); } inline void set_rand_seed() { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - srandom((int)ts.tv_nsec^((int)getpid())); + auto now = std::chrono::time_point_cast(std::chrono::steady_clock::now()); + srandom((int)now.time_since_epoch().count()^((int)getpid())); } -rpcc::rpcc(sockaddr_in d, bool retrans) : - dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), - retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) +rpcc::rpcc(sockaddr_in d, bool retrans) : + dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), + retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) { - VERIFY(pthread_mutex_init(&m_, 0) == 0); - VERIFY(pthread_mutex_init(&chan_m_, 0) == 0); - VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0); - - if(retrans){ - set_rand_seed(); - clt_nonce_ = random(); - } else { - // special client nonce 0 means this client does not - // require at-most-once logic from the server - // because it uses tcp and never retries a failed connection - clt_nonce_ = 0; - } - - char *loss_env = getenv("RPC_LOSSY"); - if(loss_env != NULL){ - lossytest_ = atoi(loss_env); - } - - // xid starts with 1 and latest received reply starts with 0 - xid_rep_window_.push_back(0); - - jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", - clt_nonce_, lossytest_); + if(retrans){ + set_rand_seed(); + clt_nonce_ = random(); + } else { + // special client nonce 0 means this client does not + // require at-most-once logic from the server + // because it uses tcp and never retries a failed connection + clt_nonce_ = 0; + } + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + // xid starts with 1 and latest received reply starts with 0 + xid_rep_window_.push_back(0); + + jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", + clt_nonce_, lossytest_); } // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { - jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", - clt_nonce_, chan_?chan_->channo():-1); - if(chan_){ - chan_->closeconn(); - chan_->decref(); - } - VERIFY(calls_.size() == 0); - VERIFY(pthread_mutex_destroy(&m_) == 0); - VERIFY(pthread_mutex_destroy(&chan_m_) == 0); + jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", + clt_nonce_, chan_?chan_->channo():-1); + if(chan_){ + chan_->closeconn(); + chan_->decref(); + } + VERIFY(calls_.size() == 0); } int rpcc::bind(TO to) { - int r; - int ret = call(rpc_const::bind, 0, r, to); - if(ret == 0){ - ScopedLock ml(&m_); - bind_done_ = true; - srv_nonce_ = r; - } else { - jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", - inet_ntoa(dst_.sin_addr), ret); - } - return ret; + int r; + int ret = call(rpc_const::bind, 0, r, to); + if(ret == 0){ + lock ml(m_); + bind_done_ = true; + srv_nonce_ = r; + } else { + jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", + inet_ntoa(dst_.sin_addr), ret); + } + return ret; }; // Cancel all outstanding calls -void + void rpcc::cancel(void) { - ScopedLock ml(&m_); - printf("rpcc::cancel: force callers to fail\n"); - std::map::iterator iter; - for(iter = calls_.begin(); iter != calls_.end(); iter++){ - caller *ca = iter->second; + lock ml(m_); + printf("rpcc::cancel: force callers to fail\n"); + std::map::iterator iter; + for(iter = calls_.begin(); iter != calls_.end(); iter++){ + caller *ca = iter->second; - jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); - { - ScopedLock cl(&ca->m); - ca->done = true; - ca->intret = rpc_const::cancel_failure; - VERIFY(pthread_cond_signal(&ca->c) == 0); + jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); + { + lock cl(ca->m); + ca->done = true; + ca->intret = rpc_const::cancel_failure; + ca->c.notify_one(); + } } - } - while (calls_.size () > 0){ - destroy_wait_ = true; - VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0); - } - printf("rpcc::cancel: done\n"); + while (calls_.size () > 0){ + destroy_wait_ = true; + destroy_wait_c_.wait(ml); + } + printf("rpcc::cancel: done\n"); } int rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, - TO to) + TO to) { - caller ca(0, &rep); + caller ca(0, &rep); int xid_rep; - { - ScopedLock ml(&m_); + { + lock ml(m_); - if((proc != rpc_const::bind && !bind_done_) || - (proc == rpc_const::bind && bind_done_)){ - jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n"); - return rpc_const::bind_failure; - } + if((proc != rpc_const::bind && !bind_done_) || + (proc == rpc_const::bind && bind_done_)){ + jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n"); + return rpc_const::bind_failure; + } - if(destroy_wait_){ - return rpc_const::cancel_failure; - } + if(destroy_wait_){ + return rpc_const::cancel_failure; + } - ca.xid = xid_++; - calls_[ca.xid] = &ca; + ca.xid = xid_++; + calls_[ca.xid] = &ca; - req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, + req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()); - req.pack_req_header(h); + req.pack_req_header(h); xid_rep = xid_rep_window_.front(); - } - - TO curr_to; - struct timespec now, nextdeadline, finaldeadline; - - clock_gettime(CLOCK_REALTIME, &now); - add_timespec(now, to.to, &finaldeadline); - curr_to.to = to_min.to; - - bool transmit = true; - connection *ch = NULL; - - while (1){ - if(transmit){ - get_refconn(&ch); - if(ch){ - if(reachable_) { - request forgot; - { - ScopedLock ml(&m_); - if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) { - forgot = dup_req_; - dup_req_.clear(); - } - } - if (forgot.isvalid()) - ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); - ch->send(req.cstr(), req.size()); - } - else jsl_log(JSL_DBG_1, "not reachable\n"); - jsl_log(JSL_DBG_2, - "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", - clt_nonce_, proc, ca.xid, clt_nonce_); - } - transmit = false; // only send once on a given channel - } - - if(!finaldeadline.tv_sec) - break; - - clock_gettime(CLOCK_REALTIME, &now); - add_timespec(now, curr_to.to, &nextdeadline); - if(cmp_timespec(nextdeadline,finaldeadline) > 0){ - nextdeadline = finaldeadline; - finaldeadline.tv_sec = 0; - } - - { - ScopedLock cal(&ca.m); - while (!ca.done){ - jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); - if(pthread_cond_timedwait(&ca.c, &ca.m, - &nextdeadline) == ETIMEDOUT){ - jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); - break; - } - } - if(ca.done){ - jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n"); - break; - } - } - - if(retrans_ && (!ch || ch->isdead())){ - // since connection is dead, retransmit - // on the new connection - transmit = true; - } - curr_to.to <<= 1; - } - - { - // no locking of ca.m since only this thread changes ca.xid - ScopedLock ml(&m_); - calls_.erase(ca.xid); - // may need to update the xid again here, in case the - // packet times out before it's even sent by the channel. - // I don't think there's any harm in maybe doing it twice - update_xid_rep(ca.xid); - - if(destroy_wait_){ - VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0); - } - } - - if (ca.done && lossytest_) + } + + TO curr_to; + std::chrono::time_point finaldeadline = + std::chrono::steady_clock::now() + + std::chrono::milliseconds(to.to), + nextdeadline; + + curr_to.to = to_min.to; + + bool transmit = true; + connection *ch = NULL; + + while (1){ + if(transmit){ + get_refconn(&ch); + if(ch){ + if(reachable_) { + request forgot; + { + lock ml(m_); + if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) { + forgot = dup_req_; + dup_req_.clear(); + } + } + if (forgot.isvalid()) + ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); + ch->send(req.cstr(), req.size()); + } + else jsl_log(JSL_DBG_1, "not reachable\n"); + jsl_log(JSL_DBG_2, + "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", + clt_nonce_, proc, ca.xid, clt_nonce_); + } + transmit = false; // only send once on a given channel + } + + if(finaldeadline == std::chrono::time_point::min()) + break; + + nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to); + if(nextdeadline > finaldeadline) { + nextdeadline = finaldeadline; + finaldeadline = std::chrono::time_point::min(); + } + { - ScopedLock ml(&m_); - if (!dup_req_.isvalid()) { - dup_req_.buf.assign(req.cstr(), req.size()); - dup_req_.xid = ca.xid; + lock cal(ca.m); + while (!ca.done){ + jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); + if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){ + jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); + break; } - if (xid_rep > xid_rep_done_) - xid_rep_done_ = xid_rep; + } + if(ca.done){ + jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n"); + break; + } } - ScopedLock cal(&ca.m); + if(retrans_ && (!ch || ch->isdead())){ + // since connection is dead, retransmit + // on the new connection + transmit = true; + } + curr_to.to <<= 1; + } - jsl_log(JSL_DBG_2, - "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", - clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), - ntohs(dst_.sin_port), ca.done, ca.intret); + { + // no locking of ca.m since only this thread changes ca.xid + lock ml(m_); + calls_.erase(ca.xid); + // may need to update the xid again here, in case the + // packet times out before it's even sent by the channel. + // I don't think there's any harm in maybe doing it twice + update_xid_rep(ca.xid); + + if(destroy_wait_){ + destroy_wait_c_.notify_one(); + } + } + + if (ca.done && lossytest_) + { + lock ml(m_); + if (!dup_req_.isvalid()) { + dup_req_.buf.assign(req.cstr(), req.size()); + dup_req_.xid = ca.xid; + } + if (xid_rep > xid_rep_done_) + xid_rep_done_ = xid_rep; + } - if(ch) - ch->decref(); + lock cal(ca.m); - // destruction of req automatically frees its buffer - return (ca.done? ca.intret : rpc_const::timeout_failure); + jsl_log(JSL_DBG_2, + "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", + clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), + ntohs(dst_.sin_port), ca.done, ca.intret); + + if(ch) + ch->decref(); + + // destruction of req automatically frees its buffer + return (ca.done? ca.intret : rpc_const::timeout_failure); } void rpcc::get_refconn(connection **ch) { - ScopedLock ml(&chan_m_); - if(!chan_ || chan_->isdead()){ - if(chan_) - chan_->decref(); - chan_ = connect_to_dst(dst_, this, lossytest_); - } - if(ch && chan_){ - if(*ch){ - (*ch)->decref(); - } - *ch = chan_; - (*ch)->incref(); - } + lock ml(chan_m_); + if(!chan_ || chan_->isdead()){ + if(chan_) + chan_->decref(); + chan_ = connect_to_dst(dst_, this, lossytest_); + } + if(ch && chan_){ + if(*ch){ + (*ch)->decref(); + } + *ch = chan_; + (*ch)->incref(); + } } -// PollMgr's thread is being used to -// make this upcall from connection object to rpcc. +// PollMgr's thread is being used to +// make this upcall from connection object to rpcc. // this funtion must not block. // -// this function keeps no reference for connection *c +// this function keeps no reference for connection *c bool rpcc::got_pdu(connection *c, char *b, int sz) { - unmarshall rep(b, sz); - reply_header h; - rep.unpack_reply_header(&h); - - if(!rep.ok()){ - jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n"); - return true; - } - - ScopedLock ml(&m_); - - update_xid_rep(h.xid); - - if(calls_.find(h.xid) == calls_.end()){ - jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid); - return true; - } - caller *ca = calls_[h.xid]; - - ScopedLock cl(&ca->m); - if(!ca->done){ - ca->un->take_in(rep); - ca->intret = h.ret; - if(ca->intret < 0){ - jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n", - h.xid, ca->intret); - } - ca->done = 1; - } - VERIFY(pthread_cond_broadcast(&ca->c) == 0); - return true; + unmarshall rep(b, sz); + reply_header h; + rep.unpack_reply_header(&h); + + if(!rep.ok()){ + jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n"); + return true; + } + + lock ml(m_); + + update_xid_rep(h.xid); + + if(calls_.find(h.xid) == calls_.end()){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid); + return true; + } + caller *ca = calls_[h.xid]; + + lock cl(ca->m); + if(!ca->done){ + ca->un->take_in(rep); + ca->intret = h.ret; + if(ca->intret < 0){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n", + h.xid, ca->intret); + } + ca->done = 1; + } + ca->c.notify_all(); + return true; } // assumes thread holds mutex m -void +void rpcc::update_xid_rep(unsigned int xid) { - std::list::iterator it; + std::list::iterator it; - if(xid <= xid_rep_window_.front()){ - return; - } + if(xid <= xid_rep_window_.front()){ + return; + } - for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ - if(*it > xid){ - xid_rep_window_.insert(it, xid); - goto compress; - } - } - xid_rep_window_.push_back(xid); + for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ + if(*it > xid){ + xid_rep_window_.insert(it, xid); + goto compress; + } + } + xid_rep_window_.push_back(xid); compress: - it = xid_rep_window_.begin(); - for (it++; it != xid_rep_window_.end(); it++){ - while (xid_rep_window_.front() + 1 == *it) - xid_rep_window_.pop_front(); - } + it = xid_rep_window_.begin(); + for (it++; it != xid_rep_window_.end(); it++){ + while (xid_rep_window_.front() + 1 == *it) + xid_rep_window_.pop_front(); + } } rpcs::rpcs(unsigned int p1, int count) : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) { - VERIFY(pthread_mutex_init(&procs_m_, 0) == 0); - VERIFY(pthread_mutex_init(&count_m_, 0) == 0); - VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0); - VERIFY(pthread_mutex_init(&conss_m_, 0) == 0); - - set_rand_seed(); - nonce_ = random(); - jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); + set_rand_seed(); + nonce_ = random(); + jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); - char *loss_env = getenv("RPC_LOSSY"); - if(loss_env != NULL){ - lossytest_ = atoi(loss_env); - } + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } - reg(rpc_const::bind, this, &rpcs::rpcbind); - dispatchpool_ = new ThrPool(6,false); + reg(rpc_const::bind, this, &rpcs::rpcbind); + dispatchpool_ = new ThrPool(6,false); - listener_ = new tcpsconn(this, port_, lossytest_); + listener_ = new tcpsconn(this, port_, lossytest_); } rpcs::~rpcs() { - // must delete listener before dispatchpool - delete listener_; - delete dispatchpool_; - free_reply_window(); + // must delete listener before dispatchpool + delete listener_; + delete dispatchpool_; + free_reply_window(); } bool @@ -448,149 +422,149 @@ rpcs::got_pdu(connection *c, char *b, int sz) return true; } - djob_t *j = new djob_t(c, b, sz); - c->incref(); - bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j); - if(!succ || !reachable_){ - c->decref(); - delete j; - } - return succ; + djob_t *j = new djob_t(c, b, sz); + c->incref(); + bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j); + if(!succ || !reachable_){ + c->decref(); + delete j; + } + return succ; } void rpcs::reg1(unsigned int proc, handler *h) { - ScopedLock pl(&procs_m_); - VERIFY(procs_.count(proc) == 0); - procs_[proc] = h; - VERIFY(procs_.count(proc) >= 1); + lock pl(procs_m_); + VERIFY(procs_.count(proc) == 0); + procs_[proc] = h; + VERIFY(procs_.count(proc) >= 1); } void rpcs::updatestat(unsigned int proc) { - ScopedLock cl(&count_m_); - counts_[proc]++; - curr_counts_--; - if(curr_counts_ == 0){ - std::map::iterator i; - printf("RPC STATS: "); - for (i = counts_.begin(); i != counts_.end(); i++){ - printf("%x:%d ", i->first, i->second); - } - printf("\n"); - - ScopedLock rwl(&reply_window_m_); - std::map >::iterator clt; - - unsigned int totalrep = 0, maxrep = 0; - for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ - totalrep += clt->second.size(); - if(clt->second.size() > maxrep) - maxrep = clt->second.size(); - } - jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", + lock cl(count_m_); + counts_[proc]++; + curr_counts_--; + if(curr_counts_ == 0){ + std::map::iterator i; + printf("RPC STATS: "); + for (i = counts_.begin(); i != counts_.end(); i++){ + printf("%x:%d ", i->first, i->second); + } + printf("\n"); + + lock rwl(reply_window_m_); + std::map >::iterator clt; + + unsigned int totalrep = 0, maxrep = 0; + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + totalrep += clt->second.size(); + if(clt->second.size() > maxrep) + maxrep = clt->second.size(); + } + jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", (int) reply_window_.size()-1, totalrep, maxrep); - curr_counts_ = counting_; - } + curr_counts_ = counting_; + } } void rpcs::dispatch(djob_t *j) { - connection *c = j->conn; - unmarshall req(j->buf, j->sz); - delete j; - - req_header h; - req.unpack_req_header(&h); - int proc = h.proc; - - if(!req.ok()){ - jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); - c->decref(); - return; - } - - jsl_log(JSL_DBG_2, - "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n", - h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); - - marshall rep; - reply_header rh(h.xid,0); - - // is client sending to an old instance of server? - if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ - jsl_log(JSL_DBG_2, - "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n", - h.srv_nonce, nonce_, h.proc); - rh.ret = rpc_const::oldsrv_failure; - rep.pack_reply_header(rh); - c->send(rep.cstr(),rep.size()); - return; - } - - handler *f; - // is RPC proc a registered procedure? - { - ScopedLock pl(&procs_m_); - if(procs_.count(proc) < 1){ - fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n", - proc); - c->decref(); + connection *c = j->conn; + unmarshall req(j->buf, j->sz); + delete j; + + req_header h; + req.unpack_req_header(&h); + int proc = h.proc; + + if(!req.ok()){ + jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); + c->decref(); + return; + } + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n", + h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); + + marshall rep; + reply_header rh(h.xid,0); + + // is client sending to an old instance of server? + if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n", + h.srv_nonce, nonce_, h.proc); + rh.ret = rpc_const::oldsrv_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + return; + } + + handler *f; + // is RPC proc a registered procedure? + { + lock pl(procs_m_); + if(procs_.count(proc) < 1){ + fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n", + proc); + c->decref(); VERIFY(0); - return; - } - - f = procs_[proc]; - } - - rpcs::rpcstate_t stat; - char *b1; - int sz1; - - if(h.clt_nonce){ - // have i seen this client before? - { - ScopedLock rwl(&reply_window_m_); - // if we don't know about this clt_nonce, create a cleanup object - if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ - VERIFY (reply_window_[h.clt_nonce].size() == 0); // create + return; + } + + f = procs_[proc]; + } + + rpcs::rpcstate_t stat; + char *b1; + int sz1; + + if(h.clt_nonce){ + // have i seen this client before? + { + lock rwl(reply_window_m_); + // if we don't know about this clt_nonce, create a cleanup object + if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ + VERIFY (reply_window_[h.clt_nonce].size() == 0); // create reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid - jsl_log(JSL_DBG_2, - "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", - h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1); - } - } - - // save the latest good connection to the client - { - ScopedLock rwl(&conss_m_); - if(conns_.find(h.clt_nonce) == conns_.end()){ - c->incref(); - conns_[h.clt_nonce] = c; - } else if(conns_[h.clt_nonce]->compare(c) < 0){ - conns_[h.clt_nonce]->decref(); - c->incref(); - conns_[h.clt_nonce] = c; - } - } - - stat = checkduplicate_and_update(h.clt_nonce, h.xid, + jsl_log(JSL_DBG_2, + "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", + h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1); + } + } + + // save the latest good connection to the client + { + lock rwl(conss_m_); + if(conns_.find(h.clt_nonce) == conns_.end()){ + c->incref(); + conns_[h.clt_nonce] = c; + } else if(conns_[h.clt_nonce]->compare(c) < 0){ + conns_[h.clt_nonce]->decref(); + c->incref(); + conns_[h.clt_nonce] = c; + } + } + + stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, &b1, &sz1); - } else { - // this client does not require at most once logic - stat = NEW; - } - - switch (stat){ - case NEW: // new request - if(counting_){ - updatestat(proc); - } - - rh.ret = f->fn(req, rep); + } else { + // this client does not require at most once logic + stat = NEW; + } + + switch (stat){ + case NEW: // new request + if(counting_){ + updatestat(proc); + } + + rh.ret = f->fn(req, rep); if (rh.ret == rpc_const::unmarshal_args_failure) { fprintf(stderr, "rpcs::dispatch: failed to" " unmarshall the arguments. You are" @@ -598,50 +572,50 @@ rpcs::dispatch(djob_t *j) " types of arguments.\n", proc); VERIFY(0); } - VERIFY(rh.ret >= 0); - - rep.pack_reply_header(rh); - rep.take_buf(&b1,&sz1); - - jsl_log(JSL_DBG_2, - "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n", - sz1, h.xid, proc, rh.ret, h.clt_nonce); - - if(h.clt_nonce > 0){ - // only record replies for clients that require at-most-once logic - add_reply(h.clt_nonce, h.xid, b1, sz1); - } - - // get the latest connection to the client - { - ScopedLock rwl(&conss_m_); - if(c->isdead() && c != conns_[h.clt_nonce]){ - c->decref(); - c = conns_[h.clt_nonce]; - c->incref(); - } - } - - c->send(b1, sz1); - if(h.clt_nonce == 0){ - // reply is not added to at-most-once window, free it - free(b1); - } - break; - case INPROGRESS: // server is working on this request - break; - case DONE: // duplicate and we still have the response - c->send(b1, sz1); - break; - case FORGOTTEN: // very old request and we don't have the response anymore - jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", - h.xid, h.clt_nonce); - rh.ret = rpc_const::atmostonce_failure; - rep.pack_reply_header(rh); - c->send(rep.cstr(),rep.size()); - break; - } - c->decref(); + VERIFY(rh.ret >= 0); + + rep.pack_reply_header(rh); + rep.take_buf(&b1,&sz1); + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n", + sz1, h.xid, proc, rh.ret, h.clt_nonce); + + if(h.clt_nonce > 0){ + // only record replies for clients that require at-most-once logic + add_reply(h.clt_nonce, h.xid, b1, sz1); + } + + // get the latest connection to the client + { + lock rwl(conss_m_); + if(c->isdead() && c != conns_[h.clt_nonce]){ + c->decref(); + c = conns_[h.clt_nonce]; + c->incref(); + } + } + + c->send(b1, sz1); + if(h.clt_nonce == 0){ + // reply is not added to at-most-once window, free it + free(b1); + } + break; + case INPROGRESS: // server is working on this request + break; + case DONE: // duplicate and we still have the response + c->send(b1, sz1); + break; + case FORGOTTEN: // very old request and we don't have the response anymore + jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", + h.xid, h.clt_nonce); + rh.ret = rpc_const::atmostonce_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + break; + } + c->decref(); } // rpcs::dispatch calls this when an RPC request arrives. @@ -658,11 +632,11 @@ rpcs::dispatch(djob_t *j) // INPROGRESS: seen this xid, and still processing it. // DONE: seen this xid, previous reply returned in *b and *sz. // FORGOTTEN: might have seen this xid, but deleted previous reply. -rpcs::rpcstate_t +rpcs::rpcstate_t rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, - unsigned int xid_rep, char **b, int *sz) + unsigned int xid_rep, char **b, int *sz) { - ScopedLock rwl(&reply_window_m_); + lock rwl(reply_window_m_); std::list &l = reply_window_[clt_nonce]; @@ -711,13 +685,13 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, // rpcs::dispatch calls add_reply when it is sending a reply to an RPC, // and passes the return value in b and sz. // add_reply() should remember b and sz. -// free_reply_window() and checkduplicate_and_update is responsible for +// free_reply_window() and checkduplicate_and_update is responsible for // calling free(b). void rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, - char *b, int sz) + char *b, int sz) { - ScopedLock rwl(&reply_window_m_); + lock rwl(reply_window_m_); // remember the RPC reply value std::list &l = reply_window_[clt_nonce]; std::list::iterator it = l.begin(); @@ -735,353 +709,310 @@ rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, void rpcs::free_reply_window(void) { - std::map >::iterator clt; - std::list::iterator it; + std::map >::iterator clt; + std::list::iterator it; - ScopedLock rwl(&reply_window_m_); - for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ - for (it = clt->second.begin(); it != clt->second.end(); it++){ - if (it->cb_present) + lock rwl(reply_window_m_); + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + for (it = clt->second.begin(); it != clt->second.end(); it++){ + if (it->cb_present) free(it->buf); - } - clt->second.clear(); - } - reply_window_.clear(); + } + clt->second.clear(); + } + reply_window_.clear(); } // rpc handler -int +int rpcs::rpcbind(int a, int &r) { - jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); - r = nonce_; - return 0; + jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); + r = nonce_; + return 0; } void marshall::rawbyte(unsigned char x) { - if(_ind >= _capa){ - _capa *= 2; - VERIFY (_buf != NULL); - _buf = (char *)realloc(_buf, _capa); - VERIFY(_buf); - } - _buf[_ind++] = x; + if(_ind >= _capa){ + _capa *= 2; + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + _buf[_ind++] = x; } void marshall::rawbytes(const char *p, int n) { - if((_ind+n) > _capa){ - _capa = _capa > n? 2*_capa:(_capa+n); - VERIFY (_buf != NULL); - _buf = (char *)realloc(_buf, _capa); - VERIFY(_buf); - } - memcpy(_buf+_ind, p, n); - _ind += n; + if((_ind+n) > _capa){ + _capa = _capa > n? 2*_capa:(_capa+n); + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + memcpy(_buf+_ind, p, n); + _ind += n; } marshall & operator<<(marshall &m, bool x) { - m.rawbyte(x); - return m; + m.rawbyte(x); + return m; } marshall & operator<<(marshall &m, unsigned char x) { - m.rawbyte(x); - return m; + m.rawbyte(x); + return m; } marshall & operator<<(marshall &m, char x) { - m << (unsigned char) x; - return m; + m << (unsigned char) x; + return m; } marshall & operator<<(marshall &m, unsigned short x) { - m.rawbyte((x >> 8) & 0xff); - m.rawbyte(x & 0xff); - return m; + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; } marshall & operator<<(marshall &m, short x) { - m << (unsigned short) x; - return m; + m << (unsigned short) x; + return m; } marshall & operator<<(marshall &m, unsigned int x) { - // network order is big-endian - m.rawbyte((x >> 24) & 0xff); - m.rawbyte((x >> 16) & 0xff); - m.rawbyte((x >> 8) & 0xff); - m.rawbyte(x & 0xff); - return m; + // network order is big-endian + m.rawbyte((x >> 24) & 0xff); + m.rawbyte((x >> 16) & 0xff); + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; } marshall & operator<<(marshall &m, int x) { - m << (unsigned int) x; - return m; + m << (unsigned int) x; + return m; } marshall & operator<<(marshall &m, const std::string &s) { - m << (unsigned int) s.size(); - m.rawbytes(s.data(), s.size()); - return m; + m << (unsigned int) s.size(); + m.rawbytes(s.data(), s.size()); + return m; } marshall & operator<<(marshall &m, unsigned long long x) { - m << (unsigned int) (x >> 32); - m << (unsigned int) x; - return m; + m << (unsigned int) (x >> 32); + m << (unsigned int) x; + return m; } void marshall::pack(int x) { - rawbyte((x >> 24) & 0xff); - rawbyte((x >> 16) & 0xff); - rawbyte((x >> 8) & 0xff); - rawbyte(x & 0xff); + rawbyte((x >> 24) & 0xff); + rawbyte((x >> 16) & 0xff); + rawbyte((x >> 8) & 0xff); + rawbyte(x & 0xff); } void unmarshall::unpack(int *x) { - (*x) = (rawbyte() & 0xff) << 24; - (*x) |= (rawbyte() & 0xff) << 16; - (*x) |= (rawbyte() & 0xff) << 8; - (*x) |= rawbyte() & 0xff; + (*x) = (rawbyte() & 0xff) << 24; + (*x) |= (rawbyte() & 0xff) << 16; + (*x) |= (rawbyte() & 0xff) << 8; + (*x) |= rawbyte() & 0xff; } // take the contents from another unmarshall object void unmarshall::take_in(unmarshall &another) { - if(_buf) - free(_buf); - another.take_buf(&_buf, &_sz); - _ind = RPC_HEADER_SZ; - _ok = _sz >= RPC_HEADER_SZ?true:false; + if(_buf) + free(_buf); + another.take_buf(&_buf, &_sz); + _ind = RPC_HEADER_SZ; + _ok = _sz >= RPC_HEADER_SZ?true:false; } bool unmarshall::okdone() { - if(ok() && _ind == _sz){ - return true; - } else { - return false; - } + if(ok() && _ind == _sz){ + return true; + } else { + return false; + } } unsigned int unmarshall::rawbyte() { - char c = 0; - if(_ind >= _sz) - _ok = false; - else - c = _buf[_ind++]; - return c; + char c = 0; + if(_ind >= _sz) + _ok = false; + else + c = _buf[_ind++]; + return c; } unmarshall & operator>>(unmarshall &u, bool &x) { - x = (bool) u.rawbyte() ; - return u; + x = (bool) u.rawbyte() ; + return u; } unmarshall & operator>>(unmarshall &u, unsigned char &x) { - x = (unsigned char) u.rawbyte() ; - return u; + x = (unsigned char) u.rawbyte() ; + return u; } unmarshall & operator>>(unmarshall &u, char &x) { - x = (char) u.rawbyte(); - return u; + x = (char) u.rawbyte(); + return u; } unmarshall & operator>>(unmarshall &u, unsigned short &x) { - x = (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; } unmarshall & operator>>(unmarshall &u, short &x) { - x = (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; } unmarshall & operator>>(unmarshall &u, unsigned int &x) { - x = (u.rawbyte() & 0xff) << 24; - x |= (u.rawbyte() & 0xff) << 16; - x |= (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; } unmarshall & operator>>(unmarshall &u, int &x) { - x = (u.rawbyte() & 0xff) << 24; - x |= (u.rawbyte() & 0xff) << 16; - x |= (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; } unmarshall & operator>>(unmarshall &u, unsigned long long &x) { - unsigned int h, l; - u >> h; - u >> l; - x = l | ((unsigned long long) h << 32); - return u; + unsigned int h, l; + u >> h; + u >> l; + x = l | ((unsigned long long) h << 32); + return u; } unmarshall & operator>>(unmarshall &u, std::string &s) { - unsigned sz; - u >> sz; - if(u.ok()) - u.rawbytes(s, sz); - return u; + unsigned sz; + u >> sz; + if(u.ok()) + u.rawbytes(s, sz); + return u; } void unmarshall::rawbytes(std::string &ss, unsigned int n) { - if((_ind+n) > (unsigned)_sz){ - _ok = false; - } else { - std::string tmps = std::string(_buf+_ind, n); - swap(ss, tmps); - VERIFY(ss.size() == n); - _ind += n; - } + if((_ind+n) > (unsigned)_sz){ + _ok = false; + } else { + std::string tmps = std::string(_buf+_ind, n); + swap(ss, tmps); + VERIFY(ss.size() == n); + _ind += n; + } } bool operator<(const sockaddr_in &a, const sockaddr_in &b){ - return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || - ((a.sin_addr.s_addr == b.sin_addr.s_addr) && - ((a.sin_port < b.sin_port)))); + return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || + ((a.sin_addr.s_addr == b.sin_addr.s_addr) && + ((a.sin_port < b.sin_port)))); } /*---------------auxilary function--------------*/ void make_sockaddr(const char *hostandport, struct sockaddr_in *dst){ - char host[200]; - const char *localhost = "127.0.0.1"; - const char *port = index(hostandport, ':'); - if(port == NULL){ - memcpy(host, localhost, strlen(localhost)+1); - port = hostandport; - } else { - memcpy(host, hostandport, port-hostandport); - host[port-hostandport] = '\0'; - port++; - } + char host[200]; + const char *localhost = "127.0.0.1"; + const char *port = index(hostandport, ':'); + if(port == NULL){ + memcpy(host, localhost, strlen(localhost)+1); + port = hostandport; + } else { + memcpy(host, hostandport, port-hostandport); + host[port-hostandport] = '\0'; + port++; + } - make_sockaddr(host, port, dst); + make_sockaddr(host, port, dst); } void make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){ - in_addr_t a; - - bzero(dst, sizeof(*dst)); - dst->sin_family = AF_INET; - - a = inet_addr(host); - if(a != INADDR_NONE){ - dst->sin_addr.s_addr = a; - } else { - struct hostent *hp = gethostbyname(host); - if(hp == 0 || hp->h_length != 4){ - fprintf(stderr, "cannot find host name %s\n", host); - exit(1); - } - dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; - } - dst->sin_port = htons(atoi(port)); -} + in_addr_t a; -int -cmp_timespec(const struct timespec &a, const struct timespec &b) -{ - if(a.tv_sec > b.tv_sec) - return 1; - else if(a.tv_sec < b.tv_sec) - return -1; - else { - if(a.tv_nsec > b.tv_nsec) - return 1; - else if(a.tv_nsec < b.tv_nsec) - return -1; - else - return 0; - } -} + bzero(dst, sizeof(*dst)); + dst->sin_family = AF_INET; -void -add_timespec(const struct timespec &a, int b, struct timespec *result) -{ - // convert to millisec, add timeout, convert back - result->tv_sec = a.tv_sec + b/1000; - result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000; - VERIFY(result->tv_nsec >= 0); - while (result->tv_nsec > 1000000000){ - result->tv_sec++; - result->tv_nsec-=1000000000; - } -} - -int -diff_timespec(const struct timespec &end, const struct timespec &start) -{ - int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0; - VERIFY(diff || end.tv_sec == start.tv_sec); - if(end.tv_nsec > start.tv_nsec){ - diff += (end.tv_nsec-start.tv_nsec)/1000000; - } else { - diff -= (start.tv_nsec-end.tv_nsec)/1000000; - } - return diff; + a = inet_addr(host); + if(a != INADDR_NONE){ + dst->sin_addr.s_addr = a; + } else { + struct hostent *hp = gethostbyname(host); + if(hp == 0 || hp->h_length != 4){ + fprintf(stderr, "cannot find host name %s\n", host); + exit(1); + } + dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; + } + dst->sin_port = htons(atoi(port)); } diff --git a/rpc/rpc.h b/rpc/rpc.h index fc61236..723121c 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -43,8 +43,8 @@ class rpcc : public chanmgr { unmarshall *un; int intret; bool done; - pthread_mutex_t m; - pthread_cond_t c; + std::mutex m; + std::condition_variable c; }; void get_refconn(connection **ch); @@ -62,11 +62,11 @@ class rpcc : public chanmgr { connection *chan_; - pthread_mutex_t m_; // protect insert/delete to calls[] - pthread_mutex_t chan_m_; + std::mutex m_; // protect insert/delete to calls[] + std::mutex chan_m_; bool destroy_wait_; - pthread_cond_t destroy_wait_c_; + std::condition_variable destroy_wait_c_; std::map calls_; std::list xid_rep_window_; @@ -328,10 +328,10 @@ class rpcs : public chanmgr { // map proc # to function std::map procs_; - pthread_mutex_t procs_m_; // protect insert/delete to procs[] - pthread_mutex_t count_m_; //protect modification of counts - pthread_mutex_t reply_window_m_; // protect reply window et al - pthread_mutex_t conss_m_; // protect conns_ + std::mutex procs_m_; // protect insert/delete to procs[] + std::mutex count_m_; //protect modification of counts + std::mutex reply_window_m_; // protect reply window et al + std::mutex conss_m_; // protect conns_ protected: @@ -626,8 +626,4 @@ void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); void make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst); -int cmp_timespec(const struct timespec &a, const struct timespec &b); -void add_timespec(const struct timespec &a, int b, struct timespec *result); -int diff_timespec(const struct timespec &a, const struct timespec &b); - #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 4c66e43..d90e494 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -10,7 +10,6 @@ #include #include #include "jsl_log.h" -#include "gettime.h" #include "lang/verify.h" #define NUM_CL 2 @@ -19,7 +18,6 @@ rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object struct sockaddr_in dst; //server's ip address int port; -pthread_attr_t attr; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers @@ -112,12 +110,11 @@ testmarshall() VERIFY(i1==i && l1==l && s1==s); } -void * -client1(void *xx) +void +client1(int cl) { - // test concurrency. - int which_cl = ((unsigned long) xx ) % NUM_CL; + int which_cl = ((unsigned long) cl ) % NUM_CL; for(int i = 0; i < 100; i++){ int arg = (random() % 2000); @@ -137,25 +134,22 @@ client1(void *xx) int arg = (random() % 1000); int rep; - struct timespec start,end; - clock_gettime(CLOCK_REALTIME, &start); + auto start = std::chrono::steady_clock::now(); int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep); - clock_gettime(CLOCK_REALTIME, &end); - int diff = diff_timespec(end, start); + auto end = std::chrono::steady_clock::now(); + int diff = std::chrono::duration_cast(end - start).count(); if (ret != 0) printf("%d ms have elapsed!!!\n", diff); VERIFY(ret == 0); VERIFY(rep == (which ? arg+1 : arg+2)); } - - return 0; } -void * -client2(void *xx) +void +client2(int cl) { - int which_cl = ((unsigned long) xx ) % NUM_CL; + int which_cl = ((unsigned long) cl ) % NUM_CL; time_t t1; time(&t1); @@ -170,10 +164,9 @@ client2(void *xx) } VERIFY((int)rep.size() == arg); } - return 0; } -void * +void client3(void *xx) { rpcc *c = (rpcc *) xx; @@ -183,7 +176,6 @@ client3(void *xx) int ret = c->call(24, i, rep, rpcc::to(3000)); VERIFY(ret == rpc_const::timeout_failure || rep == i+2); } - return 0; } @@ -267,18 +259,15 @@ concurrent_test(int nt) // create threads that make lots of calls in parallel, // to test thread synchronization for concurrent calls // and dispatches. - int ret; - printf("start concurrent_test (%d threads) ...", nt); - pthread_t th[nt]; + std::vector th(nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i); - VERIFY(ret == 0); + th[i] = std::thread(client1, i); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf(" OK\n"); } @@ -286,8 +275,6 @@ concurrent_test(int nt) void lossy_test() { - int ret; - printf("start lossy_test ..."); VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); @@ -303,13 +290,12 @@ lossy_test() } int nt = 1; - pthread_t th[nt]; + std::vector th(nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i); - VERIFY(ret == 0); + th[i] = std::thread(client2, i); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf(".. OK\n"); VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); @@ -352,17 +338,15 @@ failure_test() int nt = 10; - int ret; printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); - pthread_t th[nt]; + std::vector th(nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client3, (void *) client); - VERIFY(ret == 0); + th[i] = std::thread(client3, client); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf("ok\n"); @@ -376,12 +360,11 @@ failure_test() printf(" -- concurrent test on new client and server w/ %d threads ..", nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client3, (void *)client); - VERIFY(ret == 0); + th[i] = std::thread(client3, client); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf("ok\n"); @@ -436,12 +419,8 @@ main(int argc, char *argv[]) testmarshall(); - pthread_attr_init(&attr); - // set stack size to 32K, so we don't run out of memory - pthread_attr_setstacksize(&attr, 32*1024); - if (isserver) { - printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ); + printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ); startserver(); } diff --git a/rpc/slock.h b/rpc/slock.h deleted file mode 100644 index 7f419c4..0000000 --- a/rpc/slock.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef __SCOPED_LOCK__ -#define __SCOPED_LOCK__ - -#include -#include "lang/verify.h" -struct ScopedLock { - private: - pthread_mutex_t *m_; - public: - ScopedLock(pthread_mutex_t *m): m_(m) { - VERIFY(pthread_mutex_lock(m_)==0); - } - ~ScopedLock() { - VERIFY(pthread_mutex_unlock(m_)==0); - } -}; -struct ScopedUnlock { - private: - pthread_mutex_t *m_; - public: - ScopedUnlock(pthread_mutex_t *m): m_(m) { - VERIFY(pthread_mutex_unlock(m_)==0); - } - ~ScopedUnlock() { - VERIFY(pthread_mutex_lock(m_)==0); - } -}; -#endif /*__SCOPED_LOCK__*/ diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index f9f32fa..26226cd 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -1,10 +1,9 @@ -#include "slock.h" #include "thr_pool.h" #include #include #include "lang/verify.h" -static void * +static void do_worker(void *arg) { ThrPool *tp = (ThrPool *)arg; @@ -15,7 +14,6 @@ do_worker(void *arg) (void)(j.f)(j.a); } - pthread_exit(NULL); } //if blocking, then addJob() blocks when queue is full @@ -23,13 +21,8 @@ do_worker(void *arg) ThrPool::ThrPool(int sz, bool blocking) : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) { - pthread_attr_init(&attr_); - pthread_attr_setstacksize(&attr_, 128<<10); - for (int i = 0; i < sz; i++) { - pthread_t t; - VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0); - th_.push_back(t); + th_.push_back(std::thread(do_worker, this)); } } @@ -39,19 +32,17 @@ ThrPool::~ThrPool() { for (int i = 0; i < nthreads_; i++) { job_t j; - j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit + j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit jobq_.enq(j); } for (int i = 0; i < nthreads_; i++) { - VERIFY(pthread_join(th_[i], NULL)==0); + th_[i].join(); } - - VERIFY(pthread_attr_destroy(&attr_)==0); } bool -ThrPool::addJob(void *(*f)(void *), void *a) +ThrPool::addJob(void (*f)(void *), void *a) { job_t j; j.f = f; diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h index 5095961..4427aee 100644 --- a/rpc/thr_pool.h +++ b/rpc/thr_pool.h @@ -1,17 +1,15 @@ -#ifndef __THR_POOL__ -#define __THR_POOL__ +#ifndef thr_pool_h +#define thr_pool_h -#include #include +#include #include "fifo.h" class ThrPool { - - public: struct job_t { - void *(*f)(void *); //function point + void (*f)(void *); //function point void *a; //function arguments }; @@ -23,18 +21,17 @@ class ThrPool { bool takeJob(job_t *j); private: - pthread_attr_t attr_; int nthreads_; bool blockadd_; fifo jobq_; - std::vector th_; + std::vector th_; - bool addJob(void *(*f)(void *), void *a); + bool addJob(void (*f)(void *), void *a); }; - template bool +template bool ThrPool::addObjJob(C *o, void (C::*m)(A), A a) { @@ -43,14 +40,13 @@ ThrPool::addObjJob(C *o, void (C::*m)(A), A a) C *o; void (C::*m)(A a); A a; - static void *func(void *vvv) { + static void func(void *vvv) { objfunc_wrapper *x = (objfunc_wrapper*)vvv; C *o = x->o; void (C::*m)(A ) = x->m; A a = x->a; (o->*m)(a); delete x; - return 0; } }; diff --git a/rsm.cc b/rsm.cc index 6841e3d..478e0e0 100644 --- a/rsm.cc +++ b/rsm.cc @@ -89,18 +89,13 @@ #include "tprintf.h" #include "lang/verify.h" #include "rsm_client.h" - -static void *recoverythread(void *x) { - rsm *r = (rsm *) x; - r->recovery(); - return 0; -} +#include "lock.h" rsm::rsm(std::string _first, std::string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - pthread_t th; + std::thread th; last_myvs.vid = 0; last_myvs.seqno = 0; @@ -128,29 +123,32 @@ rsm::rsm(std::string _first, std::string _me) : testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq); { - ScopedLock ml(rsm_mutex); - VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0); + lock ml(rsm_mutex); + th = std::thread(&rsm::recovery, this); } } void rsm::reg1(int proc, handler *h) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); procs[proc] = h; } // The recovery thread runs this function void rsm::recovery() { bool r = true; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); while (1) { while (!cfg->ismember(cfg->myaddr(), vid_commit)) { + // XXX iannucci 2013/09/15 -- I don't understand whether accessing + // cfg->view_id in this manner involves a race. I suspect not. if (join(primary)) { tprintf("recovery: joined\n"); - commit_change_wo(cfg->vid()); + commit_change_wo(cfg->view_id()); } else { - ScopedUnlock su(rsm_mutex); - sleep (30); // XXX make another node in cfg primary? + ml.unlock(); + std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary? + ml.lock(); } } vid_insync = vid_commit; @@ -173,7 +171,7 @@ void rsm::recovery() { inviewchange = false; } tprintf("recovery: go to sleep %d %d\n", insync, inviewchange); - recovery_cond.wait(rsm_mutex); + recovery_cond.wait(ml); } } @@ -190,24 +188,26 @@ std::ostream & operator<<(std::ostream &o, const std::vector &d) { } bool rsm::sync_with_backups() { + adopt_lock ml(rsm_mutex); + ml.unlock(); { - ScopedUnlock su(rsm_mutex); // Make sure that the state of lock_server_cache_rsm is stable during // synchronization; otherwise, the primary's state may be more recent // than replicas after the synchronization. - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); // By acquiring and releasing the invoke_mutex once, we make sure that // the state of lock_server_cache_rsm will not be changed until all // replicas are synchronized. The reason is that client_invoke arrives // after this point of time will see inviewchange == true, and returns // BUSY. } + ml.lock(); // Start accepting synchronization request (statetransferreq) now! insync = true; - backups = std::vector(cfg->get_view(vid_insync)); + cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); LOG("rsm::sync_with_backups " << backups); - sync_cond.wait(rsm_mutex); + sync_cond.wait(ml); insync = false; return true; } @@ -237,12 +237,14 @@ bool rsm::statetransfer(std::string m) m.c_str(), last_myvs.vid, last_myvs.seqno); rpcc *cl; { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); cl = h.safebind(); if (cl) { ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(), last_myvs, vid_insync, r, rpcc::to(1000)); } + ml.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(), @@ -259,16 +261,18 @@ bool rsm::statetransfer(std::string m) } bool rsm::statetransferdone(std::string m) { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); handle h(m); rpcc *cl = h.safebind(); - if (!cl) - return false; - int r; - rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); - if (ret != rsm_protocol::OK) - return false; - return true; + bool done = false; + if (cl) { + int r; + rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); + done = (ret == rsm_protocol::OK); + } + ml.lock(); + return done; } @@ -281,12 +285,14 @@ bool rsm::join(std::string m) { last_myvs.seqno); rpcc *cl; { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); cl = h.safebind(); if (cl != 0) { ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs, r, rpcc::to(120000)); } + ml.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { @@ -304,7 +310,7 @@ bool rsm::join(std::string m) { * completed a view change */ void rsm::commit_change(unsigned vid) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); commit_change_wo(vid); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); @@ -318,8 +324,8 @@ void rsm::commit_change_wo(unsigned vid) { vid_commit = vid; inviewchange = true; set_primary(vid); - recovery_cond.signal(); - sync_cond.signal(); + recovery_cond.notify_one(); + sync_cond.notify_one(); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); } @@ -347,12 +353,12 @@ void rsm::execute(int procno, std::string req, std::string &r) { // rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) { LOG("rsm::client_invoke: procno 0x" << std::hex << procno); - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); std::vector m; std::string myaddr; viewstamp vs; { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); LOG("Checking for inviewchange"); if (inviewchange) return rsm_client_protocol::BUSY; @@ -361,7 +367,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; LOG("Assigning a viewstamp"); - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); // assign the RPC the next viewstamp number vs = myvs; myvs++; @@ -401,11 +407,11 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) { LOG("rsm::invoke: procno 0x" << std::hex << proc); - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); std::vector m; std::string myaddr; { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); // check if !inviewchange LOG("Checking for view change"); if (inviewchange) @@ -415,7 +421,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); if (find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number @@ -436,7 +442,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d */ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid, rsm_protocol::transferres &r) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); int ret = rsm_protocol::OK; tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); @@ -454,12 +460,12 @@ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned * for view vid */ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; backups.erase(find(backups.begin(), backups.end(), m)); if (backups.empty()) - sync_cond.signal(); + sync_cond.notify_one(); return rsm_protocol::OK; } @@ -469,7 +475,7 @@ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) { int ret = rsm_protocol::OK; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(), last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); if (cfg->ismember(m, vid_commit)) { @@ -484,10 +490,11 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j unsigned vid_cache = vid_commit; bool succ; { - ScopedUnlock su(rsm_mutex); + ml.unlock(); succ = cfg->add(m, vid_cache); + ml.lock(); } - if (cfg->ismember(m, cfg->vid())) { + if (cfg->ismember(m, cfg->view_id())) { r.log = cfg->dump(); tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str()); } else { @@ -505,8 +512,8 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j */ rsm_client_protocol::status rsm::client_members(int i, std::vector &r) { std::vector m; - ScopedLock ml(rsm_mutex); - m = cfg->get_view(vid_commit); + lock ml(rsm_mutex); + cfg->get_view(vid_commit, m); m.push_back(primary); r = m; tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(), @@ -518,8 +525,9 @@ rsm_client_protocol::status rsm::client_members(int i, std::vector // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - std::vector c = cfg->get_view(vid); - std::vector p = cfg->get_view(vid - 1); + std::vector c, p; + cfg->get_view(vid, c); + cfg->get_view(vid - 1, p); VERIFY (c.size() > 0); if (isamember(primary,c)) { @@ -539,7 +547,7 @@ void rsm::set_primary(unsigned vid) { } bool rsm::amiprimary() { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); return primary == cfg->myaddr() && !inviewchange; } @@ -551,7 +559,7 @@ bool rsm::amiprimary() { // assumes caller holds rsm_mutex void rsm::net_repair_wo(bool heal) { std::vector m; - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); @@ -563,7 +571,7 @@ void rsm::net_repair_wo(bool heal) { } rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", heal, dopartition, partitioned); if (heal) { @@ -603,7 +611,7 @@ void rsm::partition1() { rsm_test_protocol::status rsm::breakpointreq(int b, int &r) { r = rsm_test_protocol::OK; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("rsm::breakpointreq: %d\n", b); if (b == 1) break1 = true; else if (b == 2) break2 = true; diff --git a/rsm.h b/rsm.h index c5bf4fc..ec7632d 100644 --- a/rsm.h +++ b/rsm.h @@ -7,7 +7,7 @@ #include #include "rsm_protocol.h" #include "rsm_state_transfer.h" -#include "rpc.h" +#include "rpc/rpc.h" #include #include "config.h" @@ -51,10 +51,10 @@ class rsm : public config_view_change { rsm_test_protocol::status test_net_repairreq(int heal, int &r); rsm_test_protocol::status breakpointreq(int b, int &r); - mutex rsm_mutex; - mutex invoke_mutex; - cond recovery_cond; - cond sync_cond; + std::mutex rsm_mutex; + std::mutex invoke_mutex; + std::condition_variable recovery_cond; + std::condition_variable sync_cond; void execute(int procno, std::string req, std::string &r); rsm_client_protocol::status client_invoke(int procno, std::string req, diff --git a/rsm_client.cc b/rsm_client.cc index 69fec20..9beb0b3 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -5,19 +5,18 @@ #include #include #include "lang/verify.h" - +#include "lock.h" rsm_client::rsm_client(std::string dst) { printf("create rsm_client\n"); std::vector mems; - pthread_mutex_init(&rsm_client_mutex, NULL); sockaddr_in dstsock; make_sockaddr(dst.c_str(), &dstsock); primary = dst; { - ScopedLock ml(&rsm_client_mutex); + lock ml(rsm_client_mutex); VERIFY (init_members()); } printf("rsm_client: done\n"); @@ -31,16 +30,16 @@ void rsm_client::primary_failure() { rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) { int ret; - ScopedLock ml(&rsm_client_mutex); + lock ml(rsm_client_mutex); while (1) { printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str()); handle h(primary); - VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0); + ml.unlock(); rpcc *cl = h.safebind(); if (cl) ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000)); - VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0); + ml.lock(); if (!cl) goto prim_fail; @@ -71,14 +70,18 @@ prim_fail: bool rsm_client::init_members() { printf("rsm_client::init_members get members!\n"); handle h(primary); - VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0); int ret; - rpcc *cl = h.safebind(); - if (cl) { - ret = cl->call(rsm_client_protocol::members, 0, known_mems, - rpcc::to(1000)); + rpcc *cl; + { + adopt_lock ml(rsm_client_mutex); + ml.unlock(); + cl = h.safebind(); + if (cl) { + ret = cl->call(rsm_client_protocol::members, 0, known_mems, + rpcc::to(1000)); + } + ml.lock(); } - VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0); if (cl == 0 || ret != rsm_protocol::OK) return false; if (known_mems.size() < 1) { diff --git a/rsm_client.h b/rsm_client.h index 3219179..039bc26 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -1,7 +1,7 @@ #ifndef rsm_client_h #define rsm_client_h -#include "rpc.h" +#include "rpc/rpc.h" #include "rsm_protocol.h" #include #include @@ -19,7 +19,7 @@ class rsm_client { protected: std::string primary; std::vector known_mems; - pthread_mutex_t rsm_client_mutex; + std::mutex rsm_client_mutex; void primary_failure(); bool init_members(); public: diff --git a/rsm_protocol.h b/rsm_protocol.h index a27ef83..d479d5d 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -1,7 +1,7 @@ #ifndef rsm_protocol_h #define rsm_protocol_h -#include "rpc.h" +#include "rpc/rpc.h" class rsm_client_protocol { diff --git a/rsm_tester.cc b/rsm_tester.cc index 08034e2..f172602 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -4,37 +4,30 @@ #include "rsm_protocol.h" #include "rsmtest_client.h" -#include "rpc.h" +#include "rpc/rpc.h" #include #include #include #include #include -using namespace std; - -rsmtest_client *lc; int main(int argc, char *argv[]) { - int r; - - if(argc != 4){ - fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]); - exit(1); - } + if(argc != 4){ + fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]); + exit(1); + } - lc = new rsmtest_client(argv[1]); - string command(argv[2]); - if (command == "partition") { - r = lc->net_repair(atoi(argv[3])); - printf ("net_repair returned %d\n", r); - } else if (command == "breakpoint") { - int b = atoi(argv[3]); - r = lc->breakpoint(b); - printf ("breakpoint %d returned %d\n", b, r); - } else { - fprintf(stderr, "Unknown command %s\n", argv[2]); - } - exit(0); + rsmtest_client *lc = new rsmtest_client(argv[1]); + std::string command(argv[2]); + if (command == "partition") { + printf("net_repair returned %d\n", lc->net_repair(atoi(argv[3]))); + } else if (command == "breakpoint") { + int b = atoi(argv[3]); + printf("breakpoint %d returned %d\n", b, lc->breakpoint(b)); + } else { + fprintf(stderr, "Unknown command %s\n", argv[2]); + } + return 0; } diff --git a/rsmtest_client.cc b/rsmtest_client.cc index f008228..e27e8e5 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -1,7 +1,7 @@ // RPC stubs for clients to talk to rsmtest_server #include "rsmtest_client.h" -#include "rpc.h" +#include "rpc/rpc.h" #include #include @@ -10,30 +10,30 @@ rsmtest_client::rsmtest_client(std::string dst) { - sockaddr_in dstsock; - make_sockaddr(dst.c_str(), &dstsock); - cl = new rpcc(dstsock); - if (cl->bind() < 0) { - printf("rsmtest_client: call bind\n"); - } + sockaddr_in dstsock; + make_sockaddr(dst.c_str(), &dstsock); + cl = new rpcc(dstsock); + if (cl->bind() < 0) { + printf("rsmtest_client: call bind\n"); + } } int rsmtest_client::net_repair(int heal) { - int r; - int ret = cl->call(rsm_test_protocol::net_repair, heal, r); - VERIFY (ret == rsm_test_protocol::OK); - return r; + int r; + int ret = cl->call(rsm_test_protocol::net_repair, heal, r); + VERIFY (ret == rsm_test_protocol::OK); + return r; } int rsmtest_client::breakpoint(int b) { - int r; - int ret = cl->call(rsm_test_protocol::breakpoint, b, r); - VERIFY (ret == rsm_test_protocol::OK); - return r; + int r; + int ret = cl->call(rsm_test_protocol::breakpoint, b, r); + VERIFY (ret == rsm_test_protocol::OK); + return r; } diff --git a/rsmtest_client.h b/rsmtest_client.h index a175d9e..73a6cf1 100644 --- a/rsmtest_client.h +++ b/rsmtest_client.h @@ -5,16 +5,16 @@ #include #include "rsm_protocol.h" -#include "rpc.h" +#include "rpc/rpc.h" // Client interface to the rsmtest server class rsmtest_client { - protected: - rpcc *cl; - public: - rsmtest_client(std::string d); - virtual ~rsmtest_client() {}; - virtual rsm_test_protocol::status net_repair(int heal); - virtual rsm_test_protocol::status breakpoint(int b); + protected: + rpcc *cl; + public: + rsmtest_client(std::string d); + virtual ~rsmtest_client() {}; + virtual rsm_test_protocol::status net_repair(int heal); + virtual rsm_test_protocol::status breakpoint(int b); }; #endif diff --git a/srlock.cc b/srlock.cc deleted file mode 100644 index 8966527..0000000 --- a/srlock.cc +++ /dev/null @@ -1,16 +0,0 @@ -#include "srlock.h" - -ScopedRemoteLock::ScopedRemoteLock(lock_client *lc, lock_protocol::lockid_t lid) : - lc_(lc), lid_(lid) { - lc_->acquire(lid_); - releaseOnFree = true; -} - -void ScopedRemoteLock::retain() { - releaseOnFree = false; -} - -ScopedRemoteLock::~ScopedRemoteLock() { - if (releaseOnFree) - lc_->release(lid_); -} diff --git a/srlock.h b/srlock.h deleted file mode 100644 index 40bd8bd..0000000 --- a/srlock.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef srlock_h -#define srlock_h - -#include "lock_protocol.h" -#include "lock_client.h" - -class ScopedRemoteLock { - protected: - lock_client *lc_; - lock_protocol::lockid_t lid_; - bool releaseOnFree; - public: - ScopedRemoteLock(lock_client *, lock_protocol::lockid_t); - void retain(); - ~ScopedRemoteLock(); -}; - -#endif diff --git a/tprintf.cc b/tprintf.cc index 8f6c8a9..93a6070 100644 --- a/tprintf.cc +++ b/tprintf.cc @@ -1,16 +1,9 @@ -#include "mutex.h" #include #include #include "tprintf.h" -uint64_t utime() { - struct timeval tp; - gettimeofday(&tp, NULL); - return (tp.tv_usec + (uint64_t)tp.tv_sec * 1000000) % 1000000000; -} - -mutex cerr_mutex; -std::map thread_name_map; +std::mutex cerr_mutex; +std::map thread_name_map; int next_thread_num = 0; std::map instance_name_map; int next_instance_num = 0; diff --git a/tprintf.h b/tprintf.h index 6a0cb2a..c61626a 100644 --- a/tprintf.h +++ b/tprintf.h @@ -1,27 +1,27 @@ -#ifndef TPRINTF_H -#define TPRINTF_H +#ifndef tprintf_h +#define tprintf_h #include #include -#include "mutex.h" -#include #include #include +#include "lock.h" extern mutex cerr_mutex; -extern std::map thread_name_map; +extern std::map thread_name_map; extern int next_thread_num; extern std::map instance_name_map; extern int next_instance_num; extern char tprintf_thread_prefix; #define LOG_PREFIX { \ - cerr_mutex.acquire(); \ - pthread_t self = pthread_self(); \ + cerr_mutex.lock(); \ + auto self = std::this_thread::get_id(); \ int tid = thread_name_map[self]; \ if (tid==0) \ tid = thread_name_map[self] = ++next_thread_num; \ - std::cerr << std::left << std::setw(9) << utime() << " "; \ + auto utime = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \ + std::cerr << std::left << std::setw(9) << utime << " "; \ std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \ std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \ } @@ -32,7 +32,7 @@ extern char tprintf_thread_prefix; std::cerr << "#" << std::setw(2) << self; \ } #define LOG_SUFFIX { \ - cerr_mutex.release(); \ + cerr_mutex.unlock(); \ } #define LOG_NONMEMBER(x) { \ @@ -48,7 +48,7 @@ extern char tprintf_thread_prefix; } #define JOIN(from,to,sep) ({ \ ostringstream oss; \ - for(typeof(from) i=from;i!=to;i++) \ + for(auto i=from;i!=to;i++) \ oss << *i << sep; \ oss.str(); \ }) @@ -85,6 +85,4 @@ extern char tprintf_thread_prefix; LOG_NONMEMBER(buf); \ } -uint64_t utime(); - #endif