From: Peter Iannucci Date: Wed, 18 Dec 2013 23:03:24 +0000 (-0800) Subject: Rolled handle infrastructure into rpcc. X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/869c0cc91d8f6b2bb80026616372d16450b64d9f Rolled handle infrastructure into rpcc. --- diff --git a/.gitignore b/.gitignore index 2dde284..378a62f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,7 @@ rpc/rpctest lock_tester lock_demo lock_server -*.swp -*.swo +*.sw* *.a *.log rsm_tester diff --git a/Makefile b/Makefile index 0262c17..e41486f 100644 --- a/Makefile +++ b/Makefile @@ -11,15 +11,15 @@ rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o ar cq $@ $^ ranlib rpc/librpc.a -rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a +rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a t4.o -lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a +lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o -lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a +lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o -lock_server : lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a +lock_server : lock_smain.o threaded_log.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o -rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a +rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a t4.o %.o: %.cc $(CXX) $(CXXFLAGS) -c $< -o $@ diff --git a/config.cc b/config.cc index 4931a9f..a177362 100644 --- a/config.cc +++ b/config.cc @@ -1,5 +1,4 @@ #include "config.h" -#include "handle.h" using std::vector; @@ -85,7 +84,7 @@ void config::paxos_commit(unsigned instance, const string & value) { LOG << "is " << mem << " still a member?"; if (!isamember(mem, newmem) && me != mem) { LOG << "delete " << mem; - handle(mem).invalidate(); + rpcc::unbind_cached(mem); } } @@ -198,11 +197,10 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) VERIFY(cfg_mutex_lock); unsigned vid = my_view_id; LOG << "heartbeat to " << m << " (" << vid << ")"; - handle h(m); cfg_mutex_lock.unlock(); int r = 0, ret = rpc_protocol::bind_failure; - if (rpcc *cl = h.safebind()) + if (auto cl = rpcc::bind_cached(m)) ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); cfg_mutex_lock.lock(); @@ -212,7 +210,7 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) break; case rpc_protocol::atmostonce_failure: case rpc_protocol::oldsrv_failure: - h.invalidate(); + rpcc::unbind_cached(m); break; default: LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r; diff --git a/handle.cc b/handle.cc deleted file mode 100644 index 20438f9..0000000 --- a/handle.cc +++ /dev/null @@ -1,61 +0,0 @@ -#include "handle.h" - -class hinfo { -public: - unique_ptr client; - bool valid = true; - string destination; - std::mutex client_mutex; - hinfo(const string & destination_) : destination(destination_) {} -}; - -static std::mutex mgr_mutex; -static std::map> hmap; - -void handle::shutdown() { - lock ml(mgr_mutex); - LOG_NONMEMBER << "Shutting down handle manager"; - for (auto p : hmap) { - p.second->valid = false; - LOG_NONMEMBER << "cl " << p.first << " refcnt " << p.second.use_count(); - } - hmap.clear(); -} - -handle::handle(const string & destination) : destination_(destination) { - lock ml(mgr_mutex); - h = hmap[destination]; - if (!h || !h->valid) - h = (hmap[destination] = std::make_shared(destination)); -} - -rpcc * handle::safebind() { - if (!h) - return nullptr; - lock cl(h->client_mutex); - if (!h->valid) - return nullptr; - if (!h->client) { - unique_ptr client(new rpcc(h->destination)); - LOG << "bind(\"" << h->destination << "\")"; - int ret = client->bind(milliseconds(1000)); - if (ret < 0) { - LOG << "bind failure! " << h->destination << " " << ret; - h->valid = false; - } else { - LOG << "bind succeeded " << h->destination; - h->client = std::move(client); - } - } - return h->client.get(); -} - -void handle::invalidate() { - h.reset(); - lock ml(mgr_mutex); - if (hmap.find(destination_) != hmap.end()) { - hmap[destination_]->valid = false; - LOG << "cl " << destination_ << " refcnt " << hmap[destination_].use_count(); - hmap.erase(destination_); - } -} diff --git a/handle.h b/handle.h deleted file mode 100644 index e569002..0000000 --- a/handle.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef handle_h -#define handle_h - -#include "types.h" -#include "rpc/rpc.h" - -// Manage a cache of RPC connections. Typical usage: -// handle h(dst); -// rpc_protocol::status ret = rpc_protocol::bind_failure; -// if (rpcc *cl = h.safebind()) -// ret = cl->call(...); -// assuming dst is a string holding the host:port of the RPC server you want to -// talk to. -// -// If the calling program has not contacted dst before, safebind() will create -// a new connection, call bind(), and return an rpcc*, or 0 if bind() failed. -// if the program has previously contacted dst, safebind() just returns the -// previously created rpcc*. Because safebind() may block, callers should -// probably not hold mutexes. - -class handle { - private: - shared_ptr h; - const string destination_; - public: - handle(const string & destination); - rpcc *safebind(); - void invalidate(); - static void shutdown(); -}; - -#endif diff --git a/lock_demo.cc b/lock_demo.cc index 6cdf346..88383ab 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -1,8 +1,7 @@ #include "lock_client.h" -char log_thread_prefix = 'd'; - int main(int argc, char *argv[]) { + global = new t4_state('d'); if(argc != 2) { LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port"; return 1; diff --git a/lock_server.cc b/lock_server.cc index a4d5881..141a598 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -3,7 +3,6 @@ #include "lock_server.h" #include #include -#include "handle.h" lock_state::lock_state(): held(false) @@ -47,20 +46,13 @@ void lock_server::revoker () { continue; lock_state & st = get_lock_state(lid); - holder_t held_by; - { - lock sl(st.m); - held_by = st.held_by; - } + lock sl(st.m); + holder_t held_by = st.held_by; + sl.unlock(); - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(held_by.first).safebind(); - if (proxy) { + if (auto cl = rpcc::bind_cached(held_by.first)) { int r; - auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second); + auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second); LOG << "Revoke returned " << ret; } } @@ -83,14 +75,9 @@ void lock_server::retryer() { front = st.wanted_by.front(); } - rpcc *proxy = NULL; - // try a few times? - //int t=5; - //while (t-- && !proxy) - proxy = handle(front.first).safebind(); - if (proxy) { + if (auto cl = rpcc::bind_cached(front.first)) { int r; - auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second); + auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second); LOG << "Retry returned " << ret; } } diff --git a/lock_smain.cc b/lock_smain.cc index fecd7f8..c6f3356 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -4,9 +4,8 @@ // Main loop of lock_server -char log_thread_prefix = 's'; - int main(int argc, char *argv[]) { + global = new t4_state('s'); setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); diff --git a/lock_tester.cc b/lock_tester.cc index 58e691f..a546e09 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -5,9 +5,6 @@ #include "lock_client.h" #include #include -#include "handle.h" - -char log_thread_prefix = 'c'; // must be >= 2 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. @@ -115,6 +112,7 @@ static void test5(int i) { int main(int argc, char *argv[]) { + global = new t4_state('c'); thread th[nt]; int test = 0; @@ -183,9 +181,4 @@ main(int argc, char *argv[]) for (int i = 0; i < nt; i++) delete lc[i]; - - handle::shutdown(); - poll_mgr::shared_mgr.shutdown(); - - LOG_NONMEMBER << argv[0] << ": clean-up complete"; } diff --git a/paxos.cc b/paxos.cc index a055c36..2c7e79a 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,5 +1,4 @@ #include "paxos.h" -#include "handle.h" using namespace std::placeholders; @@ -94,8 +93,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, prepareres res; prop_t highest_n_a{0, ""}; for (auto i : nodes) { - handle h(i); - rpcc *r = h.safebind(); + auto r = rpcc::bind_cached(i); if (!r) continue; auto status = (paxos_protocol::status)r->call_timeout( @@ -124,8 +122,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, const nodes_t & nodes, const value_t & v) { for (auto i : nodes) { - handle h(i); - rpcc *r = h.safebind(); + auto r = rpcc::bind_cached(i); if (!r) continue; bool accept = false; @@ -138,8 +135,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) { for (auto i : accepts) { - handle h(i); - rpcc *r = h.safebind(); + auto r = rpcc::bind_cached(i); if (!r) continue; int res = 0; diff --git a/rpc/connection.cc b/rpc/connection.cc index c7e8f95..31ec5fa 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -15,7 +15,7 @@ connection::connection(connection_delegate * delegate, socket_t && f1, int l1) signal(SIGPIPE, SIG_IGN); - poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this); + global->shared_mgr.add_callback(fd, CB_RDONLY, this); } connection::~connection() { @@ -28,7 +28,7 @@ connection::~connection() { } // after block_remove_fd, select will never wait on fd and no callbacks // will be active - poll_mgr::shared_mgr.block_remove_fd(fd); + global->shared_mgr.block_remove_fd(fd); VERIFY(dead_); VERIFY(wpdu_.status == unused); } @@ -64,11 +64,11 @@ bool connection::send(const string & b) { if (!writepdu()) { dead_ = true; ml.unlock(); - poll_mgr::shared_mgr.block_remove_fd(fd); + global->shared_mgr.block_remove_fd(fd); ml.lock(); } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) { // should be rare to need to explicitly add write callback - poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this); + global->shared_mgr.add_callback(fd, CB_WRONLY, this); while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size()) send_complete_.wait(ml); } @@ -84,11 +84,11 @@ void connection::write_cb(int s) { VERIFY(!dead_); VERIFY(fd == s); if (wpdu_.status != inflight) { - poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY); + global->shared_mgr.del_callback(fd, CB_WRONLY); return; } if (!writepdu()) { - poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); + global->shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.status != error); @@ -127,7 +127,7 @@ void connection::read_cb(int s) { if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) { if (!readpdu()) { IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying"; - poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); + global->shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; send_complete_.notify_one(); } @@ -214,11 +214,11 @@ connection_listener::connection_listener(connection_delegate * delegate, in_port IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port; - poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this); + global->shared_mgr.add_callback(tcp_, CB_RDONLY, this); } connection_listener::~connection_listener() { - poll_mgr::shared_mgr.block_remove_fd(tcp_); + global->shared_mgr.block_remove_fd(tcp_); } void connection_listener::read_cb(int) { diff --git a/rpc/connection.h b/rpc/connection.h index 68bd902..03e92da 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -4,8 +4,10 @@ #include "types.h" #include #include +#include "t4.h" #include "poll_mgr.h" #include "file.h" +#include "threaded_log.h" class connection; diff --git a/rpc/poll_mgr.cc b/rpc/poll_mgr.cc index ebd61e1..102b8dc 100644 --- a/rpc/poll_mgr.cc +++ b/rpc/poll_mgr.cc @@ -2,6 +2,7 @@ #include #include #include "file.h" +#include "threaded_log.h" #ifdef __linux__ #include @@ -11,8 +12,6 @@ using std::vector; aio_callback::~aio_callback() {} -poll_mgr poll_mgr::shared_mgr; - class wait_manager { public: virtual void watch_fd(int fd, poll_flag flag) = 0; diff --git a/rpc/poll_mgr.h b/rpc/poll_mgr.h index babf908..6fe66c0 100644 --- a/rpc/poll_mgr.h +++ b/rpc/poll_mgr.h @@ -25,8 +25,6 @@ class poll_mgr { poll_mgr(); ~poll_mgr(); - static poll_mgr shared_mgr; - void add_callback(int fd, poll_flag flag, aio_callback *ch); void del_callback(int fd, poll_flag flag); void block_remove_fd(int fd); diff --git a/rpc/rpc.cc b/rpc/rpc.cc index de33675..2889db9 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -85,7 +85,8 @@ rpcc::rpcc(const string & d) : dst_(make_sockaddr(d)) // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { - cancel(); + lock ml(m_); + cancel(ml); IF_LEVEL(2) LOG << "delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1); chan_.reset(); VERIFY(calls_.size() == 0); @@ -104,9 +105,29 @@ int rpcc::bind(milliseconds to) { return ret; } +shared_ptr rpcc::bind_cached(const string & destination) { + auto client = global->get_handle(destination); + lock cl = lock(client->bind_m_); + if (!client->bind_done_) { + LOG_NONMEMBER << "bind(\"" << destination << "\")"; + int ret = client->bind(milliseconds(1000)); + if (ret < 0) { + LOG_NONMEMBER << "bind failure! " << destination << " " << ret; + client.reset(); + } else { + LOG_NONMEMBER << "bind succeeded " << destination; + } + } + return client; +} + +void rpcc::unbind_cached(const string & destination) { + global->erase_handle(destination); +} + // Cancel all outstanding calls -void rpcc::cancel(void) { - lock ml(m_); +void rpcc::cancel(lock & m_lock) { + VERIFY(m_lock); if (calls_.size()) { LOG << "force callers to fail"; for (auto & p : calls_) { @@ -122,7 +143,7 @@ void rpcc::cancel(void) { destroy_wait_ = true; while (calls_.size () > 0) - destroy_wait_c_.wait(ml); + destroy_wait_c_.wait(m_lock); LOG << "done"; } diff --git a/rpc/rpc.h b/rpc/rpc.h index 58e9381..f1eb3bc 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -10,6 +10,7 @@ #include "marshall.h" #include "marshall_wrap.h" #include "connection.h" +#include "threaded_log.h" using std::chrono::milliseconds; @@ -75,6 +76,7 @@ class rpcc : private connection_delegate { std::mutex m_; // protect insert/delete to calls[] std::mutex chan_m_; + std::mutex bind_m_; // protect bind operations bool destroy_wait_ = false; cond destroy_wait_c_; @@ -122,9 +124,17 @@ class rpcc : private connection_delegate { int bind(milliseconds to = rpc::to_max); + // Manages a cache of RPC connections. Usage: + // if (auto cl = rpcc::bind_cached(dst)) + // ret = cl->call(...); + // where the string dst has the form "host:port". Because bind_cached() + // may block, callers should probably not hold mutexes. + static shared_ptr bind_cached(const string & destination); + static void unbind_cached(const string & destination); + void set_reachable(bool r) { reachable_ = r; } - void cancel(); + void cancel(lock & m_lock); template inline int call(proc_t

