-CXXFLAGS = -g -MMD -Wall -I. -I./rpc
-LDFLAGS = -L. -L/usr/local/lib
-LDLIBS = -lpthread
-LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt)
-LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl)
+CXXFLAGS = -g -MMD -Werror -I. -std=c++11
+LDFLAGS =
CXX = g++
CC = g++
all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o gettime.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o rpc/jsl_log.o
rm -f $@
ar cq $@ $^
ranlib rpc/librpc.a
lock_demo=lock_demo.o lock_client.o
lock_demo : $(lock_demo) rpc/librpc.a
-lock_tester=lock_tester.o lock_client.o mutex.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
+lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
lock_tester : $(lock_tester) rpc/librpc.a
-lock_server=lock_server.o lock_smain.o mutex.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server=lock_server.o lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
lock_server : $(lock_server) rpc/librpc.a
rsm_tester=rsm_tester.o rsmtest_client.o
+#include <thread>
#include <sstream>
#include <iostream>
#include <stdio.h>
// all views, the other nodes can bring this re-joined node up to
// date.
-static void *
-heartbeatthread(void *x)
+config::config(
+ const std::string &_first,
+ const std::string &_me,
+ config_view_change *_vc)
+ : my_view_id(0), first(_first), me(_me), vc(_vc)
{
- config *r = (config *) x;
- r->heartbeater();
- return 0;
-}
-
-config::config(std::string _first, std::string _me, config_view_change *_vc)
- : myvid (0), first (_first), me (_me), vc (_vc)
-{
- VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
- VERIFY(pthread_cond_init(&config_cond, NULL) == 0);
-
- std::ostringstream ost;
- ost << me;
-
- acc = new acceptor(this, me == _first, me, ost.str());
- pro = new proposer(this, acc, me);
+ paxos_acceptor = new acceptor(this, me == _first, me, me);
+ paxos_proposer = new proposer(this, paxos_acceptor, me);
- // XXX hack; maybe should have its own port number
- pxsrpc = acc->get_rpcs();
- pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+ // XXX hack; maybe should have its own port number
+ paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
- {
- ScopedLock ml(&cfg_mutex);
-
- reconstruct();
-
- pthread_t th;
- VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
- }
+ {
+ lock ml(cfg_mutex);
+ reconstruct();
+ std::thread(&config::heartbeater, this).detach();
+ }
}
void
-config::restore(std::string s)
+config::restore(const std::string &s)
{
- ScopedLock ml(&cfg_mutex);
- acc->restore(s);
- reconstruct();
+ lock ml(cfg_mutex);
+ paxos_acceptor->restore(s);
+ reconstruct();
}
-std::vector<std::string>
-config::get_view(unsigned instance)
+void
+config::get_view(unsigned instance, std::vector<std::string> &m)
{
- ScopedLock ml(&cfg_mutex);
- return get_view_wo(instance);
+ lock ml(cfg_mutex);
+ get_view_wo(instance, m);
}
// caller should hold cfg_mutex
-std::vector<std::string>
-config::get_view_wo(unsigned instance)
+void
+config::get_view_wo(unsigned instance, std::vector<std::string> &m)
{
- std::string value = acc->value(instance);
- tprintf("get_view(%d): returns %s\n", instance, value.c_str());
- return members(value);
+ std::string value = paxos_acceptor->value(instance);
+ tprintf("get_view(%d): returns %s\n", instance, value.c_str());
+ members(value, m);
}
-std::vector<std::string>
-config::members(std::string value)
+void
+config::members(const std::string &value, std::vector<std::string> &view) const
{
- std::istringstream ist(value);
- std::string m;
- std::vector<std::string> view;
- while (ist >> m) {
- view.push_back(m);
- }
- return view;
+ std::istringstream ist(value);
+ std::string m;
+ view.clear();
+ while (ist >> m) {
+ view.push_back(m);
+ }
}
std::string
-config::value(std::vector<std::string> m)
+config::value(const std::vector<std::string> &m) const
{
- std::ostringstream ost;
- for (unsigned i = 0; i < m.size(); i++) {
- ost << m[i];
- ost << " ";
- }
- return ost.str();
+ std::ostringstream ost;
+ for (unsigned i = 0; i < m.size(); i++) {
+ ost << m[i];
+ ost << " ";
+ }
+ return ost.str();
}
// caller should hold cfg_mutex
void
config::reconstruct()
{
- if (acc->instance() > 0) {
- std::string m;
- myvid = acc->instance();
- mems = get_view_wo(myvid);
- tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
- }
+ if (paxos_acceptor->instance() > 0) {
+ std::string m;
+ my_view_id = paxos_acceptor->instance();
+ get_view_wo(my_view_id, mems);
+ tprintf("config::reconstruct: %d %s\n",
+ my_view_id, print_members(mems).c_str());
+ }
}
// Called by Paxos's acceptor.
void
-config::paxos_commit(unsigned instance, std::string value)
+config::paxos_commit(unsigned instance, const std::string &value)
{
- std::string m;
- std::vector<std::string> newmem;
- ScopedLock ml(&cfg_mutex);
-
- newmem = members(value);
- tprintf("config::paxos_commit: %d: %s\n", instance,
- print_members(newmem).c_str());
-
- for (unsigned i = 0; i < mems.size(); i++) {
- tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
- if (!isamember(mems[i], newmem) && me != mems[i]) {
- tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
- mgr.delete_handle(mems[i]);
+ std::string m;
+ std::vector<std::string> newmem;
+ lock ml(cfg_mutex);
+
+ members(value, newmem);
+ tprintf("config::paxos_commit: %d: %s\n", instance,
+ print_members(newmem).c_str());
+
+ for (unsigned i = 0; i < mems.size(); i++) {
+ tprintf("config::paxos_commit: is %s still a member?\n",
+ mems[i].c_str());
+ if (!isamember(mems[i], newmem) && me != mems[i]) {
+ tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
+ mgr.delete_handle(mems[i]);
+ }
}
- }
- mems = newmem;
- myvid = instance;
- if (vc) {
- unsigned vid = myvid;
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- vc->commit_change(vid);
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- }
+ mems = newmem;
+ my_view_id = instance;
+ if (vc) {
+ ml.unlock();
+ vc->commit_change(instance);
+ ml.lock();
+ }
}
bool
-config::ismember(std::string m, unsigned vid)
+config::ismember(const std::string &m, unsigned vid)
{
- bool r;
- ScopedLock ml(&cfg_mutex);
- std::vector<std::string> v = get_view_wo(vid);
- r = isamember(m, v);
- return r;
+ lock ml(cfg_mutex);
+ std::vector<std::string> v;
+ get_view_wo(vid, v);
+ return isamember(m, v);
}
bool
-config::add(std::string new_m, unsigned vid)
+config::add(const std::string &new_m, unsigned vid)
{
- std::vector<std::string> m;
- std::vector<std::string> curm;
- ScopedLock ml(&cfg_mutex);
- if (vid != myvid)
- return false;
- tprintf("config::add %s\n", new_m.c_str());
- m = mems;
- m.push_back(new_m);
- curm = mems;
- std::string v = value(m);
- int nextvid = myvid + 1;
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- bool r = pro->run(nextvid, curm, v);
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- if (r) {
- tprintf("config::add: proposer returned success\n");
- } else {
- tprintf("config::add: proposer returned failure\n");
- }
- return r;
+ std::vector<std::string> m;
+ std::vector<std::string> curm;
+ lock ml(cfg_mutex);
+ if (vid != my_view_id)
+ return false;
+ tprintf("config::add %s\n", new_m.c_str());
+ m = mems;
+ m.push_back(new_m);
+ curm = mems;
+ std::string v = value(m);
+ int nextvid = my_view_id + 1;
+ bool r;
+ {
+ ml.unlock();
+ r = paxos_proposer->run(nextvid, curm, v);
+ ml.lock();
+ }
+ tprintf("config::add: proposer returned %s\n",
+ r ? "success" : "failure");
+ return r;
}
// caller should hold cfg_mutex
bool
-config::remove_wo(std::string m)
+config::remove(const std::string &m)
{
- tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
- std::vector<std::string> n;
- for (unsigned i = 0; i < mems.size(); i++) {
- if (mems[i] != m) n.push_back(mems[i]);
- }
- std::string v = value(n);
- std::vector<std::string> cmems = mems;
- int nextvid = myvid + 1;
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- bool r = pro->run(nextvid, cmems, v);
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- if (r) {
- tprintf("config::remove: proposer returned success\n");
- } else {
- tprintf("config::remove: proposer returned failure\n");
- }
- return r;
+ adopt_lock ml(cfg_mutex);
+ tprintf("config::remove: my_view_id %d remove? %s\n",
+ my_view_id, m.c_str());
+ std::vector<std::string> n;
+ for (unsigned i = 0; i < mems.size(); i++) {
+ if (mems[i] != m)
+ n.push_back(mems[i]);
+ }
+ std::string v = value(n);
+ std::vector<std::string> cmems = mems;
+ int nextvid = my_view_id + 1;
+ bool r;
+ {
+ ml.unlock();
+ r = paxos_proposer->run(nextvid, cmems, v);
+ ml.lock();
+ }
+ tprintf("config::remove: proposer returned %s\n",
+ r ? "success" : "failure");
+ return r;
}
void
config::heartbeater()
{
- struct timeval now;
- struct timespec next_timeout;
- std::string m;
- heartbeat_t h;
- bool stable;
- unsigned vid;
- std::vector<std::string> cmems;
- ScopedLock ml(&cfg_mutex);
-
- while (1) {
-
- gettimeofday(&now, NULL);
- next_timeout.tv_sec = now.tv_sec + 3;
- next_timeout.tv_nsec = 0;
- tprintf("heartbeater: go to sleep\n");
- pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
-
- stable = true;
- vid = myvid;
- cmems = get_view_wo(vid);
- tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
-
- if (!isamember(me, cmems)) {
- tprintf("heartbeater: not member yet; skip hearbeat\n");
- continue;
- }
-
- //find the node with the smallest id
- m = me;
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (m > cmems[i])
- m = cmems[i];
- }
-
- if (m == me) {
- //if i am the one with smallest id, ping the rest of the nodes
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (cmems[i] != me) {
- if ((h = doheartbeat(cmems[i])) != OK) {
- stable = false;
- m = cmems[i];
- break;
- }
- }
- }
- } else {
- //the rest of the nodes ping the one with smallest id
- if ((h = doheartbeat(m)) != OK)
- stable = false;
- }
-
- if (!stable && vid == myvid) {
- remove_wo(m);
+ std::string m;
+ heartbeat_t h;
+ bool stable;
+ unsigned vid;
+ std::vector<std::string> cmems;
+ lock ml(cfg_mutex);
+
+ while (1) {
+ auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
+ tprintf("heartbeater: go to sleep\n");
+ config_cond.wait_until(ml, next_timeout);
+
+ stable = true;
+ vid = my_view_id;
+ get_view_wo(vid, cmems);
+ tprintf("heartbeater: current membership %s\n",
+ print_members(cmems).c_str());
+
+ if (!isamember(me, cmems)) {
+ tprintf("heartbeater: not member yet; skip hearbeat\n");
+ continue;
+ }
+
+ // who has the smallest ID?
+ m = me;
+ for (unsigned i = 0; i < cmems.size(); i++) {
+ if (m > cmems[i])
+ m = cmems[i];
+ }
+
+ if (m == me) {
+ // ping the other nodes
+ for (unsigned i = 0; i < cmems.size(); i++) {
+ if (cmems[i] != me) {
+ if ((h = doheartbeat(cmems[i])) != OK) {
+ stable = false;
+ m = cmems[i];
+ break;
+ }
+ }
+ }
+ } else {
+ // ping the node with the smallest ID
+ if ((h = doheartbeat(m)) != OK)
+ stable = false;
+ }
+
+ if (!stable && vid == my_view_id) {
+ remove(m);
+ }
}
- }
}
paxos_protocol::status
config::heartbeat(std::string m, unsigned vid, int &r)
{
- ScopedLock ml(&cfg_mutex);
- int ret = paxos_protocol::ERR;
- r = (int) myvid;
- tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
- if (vid == myvid) {
- ret = paxos_protocol::OK;
- } else if (pro->isrunning()) {
- VERIFY (vid == myvid + 1 || vid + 1 == myvid);
- ret = paxos_protocol::OK;
- } else {
- ret = paxos_protocol::ERR;
- }
- return ret;
+ lock ml(cfg_mutex);
+ int ret = paxos_protocol::ERR;
+ r = (int) my_view_id;
+ tprintf("heartbeat from %s(%d) my_view_id %d\n",
+ m.c_str(), vid, my_view_id);
+ if (vid == my_view_id) {
+ ret = paxos_protocol::OK;
+ } else if (paxos_proposer->isrunning()) {
+ VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
+ ret = paxos_protocol::OK;
+ } else {
+ ret = paxos_protocol::ERR;
+ }
+ return ret;
}
config::heartbeat_t
-config::doheartbeat(std::string m)
+config::doheartbeat(const std::string &m)
{
- int ret = rpc_const::timeout_failure;
- int r;
- unsigned vid = myvid;
- heartbeat_t res = OK;
-
- tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
- handle h(m);
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- rpcc *cl = h.safebind();
- if (cl) {
- ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
- rpcc::to(1000));
- }
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- if (ret != paxos_protocol::OK) {
- if (ret == rpc_const::atmostonce_failure ||
- ret == rpc_const::oldsrv_failure) {
- mgr.delete_handle(m);
- } else {
- tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
- m.c_str(), ret, vid, r);
- if (ret < 0) res = FAILURE;
- else res = VIEWERR;
+ adopt_lock ml(cfg_mutex);
+ int ret = rpc_const::timeout_failure;
+ int r;
+ unsigned vid = my_view_id;
+ heartbeat_t res = OK;
+
+ tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+ handle h(m);
+ {
+ ml.unlock();
+ rpcc *cl = h.safebind();
+ if (cl) {
+ ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
+ rpcc::to(1000));
+ }
+ ml.lock();
+ }
+ if (ret != paxos_protocol::OK) {
+ if (ret == rpc_const::atmostonce_failure ||
+ ret == rpc_const::oldsrv_failure) {
+ mgr.delete_handle(m);
+ } else {
+ tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
+ m.c_str(), ret, vid, r);
+ if (ret < 0) res = FAILURE;
+ else res = VIEWERR;
+ }
}
- }
- tprintf("doheartbeat done %d\n", res);
- return res;
+ tprintf("doheartbeat done %d\n", res);
+ return res;
}
#include <string>
#include <vector>
#include "paxos.h"
+#include "lock.h"
class config_view_change {
- public:
- virtual void commit_change(unsigned vid) = 0;
- virtual ~config_view_change() {};
+ public:
+ virtual void commit_change(unsigned view_id) = 0;
+ virtual ~config_view_change() {};
};
class config : public paxos_change {
- private:
- acceptor *acc;
- proposer *pro;
- rpcs *pxsrpc;
- unsigned myvid;
- std::string first;
- std::string me;
- config_view_change *vc;
- std::vector<std::string> mems;
- pthread_mutex_t cfg_mutex;
- pthread_cond_t heartbeat_cond;
- pthread_cond_t config_cond;
- paxos_protocol::status heartbeat(std::string m, unsigned instance, int &r);
- std::string value(std::vector<std::string> mems);
- std::vector<std::string> members(std::string v);
- std::vector<std::string> get_view_wo(unsigned instance);
- bool remove_wo(std::string);
- void reconstruct();
- typedef enum {
- OK, // response and same view #
- VIEWERR, // response but different view #
- FAILURE, // no response
- } heartbeat_t;
- heartbeat_t doheartbeat(std::string m);
- public:
- config(std::string _first, std::string _me, config_view_change *_vc);
- unsigned vid() { return myvid; }
- std::string myaddr() { return me; };
- std::string dump() { return acc->dump(); };
- std::vector<std::string> get_view(unsigned instance);
- void restore(std::string s);
- bool add(std::string, unsigned vid);
- bool ismember(std::string m, unsigned vid);
- void heartbeater(void);
- void paxos_commit(unsigned instance, std::string v);
- rpcs *get_rpcs() { return acc->get_rpcs(); }
- void breakpoint(int b) { pro->breakpoint(b); }
+ private:
+ acceptor *paxos_acceptor;
+ proposer *paxos_proposer;
+ unsigned my_view_id;
+ std::string first;
+ std::string me;
+ config_view_change *vc;
+ std::vector<std::string> mems;
+ mutex cfg_mutex;
+ std::condition_variable config_cond;
+ paxos_protocol::status heartbeat(
+ std::string m,
+ unsigned instance,
+ int &r);
+ std::string value(const std::vector<std::string> &mems) const;
+ void members(const std::string &v, std::vector<std::string> &m) const;
+ void get_view_wo(unsigned instance, std::vector<std::string> &m);
+ bool remove(const std::string &);
+ void reconstruct();
+ typedef enum {
+ OK, // response and same view #
+ VIEWERR, // response but different view #
+ FAILURE, // no response
+ } heartbeat_t;
+ heartbeat_t doheartbeat(const std::string &m);
+ public:
+ config(const std::string &_first,
+ const std::string &_me,
+ config_view_change *_vc);
+ unsigned view_id() { return my_view_id; }
+ const std::string &myaddr() const { return me; };
+ std::string dump() { return paxos_acceptor->dump(); };
+ void get_view(unsigned instance, std::vector<std::string> &m);
+ void restore(const std::string &s);
+ bool add(const std::string &, unsigned view_id);
+ bool ismember(const std::string &m, unsigned view_id);
+ void heartbeater(void);
+ void paxos_commit(unsigned instance, const std::string &v);
+ rpcs *get_rpcs() { return paxos_acceptor->get_rpcs(); }
+ void breakpoint(int b) { paxos_proposer->breakpoint(b); }
};
#endif
+++ /dev/null
-/*
- * Copyright (c), MM Weiss
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification,
- * are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * 3. Neither the name of the MM Weiss nor the names of its contributors
- * may be used to endorse or promote products derived from this software without
- * specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
- * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
- * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
- * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
- * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-/*
- * clock_gettime_stub.c
- * gcc -Wall -c clock_gettime_stub.c
- * posix realtime functions; MacOS user space glue
- */
-
-/* @comment
- * other possible implementation using intel builtin rdtsc
- * rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html
- *
- * we could get the ticks by doing this
- *
- * __asm __volatile("mov %%ebx, %%esi\n\t"
- * "cpuid\n\t"
- * "xchg %%esi, %%ebx\n\t"
- * "rdtsc"
- * : "=a" (a),
- * "=d" (d)
- * );
-
- * we could even replace our tricky sched_yield call by assembly code to get a better accurency,
- * anyway the following C stub will satisfy 99% of apps using posix clock_gettime call,
- * moreover, the setter version (clock_settime) could be easly written using mach primitives:
- * http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time)
- *
- * hackers don't be crackers, don't you use a flush toilet?
- *
- *
- * @see draft: ./posix-realtime-stub/posix-realtime-stub.c
- *
- */
-
-
-#ifdef __APPLE__
-
-#pragma weak clock_gettime
-
-#include <sys/time.h>
-#include <sys/resource.h>
-#include <mach/mach.h>
-#include <mach/clock.h>
-#include <mach/mach_time.h>
-#include <errno.h>
-#include <unistd.h>
-#include <sched.h>
-
-typedef enum {
- CLOCK_REALTIME,
- CLOCK_MONOTONIC,
- CLOCK_PROCESS_CPUTIME_ID,
- CLOCK_THREAD_CPUTIME_ID
-} clockid_t;
-
-static mach_timebase_info_data_t __clock_gettime_inf;
-
-int clock_gettime(clockid_t clk_id, struct timespec *tp) {
- kern_return_t ret;
- clock_serv_t clk;
- clock_id_t clk_serv_id;
- mach_timespec_t tm;
-
- uint64_t start, end, delta, nano;
-
- int retval = -1;
- switch (clk_id) {
- case CLOCK_REALTIME:
- case CLOCK_MONOTONIC:
- clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK;
- if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) {
- if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) {
- tp->tv_sec = tm.tv_sec;
- tp->tv_nsec = tm.tv_nsec;
- retval = 0;
- }
- }
- if (KERN_SUCCESS != ret) {
- errno = EINVAL;
- retval = -1;
- }
- break;
- case CLOCK_PROCESS_CPUTIME_ID:
- case CLOCK_THREAD_CPUTIME_ID:
- start = mach_absolute_time();
- if (clk_id == CLOCK_PROCESS_CPUTIME_ID) {
- getpid();
- } else {
- sched_yield();
- }
- end = mach_absolute_time();
- delta = end - start;
- if (0 == __clock_gettime_inf.denom) {
- mach_timebase_info(&__clock_gettime_inf);
- }
- nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom;
- tp->tv_sec = nano * 1e-9;
- tp->tv_nsec = nano - (tp->tv_sec * 1e9);
- retval = 0;
- break;
- default:
- errno = EINVAL;
- retval = -1;
- }
- return retval;
-}
-
-#endif // __APPLE__
+++ /dev/null
-#ifndef gettime_h
-#define gettime_h
-
-#ifdef __APPLE__
-typedef enum {
- CLOCK_REALTIME,
- CLOCK_MONOTONIC,
- CLOCK_PROCESS_CPUTIME_ID,
- CLOCK_THREAD_CPUTIME_ID
-} clockid_t;
-
-int clock_gettime(clockid_t clk_id, struct timespec *tp);
-#endif
-
-#endif
#include "handle.h"
#include <stdio.h>
#include "tprintf.h"
+#include "lock.h"
handle_mgr mgr;
handle::handle(std::string m)
{
- h = mgr.get_handle(m);
+ h = mgr.get_handle(m);
}
rpcc *
handle::safebind()
{
- if (!h)
- return NULL;
- ScopedLock ml(&h->cl_mutex);
- if (h->del)
- return NULL;
- if (h->cl)
+ if (!h)
+ return NULL;
+ lock ml(h->cl_mutex);
+ if (h->del)
+ return NULL;
+ if (h->cl)
+ return h->cl;
+ sockaddr_in dstsock;
+ make_sockaddr(h->m.c_str(), &dstsock);
+ rpcc *cl = new rpcc(dstsock);
+ tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
+ int ret;
+ // Starting with lab 6, our test script assumes that the failure
+ // can be detected by paxos and rsm layer within few seconds. We have
+ // to set the timeout with a small value to support the assumption.
+ //
+ // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
+ // lab 6 and lab 7 because the rpc layer may delay your RPC request,
+ // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
+ ret = cl->bind(rpcc::to(1000));
+ if (ret < 0) {
+ tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
+ delete cl;
+ h->del = true;
+ } else {
+ tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
+ h->cl = cl;
+ }
return h->cl;
- sockaddr_in dstsock;
- make_sockaddr(h->m.c_str(), &dstsock);
- rpcc *cl = new rpcc(dstsock);
- tprintf("handler_mgr::get_handle trying to bind...%s\n", h->m.c_str());
- int ret;
- // Starting with lab 6, our test script assumes that the failure
- // can be detected by paxos and rsm layer within few seconds. We have
- // to set the timeout with a small value to support the assumption.
- //
- // Note: with RPC_LOSSY=5, your lab would failed to pass the tests of
- // lab 6 and lab 7 because the rpc layer may delay your RPC request,
- // and cause a time out failure. Please make sure RPC_LOSSY is set to 0.
- ret = cl->bind(rpcc::to(1000));
- if (ret < 0) {
- tprintf("handle_mgr::get_handle bind failure! %s %d\n", h->m.c_str(), ret);
- delete cl;
- h->del = true;
- } else {
- tprintf("handle_mgr::get_handle bind succeeded %s\n", h->m.c_str());
- h->cl = cl;
- }
- return h->cl;
}
handle::~handle()
{
- if (h) mgr.done_handle(h);
+ if (h) mgr.done_handle(h);
}
handle_mgr::handle_mgr()
{
- VERIFY (pthread_mutex_init(&handle_mutex, NULL) == 0);
}
struct hinfo *
handle_mgr::get_handle(std::string m)
{
- ScopedLock ml(&handle_mutex);
- struct hinfo *h = 0;
- if (hmap.find(m) == hmap.end()) {
- h = new hinfo;
- h->cl = NULL;
- h->del = false;
- h->refcnt = 1;
- h->m = m;
- pthread_mutex_init(&h->cl_mutex, NULL);
- hmap[m] = h;
- } else if (!hmap[m]->del) {
- h = hmap[m];
- h->refcnt ++;
- }
- return h;
+ lock ml(handle_mutex);
+ struct hinfo *h = 0;
+ if (hmap.find(m) == hmap.end()) {
+ h = new hinfo;
+ h->cl = NULL;
+ h->del = false;
+ h->refcnt = 1;
+ h->m = m;
+ hmap[m] = h;
+ } else if (!hmap[m]->del) {
+ h = hmap[m];
+ h->refcnt ++;
+ }
+ return h;
}
void
handle_mgr::done_handle(struct hinfo *h)
{
- ScopedLock ml(&handle_mutex);
- h->refcnt--;
- if (h->refcnt == 0 && h->del)
- delete_handle_wo(h->m);
+ lock ml(handle_mutex);
+ h->refcnt--;
+ if (h->refcnt == 0 && h->del)
+ delete_handle_wo(h->m);
}
void
handle_mgr::delete_handle(std::string m)
{
- ScopedLock ml(&handle_mutex);
- delete_handle_wo(m);
+ lock ml(handle_mutex);
+ delete_handle_wo(m);
}
// Must be called with handle_mutex locked.
void
handle_mgr::delete_handle_wo(std::string m)
{
- if (hmap.find(m) == hmap.end()) {
- tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
- } else {
- tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
- hmap[m]->refcnt);
- struct hinfo *h = hmap[m];
- if (h->refcnt == 0) {
- if (h->cl) {
- h->cl->cancel();
- delete h->cl;
- }
- pthread_mutex_destroy(&h->cl_mutex);
- hmap.erase(m);
- delete h;
+ if (hmap.find(m) == hmap.end()) {
+ tprintf("handle_mgr::delete_handle_wo: cl %s isn't in cl list\n", m.c_str());
} else {
- h->del = true;
+ tprintf("handle_mgr::delete_handle_wo: cl %s refcnt %d\n", m.c_str(),
+ hmap[m]->refcnt);
+ struct hinfo *h = hmap[m];
+ if (h->refcnt == 0) {
+ if (h->cl) {
+ h->cl->cancel();
+ delete h->cl;
+ }
+ hmap.erase(m);
+ delete h;
+ } else {
+ h->del = true;
+ }
}
- }
}
#include <string>
#include <vector>
-#include "rpc.h"
+#include "rpc/rpc.h"
struct hinfo {
rpcc *cl;
int refcnt;
bool del;
std::string m;
- pthread_mutex_t cl_mutex;
+ std::mutex cl_mutex;
};
class handle {
- private:
- struct hinfo *h;
- public:
- handle(std::string m);
- ~handle();
- /* safebind will try to bind with the rpc server on the first call.
- * Since bind may block, the caller probably should not hold a mutex
- * when calling safebind.
- *
- * return:
- * if the first safebind succeeded, all later calls would return
- * a rpcc object; otherwise, all later calls would return NULL.
- *
- * Example:
- * handle h(dst);
- * XXX_protocol::status ret;
- * if (h.safebind()) {
- * ret = h.safebind()->call(...);
- * }
- * if (!h.safebind() || ret != XXX_protocol::OK) {
- * // handle failure
- * }
- */
- rpcc *safebind();
+ private:
+ struct hinfo *h;
+ public:
+ handle(std::string m);
+ ~handle();
+ /* safebind will try to bind with the rpc server on the first call.
+ * Since bind may block, the caller probably should not hold a mutex
+ * when calling safebind.
+ *
+ * return:
+ * if the first safebind succeeded, all later calls would return
+ * a rpcc object; otherwise, all later calls would return NULL.
+ *
+ * Example:
+ * handle h(dst);
+ * XXX_protocol::status ret;
+ * if (h.safebind()) {
+ * ret = h.safebind()->call(...);
+ * }
+ * if (!h.safebind() || ret != XXX_protocol::OK) {
+ * // handle failure
+ * }
+ */
+ rpcc *safebind();
};
class handle_mgr {
- private:
- pthread_mutex_t handle_mutex;
- std::map<std::string, struct hinfo *> hmap;
- public:
- handle_mgr();
- struct hinfo *get_handle(std::string m);
- void done_handle(struct hinfo *h);
- void delete_handle(std::string m);
- void delete_handle_wo(std::string m);
+ private:
+ std::mutex handle_mutex;
+ std::map<std::string, struct hinfo *> hmap;
+ public:
+ handle_mgr();
+ struct hinfo *get_handle(std::string m);
+ void done_handle(struct hinfo *h);
+ void delete_handle(std::string m);
+ void delete_handle_wo(std::string m);
};
extern class handle_mgr mgr;
+++ /dev/null
-// compile time version of min and max
-
-#ifndef algorithm_h
-#define algorithm_h
-
-template <int A, int B>
-struct static_max
-{
- static const int value = A > B ? A : B;
-};
-
-template <int A, int B>
-struct static_min
-{
- static const int value = A < B ? A : B;
-};
-
-#endif
--- /dev/null
+#ifndef lock_h
+#define lock_h
+
+#include <thread>
+#include <mutex>
+
+using std::mutex;
+using lock = std::unique_lock<std::mutex>;
+
+class adopt_lock : public lock {
+public:
+ inline adopt_lock(class mutex &m) : std::unique_lock<std::mutex>(m, std::adopt_lock) {
+ }
+ inline ~adopt_lock() {
+ release();
+ }
+};
+
+#endif
// RPC stubs for clients to talk to lock_server
#include "lock_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <sstream>
#include <string>
#include "lock_protocol.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <vector>
// Client interface to the lock server
typedef int t4_status;
-typedef unsigned long long t4_lockid_t;
+typedef const char * t4_lockid_t;
t4_lock_client *t4_lock_client_new(const char *dst);
void t4_lock_client_delete(t4_lock_client *);
// see lock_client.cache.h for protocol details.
#include "lock_client_cache_rsm.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <sstream>
#include <iostream>
#include <algorithm>
#include "tprintf.h"
#include "rsm_client.h"
+#include "lock.h"
+
+using std::ostringstream;
lock_state::lock_state():
state(none)
}
void lock_state::wait() {
- pthread_t self = pthread_self();
- c[self].wait(m);
+ auto self = std::this_thread::get_id();
+ {
+ adopt_lock ml(m);
+ c[self].wait(ml);
+ }
c.erase(self);
}
void lock_state::signal() {
// signal anyone
if (c.begin() != c.end())
- c.begin()->second.signal();
+ c.begin()->second.notify_one();
}
-void lock_state::signal(pthread_t who) {
+void lock_state::signal(std::thread::id who) {
if (c.count(who))
- c[who].signal();
-}
-
-static void * releasethread(void *x) {
- lock_client_cache_rsm *cc = (lock_client_cache_rsm *) x;
- cc->releaser();
- return 0;
+ c[who].notify_one();
}
int lock_client_cache_rsm::last_port = 0;
lock_state & lock_client_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
- ScopedLock sl(lock_table_lock);
+ lock sl(lock_table_lock);
// by the semantics of std::map, this will create
// the lock if it doesn't already exist
return lock_table[lid];
rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
{
- ScopedLock sl(xid_mutex);
+ lock sl(xid_mutex);
xid = 0;
}
rsmc = new rsm_client(xdst);
- int r = pthread_create(&releaser_thread, NULL, &releasethread, (void *) this);
- VERIFY (r == 0);
+ releaser_thread = std::thread(&lock_client_cache_rsm::releaser, this);
}
void lock_client_cache_rsm::releaser() {
LOG("Releaser: " << lid);
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
- VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread);
+ lock sl(st.m);
+ VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
st.state = lock_state::releasing;
{
- ScopedUnlock su(st.m);
+ sl.unlock();
int r;
rsmc->call(lock_protocol::release, lid, id, st.xid, r);
+ sl.lock();
}
st.state = lock_state::none;
LOG("Lock " << lid << ": none");
lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid) {
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
- pthread_t self = pthread_self();
+ lock sl(st.m);
+ auto self = std::this_thread::get_id();
// check for reentrancy
VERIFY(st.state != lock_state::locked || st.held_by != self);
if (st.state == lock_state::none || st.state == lock_state::retrying) {
if (st.state == lock_state::none) {
- ScopedLock sl(xid_mutex);
+ lock sl(xid_mutex);
st.xid = xid++;
}
st.state = lock_state::acquiring;
LOG("Lock " << lid << ": acquiring");
lock_protocol::status result;
{
- ScopedUnlock su(st.m);
+ sl.unlock();
int r;
result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
+ sl.lock();
}
LOG("acquire returned " << result);
if (result == lock_protocol::OK) {
VERIFY(st.wanted_by.size() != 0);
if (st.state == lock_state::free) {
// is it for me?
- pthread_t front = st.wanted_by.front();
- if (front == releaser_thread) {
+ auto front = st.wanted_by.front();
+ if (front == releaser_thread.get_id()) {
st.wanted_by.pop_front();
st.state = lock_state::locked;
- st.held_by = releaser_thread;
+ st.held_by = releaser_thread.get_id();
LOG("Queuing " << lid << " for release");
release_fifo.enq(lid);
} else if (front == self) {
lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid) {
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
- pthread_t self = pthread_self();
+ lock sl(st.m);
+ auto self = std::this_thread::get_id();
VERIFY(st.state == lock_state::locked && st.held_by == self);
st.state = lock_state::free;
LOG("Lock " << lid << ": free");
if (st.wanted_by.size()) {
- pthread_t front = st.wanted_by.front();
- if (front == releaser_thread) {
+ auto front = st.wanted_by.front();
+ if (front == releaser_thread.get_id()) {
st.state = lock_state::locked;
- st.held_by = releaser_thread;
+ st.held_by = releaser_thread.get_id();
st.wanted_by.pop_front();
LOG("Queuing " << lid << " for release");
release_fifo.enq(lid);
rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
LOG("Revoke handler " << lid << " " << xid);
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
+ lock sl(st.m);
if (st.state == lock_state::releasing || st.state == lock_state::none)
return rlock_protocol::OK;
if (st.state == lock_state::free &&
- (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread)) {
+ (st.wanted_by.size() == 0 || st.wanted_by.front() == releaser_thread.get_id())) {
// gimme
st.state = lock_state::locked;
- st.held_by = releaser_thread;
+ st.held_by = releaser_thread.get_id();
if (st.wanted_by.size())
st.wanted_by.pop_front();
release_fifo.enq(lid);
} else {
// get in line
- st.wanted_by.push_back(releaser_thread);
+ st.wanted_by.push_back(releaser_thread.get_id());
}
return rlock_protocol::OK;
}
rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
+ lock sl(st.m);
VERIFY(st.state == lock_state::acquiring);
st.state = lock_state::retrying;
LOG("Lock " << lid << ": none");
#include <string>
#include "lock_protocol.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include "lock_client.h"
#include "lang/verify.h"
-#include "mutex.h"
#include "rpc/fifo.h"
#include "rsm_client.h"
-// Classes that inherit lock_release_user can override dorelease so that
-// that they will be called when lock_client releases a lock.
-// You will not need to do anything with this class until Lab 5.
class lock_release_user {
public:
virtual void dorelease(lock_protocol::lockid_t) = 0;
virtual ~lock_release_user() {};
};
-using namespace std;
+using std::string;
+using std::thread;
+using std::list;
+using std::map;
typedef string callback;
acquiring,
releasing
} state;
- pthread_t held_by;
- list<pthread_t> wanted_by;
+ std::thread::id held_by;
+ list<std::thread::id> wanted_by;
mutex m;
- map<pthread_t, cond> c;
+ map<std::thread::id, std::condition_variable> c;
lock_protocol::xid_t xid;
void wait();
void signal();
- void signal(pthread_t who);
+ void signal(std::thread::id who);
};
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
// lock_revoke_server.
class lock_client_cache_rsm : public lock_client {
private:
- pthread_t releaser_thread;
+ std::thread releaser_thread;
rsm_client *rsmc;
class lock_release_user *lu;
int rlock_port;
#include "lock_protocol.h"
#include "lock_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <vector>
#include <stdlib.h>
#include <stdio.h>
-std::string dst;
-lock_client *lc;
-
int
main(int argc, char *argv[])
{
- int r;
-
- if(argc != 2){
- fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
- exit(1);
- }
+ if(argc != 2) {
+ fprintf(stderr, "Usage: %s [host:]port\n", argv[0]);
+ return 1;
+ }
- dst = argv[1];
- lc = new lock_client(dst);
- r = lc->stat(1);
- printf ("stat returned %d\n", r);
+ lock_client *lc = new lock_client(argv[1]);
+ printf ("stat returned %d\n", lc->stat("1"));
}
#ifndef lock_protocol_h
#define lock_protocol_h
-#include "rpc.h"
+#include "rpc/rpc.h"
+#include <string>
class lock_protocol {
public:
enum xxstatus { OK, RETRY, RPCERR, NOENT, IOERR };
typedef int status;
- typedef unsigned long long lockid_t;
+ typedef std::string lockid_t;
typedef unsigned long long xid_t;
enum rpc_numbers {
acquire = 0x7001,
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
+#include "lock.h"
lock_server::lock_server():
nacquire (0)
// caller must hold lock_lock
mutex &
lock_server::get_lock(lock_protocol::lockid_t lid) {
- lock_lock.acquire();
+ lock ml(lock_lock);
// by the semantics of std::map, this will create
// the mutex if it doesn't already exist
mutex &l = locks[lid];
- lock_lock.release();
return l;
}
lock_protocol::status
lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r)
{
- get_lock(lid).acquire();
+ get_lock(lid).lock();
return lock_protocol::OK;
}
lock_protocol::status
lock_server::release(int clt, lock_protocol::lockid_t lid, int &r)
{
- get_lock(lid).release();
+ get_lock(lid).unlock();
return lock_protocol::OK;
}
#include <string>
#include "lock_protocol.h"
#include "lock_client.h"
-#include "rpc.h"
-#include <pthread.h>
+#include "rpc/rpc.h"
#include <list>
#include <map>
-#include "mutex.h"
-using namespace std;
+using std::map;
typedef map<lock_protocol::lockid_t, mutex> lock_map;
#include "handle.h"
#include "tprintf.h"
#include "rpc/marshall.h"
+#include "lock.h"
+
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
lock_state::lock_state():
held(false)
{
}
+lock_state& lock_state::operator=(const lock_state& o) {
+ held = o.held;
+ held_by = o.held_by;
+ wanted_by = o.wanted_by;
+ old_requests = o.old_requests;
+ return *this;
+}
+
template <class A, class B>
ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
o << "<" << d.first << "," << d.second << ">";
lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
- ScopedLock sl(lock_table_lock);
+ lock sl(lock_table_lock);
// by the semantics of map, this will create
// the lock if it doesn't already exist
return lock_table[lid];
}
-static void *revokethread(void *x) {
- lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
- sc->revoker();
- return 0;
-}
-
-static void *retrythread(void *x) {
- lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
- sc->retryer();
- return 0;
-}
-
lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
- pthread_t th;
- VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0);
- VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0);
+ std::thread(&lock_server_cache_rsm::revoker, this).detach();
+ std::thread(&lock_server_cache_rsm::retryer, this).detach();
rsm->set_state_transfer(this);
}
lock_state &st = get_lock_state(lid);
holder held_by;
{
- ScopedLock sl(st.m);
+ lock sl(st.m);
held_by = st.held_by;
}
lock_state &st = get_lock_state(lid);
holder front;
{
- ScopedLock sl(st.m);
+ lock sl(st.m);
if (st.wanted_by.empty())
continue;
front = st.wanted_by.front();
LOG_FUNC_ENTER_SERVER;
holder h = holder(id, xid);
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
+ lock sl(st.m);
// deal with duplicated requests
if (st.old_requests.count(id)) {
int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
LOG_FUNC_ENTER_SERVER;
lock_state &st = get_lock_state(lid);
- ScopedLock sl(st.m);
+ lock sl(st.m);
if (st.held && st.held_by == holder(id, xid)) {
st.held = false;
LOG("Lock " << lid << " not held");
}
string lock_server_cache_rsm::marshal_state() {
- ScopedLock sl(lock_table_lock);
+ lock sl(lock_table_lock);
marshall rep;
rep << nacquire;
rep << lock_table;
}
void lock_server_cache_rsm::unmarshal_state(string state) {
- ScopedLock sl(lock_table_lock);
+ lock sl(lock_table_lock);
unmarshall rep(state);
rep >> nacquire;
rep >> lock_table;
#include <map>
#include <vector>
#include "lock_protocol.h"
-#include "rpc.h"
-#include "mutex.h"
+#include "rpc/rpc.h"
#include "rsm_state_transfer.h"
#include "rsm.h"
#include "rpc/fifo.h"
+#include "lock.h"
-using namespace std;
+using std::string;
+using std::pair;
+using std::list;
+using std::map;
typedef string callback;
typedef pair<callback, lock_protocol::xid_t> holder;
list<holder> wanted_by;
map<callback, lock_protocol::xid_t> old_requests;
mutex m;
+ lock_state& operator=(const lock_state&);
};
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include "paxos.h"
#include "rsm.h"
-#include "jsl_log.h"
-
// Main loop of lock_server
char tprintf_thread_prefix = 's';
int
main(int argc, char *argv[])
{
- int count = 0;
-
- setvbuf(stdout, NULL, _IONBF, 0);
- setvbuf(stderr, NULL, _IONBF, 0);
-
- srandom(getpid());
+ int count = 0;
- if(argc != 3){
- fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
- exit(1);
- }
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
- char *count_env = getenv("RPC_COUNT");
- if(count_env != NULL){
- count = atoi(count_env);
- }
+ srandom(getpid());
- //jsl_set_debug(2);
- // Comment out the next line to switch between the ordinary lock
- // server and the RSM. In Lab 6, we disable the lock server and
- // implement Paxos. In Lab 7, we will make the lock server use your
- // RSM layer.
-#define RSM
-#ifdef RSM
-// You must comment out the next line once you are done with Step One.
-//#define STEP_ONE
-#ifdef STEP_ONE
- rpcs server(atoi(argv[1]));
- lock_server_cache_rsm ls;
- server.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
- server.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
- server.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
-#else
- rsm rsm(argv[1], argv[2]);
- lock_server_cache_rsm ls(&rsm);
- rsm.set_state_transfer((rsm_state_transfer *)&ls);
- rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
- rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
- rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
-#endif // STEP_ONE
-#endif // RSM
+ if(argc != 3){
+ fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+ exit(1);
+ }
-#ifndef RSM
- lock_server_cache ls;
- rpcs server(atoi(argv[1]), count);
- server.reg(lock_protocol::stat, &ls, &lock_server_cache::stat);
- server.reg(lock_protocol::acquire, &ls, &lock_server_cache::acquire);
- server.reg(lock_protocol::release, &ls, &lock_server_cache::release);
-#endif
+ char *count_env = getenv("RPC_COUNT");
+ if(count_env != NULL){
+ count = atoi(count_env);
+ }
+ rsm rsm(argv[1], argv[2]);
+ lock_server_cache_rsm ls(&rsm);
+ rsm.set_state_transfer((rsm_state_transfer *)&ls);
+ rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
+ rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
+ rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
- while(1)
- sleep(1000);
+ while(1)
+ sleep(1000);
}
#include "lock_protocol.h"
#include "lock_client.h"
-#include "rpc.h"
-#include "jsl_log.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <vector>
#include <stdlib.h>
#include "tprintf.h"
#include <sys/types.h>
#include <unistd.h>
+#include "lock.h"
char tprintf_thread_prefix = 'c';
// must be >= 2
-int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
+const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
std::string dst;
lock_client_cache_rsm **lc = new lock_client_cache_rsm * [nt];
-lock_protocol::lockid_t a = 1;
-lock_protocol::lockid_t b = 2;
-lock_protocol::lockid_t c = 3;
+lock_protocol::lockid_t a = "1";
+lock_protocol::lockid_t b = "2";
+lock_protocol::lockid_t c = "3";
// check_grant() and check_release() check that the lock server
// doesn't grant the same lock to both clients.
// it assumes that lock names are distinct in the first byte.
int ct[256];
-pthread_mutex_t count_mutex;
+std::mutex count_mutex;
void
check_grant(lock_protocol::lockid_t lid)
{
- ScopedLock ml(&count_mutex);
- int x = lid & 0xff;
- if(ct[x] != 0){
- fprintf(stderr, "error: server granted %016llx twice\n", lid);
- fprintf(stdout, "error: server granted %016llx twice\n", lid);
- exit(1);
- }
- ct[x] += 1;
+ lock ml(count_mutex);
+ int x = lid[0] & 0x0f;
+ if(ct[x] != 0){
+ fprintf(stderr, "error: server granted %s twice\n", lid.c_str());
+ fprintf(stdout, "error: server granted %s twice\n", lid.c_str());
+ exit(1);
+ }
+ ct[x] += 1;
}
void
check_release(lock_protocol::lockid_t lid)
{
- ScopedLock ml(&count_mutex);
- int x = lid & 0xff;
- if(ct[x] != 1){
- fprintf(stderr, "error: client released un-held lock %016llx\n", lid);
- exit(1);
- }
- ct[x] -= 1;
+ lock ml(count_mutex);
+ int x = lid[0] & 0x0f;
+ if(ct[x] != 1){
+ fprintf(stderr, "error: client released un-held lock %s\n", lid.c_str());
+ exit(1);
+ }
+ ct[x] -= 1;
}
void
void *
test2(void *x)
{
- int i = * (int *) x;
-
- tprintf ("test2: client %d acquire a release a\n", i);
- lc[i]->acquire(a);
- tprintf ("test2: client %d acquire done\n", i);
- check_grant(a);
- sleep(1);
- tprintf ("test2: client %d release\n", i);
- check_release(a);
- lc[i]->release(a);
- tprintf ("test2: client %d release done\n", i);
- return 0;
-}
+ int i = * (int *) x;
-void *
-test3(void *x)
-{
- int i = * (int *) x;
-
- tprintf ("test3: client %d acquire a release a concurrent\n", i);
- for (int j = 0; j < 10; j++) {
+ tprintf ("test2: client %d acquire a release a\n", i);
lc[i]->acquire(a);
+ tprintf ("test2: client %d acquire done\n", i);
check_grant(a);
- tprintf ("test3: client %d got lock\n", i);
+ sleep(1);
+ tprintf ("test2: client %d release\n", i);
check_release(a);
lc[i]->release(a);
- }
- return 0;
+ tprintf ("test2: client %d release done\n", i);
+ return 0;
}
void *
-test4(void *x)
+test3(void *x)
{
- int i = * (int *) x;
+ int i = * (int *) x;
+
+ tprintf ("test3: client %d acquire a release a concurrent\n", i);
+ for (int j = 0; j < 10; j++) {
+ lc[i]->acquire(a);
+ check_grant(a);
+ tprintf ("test3: client %d got lock\n", i);
+ check_release(a);
+ lc[i]->release(a);
+ }
+ return 0;
+}
- tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
- for (int j = 0; j < 10; j++) {
- lc[0]->acquire(a);
- check_grant(a);
- tprintf ("test4: thread %d on client 0 got lock\n", i);
- check_release(a);
- lc[0]->release(a);
- }
- return 0;
+void *
+test4(void *x)
+{
+ int i = * (int *) x;
+
+ tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
+ for (int j = 0; j < 10; j++) {
+ lc[0]->acquire(a);
+ check_grant(a);
+ tprintf ("test4: thread %d on client 0 got lock\n", i);
+ check_release(a);
+ lc[0]->release(a);
+ }
+ return 0;
}
void *
test5(void *x)
{
- int i = * (int *) x;
-
- tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
- for (int j = 0; j < 10; j++) {
- if (i < 5) lc[0]->acquire(a);
- else lc[1]->acquire(a);
- check_grant(a);
- tprintf ("test5: client %d got lock\n", i);
- check_release(a);
- if (i < 5) lc[0]->release(a);
- else lc[1]->release(a);
- }
- return 0;
+ int i = * (int *) x;
+
+ tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
+ for (int j = 0; j < 10; j++) {
+ if (i < 5) lc[0]->acquire(a);
+ else lc[1]->acquire(a);
+ check_grant(a);
+ tprintf ("test5: client %d got lock\n", i);
+ check_release(a);
+ if (i < 5) lc[0]->release(a);
+ else lc[1]->release(a);
+ }
+ return 0;
}
int
main(int argc, char *argv[])
{
- int r;
- pthread_t th[nt];
+ std::thread th[nt];
int test = 0;
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
srandom(getpid());
- //jsl_set_debug(2);
-
if(argc < 2) {
- fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
- exit(1);
+ fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
+ exit(1);
}
dst = argv[1];
if (argc > 2) {
- test = atoi(argv[2]);
- if(test < 1 || test > 5){
- tprintf("Test number must be between 1 and 5\n");
- exit(1);
- }
+ test = atoi(argv[2]);
+ if(test < 1 || test > 5){
+ tprintf("Test number must be between 1 and 5\n");
+ exit(1);
+ }
}
- VERIFY(pthread_mutex_init(&count_mutex, NULL) == 0);
tprintf("cache lock client\n");
for (int i = 0; i < nt; i++) lc[i] = new lock_client_cache_rsm(dst);
if(!test || test == 1){
- test1();
+ test1();
}
if(!test || test == 2){
- // test2
- for (int i = 0; i < nt; i++) {
- int *a = new int (i);
- r = pthread_create(&th[i], NULL, test2, (void *) a);
- VERIFY (r == 0);
- }
- for (int i = 0; i < nt; i++) {
- pthread_join(th[i], NULL);
- }
+ // test2
+ for (int i = 0; i < nt; i++) {
+ int *a = new int (i);
+ th[i] = std::thread(test2, a);
+ }
+ for (int i = 0; i < nt; i++) {
+ th[i].join();
+ }
}
if(!test || test == 3){
- tprintf("test 3\n");
-
- // test3
- for (int i = 0; i < nt; i++) {
- int *a = new int (i);
- r = pthread_create(&th[i], NULL, test3, (void *) a);
- VERIFY (r == 0);
- }
- for (int i = 0; i < nt; i++) {
- pthread_join(th[i], NULL);
- }
+ tprintf("test 3\n");
+
+ // test3
+ for (int i = 0; i < nt; i++) {
+ int *a = new int (i);
+ th[i] = std::thread(test3, a);
+ }
+ for (int i = 0; i < nt; i++) {
+ th[i].join();
+ }
}
if(!test || test == 4){
- tprintf("test 4\n");
-
- // test 4
- for (int i = 0; i < 2; i++) {
- int *a = new int (i);
- r = pthread_create(&th[i], NULL, test4, (void *) a);
- VERIFY (r == 0);
- }
- for (int i = 0; i < 2; i++) {
- pthread_join(th[i], NULL);
- }
+ tprintf("test 4\n");
+
+ // test 4
+ for (int i = 0; i < 2; i++) {
+ int *a = new int (i);
+ th[i] = std::thread(test4, a);
+ }
+ for (int i = 0; i < 2; i++) {
+ th[i].join();
+ }
}
if(!test || test == 5){
- tprintf("test 5\n");
-
- // test 5
-
- for (int i = 0; i < nt; i++) {
- int *a = new int (i);
- r = pthread_create(&th[i], NULL, test5, (void *) a);
- VERIFY (r == 0);
- }
- for (int i = 0; i < nt; i++) {
- pthread_join(th[i], NULL);
- }
+ tprintf("test 5\n");
+
+ // test 5
+
+ for (int i = 0; i < nt; i++) {
+ int *a = new int (i);
+ th[i] = std::thread(test5, a);
+ }
+ for (int i = 0; i < nt; i++) {
+ th[i].join();
+ }
}
tprintf ("%s: passed all tests successfully\n", argv[0]);
+++ /dev/null
-#include "mutex.h"
-#include "lang/verify.h"
-
-mutex::mutex() {
- VERIFY(pthread_mutex_init(&m, NULL) == 0);
-}
-
-mutex::~mutex() {
- VERIFY(pthread_mutex_destroy(&m) == 0);
-}
-
-void mutex::acquire() {
- VERIFY(pthread_mutex_lock(&m) == 0);
-}
-
-void mutex::release() {
- VERIFY(pthread_mutex_unlock(&m) == 0);
-}
-
-mutex::operator pthread_mutex_t *() {
- return &m;
-}
-
-cond::cond() {
- VERIFY(pthread_cond_init(&c, NULL) == 0);
-}
-
-cond::~cond() {
- VERIFY(pthread_cond_destroy(&c) == 0);
-}
-
-void cond::wait(mutex &m) {
- VERIFY(pthread_cond_wait(&c, m) == 0);
-}
-
-void cond::signal() {
- VERIFY(pthread_cond_signal(&c) == 0);
-}
-
-void cond::broadcast() {
- VERIFY(pthread_cond_broadcast(&c) == 0);
-}
+++ /dev/null
-#ifndef mutex_h
-#define mutex_h
-
-#include <pthread.h>
-
-class mutex {
- protected:
- pthread_mutex_t m;
- public:
- mutex();
- ~mutex();
- void acquire();
- void release();
- operator pthread_mutex_t *();
-};
-
-class cond {
- protected:
- pthread_cond_t c;
- public:
- cond();
- ~cond();
- void wait(mutex &m);
- void signal();
- void broadcast();
-};
-
-#endif
#include "paxos.h"
#include "handle.h"
-// #include <signal.h>
#include <stdio.h>
#include "tprintf.h"
#include "lang/verify.h"
+#include "lock.h"
// This module implements the proposer and acceptor of the Paxos
// distributed algorithm as described by Lamport's "Paxos Made
proposer::isrunning()
{
bool r;
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
r = !stable;
return r;
}
std::string v;
bool r = false;
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
if (!stable) { // already running proposer?
acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
paxos_protocol::prepareres &r)
{
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
r.oldinstance = false;
r.accept = false;
r.n_a = n_a;
paxos_protocol::status
acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
{
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
r = false;
if (a.n >= n_h) {
n_a = a.n;
}
// the src argument is only for debug purpose
-paxos_protocol::status
+ paxos_protocol::status
acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
{
- ScopedLock ml(pxs_mutex);
- tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
- a.instance, instance_h, v_a.c_str());
- if (a.instance == instance_h + 1) {
- VERIFY(v_a == a.v);
- commit_wo(a.instance, v_a);
- } else if (a.instance <= instance_h) {
- // we are ahead ignore.
- } else {
- // we are behind
- VERIFY(0);
- }
- return paxos_protocol::OK;
+ lock ml(pxs_mutex);
+ tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
+ a.instance, instance_h, v_a.c_str());
+ if (a.instance == instance_h + 1) {
+ VERIFY(v_a == a.v);
+ commit_wo(a.instance, v_a);
+ } else if (a.instance <= instance_h) {
+ // we are ahead ignore.
+ } else {
+ // we are behind
+ VERIFY(0);
+ }
+ return paxos_protocol::OK;
}
void
acceptor::commit_wo(unsigned instance, std::string value)
{
- //assume pxs_mutex is held
- tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
- if (instance > instance_h) {
- tprintf("commit: highestaccepteinstance = %d\n", instance);
- values[instance] = value;
- l->loginstance(instance, value);
- instance_h = instance;
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
- if (cfg) {
- pxs_mutex.release();
- cfg->paxos_commit(instance, value);
- pxs_mutex.acquire();
+ //assume pxs_mutex is held
+ adopt_lock ml(pxs_mutex);
+ tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+ if (instance > instance_h) {
+ tprintf("commit: highestaccepteinstance = %d\n", instance);
+ values[instance] = value;
+ l->loginstance(instance, value);
+ instance_h = instance;
+ n_h.n = 0;
+ n_h.m = me;
+ n_a.n = 0;
+ n_a.m = me;
+ v_a.clear();
+ if (cfg) {
+ ml.unlock();
+ cfg->paxos_commit(instance, value);
+ ml.lock();
+ }
}
- }
}
void
acceptor::commit(unsigned instance, std::string value)
{
- ScopedLock ml(pxs_mutex);
- commit_wo(instance, value);
+ lock ml(pxs_mutex);
+ commit_wo(instance, value);
}
std::string
#include <string>
#include <vector>
-#include "rpc.h"
+#include "rpc/rpc.h"
#include "paxos_protocol.h"
#include "log.h"
-#include "mutex.h"
class paxos_change {
public:
- virtual void paxos_commit(unsigned instance, std::string v) = 0;
+ virtual void paxos_commit(unsigned instance, const std::string &v) = 0;
virtual ~paxos_change() {};
};
rpcs *pxs;
paxos_change *cfg;
std::string me;
- mutex pxs_mutex;
+ std::mutex pxs_mutex;
// Acceptor state
prop_t n_h; // number of the highest proposal seen in a prepare
bool break1;
bool break2;
- mutex pxs_mutex;
+ std::mutex pxs_mutex;
// Proposer state
bool stable;
#ifndef paxos_protocol_h
#define paxos_protocol_h
-#include "rpc.h"
+#include "rpc/rpc.h"
struct prop_t {
unsigned n;
+++ /dev/null
-#include "mutex.h"
-#include <stdlib.h>
-#include "rpc/slock.h"
-#include <stdint.h>
-
-static mutex rand_m;
-
-void srand_safe(unsigned int seed) {
- ScopedLock s(rand_m);
- srandom(seed);
-}
-
-// RAND_MAX is guaranteed to be at least 32767
-// but math with rand() is annoying.
-// Here's a canned solution from
-// http://www.azillionmonkeys.com/qed/random.html
-// which gives uniform numbers in [0, r)
-
-#define RS_SCALE (1.0 / (1.0 + RAND_MAX))
-
-double drand (void) {
- double d;
- do {
- d = (((random() * RS_SCALE) + random()) * RS_SCALE + random()) * RS_SCALE;
- } while (d >= 1); /* Round off */
- return d;
-}
-
-#define irand(x) ((unsigned int) ((x) * drand()))
-
-// use this to construct a 32-bit thread-safe RNG
-#define RAND32 ((uint32_t)irand(1ul<<32))
-uint32_t rand32_safe() {
- ScopedLock s(rand_m);
- return RAND32;
-}
+++ /dev/null
-#ifndef random_h
-#define random_h
-
-void srand_safe(unsigned int seed);
-uint32_t rand32_safe();
-
-#endif
#include <signal.h>
#include <unistd.h>
-#include "method_thread.h"
#include "connection.h"
-#include "slock.h"
#include "pollmgr.h"
#include "jsl_log.h"
-#include "gettime.h"
#include "lang/verify.h"
+#include "lock.h"
#define MAX_PDU (10<<20) //maximum PDF is 10M
-connection::connection(chanmgr *m1, int f1, int l1)
+connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
{
- int flags = fcntl(fd_, F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(fd_, F_SETFL, flags);
+ int flags = fcntl(fd_, F_GETFL, NULL);
+ flags |= O_NONBLOCK;
+ fcntl(fd_, F_SETFL, flags);
+
+ signal(SIGPIPE, SIG_IGN);
- signal(SIGPIPE, SIG_IGN);
- VERIFY(pthread_mutex_init(&m_,0)==0);
- VERIFY(pthread_mutex_init(&ref_m_,0)==0);
- VERIFY(pthread_cond_init(&send_wait_,0)==0);
- VERIFY(pthread_cond_init(&send_complete_,0)==0);
-
- VERIFY(gettimeofday(&create_time_, NULL) == 0);
+ VERIFY(gettimeofday(&create_time_, NULL) == 0);
- PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+ PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
}
connection::~connection()
{
- VERIFY(dead_);
- VERIFY(pthread_mutex_destroy(&m_)== 0);
- VERIFY(pthread_mutex_destroy(&ref_m_)== 0);
- VERIFY(pthread_cond_destroy(&send_wait_) == 0);
- VERIFY(pthread_cond_destroy(&send_complete_) == 0);
- if (rpdu_.buf)
- free(rpdu_.buf);
- VERIFY(!wpdu_.buf);
- close(fd_);
+ VERIFY(dead_);
+ if (rpdu_.buf)
+ free(rpdu_.buf);
+ VERIFY(!wpdu_.buf);
+ close(fd_);
}
void
connection::incref()
{
- ScopedLock ml(&ref_m_);
- refno_++;
+ lock rl(ref_m_);
+ refno_++;
}
bool
connection::isdead()
{
- ScopedLock ml(&m_);
- return dead_;
+ lock ml(m_);
+ return dead_;
}
void
connection::closeconn()
{
- {
- ScopedLock ml(&m_);
- if (!dead_) {
- dead_ = true;
- shutdown(fd_,SHUT_RDWR);
- }else{
- return;
- }
- }
- //after block_remove_fd, select will never wait on fd_
- //and no callbacks will be active
- PollMgr::Instance()->block_remove_fd(fd_);
+ {
+ lock ml(m_);
+ if (!dead_) {
+ dead_ = true;
+ shutdown(fd_,SHUT_RDWR);
+ } else {
+ return;
+ }
+ }
+ //after block_remove_fd, select will never wait on fd_
+ //and no callbacks will be active
+ PollMgr::Instance()->block_remove_fd(fd_);
}
void
connection::decref()
{
- VERIFY(pthread_mutex_lock(&ref_m_)==0);
- refno_ --;
- VERIFY(refno_>=0);
- if (refno_==0) {
- VERIFY(pthread_mutex_lock(&m_)==0);
- if (dead_) {
- VERIFY(pthread_mutex_unlock(&ref_m_)==0);
- VERIFY(pthread_mutex_unlock(&m_)==0);
- delete this;
- return;
- }
- VERIFY(pthread_mutex_unlock(&m_)==0);
- }
- pthread_mutex_unlock(&ref_m_);
+ bool dead = false;
+ {
+ lock rl(ref_m_);
+ refno_--;
+ VERIFY(refno_>=0);
+ if (refno_==0) {
+ lock ml(m_);
+ dead = dead_;
+ }
+ }
+ if (dead) {
+ delete this;
+ }
}
int
connection::ref()
{
- ScopedLock rl(&ref_m_);
+ lock rl(ref_m_);
return refno_;
}
int
connection::compare(connection *another)
{
- if (create_time_.tv_sec > another->create_time_.tv_sec)
- return 1;
- if (create_time_.tv_sec < another->create_time_.tv_sec)
- return -1;
- if (create_time_.tv_usec > another->create_time_.tv_usec)
- return 1;
- if (create_time_.tv_usec < another->create_time_.tv_usec)
- return -1;
- return 0;
+ if (create_time_.tv_sec > another->create_time_.tv_sec)
+ return 1;
+ if (create_time_.tv_sec < another->create_time_.tv_sec)
+ return -1;
+ if (create_time_.tv_usec > another->create_time_.tv_usec)
+ return 1;
+ if (create_time_.tv_usec < another->create_time_.tv_usec)
+ return -1;
+ return 0;
}
bool
connection::send(char *b, int sz)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
waiters_++;
while (!dead_ && wpdu_.buf) {
- VERIFY(pthread_cond_wait(&send_wait_, &m_)==0);
+ send_wait_.wait(ml);
}
waiters_--;
if (dead_) {
if (!writepdu()) {
dead_ = true;
- VERIFY(pthread_mutex_unlock(&m_) == 0);
+ ml.unlock();
PollMgr::Instance()->block_remove_fd(fd_);
- VERIFY(pthread_mutex_lock(&m_) == 0);
+ ml.lock();
}else{
if (wpdu_.solong == wpdu_.sz) {
}else{
//should be rare to need to explicitly add write callback
PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
- VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0);
+ send_complete_.wait(ml);
}
}
}
wpdu_.solong = wpdu_.sz = 0;
wpdu_.buf = NULL;
if (waiters_ > 0)
- pthread_cond_broadcast(&send_wait_);
+ send_wait_.notify_all();
return ret;
}
void
connection::write_cb(int s)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
VERIFY(!dead_);
VERIFY(fd_ == s);
if (wpdu_.sz == 0) {
if (!writepdu()) {
PollMgr::Instance()->del_callback(fd_, CB_RDWR);
dead_ = true;
- }else{
+ } else {
VERIFY(wpdu_.solong >= 0);
if (wpdu_.solong < wpdu_.sz) {
return;
}
- }
- pthread_cond_signal(&send_complete_);
+ }
+ send_complete_.notify_one();
}
//fd_ is ready to be read
void
connection::read_cb(int s)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
VERIFY(fd_ == s);
if (dead_) {
return;
if (!succ) {
PollMgr::Instance()->del_callback(fd_,CB_RDWR);
dead_ = true;
- pthread_cond_signal(&send_complete_);
+ send_complete_.notify_one();
}
if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
if (sz > MAX_PDU) {
char *tmpb = (char *)&sz1;
- jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
+ jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
return false;
}
return true;
}
-tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
: mgr_(m1), lossy_(lossytest)
{
-
- VERIFY(pthread_mutex_init(&m_,NULL) == 0);
-
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
VERIFY(0);
}
- socklen_t addrlen = sizeof(sin);
- VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
- port_ = ntohs(sin.sin_port);
+ socklen_t addrlen = sizeof(sin);
+ VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
+ port_ = ntohs(sin.sin_port);
- jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
+ jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
sin.sin_port);
if (pipe(pipe_) < 0) {
flags |= O_NONBLOCK;
fcntl(pipe_[0], F_SETFL, flags);
- VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0);
+ th_ = std::thread(&tcpsconn::accept_conn, this);
}
tcpsconn::~tcpsconn()
{
VERIFY(close(pipe_[1]) == 0);
- VERIFY(pthread_join(th_, NULL) == 0);
+ th_.join();
//close all the active connections
std::map<int, connection *>::iterator i;
for (i = conns_.begin(); i != conns_.end(); i++) {
i->second->closeconn();
i->second->decref();
- }
+ }
}
void
{
sockaddr_in sin;
socklen_t slen = sizeof(sin);
- int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
+ int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
if (s1 < 0) {
perror("tcpsconn::accept_conn error");
- pthread_exit(NULL);
+ throw thread_exit_exception();
}
- jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
+ jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
connection *ch = new connection(mgr_, s1, lossy_);
fd_set rfds;
int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
- while (1) {
- FD_ZERO(&rfds);
- FD_SET(pipe_[0], &rfds);
- FD_SET(tcp_, &rfds);
-
- int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
- if (ret < 0) {
- if (errno == EINTR) {
- continue;
- } else {
- perror("accept_conn select:");
- jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
- VERIFY(0);
- }
- }
-
- if (FD_ISSET(pipe_[0], &rfds)) {
- close(pipe_[0]);
- close(tcp_);
- return;
- }
- else if (FD_ISSET(tcp_, &rfds)) {
- process_accept();
- } else {
- VERIFY(0);
- }
- }
+ try {
+
+ while (1) {
+ FD_ZERO(&rfds);
+ FD_SET(pipe_[0], &rfds);
+ FD_SET(tcp_, &rfds);
+
+ int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+ if (ret < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ perror("accept_conn select:");
+ jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
+ VERIFY(0);
+ }
+ }
+
+ if (FD_ISSET(pipe_[0], &rfds)) {
+ close(pipe_[0]);
+ close(tcp_);
+ return;
+ }
+ else if (FD_ISSET(tcp_, &rfds)) {
+ process_accept();
+ } else {
+ VERIFY(0);
+ }
+ }
+ }
+ catch (thread_exit_exception e)
+ {
+ return;
+ }
}
connection *
int yes = 1;
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
- jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
+ jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
close(s);
return NULL;
return new connection(mgr, s, lossy);
}
-
#ifndef connection_h
-#define connection_h 1
+#define connection_h
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstddef>
+#include <thread>
#include <map>
#include "pollmgr.h"
+class thread_exit_exception : std::exception {
+};
+
class connection;
class chanmgr {
- public:
- virtual bool got_pdu(connection *c, char *b, int sz) = 0;
- virtual ~chanmgr() {}
+ public:
+ virtual bool got_pdu(connection *c, char *b, int sz) = 0;
+ virtual ~chanmgr() {}
};
class connection : public aio_callback {
- public:
- struct charbuf {
- charbuf(): buf(NULL), sz(0), solong(0) {}
- charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
- char *buf;
- int sz;
- int solong; //amount of bytes written or read so far
- };
-
- connection(chanmgr *m1, int f1, int lossytest=0);
- ~connection();
-
- int channo() { return fd_; }
- bool isdead();
- void closeconn();
-
- bool send(char *b, int sz);
- void write_cb(int s);
- void read_cb(int s);
-
- void incref();
- void decref();
- int ref();
-
- int compare(connection *another);
- private:
-
- bool readpdu();
- bool writepdu();
-
- chanmgr *mgr_;
- const int fd_;
- bool dead_;
-
- charbuf wpdu_;
- charbuf rpdu_;
-
- struct timeval create_time_;
-
- int waiters_;
- int refno_;
- const int lossy_;
-
- pthread_mutex_t m_;
- pthread_mutex_t ref_m_;
- pthread_cond_t send_complete_;
- pthread_cond_t send_wait_;
+ public:
+ struct charbuf {
+ charbuf(): buf(NULL), sz(0), solong(0) {}
+ charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
+ char *buf;
+ int sz;
+ int solong; //amount of bytes written or read so far
+ };
+
+ connection(chanmgr *m1, int f1, int lossytest=0);
+ ~connection();
+
+ int channo() { return fd_; }
+ bool isdead();
+ void closeconn();
+
+ bool send(char *b, int sz);
+ void write_cb(int s);
+ void read_cb(int s);
+
+ void incref();
+ void decref();
+ int ref();
+
+ int compare(connection *another);
+ private:
+
+ bool readpdu();
+ bool writepdu();
+
+ chanmgr *mgr_;
+ const int fd_;
+ bool dead_;
+
+ charbuf wpdu_;
+ charbuf rpdu_;
+
+ struct timeval create_time_;
+
+ int waiters_;
+ int refno_;
+ const int lossy_;
+
+ std::mutex m_;
+ std::mutex ref_m_;
+ std::condition_variable send_complete_;
+ std::condition_variable send_wait_;
};
class tcpsconn {
- public:
- tcpsconn(chanmgr *m1, int port, int lossytest=0);
- ~tcpsconn();
- inline int port() { return port_; }
- void accept_conn();
- private:
- int port_;
- pthread_mutex_t m_;
- pthread_t th_;
- int pipe_[2];
-
- int tcp_; //file desciptor for accepting connection
- chanmgr *mgr_;
- int lossy_;
- std::map<int, connection *> conns_;
-
- void process_accept();
+ public:
+ tcpsconn(chanmgr *m1, int port, int lossytest=0);
+ ~tcpsconn();
+ inline int port() { return port_; }
+ void accept_conn();
+ private:
+ int port_;
+ std::mutex m_;
+ std::thread th_;
+ int pipe_[2];
+
+ int tcp_; //file desciptor for accepting connection
+ chanmgr *mgr_;
+ int lossy_;
+ std::map<int, connection *> conns_;
+
+ void process_accept();
};
struct bundle {
- bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
- chanmgr *mgr;
- int tcp;
- int lossy;
+ bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
+ chanmgr *mgr;
+ int tcp;
+ int lossy;
};
-void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0);
connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
#endif
#include <sys/time.h>
#include <time.h>
#include <errno.h>
-#include "slock.h"
#include "lang/verify.h"
+#include "lock.h"
template<class T>
class fifo {
public:
fifo(int m=0);
- ~fifo();
bool enq(T, bool blocking=true);
void deq(T *);
bool size();
private:
std::list<T> q_;
- pthread_mutex_t m_;
- pthread_cond_t non_empty_c_; // q went non-empty
- pthread_cond_t has_space_c_; // q is not longer overfull
+ mutex m_;
+ std::condition_variable non_empty_c_; // q went non-empty
+ std::condition_variable has_space_c_; // q is not longer overfull
unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit
};
template<class T>
fifo<T>::fifo(int limit) : max_(limit)
{
- VERIFY(pthread_mutex_init(&m_, 0) == 0);
- VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0);
- VERIFY(pthread_cond_init(&has_space_c_, 0) == 0);
-}
-
-template<class T>
-fifo<T>::~fifo()
-{
- //fifo is to be deleted only when no threads are using it!
- VERIFY(pthread_mutex_destroy(&m_)==0);
- VERIFY(pthread_cond_destroy(&non_empty_c_) == 0);
- VERIFY(pthread_cond_destroy(&has_space_c_) == 0);
}
template<class T> bool
fifo<T>::size()
{
- ScopedLock ml(&m_);
+ lock ml(m_);
return q_.size();
}
template<class T> bool
fifo<T>::enq(T e, bool blocking)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
while (1) {
if (!max_ || q_.size() < max_) {
q_.push_back(e);
break;
}
- if (blocking)
- VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0);
+ if (blocking) {
+ has_space_c_.wait(ml);
+ }
else
return false;
}
- VERIFY(pthread_cond_signal(&non_empty_c_) == 0);
+ non_empty_c_.notify_one();
return true;
}
template<class T> void
fifo<T>::deq(T *e)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
while(1) {
if(q_.empty()){
- VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0);
+ non_empty_c_.wait(ml);
} else {
*e = q_.front();
q_.pop_front();
if (max_ && q_.size() < max_) {
- VERIFY(pthread_cond_signal(&has_space_c_)==0);
+ has_space_c_.notify_one();
}
break;
}
int JSL_DEBUG_LEVEL = 0;
void
jsl_set_debug(int level) {
- JSL_DEBUG_LEVEL = level;
+ JSL_DEBUG_LEVEL = level;
}
-#ifndef __JSL_LOG_H__
-#define __JSL_LOG_H__ 1
+#ifndef jsl_log_h
+#define jsl_log_h
enum dbcode {
- JSL_DBG_OFF = 0,
- JSL_DBG_1 = 1, // Critical
- JSL_DBG_2 = 2, // Error
- JSL_DBG_3 = 3, // Info
- JSL_DBG_4 = 4, // Debugging
+ JSL_DBG_OFF = 0,
+ JSL_DBG_1 = 1, // Critical
+ JSL_DBG_2 = 2, // Error
+ JSL_DBG_3 = 3, // Info
+ JSL_DBG_4 = 4, // Debugging
};
extern int JSL_DEBUG_LEVEL;
-#define jsl_log(level,...) \
- do { \
- if(JSL_DEBUG_LEVEL < abs(level)) \
- {;} \
- else { \
- { printf(__VA_ARGS__);} \
- } \
- } while(0)
+#define jsl_log(level,...) {if(JSL_DEBUG_LEVEL >= abs(level)) printf(__VA_ARGS__);}
void jsl_set_debug(int level);
-#endif // __JSL_LOG_H__
+#endif
#include <cstddef>
#include <inttypes.h>
#include "lang/verify.h"
-#include "lang/algorithm.h"
struct req_header {
req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0):
typedef uint64_t rpc_checksum_t;
typedef int rpc_sz_t;
-enum {
- //size of initial buffer allocation
- DEFAULT_RPC_SZ = 1024,
+//size of initial buffer allocation
+#define DEFAULT_RPC_SZ 1024
+#define RPC_HEADER_SZ_NO_CHECKSUM (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
#if RPC_CHECKSUMMING
- //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
- RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t)
+//size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
+#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM + sizeof(rpc_checksum_t))
#else
- RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t)
+#define RPC_HEADER_SZ (RPC_HEADER_SZ_NO_CHECKSUM)
#endif
-};
class marshall {
private:
+++ /dev/null
-#ifndef method_thread_h
-#define method_thread_h
-
-// method_thread(): start a thread that runs an object method.
-// returns a pthread_t on success, and zero on error.
-
-#include <pthread.h>
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include "lang/verify.h"
-
-static pthread_t
-method_thread_parent(void *(*fn)(void *), void *arg, bool detach)
-{
- pthread_t th;
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- // set stack size to 100K, so we don't run out of memory
- pthread_attr_setstacksize(&attr, 100*1024);
- int err = pthread_create(&th, &attr, fn, arg);
- pthread_attr_destroy(&attr);
- if (err != 0) {
- fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err));
- exit(1);
- }
-
- if (detach) {
- // don't keep thread state around after exit, to avoid
- // running out of threads. set detach==false if you plan
- // to pthread_join.
- VERIFY(pthread_detach(th) == 0);
- }
-
- return th;
-}
-
-static void
-method_thread_child()
-{
- // defer pthread_cancel() by default. check explicitly by
- // enabling then pthread_testcancel().
- int oldstate, oldtype;
- VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0);
- VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0);
-}
-
-template <class C> pthread_t
-method_thread(C *o, bool detach, void (C::*m)())
-{
- class XXX {
- public:
- C *o;
- void (C::*m)();
- static void *yyy(void *vvv) {
- XXX *x = (XXX*)vvv;
- C *o = x->o;
- void (C::*m)() = x->m;
- delete x;
- method_thread_child();
- (o->*m)();
- return 0;
- }
- };
- XXX *x = new XXX;
- x->o = o;
- x->m = m;
- return method_thread_parent(&XXX::yyy, (void *) x, detach);
-}
-
-template <class C, class A> pthread_t
-method_thread(C *o, bool detach, void (C::*m)(A), A a)
-{
- class XXX {
- public:
- C *o;
- void (C::*m)(A a);
- A a;
- static void *yyy(void *vvv) {
- XXX *x = (XXX*)vvv;
- C *o = x->o;
- void (C::*m)(A ) = x->m;
- A a = x->a;
- delete x;
- method_thread_child();
- (o->*m)(a);
- return 0;
- }
- };
- XXX *x = new XXX;
- x->o = o;
- x->m = m;
- x->a = a;
- return method_thread_parent(&XXX::yyy, (void *) x, detach);
-}
-
-namespace {
- // ~xavid: this causes a bizzare compile error on OS X.5 when
- // it's declared in the function, so I moved it out here.
- template <class C, class A1, class A2>
- class XXX {
- public:
- C *o;
- void (C::*m)(A1 a1, A2 a2);
- A1 a1;
- A2 a2;
- static void *yyy(void *vvv) {
- XXX *x = (XXX*)vvv;
- C *o = x->o;
- void (C::*m)(A1 , A2 ) = x->m;
- A1 a1 = x->a1;
- A2 a2 = x->a2;
- delete x;
- method_thread_child();
- (o->*m)(a1, a2);
- return 0;
- }
- };
-}
-
-template <class C, class A1, class A2> pthread_t
-method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2)
-{
- XXX<C,A1,A2> *x = new XXX<C,A1,A2>;
- x->o = o;
- x->m = m;
- x->a1 = a1;
- x->a2 = a2;
- return method_thread_parent(&XXX<C,A1,A2>::yyy, (void *) x, detach);
-}
-
-template <class C, class A1, class A2, class A3> pthread_t
-method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3)
-{
- class XXX {
- public:
- C *o;
- void (C::*m)(A1 a1, A2 a2, A3 a3);
- A1 a1;
- A2 a2;
- A3 a3;
- static void *yyy(void *vvv) {
- XXX *x = (XXX*)vvv;
- C *o = x->o;
- void (C::*m)(A1 , A2 , A3 ) = x->m;
- A1 a1 = x->a1;
- A2 a2 = x->a2;
- A3 a3 = x->a3;
- delete x;
- method_thread_child();
- (o->*m)(a1, a2, a3);
- return 0;
- }
- };
- XXX *x = new XXX;
- x->o = o;
- x->m = m;
- x->a1 = a1;
- x->a2 = a2;
- x->a3 = a3;
- return method_thread_parent(&XXX::yyy, (void *) x, detach);
-}
-
-#endif
-#include <sys/time.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
-#include "slock.h"
#include "jsl_log.h"
-#include "method_thread.h"
#include "lang/verify.h"
#include "pollmgr.h"
+#include "lock.h"
PollMgr *PollMgr::instance = NULL;
-static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
+static std::once_flag pollmgr_is_initialized;
void
PollMgrInit()
PollMgr *
PollMgr::Instance()
{
- pthread_once(&pollmgr_is_initialized, PollMgrInit);
+ std::call_once(pollmgr_is_initialized, PollMgrInit);
return instance;
}
aio_ = new SelectAIO();
//aio_ = new EPollAIO();
- VERIFY(pthread_mutex_init(&m_, NULL) == 0);
- VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
- VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
+ th_ = std::thread(&PollMgr::wait_loop, this);
}
PollMgr::~PollMgr()
{
VERIFY(fd < MAX_POLL_FDS);
- ScopedLock ml(&m_);
+ lock ml(m_);
aio_->watch_fd(fd, flag);
VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
void
PollMgr::block_remove_fd(int fd)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
aio_->unwatch_fd(fd, CB_RDWR);
pending_change_ = true;
- VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
+ changedone_c_.wait(ml);
callbacks_[fd] = NULL;
}
void
PollMgr::del_callback(int fd, poll_flag flag)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
if (aio_->unwatch_fd(fd, flag)) {
callbacks_[fd] = NULL;
}
bool
PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
if (!callbacks_[fd] || callbacks_[fd]!=c)
return false;
while (1) {
{
- ScopedLock ml(&m_);
+ lock ml(m_);
if (pending_change_) {
pending_change_ = false;
- VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
+ changedone_c_.notify_all();
}
}
readable.clear();
int flags = fcntl(pipefd_[0], F_GETFL, NULL);
flags |= O_NONBLOCK;
fcntl(pipefd_[0], F_SETFL, flags);
-
- VERIFY(pthread_mutex_init(&m_, NULL) == 0);
}
SelectAIO::~SelectAIO()
{
- VERIFY(pthread_mutex_destroy(&m_) == 0);
}
void
SelectAIO::watch_fd(int fd, poll_flag flag)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
if (highfds_ <= fd)
highfds_ = fd;
bool
SelectAIO::is_watched(int fd, poll_flag flag)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
if (flag == CB_RDONLY) {
return FD_ISSET(fd,&rfds_);
}else if (flag == CB_WRONLY) {
bool
SelectAIO::unwatch_fd(int fd, poll_flag flag)
{
- ScopedLock ml(&m_);
+ lock ml(m_);
if (flag == CB_RDONLY) {
FD_CLR(fd, &rfds_);
}else if (flag == CB_WRONLY) {
int high;
{
- ScopedLock ml(&m_);
+ lock ml(m_);
trfds = rfds_;
twfds = wfds_;
high = highfds_;
-
}
int ret = select(high+1, &trfds, &twfds, NULL, NULL);
#include <sys/select.h>
#include <vector>
+#include <thread>
#ifdef __linux__
#include <sys/epoll.h>
static int useless;
private:
- pthread_mutex_t m_;
- pthread_cond_t changedone_c_;
- pthread_t th_;
+ std::mutex m_;
+ std::condition_variable changedone_c_;
+ std::thread th_;
aio_callback *callbacks_[MAX_POLL_FDS];
aio_mgr *aio_;
int highfds_;
int pipefd_[2];
- pthread_mutex_t m_;
+ std::mutex m_;
};
object. A connection object is deleted only when the underlying connection is
dead and the reference count reaches zero.
- The previous version of the RPC library uses pthread_cancel* routines
- to implement the deletion of rpcc and rpcs objects. The idea is to cancel
- all active threads that might be holding a reference to an object before
- deleting that object. However, pthread_cancel is not robust and there are
- always bugs where outstanding references to deleted objects persist.
- This version of the RPC library does not do pthread_cancel, but explicitly
- joins exited threads to make sure no outstanding references exist before
- deleting objects.
+ This version of the RPC library explicitly joins exited threads to make sure
+ no outstanding references exist before deleting objects.
To delete a rpcc object safely, the users of the library must ensure that
there are no outstanding calls on the rpcc object.
3. delete the dispatch thread pool which involves waiting for current active
RPC handlers to finish. It is interesting how a thread pool can be deleted
without using thread cancellation. The trick is to inject x "poison pills" for
- a thread pool of x threads. Upon getting a poison pill instead of a normal
+ a thread pool of x threads. Upon getting a poison pill instead of a normal
task, a worker thread will exit (and thread pool destructor waits to join all
x exited worker threads).
*/
#include "rpc.h"
-#include "method_thread.h"
-#include "slock.h"
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
-#include <time.h>
#include <netdb.h>
#include <unistd.h>
+#include "lock.h"
#include "jsl_log.h"
-#include "gettime.h"
#include "lang/verify.h"
const rpcc::TO rpcc::to_max = { 120000 };
rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
: xid(xxid), un(xun), done(false)
{
- VERIFY(pthread_mutex_init(&m,0) == 0);
- VERIFY(pthread_cond_init(&c, 0) == 0);
}
rpcc::caller::~caller()
{
- VERIFY(pthread_mutex_destroy(&m) == 0);
- VERIFY(pthread_cond_destroy(&c) == 0);
}
inline
void set_rand_seed()
{
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- srandom((int)ts.tv_nsec^((int)getpid()));
+ auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
+ srandom((int)now.time_since_epoch().count()^((int)getpid()));
}
-rpcc::rpcc(sockaddr_in d, bool retrans) :
- dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+rpcc::rpcc(sockaddr_in d, bool retrans) :
+ dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+ retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
{
- VERIFY(pthread_mutex_init(&m_, 0) == 0);
- VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
- VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
-
- if(retrans){
- set_rand_seed();
- clt_nonce_ = random();
- } else {
- // special client nonce 0 means this client does not
- // require at-most-once logic from the server
- // because it uses tcp and never retries a failed connection
- clt_nonce_ = 0;
- }
-
- char *loss_env = getenv("RPC_LOSSY");
- if(loss_env != NULL){
- lossytest_ = atoi(loss_env);
- }
-
- // xid starts with 1 and latest received reply starts with 0
- xid_rep_window_.push_back(0);
-
- jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
- clt_nonce_, lossytest_);
+ if(retrans){
+ set_rand_seed();
+ clt_nonce_ = random();
+ } else {
+ // special client nonce 0 means this client does not
+ // require at-most-once logic from the server
+ // because it uses tcp and never retries a failed connection
+ clt_nonce_ = 0;
+ }
+
+ char *loss_env = getenv("RPC_LOSSY");
+ if(loss_env != NULL){
+ lossytest_ = atoi(loss_env);
+ }
+
+ // xid starts with 1 and latest received reply starts with 0
+ xid_rep_window_.push_back(0);
+
+ jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
+ clt_nonce_, lossytest_);
}
// IMPORTANT: destruction should happen only when no external threads
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc()
{
- jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
- clt_nonce_, chan_?chan_->channo():-1);
- if(chan_){
- chan_->closeconn();
- chan_->decref();
- }
- VERIFY(calls_.size() == 0);
- VERIFY(pthread_mutex_destroy(&m_) == 0);
- VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
+ jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
+ clt_nonce_, chan_?chan_->channo():-1);
+ if(chan_){
+ chan_->closeconn();
+ chan_->decref();
+ }
+ VERIFY(calls_.size() == 0);
}
int
rpcc::bind(TO to)
{
- int r;
- int ret = call(rpc_const::bind, 0, r, to);
- if(ret == 0){
- ScopedLock ml(&m_);
- bind_done_ = true;
- srv_nonce_ = r;
- } else {
- jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
- inet_ntoa(dst_.sin_addr), ret);
- }
- return ret;
+ int r;
+ int ret = call(rpc_const::bind, 0, r, to);
+ if(ret == 0){
+ lock ml(m_);
+ bind_done_ = true;
+ srv_nonce_ = r;
+ } else {
+ jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
+ inet_ntoa(dst_.sin_addr), ret);
+ }
+ return ret;
};
// Cancel all outstanding calls
-void
+ void
rpcc::cancel(void)
{
- ScopedLock ml(&m_);
- printf("rpcc::cancel: force callers to fail\n");
- std::map<int,caller*>::iterator iter;
- for(iter = calls_.begin(); iter != calls_.end(); iter++){
- caller *ca = iter->second;
+ lock ml(m_);
+ printf("rpcc::cancel: force callers to fail\n");
+ std::map<int,caller*>::iterator iter;
+ for(iter = calls_.begin(); iter != calls_.end(); iter++){
+ caller *ca = iter->second;
- jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
- {
- ScopedLock cl(&ca->m);
- ca->done = true;
- ca->intret = rpc_const::cancel_failure;
- VERIFY(pthread_cond_signal(&ca->c) == 0);
+ jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
+ {
+ lock cl(ca->m);
+ ca->done = true;
+ ca->intret = rpc_const::cancel_failure;
+ ca->c.notify_one();
+ }
}
- }
- while (calls_.size () > 0){
- destroy_wait_ = true;
- VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
- }
- printf("rpcc::cancel: done\n");
+ while (calls_.size () > 0){
+ destroy_wait_ = true;
+ destroy_wait_c_.wait(ml);
+ }
+ printf("rpcc::cancel: done\n");
}
int
rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
- TO to)
+ TO to)
{
- caller ca(0, &rep);
+ caller ca(0, &rep);
int xid_rep;
- {
- ScopedLock ml(&m_);
+ {
+ lock ml(m_);
- if((proc != rpc_const::bind && !bind_done_) ||
- (proc == rpc_const::bind && bind_done_)){
- jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
- return rpc_const::bind_failure;
- }
+ if((proc != rpc_const::bind && !bind_done_) ||
+ (proc == rpc_const::bind && bind_done_)){
+ jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
+ return rpc_const::bind_failure;
+ }
- if(destroy_wait_){
- return rpc_const::cancel_failure;
- }
+ if(destroy_wait_){
+ return rpc_const::cancel_failure;
+ }
- ca.xid = xid_++;
- calls_[ca.xid] = &ca;
+ ca.xid = xid_++;
+ calls_[ca.xid] = &ca;
- req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
+ req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
xid_rep_window_.front());
- req.pack_req_header(h);
+ req.pack_req_header(h);
xid_rep = xid_rep_window_.front();
- }
-
- TO curr_to;
- struct timespec now, nextdeadline, finaldeadline;
-
- clock_gettime(CLOCK_REALTIME, &now);
- add_timespec(now, to.to, &finaldeadline);
- curr_to.to = to_min.to;
-
- bool transmit = true;
- connection *ch = NULL;
-
- while (1){
- if(transmit){
- get_refconn(&ch);
- if(ch){
- if(reachable_) {
- request forgot;
- {
- ScopedLock ml(&m_);
- if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
- forgot = dup_req_;
- dup_req_.clear();
- }
- }
- if (forgot.isvalid())
- ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
- ch->send(req.cstr(), req.size());
- }
- else jsl_log(JSL_DBG_1, "not reachable\n");
- jsl_log(JSL_DBG_2,
- "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
- clt_nonce_, proc, ca.xid, clt_nonce_);
- }
- transmit = false; // only send once on a given channel
- }
-
- if(!finaldeadline.tv_sec)
- break;
-
- clock_gettime(CLOCK_REALTIME, &now);
- add_timespec(now, curr_to.to, &nextdeadline);
- if(cmp_timespec(nextdeadline,finaldeadline) > 0){
- nextdeadline = finaldeadline;
- finaldeadline.tv_sec = 0;
- }
-
- {
- ScopedLock cal(&ca.m);
- while (!ca.done){
- jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
- if(pthread_cond_timedwait(&ca.c, &ca.m,
- &nextdeadline) == ETIMEDOUT){
- jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
- break;
- }
- }
- if(ca.done){
- jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
- break;
- }
- }
-
- if(retrans_ && (!ch || ch->isdead())){
- // since connection is dead, retransmit
- // on the new connection
- transmit = true;
- }
- curr_to.to <<= 1;
- }
-
- {
- // no locking of ca.m since only this thread changes ca.xid
- ScopedLock ml(&m_);
- calls_.erase(ca.xid);
- // may need to update the xid again here, in case the
- // packet times out before it's even sent by the channel.
- // I don't think there's any harm in maybe doing it twice
- update_xid_rep(ca.xid);
-
- if(destroy_wait_){
- VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
- }
- }
-
- if (ca.done && lossytest_)
+ }
+
+ TO curr_to;
+ std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
+ std::chrono::steady_clock::now() +
+ std::chrono::milliseconds(to.to),
+ nextdeadline;
+
+ curr_to.to = to_min.to;
+
+ bool transmit = true;
+ connection *ch = NULL;
+
+ while (1){
+ if(transmit){
+ get_refconn(&ch);
+ if(ch){
+ if(reachable_) {
+ request forgot;
+ {
+ lock ml(m_);
+ if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
+ forgot = dup_req_;
+ dup_req_.clear();
+ }
+ }
+ if (forgot.isvalid())
+ ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
+ ch->send(req.cstr(), req.size());
+ }
+ else jsl_log(JSL_DBG_1, "not reachable\n");
+ jsl_log(JSL_DBG_2,
+ "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
+ clt_nonce_, proc, ca.xid, clt_nonce_);
+ }
+ transmit = false; // only send once on a given channel
+ }
+
+ if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
+ break;
+
+ nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
+ if(nextdeadline > finaldeadline) {
+ nextdeadline = finaldeadline;
+ finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
+ }
+
{
- ScopedLock ml(&m_);
- if (!dup_req_.isvalid()) {
- dup_req_.buf.assign(req.cstr(), req.size());
- dup_req_.xid = ca.xid;
+ lock cal(ca.m);
+ while (!ca.done){
+ jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
+ if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
+ jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
+ break;
}
- if (xid_rep > xid_rep_done_)
- xid_rep_done_ = xid_rep;
+ }
+ if(ca.done){
+ jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
+ break;
+ }
}
- ScopedLock cal(&ca.m);
+ if(retrans_ && (!ch || ch->isdead())){
+ // since connection is dead, retransmit
+ // on the new connection
+ transmit = true;
+ }
+ curr_to.to <<= 1;
+ }
- jsl_log(JSL_DBG_2,
- "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
- clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
- ntohs(dst_.sin_port), ca.done, ca.intret);
+ {
+ // no locking of ca.m since only this thread changes ca.xid
+ lock ml(m_);
+ calls_.erase(ca.xid);
+ // may need to update the xid again here, in case the
+ // packet times out before it's even sent by the channel.
+ // I don't think there's any harm in maybe doing it twice
+ update_xid_rep(ca.xid);
+
+ if(destroy_wait_){
+ destroy_wait_c_.notify_one();
+ }
+ }
+
+ if (ca.done && lossytest_)
+ {
+ lock ml(m_);
+ if (!dup_req_.isvalid()) {
+ dup_req_.buf.assign(req.cstr(), req.size());
+ dup_req_.xid = ca.xid;
+ }
+ if (xid_rep > xid_rep_done_)
+ xid_rep_done_ = xid_rep;
+ }
- if(ch)
- ch->decref();
+ lock cal(ca.m);
- // destruction of req automatically frees its buffer
- return (ca.done? ca.intret : rpc_const::timeout_failure);
+ jsl_log(JSL_DBG_2,
+ "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
+ clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
+ ntohs(dst_.sin_port), ca.done, ca.intret);
+
+ if(ch)
+ ch->decref();
+
+ // destruction of req automatically frees its buffer
+ return (ca.done? ca.intret : rpc_const::timeout_failure);
}
void
rpcc::get_refconn(connection **ch)
{
- ScopedLock ml(&chan_m_);
- if(!chan_ || chan_->isdead()){
- if(chan_)
- chan_->decref();
- chan_ = connect_to_dst(dst_, this, lossytest_);
- }
- if(ch && chan_){
- if(*ch){
- (*ch)->decref();
- }
- *ch = chan_;
- (*ch)->incref();
- }
+ lock ml(chan_m_);
+ if(!chan_ || chan_->isdead()){
+ if(chan_)
+ chan_->decref();
+ chan_ = connect_to_dst(dst_, this, lossytest_);
+ }
+ if(ch && chan_){
+ if(*ch){
+ (*ch)->decref();
+ }
+ *ch = chan_;
+ (*ch)->incref();
+ }
}
-// PollMgr's thread is being used to
-// make this upcall from connection object to rpcc.
+// PollMgr's thread is being used to
+// make this upcall from connection object to rpcc.
// this funtion must not block.
//
-// this function keeps no reference for connection *c
+// this function keeps no reference for connection *c
bool
rpcc::got_pdu(connection *c, char *b, int sz)
{
- unmarshall rep(b, sz);
- reply_header h;
- rep.unpack_reply_header(&h);
-
- if(!rep.ok()){
- jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
- return true;
- }
-
- ScopedLock ml(&m_);
-
- update_xid_rep(h.xid);
-
- if(calls_.find(h.xid) == calls_.end()){
- jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
- return true;
- }
- caller *ca = calls_[h.xid];
-
- ScopedLock cl(&ca->m);
- if(!ca->done){
- ca->un->take_in(rep);
- ca->intret = h.ret;
- if(ca->intret < 0){
- jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
- h.xid, ca->intret);
- }
- ca->done = 1;
- }
- VERIFY(pthread_cond_broadcast(&ca->c) == 0);
- return true;
+ unmarshall rep(b, sz);
+ reply_header h;
+ rep.unpack_reply_header(&h);
+
+ if(!rep.ok()){
+ jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
+ return true;
+ }
+
+ lock ml(m_);
+
+ update_xid_rep(h.xid);
+
+ if(calls_.find(h.xid) == calls_.end()){
+ jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
+ return true;
+ }
+ caller *ca = calls_[h.xid];
+
+ lock cl(ca->m);
+ if(!ca->done){
+ ca->un->take_in(rep);
+ ca->intret = h.ret;
+ if(ca->intret < 0){
+ jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
+ h.xid, ca->intret);
+ }
+ ca->done = 1;
+ }
+ ca->c.notify_all();
+ return true;
}
// assumes thread holds mutex m
-void
+void
rpcc::update_xid_rep(unsigned int xid)
{
- std::list<unsigned int>::iterator it;
+ std::list<unsigned int>::iterator it;
- if(xid <= xid_rep_window_.front()){
- return;
- }
+ if(xid <= xid_rep_window_.front()){
+ return;
+ }
- for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
- if(*it > xid){
- xid_rep_window_.insert(it, xid);
- goto compress;
- }
- }
- xid_rep_window_.push_back(xid);
+ for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+ if(*it > xid){
+ xid_rep_window_.insert(it, xid);
+ goto compress;
+ }
+ }
+ xid_rep_window_.push_back(xid);
compress:
- it = xid_rep_window_.begin();
- for (it++; it != xid_rep_window_.end(); it++){
- while (xid_rep_window_.front() + 1 == *it)
- xid_rep_window_.pop_front();
- }
+ it = xid_rep_window_.begin();
+ for (it++; it != xid_rep_window_.end(); it++){
+ while (xid_rep_window_.front() + 1 == *it)
+ xid_rep_window_.pop_front();
+ }
}
rpcs::rpcs(unsigned int p1, int count)
: port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
{
- VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
- VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
- VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
- VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
-
- set_rand_seed();
- nonce_ = random();
- jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
+ set_rand_seed();
+ nonce_ = random();
+ jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
- char *loss_env = getenv("RPC_LOSSY");
- if(loss_env != NULL){
- lossytest_ = atoi(loss_env);
- }
+ char *loss_env = getenv("RPC_LOSSY");
+ if(loss_env != NULL){
+ lossytest_ = atoi(loss_env);
+ }
- reg(rpc_const::bind, this, &rpcs::rpcbind);
- dispatchpool_ = new ThrPool(6,false);
+ reg(rpc_const::bind, this, &rpcs::rpcbind);
+ dispatchpool_ = new ThrPool(6,false);
- listener_ = new tcpsconn(this, port_, lossytest_);
+ listener_ = new tcpsconn(this, port_, lossytest_);
}
rpcs::~rpcs()
{
- // must delete listener before dispatchpool
- delete listener_;
- delete dispatchpool_;
- free_reply_window();
+ // must delete listener before dispatchpool
+ delete listener_;
+ delete dispatchpool_;
+ free_reply_window();
}
bool
return true;
}
- djob_t *j = new djob_t(c, b, sz);
- c->incref();
- bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
- if(!succ || !reachable_){
- c->decref();
- delete j;
- }
- return succ;
+ djob_t *j = new djob_t(c, b, sz);
+ c->incref();
+ bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
+ if(!succ || !reachable_){
+ c->decref();
+ delete j;
+ }
+ return succ;
}
void
rpcs::reg1(unsigned int proc, handler *h)
{
- ScopedLock pl(&procs_m_);
- VERIFY(procs_.count(proc) == 0);
- procs_[proc] = h;
- VERIFY(procs_.count(proc) >= 1);
+ lock pl(procs_m_);
+ VERIFY(procs_.count(proc) == 0);
+ procs_[proc] = h;
+ VERIFY(procs_.count(proc) >= 1);
}
void
rpcs::updatestat(unsigned int proc)
{
- ScopedLock cl(&count_m_);
- counts_[proc]++;
- curr_counts_--;
- if(curr_counts_ == 0){
- std::map<int, int>::iterator i;
- printf("RPC STATS: ");
- for (i = counts_.begin(); i != counts_.end(); i++){
- printf("%x:%d ", i->first, i->second);
- }
- printf("\n");
-
- ScopedLock rwl(&reply_window_m_);
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
-
- unsigned int totalrep = 0, maxrep = 0;
- for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- totalrep += clt->second.size();
- if(clt->second.size() > maxrep)
- maxrep = clt->second.size();
- }
- jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
+ lock cl(count_m_);
+ counts_[proc]++;
+ curr_counts_--;
+ if(curr_counts_ == 0){
+ std::map<int, int>::iterator i;
+ printf("RPC STATS: ");
+ for (i = counts_.begin(); i != counts_.end(); i++){
+ printf("%x:%d ", i->first, i->second);
+ }
+ printf("\n");
+
+ lock rwl(reply_window_m_);
+ std::map<unsigned int,std::list<reply_t> >::iterator clt;
+
+ unsigned int totalrep = 0, maxrep = 0;
+ for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+ totalrep += clt->second.size();
+ if(clt->second.size() > maxrep)
+ maxrep = clt->second.size();
+ }
+ jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
(int) reply_window_.size()-1, totalrep, maxrep);
- curr_counts_ = counting_;
- }
+ curr_counts_ = counting_;
+ }
}
void
rpcs::dispatch(djob_t *j)
{
- connection *c = j->conn;
- unmarshall req(j->buf, j->sz);
- delete j;
-
- req_header h;
- req.unpack_req_header(&h);
- int proc = h.proc;
-
- if(!req.ok()){
- jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
- c->decref();
- return;
- }
-
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
- h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
-
- marshall rep;
- reply_header rh(h.xid,0);
-
- // is client sending to an old instance of server?
- if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
- h.srv_nonce, nonce_, h.proc);
- rh.ret = rpc_const::oldsrv_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
- return;
- }
-
- handler *f;
- // is RPC proc a registered procedure?
- {
- ScopedLock pl(&procs_m_);
- if(procs_.count(proc) < 1){
- fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
- proc);
- c->decref();
+ connection *c = j->conn;
+ unmarshall req(j->buf, j->sz);
+ delete j;
+
+ req_header h;
+ req.unpack_req_header(&h);
+ int proc = h.proc;
+
+ if(!req.ok()){
+ jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
+ c->decref();
+ return;
+ }
+
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
+ h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
+
+ marshall rep;
+ reply_header rh(h.xid,0);
+
+ // is client sending to an old instance of server?
+ if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
+ h.srv_nonce, nonce_, h.proc);
+ rh.ret = rpc_const::oldsrv_failure;
+ rep.pack_reply_header(rh);
+ c->send(rep.cstr(),rep.size());
+ return;
+ }
+
+ handler *f;
+ // is RPC proc a registered procedure?
+ {
+ lock pl(procs_m_);
+ if(procs_.count(proc) < 1){
+ fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
+ proc);
+ c->decref();
VERIFY(0);
- return;
- }
-
- f = procs_[proc];
- }
-
- rpcs::rpcstate_t stat;
- char *b1;
- int sz1;
-
- if(h.clt_nonce){
- // have i seen this client before?
- {
- ScopedLock rwl(&reply_window_m_);
- // if we don't know about this clt_nonce, create a cleanup object
- if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
- VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+ return;
+ }
+
+ f = procs_[proc];
+ }
+
+ rpcs::rpcstate_t stat;
+ char *b1;
+ int sz1;
+
+ if(h.clt_nonce){
+ // have i seen this client before?
+ {
+ lock rwl(reply_window_m_);
+ // if we don't know about this clt_nonce, create a cleanup object
+ if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
+ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
- h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
- }
- }
-
- // save the latest good connection to the client
- {
- ScopedLock rwl(&conss_m_);
- if(conns_.find(h.clt_nonce) == conns_.end()){
- c->incref();
- conns_[h.clt_nonce] = c;
- } else if(conns_[h.clt_nonce]->compare(c) < 0){
- conns_[h.clt_nonce]->decref();
- c->incref();
- conns_[h.clt_nonce] = c;
- }
- }
-
- stat = checkduplicate_and_update(h.clt_nonce, h.xid,
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
+ h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
+ }
+ }
+
+ // save the latest good connection to the client
+ {
+ lock rwl(conss_m_);
+ if(conns_.find(h.clt_nonce) == conns_.end()){
+ c->incref();
+ conns_[h.clt_nonce] = c;
+ } else if(conns_[h.clt_nonce]->compare(c) < 0){
+ conns_[h.clt_nonce]->decref();
+ c->incref();
+ conns_[h.clt_nonce] = c;
+ }
+ }
+
+ stat = checkduplicate_and_update(h.clt_nonce, h.xid,
h.xid_rep, &b1, &sz1);
- } else {
- // this client does not require at most once logic
- stat = NEW;
- }
-
- switch (stat){
- case NEW: // new request
- if(counting_){
- updatestat(proc);
- }
-
- rh.ret = f->fn(req, rep);
+ } else {
+ // this client does not require at most once logic
+ stat = NEW;
+ }
+
+ switch (stat){
+ case NEW: // new request
+ if(counting_){
+ updatestat(proc);
+ }
+
+ rh.ret = f->fn(req, rep);
if (rh.ret == rpc_const::unmarshal_args_failure) {
fprintf(stderr, "rpcs::dispatch: failed to"
" unmarshall the arguments. You are"
" types of arguments.\n", proc);
VERIFY(0);
}
- VERIFY(rh.ret >= 0);
-
- rep.pack_reply_header(rh);
- rep.take_buf(&b1,&sz1);
-
- jsl_log(JSL_DBG_2,
- "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
- sz1, h.xid, proc, rh.ret, h.clt_nonce);
-
- if(h.clt_nonce > 0){
- // only record replies for clients that require at-most-once logic
- add_reply(h.clt_nonce, h.xid, b1, sz1);
- }
-
- // get the latest connection to the client
- {
- ScopedLock rwl(&conss_m_);
- if(c->isdead() && c != conns_[h.clt_nonce]){
- c->decref();
- c = conns_[h.clt_nonce];
- c->incref();
- }
- }
-
- c->send(b1, sz1);
- if(h.clt_nonce == 0){
- // reply is not added to at-most-once window, free it
- free(b1);
- }
- break;
- case INPROGRESS: // server is working on this request
- break;
- case DONE: // duplicate and we still have the response
- c->send(b1, sz1);
- break;
- case FORGOTTEN: // very old request and we don't have the response anymore
- jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
- h.xid, h.clt_nonce);
- rh.ret = rpc_const::atmostonce_failure;
- rep.pack_reply_header(rh);
- c->send(rep.cstr(),rep.size());
- break;
- }
- c->decref();
+ VERIFY(rh.ret >= 0);
+
+ rep.pack_reply_header(rh);
+ rep.take_buf(&b1,&sz1);
+
+ jsl_log(JSL_DBG_2,
+ "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
+ sz1, h.xid, proc, rh.ret, h.clt_nonce);
+
+ if(h.clt_nonce > 0){
+ // only record replies for clients that require at-most-once logic
+ add_reply(h.clt_nonce, h.xid, b1, sz1);
+ }
+
+ // get the latest connection to the client
+ {
+ lock rwl(conss_m_);
+ if(c->isdead() && c != conns_[h.clt_nonce]){
+ c->decref();
+ c = conns_[h.clt_nonce];
+ c->incref();
+ }
+ }
+
+ c->send(b1, sz1);
+ if(h.clt_nonce == 0){
+ // reply is not added to at-most-once window, free it
+ free(b1);
+ }
+ break;
+ case INPROGRESS: // server is working on this request
+ break;
+ case DONE: // duplicate and we still have the response
+ c->send(b1, sz1);
+ break;
+ case FORGOTTEN: // very old request and we don't have the response anymore
+ jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
+ h.xid, h.clt_nonce);
+ rh.ret = rpc_const::atmostonce_failure;
+ rep.pack_reply_header(rh);
+ c->send(rep.cstr(),rep.size());
+ break;
+ }
+ c->decref();
}
// rpcs::dispatch calls this when an RPC request arrives.
// INPROGRESS: seen this xid, and still processing it.
// DONE: seen this xid, previous reply returned in *b and *sz.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
-rpcs::rpcstate_t
+rpcs::rpcstate_t
rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
- unsigned int xid_rep, char **b, int *sz)
+ unsigned int xid_rep, char **b, int *sz)
{
- ScopedLock rwl(&reply_window_m_);
+ lock rwl(reply_window_m_);
std::list<reply_t> &l = reply_window_[clt_nonce];
// rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
// and passes the return value in b and sz.
// add_reply() should remember b and sz.
-// free_reply_window() and checkduplicate_and_update is responsible for
+// free_reply_window() and checkduplicate_and_update is responsible for
// calling free(b).
void
rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
- char *b, int sz)
+ char *b, int sz)
{
- ScopedLock rwl(&reply_window_m_);
+ lock rwl(reply_window_m_);
// remember the RPC reply value
std::list<reply_t> &l = reply_window_[clt_nonce];
std::list<reply_t>::iterator it = l.begin();
void
rpcs::free_reply_window(void)
{
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
- std::list<reply_t>::iterator it;
+ std::map<unsigned int,std::list<reply_t> >::iterator clt;
+ std::list<reply_t>::iterator it;
- ScopedLock rwl(&reply_window_m_);
- for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- for (it = clt->second.begin(); it != clt->second.end(); it++){
- if (it->cb_present)
+ lock rwl(reply_window_m_);
+ for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+ for (it = clt->second.begin(); it != clt->second.end(); it++){
+ if (it->cb_present)
free(it->buf);
- }
- clt->second.clear();
- }
- reply_window_.clear();
+ }
+ clt->second.clear();
+ }
+ reply_window_.clear();
}
// rpc handler
-int
+int
rpcs::rpcbind(int a, int &r)
{
- jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
- r = nonce_;
- return 0;
+ jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
+ r = nonce_;
+ return 0;
}
void
marshall::rawbyte(unsigned char x)
{
- if(_ind >= _capa){
- _capa *= 2;
- VERIFY (_buf != NULL);
- _buf = (char *)realloc(_buf, _capa);
- VERIFY(_buf);
- }
- _buf[_ind++] = x;
+ if(_ind >= _capa){
+ _capa *= 2;
+ VERIFY (_buf != NULL);
+ _buf = (char *)realloc(_buf, _capa);
+ VERIFY(_buf);
+ }
+ _buf[_ind++] = x;
}
void
marshall::rawbytes(const char *p, int n)
{
- if((_ind+n) > _capa){
- _capa = _capa > n? 2*_capa:(_capa+n);
- VERIFY (_buf != NULL);
- _buf = (char *)realloc(_buf, _capa);
- VERIFY(_buf);
- }
- memcpy(_buf+_ind, p, n);
- _ind += n;
+ if((_ind+n) > _capa){
+ _capa = _capa > n? 2*_capa:(_capa+n);
+ VERIFY (_buf != NULL);
+ _buf = (char *)realloc(_buf, _capa);
+ VERIFY(_buf);
+ }
+ memcpy(_buf+_ind, p, n);
+ _ind += n;
}
marshall &
operator<<(marshall &m, bool x)
{
- m.rawbyte(x);
- return m;
+ m.rawbyte(x);
+ return m;
}
marshall &
operator<<(marshall &m, unsigned char x)
{
- m.rawbyte(x);
- return m;
+ m.rawbyte(x);
+ return m;
}
marshall &
operator<<(marshall &m, char x)
{
- m << (unsigned char) x;
- return m;
+ m << (unsigned char) x;
+ return m;
}
marshall &
operator<<(marshall &m, unsigned short x)
{
- m.rawbyte((x >> 8) & 0xff);
- m.rawbyte(x & 0xff);
- return m;
+ m.rawbyte((x >> 8) & 0xff);
+ m.rawbyte(x & 0xff);
+ return m;
}
marshall &
operator<<(marshall &m, short x)
{
- m << (unsigned short) x;
- return m;
+ m << (unsigned short) x;
+ return m;
}
marshall &
operator<<(marshall &m, unsigned int x)
{
- // network order is big-endian
- m.rawbyte((x >> 24) & 0xff);
- m.rawbyte((x >> 16) & 0xff);
- m.rawbyte((x >> 8) & 0xff);
- m.rawbyte(x & 0xff);
- return m;
+ // network order is big-endian
+ m.rawbyte((x >> 24) & 0xff);
+ m.rawbyte((x >> 16) & 0xff);
+ m.rawbyte((x >> 8) & 0xff);
+ m.rawbyte(x & 0xff);
+ return m;
}
marshall &
operator<<(marshall &m, int x)
{
- m << (unsigned int) x;
- return m;
+ m << (unsigned int) x;
+ return m;
}
marshall &
operator<<(marshall &m, const std::string &s)
{
- m << (unsigned int) s.size();
- m.rawbytes(s.data(), s.size());
- return m;
+ m << (unsigned int) s.size();
+ m.rawbytes(s.data(), s.size());
+ return m;
}
marshall &
operator<<(marshall &m, unsigned long long x)
{
- m << (unsigned int) (x >> 32);
- m << (unsigned int) x;
- return m;
+ m << (unsigned int) (x >> 32);
+ m << (unsigned int) x;
+ return m;
}
void
marshall::pack(int x)
{
- rawbyte((x >> 24) & 0xff);
- rawbyte((x >> 16) & 0xff);
- rawbyte((x >> 8) & 0xff);
- rawbyte(x & 0xff);
+ rawbyte((x >> 24) & 0xff);
+ rawbyte((x >> 16) & 0xff);
+ rawbyte((x >> 8) & 0xff);
+ rawbyte(x & 0xff);
}
void
unmarshall::unpack(int *x)
{
- (*x) = (rawbyte() & 0xff) << 24;
- (*x) |= (rawbyte() & 0xff) << 16;
- (*x) |= (rawbyte() & 0xff) << 8;
- (*x) |= rawbyte() & 0xff;
+ (*x) = (rawbyte() & 0xff) << 24;
+ (*x) |= (rawbyte() & 0xff) << 16;
+ (*x) |= (rawbyte() & 0xff) << 8;
+ (*x) |= rawbyte() & 0xff;
}
// take the contents from another unmarshall object
void
unmarshall::take_in(unmarshall &another)
{
- if(_buf)
- free(_buf);
- another.take_buf(&_buf, &_sz);
- _ind = RPC_HEADER_SZ;
- _ok = _sz >= RPC_HEADER_SZ?true:false;
+ if(_buf)
+ free(_buf);
+ another.take_buf(&_buf, &_sz);
+ _ind = RPC_HEADER_SZ;
+ _ok = _sz >= RPC_HEADER_SZ?true:false;
}
bool
unmarshall::okdone()
{
- if(ok() && _ind == _sz){
- return true;
- } else {
- return false;
- }
+ if(ok() && _ind == _sz){
+ return true;
+ } else {
+ return false;
+ }
}
unsigned int
unmarshall::rawbyte()
{
- char c = 0;
- if(_ind >= _sz)
- _ok = false;
- else
- c = _buf[_ind++];
- return c;
+ char c = 0;
+ if(_ind >= _sz)
+ _ok = false;
+ else
+ c = _buf[_ind++];
+ return c;
}
unmarshall &
operator>>(unmarshall &u, bool &x)
{
- x = (bool) u.rawbyte() ;
- return u;
+ x = (bool) u.rawbyte() ;
+ return u;
}
unmarshall &
operator>>(unmarshall &u, unsigned char &x)
{
- x = (unsigned char) u.rawbyte() ;
- return u;
+ x = (unsigned char) u.rawbyte() ;
+ return u;
}
unmarshall &
operator>>(unmarshall &u, char &x)
{
- x = (char) u.rawbyte();
- return u;
+ x = (char) u.rawbyte();
+ return u;
}
unmarshall &
operator>>(unmarshall &u, unsigned short &x)
{
- x = (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
+ x = (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
}
unmarshall &
operator>>(unmarshall &u, short &x)
{
- x = (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
+ x = (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
}
unmarshall &
operator>>(unmarshall &u, unsigned int &x)
{
- x = (u.rawbyte() & 0xff) << 24;
- x |= (u.rawbyte() & 0xff) << 16;
- x |= (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
+ x = (u.rawbyte() & 0xff) << 24;
+ x |= (u.rawbyte() & 0xff) << 16;
+ x |= (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
}
unmarshall &
operator>>(unmarshall &u, int &x)
{
- x = (u.rawbyte() & 0xff) << 24;
- x |= (u.rawbyte() & 0xff) << 16;
- x |= (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
+ x = (u.rawbyte() & 0xff) << 24;
+ x |= (u.rawbyte() & 0xff) << 16;
+ x |= (u.rawbyte() & 0xff) << 8;
+ x |= u.rawbyte() & 0xff;
+ return u;
}
unmarshall &
operator>>(unmarshall &u, unsigned long long &x)
{
- unsigned int h, l;
- u >> h;
- u >> l;
- x = l | ((unsigned long long) h << 32);
- return u;
+ unsigned int h, l;
+ u >> h;
+ u >> l;
+ x = l | ((unsigned long long) h << 32);
+ return u;
}
unmarshall &
operator>>(unmarshall &u, std::string &s)
{
- unsigned sz;
- u >> sz;
- if(u.ok())
- u.rawbytes(s, sz);
- return u;
+ unsigned sz;
+ u >> sz;
+ if(u.ok())
+ u.rawbytes(s, sz);
+ return u;
}
void
unmarshall::rawbytes(std::string &ss, unsigned int n)
{
- if((_ind+n) > (unsigned)_sz){
- _ok = false;
- } else {
- std::string tmps = std::string(_buf+_ind, n);
- swap(ss, tmps);
- VERIFY(ss.size() == n);
- _ind += n;
- }
+ if((_ind+n) > (unsigned)_sz){
+ _ok = false;
+ } else {
+ std::string tmps = std::string(_buf+_ind, n);
+ swap(ss, tmps);
+ VERIFY(ss.size() == n);
+ _ind += n;
+ }
}
bool operator<(const sockaddr_in &a, const sockaddr_in &b){
- return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
- ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
- ((a.sin_port < b.sin_port))));
+ return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
+ ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
+ ((a.sin_port < b.sin_port))));
}
/*---------------auxilary function--------------*/
void
make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
- char host[200];
- const char *localhost = "127.0.0.1";
- const char *port = index(hostandport, ':');
- if(port == NULL){
- memcpy(host, localhost, strlen(localhost)+1);
- port = hostandport;
- } else {
- memcpy(host, hostandport, port-hostandport);
- host[port-hostandport] = '\0';
- port++;
- }
+ char host[200];
+ const char *localhost = "127.0.0.1";
+ const char *port = index(hostandport, ':');
+ if(port == NULL){
+ memcpy(host, localhost, strlen(localhost)+1);
+ port = hostandport;
+ } else {
+ memcpy(host, hostandport, port-hostandport);
+ host[port-hostandport] = '\0';
+ port++;
+ }
- make_sockaddr(host, port, dst);
+ make_sockaddr(host, port, dst);
}
void
make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
- in_addr_t a;
-
- bzero(dst, sizeof(*dst));
- dst->sin_family = AF_INET;
-
- a = inet_addr(host);
- if(a != INADDR_NONE){
- dst->sin_addr.s_addr = a;
- } else {
- struct hostent *hp = gethostbyname(host);
- if(hp == 0 || hp->h_length != 4){
- fprintf(stderr, "cannot find host name %s\n", host);
- exit(1);
- }
- dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
- }
- dst->sin_port = htons(atoi(port));
-}
+ in_addr_t a;
-int
-cmp_timespec(const struct timespec &a, const struct timespec &b)
-{
- if(a.tv_sec > b.tv_sec)
- return 1;
- else if(a.tv_sec < b.tv_sec)
- return -1;
- else {
- if(a.tv_nsec > b.tv_nsec)
- return 1;
- else if(a.tv_nsec < b.tv_nsec)
- return -1;
- else
- return 0;
- }
-}
+ bzero(dst, sizeof(*dst));
+ dst->sin_family = AF_INET;
-void
-add_timespec(const struct timespec &a, int b, struct timespec *result)
-{
- // convert to millisec, add timeout, convert back
- result->tv_sec = a.tv_sec + b/1000;
- result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
- VERIFY(result->tv_nsec >= 0);
- while (result->tv_nsec > 1000000000){
- result->tv_sec++;
- result->tv_nsec-=1000000000;
- }
-}
-
-int
-diff_timespec(const struct timespec &end, const struct timespec &start)
-{
- int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
- VERIFY(diff || end.tv_sec == start.tv_sec);
- if(end.tv_nsec > start.tv_nsec){
- diff += (end.tv_nsec-start.tv_nsec)/1000000;
- } else {
- diff -= (start.tv_nsec-end.tv_nsec)/1000000;
- }
- return diff;
+ a = inet_addr(host);
+ if(a != INADDR_NONE){
+ dst->sin_addr.s_addr = a;
+ } else {
+ struct hostent *hp = gethostbyname(host);
+ if(hp == 0 || hp->h_length != 4){
+ fprintf(stderr, "cannot find host name %s\n", host);
+ exit(1);
+ }
+ dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
+ }
+ dst->sin_port = htons(atoi(port));
}
unmarshall *un;
int intret;
bool done;
- pthread_mutex_t m;
- pthread_cond_t c;
+ std::mutex m;
+ std::condition_variable c;
};
void get_refconn(connection **ch);
connection *chan_;
- pthread_mutex_t m_; // protect insert/delete to calls[]
- pthread_mutex_t chan_m_;
+ std::mutex m_; // protect insert/delete to calls[]
+ std::mutex chan_m_;
bool destroy_wait_;
- pthread_cond_t destroy_wait_c_;
+ std::condition_variable destroy_wait_c_;
std::map<int, caller *> calls_;
std::list<unsigned int> xid_rep_window_;
// map proc # to function
std::map<int, handler *> procs_;
- pthread_mutex_t procs_m_; // protect insert/delete to procs[]
- pthread_mutex_t count_m_; //protect modification of counts
- pthread_mutex_t reply_window_m_; // protect reply window et al
- pthread_mutex_t conss_m_; // protect conns_
+ std::mutex procs_m_; // protect insert/delete to procs[]
+ std::mutex count_m_; //protect modification of counts
+ std::mutex reply_window_m_; // protect reply window et al
+ std::mutex conss_m_; // protect conns_
protected:
void make_sockaddr(const char *host, const char *port,
struct sockaddr_in *dst);
-int cmp_timespec(const struct timespec &a, const struct timespec &b);
-void add_timespec(const struct timespec &a, int b, struct timespec *result);
-int diff_timespec(const struct timespec &a, const struct timespec &b);
-
#endif
#include <sys/types.h>
#include <unistd.h>
#include "jsl_log.h"
-#include "gettime.h"
#include "lang/verify.h"
#define NUM_CL 2
rpcc *clients[NUM_CL]; // client rpc object
struct sockaddr_in dst; //server's ip address
int port;
-pthread_attr_t attr;
// server-side handlers. they must be methods of some class
// to simplify rpcs::reg(). a server process can have handlers
VERIFY(i1==i && l1==l && s1==s);
}
-void *
-client1(void *xx)
+void
+client1(int cl)
{
-
// test concurrency.
- int which_cl = ((unsigned long) xx ) % NUM_CL;
+ int which_cl = ((unsigned long) cl ) % NUM_CL;
for(int i = 0; i < 100; i++){
int arg = (random() % 2000);
int arg = (random() % 1000);
int rep;
- struct timespec start,end;
- clock_gettime(CLOCK_REALTIME, &start);
+ auto start = std::chrono::steady_clock::now();
int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
- clock_gettime(CLOCK_REALTIME, &end);
- int diff = diff_timespec(end, start);
+ auto end = std::chrono::steady_clock::now();
+ int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (ret != 0)
printf("%d ms have elapsed!!!\n", diff);
VERIFY(ret == 0);
VERIFY(rep == (which ? arg+1 : arg+2));
}
-
- return 0;
}
-void *
-client2(void *xx)
+void
+client2(int cl)
{
- int which_cl = ((unsigned long) xx ) % NUM_CL;
+ int which_cl = ((unsigned long) cl ) % NUM_CL;
time_t t1;
time(&t1);
}
VERIFY((int)rep.size() == arg);
}
- return 0;
}
-void *
+void
client3(void *xx)
{
rpcc *c = (rpcc *) xx;
int ret = c->call(24, i, rep, rpcc::to(3000));
VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
}
- return 0;
}
// create threads that make lots of calls in parallel,
// to test thread synchronization for concurrent calls
// and dispatches.
- int ret;
-
printf("start concurrent_test (%d threads) ...", nt);
- pthread_t th[nt];
+ std::vector<std::thread> th(nt);
for(int i = 0; i < nt; i++){
- ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
- VERIFY(ret == 0);
+ th[i] = std::thread(client1, i);
}
for(int i = 0; i < nt; i++){
- VERIFY(pthread_join(th[i], NULL) == 0);
+ th[i].join();
}
printf(" OK\n");
}
void
lossy_test()
{
- int ret;
-
printf("start lossy_test ...");
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
}
int nt = 1;
- pthread_t th[nt];
+ std::vector<std::thread> th(nt);
for(int i = 0; i < nt; i++){
- ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
- VERIFY(ret == 0);
+ th[i] = std::thread(client2, i);
}
for(int i = 0; i < nt; i++){
- VERIFY(pthread_join(th[i], NULL) == 0);
+ th[i].join();
}
printf(".. OK\n");
VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
int nt = 10;
- int ret;
printf(" -- concurrent test on new rpc client w/ %d threads ..", nt);
- pthread_t th[nt];
+ std::vector<std::thread> th(nt);
for(int i = 0; i < nt; i++){
- ret = pthread_create(&th[i], &attr, client3, (void *) client);
- VERIFY(ret == 0);
+ th[i] = std::thread(client3, client);
}
for(int i = 0; i < nt; i++){
- VERIFY(pthread_join(th[i], NULL) == 0);
+ th[i].join();
}
printf("ok\n");
printf(" -- concurrent test on new client and server w/ %d threads ..", nt);
for(int i = 0; i < nt; i++){
- ret = pthread_create(&th[i], &attr, client3, (void *)client);
- VERIFY(ret == 0);
+ th[i] = std::thread(client3, client);
}
for(int i = 0; i < nt; i++){
- VERIFY(pthread_join(th[i], NULL) == 0);
+ th[i].join();
}
printf("ok\n");
testmarshall();
- pthread_attr_init(&attr);
- // set stack size to 32K, so we don't run out of memory
- pthread_attr_setstacksize(&attr, 32*1024);
-
if (isserver) {
- printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
+ printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
startserver();
}
+++ /dev/null
-#ifndef __SCOPED_LOCK__
-#define __SCOPED_LOCK__
-
-#include <pthread.h>
-#include "lang/verify.h"
-struct ScopedLock {
- private:
- pthread_mutex_t *m_;
- public:
- ScopedLock(pthread_mutex_t *m): m_(m) {
- VERIFY(pthread_mutex_lock(m_)==0);
- }
- ~ScopedLock() {
- VERIFY(pthread_mutex_unlock(m_)==0);
- }
-};
-struct ScopedUnlock {
- private:
- pthread_mutex_t *m_;
- public:
- ScopedUnlock(pthread_mutex_t *m): m_(m) {
- VERIFY(pthread_mutex_unlock(m_)==0);
- }
- ~ScopedUnlock() {
- VERIFY(pthread_mutex_lock(m_)==0);
- }
-};
-#endif /*__SCOPED_LOCK__*/
-#include "slock.h"
#include "thr_pool.h"
#include <stdlib.h>
#include <errno.h>
#include "lang/verify.h"
-static void *
+static void
do_worker(void *arg)
{
ThrPool *tp = (ThrPool *)arg;
(void)(j.f)(j.a);
}
- pthread_exit(NULL);
}
//if blocking, then addJob() blocks when queue is full
ThrPool::ThrPool(int sz, bool blocking)
: nthreads_(sz),blockadd_(blocking),jobq_(100*sz)
{
- pthread_attr_init(&attr_);
- pthread_attr_setstacksize(&attr_, 128<<10);
-
for (int i = 0; i < sz; i++) {
- pthread_t t;
- VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0);
- th_.push_back(t);
+ th_.push_back(std::thread(do_worker, this));
}
}
{
for (int i = 0; i < nthreads_; i++) {
job_t j;
- j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit
+ j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit
jobq_.enq(j);
}
for (int i = 0; i < nthreads_; i++) {
- VERIFY(pthread_join(th_[i], NULL)==0);
+ th_[i].join();
}
-
- VERIFY(pthread_attr_destroy(&attr_)==0);
}
bool
-ThrPool::addJob(void *(*f)(void *), void *a)
+ThrPool::addJob(void (*f)(void *), void *a)
{
job_t j;
j.f = f;
-#ifndef __THR_POOL__
-#define __THR_POOL__
+#ifndef thr_pool_h
+#define thr_pool_h
-#include <pthread.h>
#include <vector>
+#include <thread>
#include "fifo.h"
class ThrPool {
-
-
public:
struct job_t {
- void *(*f)(void *); //function point
+ void (*f)(void *); //function point
void *a; //function arguments
};
bool takeJob(job_t *j);
private:
- pthread_attr_t attr_;
int nthreads_;
bool blockadd_;
fifo<job_t> jobq_;
- std::vector<pthread_t> th_;
+ std::vector<std::thread> th_;
- bool addJob(void *(*f)(void *), void *a);
+ bool addJob(void (*f)(void *), void *a);
};
- template <class C, class A> bool
+template <class C, class A> bool
ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
{
C *o;
void (C::*m)(A a);
A a;
- static void *func(void *vvv) {
+ static void func(void *vvv) {
objfunc_wrapper *x = (objfunc_wrapper*)vvv;
C *o = x->o;
void (C::*m)(A ) = x->m;
A a = x->a;
(o->*m)(a);
delete x;
- return 0;
}
};
#include "tprintf.h"
#include "lang/verify.h"
#include "rsm_client.h"
-
-static void *recoverythread(void *x) {
- rsm *r = (rsm *) x;
- r->recovery();
- return 0;
-}
+#include "lock.h"
rsm::rsm(std::string _first, std::string _me) :
stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
partitioned (false), dopartition(false), break1(false), break2(false)
{
- pthread_t th;
+ std::thread th;
last_myvs.vid = 0;
last_myvs.seqno = 0;
testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq);
{
- ScopedLock ml(rsm_mutex);
- VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0);
+ lock ml(rsm_mutex);
+ th = std::thread(&rsm::recovery, this);
}
}
void rsm::reg1(int proc, handler *h) {
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
procs[proc] = h;
}
// The recovery thread runs this function
void rsm::recovery() {
bool r = true;
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
while (1) {
while (!cfg->ismember(cfg->myaddr(), vid_commit)) {
+ // XXX iannucci 2013/09/15 -- I don't understand whether accessing
+ // cfg->view_id in this manner involves a race. I suspect not.
if (join(primary)) {
tprintf("recovery: joined\n");
- commit_change_wo(cfg->vid());
+ commit_change_wo(cfg->view_id());
} else {
- ScopedUnlock su(rsm_mutex);
- sleep (30); // XXX make another node in cfg primary?
+ ml.unlock();
+ std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
+ ml.lock();
}
}
vid_insync = vid_commit;
inviewchange = false;
}
tprintf("recovery: go to sleep %d %d\n", insync, inviewchange);
- recovery_cond.wait(rsm_mutex);
+ recovery_cond.wait(ml);
}
}
}
bool rsm::sync_with_backups() {
+ adopt_lock ml(rsm_mutex);
+ ml.unlock();
{
- ScopedUnlock su(rsm_mutex);
// Make sure that the state of lock_server_cache_rsm is stable during
// synchronization; otherwise, the primary's state may be more recent
// than replicas after the synchronization.
- ScopedLock ml(invoke_mutex);
+ lock ml(invoke_mutex);
// By acquiring and releasing the invoke_mutex once, we make sure that
// the state of lock_server_cache_rsm will not be changed until all
// replicas are synchronized. The reason is that client_invoke arrives
// after this point of time will see inviewchange == true, and returns
// BUSY.
}
+ ml.lock();
// Start accepting synchronization request (statetransferreq) now!
insync = true;
- backups = std::vector<std::string>(cfg->get_view(vid_insync));
+ cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
LOG("rsm::sync_with_backups " << backups);
- sync_cond.wait(rsm_mutex);
+ sync_cond.wait(ml);
insync = false;
return true;
}
m.c_str(), last_myvs.vid, last_myvs.seqno);
rpcc *cl;
{
- ScopedUnlock su(rsm_mutex);
+ adopt_lock ml(rsm_mutex);
+ ml.unlock();
cl = h.safebind();
if (cl) {
ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(),
last_myvs, vid_insync, r, rpcc::to(1000));
}
+ ml.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(),
}
bool rsm::statetransferdone(std::string m) {
- ScopedUnlock su(rsm_mutex);
+ adopt_lock ml(rsm_mutex);
+ ml.unlock();
handle h(m);
rpcc *cl = h.safebind();
- if (!cl)
- return false;
- int r;
- rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r);
- if (ret != rsm_protocol::OK)
- return false;
- return true;
+ bool done = false;
+ if (cl) {
+ int r;
+ rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r);
+ done = (ret == rsm_protocol::OK);
+ }
+ ml.lock();
+ return done;
}
last_myvs.seqno);
rpcc *cl;
{
- ScopedUnlock su(rsm_mutex);
+ adopt_lock ml(rsm_mutex);
+ ml.unlock();
cl = h.safebind();
if (cl != 0) {
ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs,
r, rpcc::to(120000));
}
+ ml.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
* completed a view change
*/
void rsm::commit_change(unsigned vid) {
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
commit_change_wo(vid);
if (cfg->ismember(cfg->myaddr(), vid_commit))
breakpoint2();
vid_commit = vid;
inviewchange = true;
set_primary(vid);
- recovery_cond.signal();
- sync_cond.signal();
+ recovery_cond.notify_one();
+ sync_cond.notify_one();
if (cfg->ismember(cfg->myaddr(), vid_commit))
breakpoint2();
}
//
rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) {
LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
- ScopedLock ml(invoke_mutex);
+ lock ml(invoke_mutex);
std::vector<std::string> m;
std::string myaddr;
viewstamp vs;
{
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
LOG("Checking for inviewchange");
if (inviewchange)
return rsm_client_protocol::BUSY;
if (primary != myaddr)
return rsm_client_protocol::NOTPRIMARY;
LOG("Assigning a viewstamp");
- m = cfg->get_view(vid_commit);
+ cfg->get_view(vid_commit, m);
// assign the RPC the next viewstamp number
vs = myvs;
myvs++;
rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) {
LOG("rsm::invoke: procno 0x" << std::hex << proc);
- ScopedLock ml(invoke_mutex);
+ lock ml(invoke_mutex);
std::vector<std::string> m;
std::string myaddr;
{
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
// check if !inviewchange
LOG("Checking for view change");
if (inviewchange)
myaddr = cfg->myaddr();
if (primary == myaddr)
return rsm_protocol::ERR;
- m = cfg->get_view(vid_commit);
+ cfg->get_view(vid_commit, m);
if (find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
*/
rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid,
rsm_protocol::transferres &r) {
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
int ret = rsm_protocol::OK;
tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
* for view vid
*/
rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
backups.erase(find(backups.begin(), backups.end(), m));
if (backups.empty())
- sync_cond.signal();
+ sync_cond.notify_one();
return rsm_protocol::OK;
}
rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) {
int ret = rsm_protocol::OK;
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(),
last.vid, last.seqno, last_myvs.vid, last_myvs.seqno);
if (cfg->ismember(m, vid_commit)) {
unsigned vid_cache = vid_commit;
bool succ;
{
- ScopedUnlock su(rsm_mutex);
+ ml.unlock();
succ = cfg->add(m, vid_cache);
+ ml.lock();
}
- if (cfg->ismember(m, cfg->vid())) {
+ if (cfg->ismember(m, cfg->view_id())) {
r.log = cfg->dump();
tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str());
} else {
*/
rsm_client_protocol::status rsm::client_members(int i, std::vector<std::string> &r) {
std::vector<std::string> m;
- ScopedLock ml(rsm_mutex);
- m = cfg->get_view(vid_commit);
+ lock ml(rsm_mutex);
+ cfg->get_view(vid_commit, m);
m.push_back(primary);
r = m;
tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(),
// otherwise, the lowest number node of the previous view.
// caller should hold rsm_mutex
void rsm::set_primary(unsigned vid) {
- std::vector<std::string> c = cfg->get_view(vid);
- std::vector<std::string> p = cfg->get_view(vid - 1);
+ std::vector<std::string> c, p;
+ cfg->get_view(vid, c);
+ cfg->get_view(vid - 1, p);
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
}
bool rsm::amiprimary() {
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
return primary == cfg->myaddr() && !inviewchange;
}
// assumes caller holds rsm_mutex
void rsm::net_repair_wo(bool heal) {
std::vector<std::string> m;
- m = cfg->get_view(vid_commit);
+ cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
if (m[i] != cfg->myaddr()) {
handle h(m[i]);
}
rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) {
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
heal, dopartition, partitioned);
if (heal) {
rsm_test_protocol::status rsm::breakpointreq(int b, int &r) {
r = rsm_test_protocol::OK;
- ScopedLock ml(rsm_mutex);
+ lock ml(rsm_mutex);
tprintf("rsm::breakpointreq: %d\n", b);
if (b == 1) break1 = true;
else if (b == 2) break2 = true;
#include <vector>
#include "rsm_protocol.h"
#include "rsm_state_transfer.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include "config.h"
rsm_test_protocol::status test_net_repairreq(int heal, int &r);
rsm_test_protocol::status breakpointreq(int b, int &r);
- mutex rsm_mutex;
- mutex invoke_mutex;
- cond recovery_cond;
- cond sync_cond;
+ std::mutex rsm_mutex;
+ std::mutex invoke_mutex;
+ std::condition_variable recovery_cond;
+ std::condition_variable sync_cond;
void execute(int procno, std::string req, std::string &r);
rsm_client_protocol::status client_invoke(int procno, std::string req,
#include <handle.h>
#include <unistd.h>
#include "lang/verify.h"
-
+#include "lock.h"
rsm_client::rsm_client(std::string dst) {
printf("create rsm_client\n");
std::vector<std::string> mems;
- pthread_mutex_init(&rsm_client_mutex, NULL);
sockaddr_in dstsock;
make_sockaddr(dst.c_str(), &dstsock);
primary = dst;
{
- ScopedLock ml(&rsm_client_mutex);
+ lock ml(rsm_client_mutex);
VERIFY (init_members());
}
printf("rsm_client: done\n");
rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
int ret;
- ScopedLock ml(&rsm_client_mutex);
+ lock ml(rsm_client_mutex);
while (1) {
printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
handle h(primary);
- VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
+ ml.unlock();
rpcc *cl = h.safebind();
if (cl)
ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
- VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
+ ml.lock();
if (!cl)
goto prim_fail;
bool rsm_client::init_members() {
printf("rsm_client::init_members get members!\n");
handle h(primary);
- VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
int ret;
- rpcc *cl = h.safebind();
- if (cl) {
- ret = cl->call(rsm_client_protocol::members, 0, known_mems,
- rpcc::to(1000));
+ rpcc *cl;
+ {
+ adopt_lock ml(rsm_client_mutex);
+ ml.unlock();
+ cl = h.safebind();
+ if (cl) {
+ ret = cl->call(rsm_client_protocol::members, 0, known_mems,
+ rpcc::to(1000));
+ }
+ ml.lock();
}
- VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
if (cl == 0 || ret != rsm_protocol::OK)
return false;
if (known_mems.size() < 1) {
#ifndef rsm_client_h
#define rsm_client_h
-#include "rpc.h"
+#include "rpc/rpc.h"
#include "rsm_protocol.h"
#include <string>
#include <vector>
protected:
std::string primary;
std::vector<std::string> known_mems;
- pthread_mutex_t rsm_client_mutex;
+ std::mutex rsm_client_mutex;
void primary_failure();
bool init_members();
public:
#ifndef rsm_protocol_h
#define rsm_protocol_h
-#include "rpc.h"
+#include "rpc/rpc.h"
class rsm_client_protocol {
#include "rsm_protocol.h"
#include "rsmtest_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <vector>
#include <stdlib.h>
#include <stdio.h>
#include <string>
-using namespace std;
-
-rsmtest_client *lc;
int
main(int argc, char *argv[])
{
- int r;
-
- if(argc != 4){
- fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
- exit(1);
- }
+ if(argc != 4){
+ fprintf(stderr, "Usage: %s [host:]port [partition] arg\n", argv[0]);
+ exit(1);
+ }
- lc = new rsmtest_client(argv[1]);
- string command(argv[2]);
- if (command == "partition") {
- r = lc->net_repair(atoi(argv[3]));
- printf ("net_repair returned %d\n", r);
- } else if (command == "breakpoint") {
- int b = atoi(argv[3]);
- r = lc->breakpoint(b);
- printf ("breakpoint %d returned %d\n", b, r);
- } else {
- fprintf(stderr, "Unknown command %s\n", argv[2]);
- }
- exit(0);
+ rsmtest_client *lc = new rsmtest_client(argv[1]);
+ std::string command(argv[2]);
+ if (command == "partition") {
+ printf("net_repair returned %d\n", lc->net_repair(atoi(argv[3])));
+ } else if (command == "breakpoint") {
+ int b = atoi(argv[3]);
+ printf("breakpoint %d returned %d\n", b, lc->breakpoint(b));
+ } else {
+ fprintf(stderr, "Unknown command %s\n", argv[2]);
+ }
+ return 0;
}
// RPC stubs for clients to talk to rsmtest_server
#include "rsmtest_client.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <sstream>
rsmtest_client::rsmtest_client(std::string dst)
{
- sockaddr_in dstsock;
- make_sockaddr(dst.c_str(), &dstsock);
- cl = new rpcc(dstsock);
- if (cl->bind() < 0) {
- printf("rsmtest_client: call bind\n");
- }
+ sockaddr_in dstsock;
+ make_sockaddr(dst.c_str(), &dstsock);
+ cl = new rpcc(dstsock);
+ if (cl->bind() < 0) {
+ printf("rsmtest_client: call bind\n");
+ }
}
int
rsmtest_client::net_repair(int heal)
{
- int r;
- int ret = cl->call(rsm_test_protocol::net_repair, heal, r);
- VERIFY (ret == rsm_test_protocol::OK);
- return r;
+ int r;
+ int ret = cl->call(rsm_test_protocol::net_repair, heal, r);
+ VERIFY (ret == rsm_test_protocol::OK);
+ return r;
}
int
rsmtest_client::breakpoint(int b)
{
- int r;
- int ret = cl->call(rsm_test_protocol::breakpoint, b, r);
- VERIFY (ret == rsm_test_protocol::OK);
- return r;
+ int r;
+ int ret = cl->call(rsm_test_protocol::breakpoint, b, r);
+ VERIFY (ret == rsm_test_protocol::OK);
+ return r;
}
#include <string>
#include "rsm_protocol.h"
-#include "rpc.h"
+#include "rpc/rpc.h"
// Client interface to the rsmtest server
class rsmtest_client {
- protected:
- rpcc *cl;
- public:
- rsmtest_client(std::string d);
- virtual ~rsmtest_client() {};
- virtual rsm_test_protocol::status net_repair(int heal);
- virtual rsm_test_protocol::status breakpoint(int b);
+ protected:
+ rpcc *cl;
+ public:
+ rsmtest_client(std::string d);
+ virtual ~rsmtest_client() {};
+ virtual rsm_test_protocol::status net_repair(int heal);
+ virtual rsm_test_protocol::status breakpoint(int b);
};
#endif
+++ /dev/null
-#include "srlock.h"
-
-ScopedRemoteLock::ScopedRemoteLock(lock_client *lc, lock_protocol::lockid_t lid) :
- lc_(lc), lid_(lid) {
- lc_->acquire(lid_);
- releaseOnFree = true;
-}
-
-void ScopedRemoteLock::retain() {
- releaseOnFree = false;
-}
-
-ScopedRemoteLock::~ScopedRemoteLock() {
- if (releaseOnFree)
- lc_->release(lid_);
-}
+++ /dev/null
-#ifndef srlock_h
-#define srlock_h
-
-#include "lock_protocol.h"
-#include "lock_client.h"
-
-class ScopedRemoteLock {
- protected:
- lock_client *lc_;
- lock_protocol::lockid_t lid_;
- bool releaseOnFree;
- public:
- ScopedRemoteLock(lock_client *, lock_protocol::lockid_t);
- void retain();
- ~ScopedRemoteLock();
-};
-
-#endif
-#include "mutex.h"
#include <sys/time.h>
#include <stdint.h>
#include "tprintf.h"
-uint64_t utime() {
- struct timeval tp;
- gettimeofday(&tp, NULL);
- return (tp.tv_usec + (uint64_t)tp.tv_sec * 1000000) % 1000000000;
-}
-
-mutex cerr_mutex;
-std::map<pthread_t, int> thread_name_map;
+std::mutex cerr_mutex;
+std::map<std::thread::id, int> thread_name_map;
int next_thread_num = 0;
std::map<void *, int> instance_name_map;
int next_instance_num = 0;
-#ifndef TPRINTF_H
-#define TPRINTF_H
+#ifndef tprintf_h
+#define tprintf_h
#include <iomanip>
#include <iostream>
-#include "mutex.h"
-#include <time.h>
#include <stdio.h>
#include <map>
+#include "lock.h"
extern mutex cerr_mutex;
-extern std::map<pthread_t, int> thread_name_map;
+extern std::map<std::thread::id, int> thread_name_map;
extern int next_thread_num;
extern std::map<void *, int> instance_name_map;
extern int next_instance_num;
extern char tprintf_thread_prefix;
#define LOG_PREFIX { \
- cerr_mutex.acquire(); \
- pthread_t self = pthread_self(); \
+ cerr_mutex.lock(); \
+ auto self = std::this_thread::get_id(); \
int tid = thread_name_map[self]; \
if (tid==0) \
tid = thread_name_map[self] = ++next_thread_num; \
- std::cerr << std::left << std::setw(9) << utime() << " "; \
+ auto utime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
+ std::cerr << std::left << std::setw(9) << utime << " "; \
std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \
std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \
}
std::cerr << "#" << std::setw(2) << self; \
}
#define LOG_SUFFIX { \
- cerr_mutex.release(); \
+ cerr_mutex.unlock(); \
}
#define LOG_NONMEMBER(x) { \
}
#define JOIN(from,to,sep) ({ \
ostringstream oss; \
- for(typeof(from) i=from;i!=to;i++) \
+ for(auto i=from;i!=to;i++) \
oss << *i << sep; \
oss.str(); \
})
LOG_NONMEMBER(buf); \
}
-uint64_t utime();
-
#endif