proc, R & r, const Args & ... args) { diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index fb170e7..f8395e9 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -7,11 +7,10 @@ #include #include #include +#include "threaded_log.h" #define NUM_CL 2 -char log_thread_prefix = 'r'; - static rpcs *server; // server rpc object static rpcc *clients[NUM_CL]; // client rpc object static string dst; //server's ip address @@ -343,6 +342,7 @@ static void failure_test() { } int main(int argc, char *argv[]) { + global = new t4_state('r'); setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); diff --git a/rsm.cc b/rsm.cc index cb986fe..711f0ad 100644 --- a/rsm.cc +++ b/rsm.cc @@ -79,7 +79,6 @@ // upcalls, but can keep its locks when calling down. #include "rsm.h" -#include "handle.h" #include "rsm_client.h" #include @@ -202,13 +201,12 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) { bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) { rsm_protocol::transferres r; - handle h(m); int ret = 0; LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; - rpcc *cl; + shared_ptr cl; { rsm_mutex_lock.unlock(); - cl = h.safebind(); + cl = rpcc::bind_cached(m); if (cl) { ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100), r, cfg->myaddr(), last_myvs, vid_insync); @@ -229,10 +227,8 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); - handle h(m); - rpcc *cl = h.safebind(); bool done = false; - if (cl) { + if (auto cl = rpcc::bind_cached(m)) { int r; auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync); done = (ret == rsm_protocol::OK); @@ -243,21 +239,16 @@ bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) { bool rsm::join(const string & m, lock & rsm_mutex_lock) { - handle h(m); int ret = 0; string log; LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"; - rpcc *cl; - { - rsm_mutex_lock.unlock(); - cl = h.safebind(); - if (cl != 0) { - ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, - cfg->myaddr(), last_myvs); - } - rsm_mutex_lock.lock(); - } + + rsm_mutex_lock.unlock(); + auto cl = rpcc::bind_cached(m); + if (cl) + ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs); + rsm_mutex_lock.lock(); if (cl == 0 || ret != rsm_protocol::OK) { LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret; @@ -342,9 +333,8 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id for (unsigned i = 0; i < m.size(); i++) { if (m[i] != myaddr) { // if invoke on slave fails, return rsm_client_protocol::BUSY - handle h(m[i]); LOG << "Sending invoke to " << m[i]; - rpcc *cl = h.safebind(); + auto cl = rpcc::bind_cached(m[i]); if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; @@ -523,9 +513,9 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { - handle h(m[i]); LOG << "member " << m[i] << " " << heal; - if (h.safebind()) h.safebind()->set_reachable(heal); + if (auto cl = rpcc::bind_cached(m[i])) + cl->set_reachable(heal); } } rsmrpc->set_reachable(heal); diff --git a/rsm_client.cc b/rsm_client.cc index 161ddb5..9906d39 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -1,6 +1,5 @@ #include "rsm_client.h" #include -#include #include rsm_client::rsm_client(string dst) : primary(dst) { @@ -19,10 +18,10 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s lock ml(rsm_client_mutex); while (1) { LOG << "proc " << std::hex << proc << " primary " << primary; - handle h(primary); + string prim = primary; ml.unlock(); - rpcc *cl = h.safebind(); + auto cl = rpcc::bind_cached(prim); auto ret = rsm_client_protocol::OK; if (cl) ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req); @@ -31,34 +30,34 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s if (!cl) goto prim_fail; - LOG << "proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret; + LOG << "proc " << std::hex << proc << " primary " << prim << " ret " << std::dec << ret; if (ret == rsm_client_protocol::OK) return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { - LOG << "rsm is busy " << primary; + LOG << "rsm is busy " << prim; usleep(300000); continue; } if (ret == rsm_client_protocol::NOTPRIMARY) { - LOG << "primary " << primary << " isn't the primary--let's get a complete list of mems"; + LOG << "primary " << prim << " isn't the primary--let's get a complete list of mems"; if (init_members(ml)) continue; } prim_fail: - LOG << "primary " << primary << " failed ret " << std::dec << ret; + LOG << "primary " << prim << " failed ret " << std::dec << ret; primary_failure(ml); - LOG << "retry new primary " << primary; + LOG << "retry new primary " << prim; } } bool rsm_client::init_members(lock & rsm_client_mutex_lock) { LOG << "get members!"; - handle h(primary); + string prim = primary; int ret = rsm_client_protocol::ERR; - rpcc *cl; + shared_ptr cl; { rsm_client_mutex_lock.unlock(); - cl = h.safebind(); + cl = rpcc::bind_cached(prim); if (cl) ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0); rsm_client_mutex_lock.lock(); diff --git a/rsm_tester.cc b/rsm_tester.cc index d5d27fd..5507f80 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -6,9 +6,8 @@ #include "rsm_protocol.h" #include "rsmtest_client.h" -char log_thread_prefix = 't'; - int main(int argc, char *argv[]) { + global = new t4_state('t'); if(argc != 4){ LOG_NONMEMBER << "Usage: " << argv[0] << " [host:]port [partition] arg"; return 1; diff --git a/t4.cc b/t4.cc new file mode 100644 index 0000000..834136b --- /dev/null +++ b/t4.cc @@ -0,0 +1,39 @@ +#include "t4.h" +#include +#include "rpc/rpc.h" + +using namespace std::chrono; + +t4_state *global; + +t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) { + uint32_t seed = std::random_device()(); + auto time = system_clock::now().time_since_epoch(); + auto ticks = duration_cast(time).count(); + seed ^= (uint32_t)ticks; + auto pid = getpid(); + seed ^= (uint32_t)pid; + auto tid = std::hash()(std::this_thread::get_id()); + seed ^= (uint32_t)tid; + random_generator.seed(seed); + // make sure the clock will read differently next time! + std::this_thread::sleep_for(microseconds(1)); +} + +t4_state::~t4_state() { + lock ml(handle_cache_mutex); + handle_cache.clear(); + shared_mgr.shutdown(); +} + +shared_ptr t4_state::get_handle(const string & destination) { + lock ml(handle_cache_mutex); + if (!handle_cache[destination]) + handle_cache[destination] = std::make_shared(destination); + return handle_cache[destination]; +} + +void t4_state::erase_handle(const string & destination) { + lock ml(handle_cache_mutex); + handle_cache.erase(destination); +} diff --git a/t4.h b/t4.h new file mode 100644 index 0000000..19eaddc --- /dev/null +++ b/t4.h @@ -0,0 +1,33 @@ +#ifndef t4_h +#define t4_h + +#include "types.h" +#include "rpc/poll_mgr.h" + +struct t4_state { + std::mutex log_mutex; + std::map thread_name_map; + std::map instance_name_map; + + poll_mgr shared_mgr; + + int next_thread_num = 0; + int next_instance_num = 0; + int DEBUG_LEVEL = 0; + char log_thread_prefix; + + std::mutex handle_cache_mutex; + std::map> handle_cache; + + shared_ptr get_handle(const string & destination); + void erase_handle(const string & destination); + + std::mt19937_64 random_generator; + + t4_state(char log_prefix = ' '); + ~t4_state(); +}; + +extern t4_state *global; + +#endif diff --git a/threaded_log.cc b/threaded_log.cc index 98cc6f6..0160f33 100644 --- a/threaded_log.cc +++ b/threaded_log.cc @@ -1,4 +1,5 @@ #include "threaded_log.h" +#include "t4.h" static std::mutex log_mutex; static std::map thread_name_map; @@ -17,7 +18,7 @@ locked_ostream && _log_prefix(locked_ostream && f, const string & file, const st auto utime = duration_cast( system_clock::now().time_since_epoch()).count() % 1000000000; f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " "; - f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid; + f << std::setfill(' ') << global->log_thread_prefix << std::left << std::setw(2) << tid; f << " " << std::setw(20) << file << " " << std::setw(18) << func; return std::move(f); } diff --git a/types.h b/types.h index 7ab04cc..ce52a15 100644 --- a/types.h +++ b/types.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -106,7 +107,6 @@ operator<<(std::ostream & o, const A & a) { } #include "verify.h" -#include "threaded_log.h" // struct tuple adapter, useful for marshalling and endian swapping. usage: //