#include "lock.h"
#include "jsl_log.h"
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lang/verify.h"
+using std::stoi;
+
const rpcc::TO rpcc::to_max = { 120000 };
const rpcc::TO rpcc::to_min = { 1000 };
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
-rpcc::rpcc(sockaddr_in d, bool retrans) :
- dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+rpcc::rpcc(const string & d, bool retrans) :
+ dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
{
if(retrans){
rpcc::cancel(void)
{
lock ml(m_);
- tprintf("rpcc::cancel: force callers to fail");
+ LOG("rpcc::cancel: force callers to fail");
for(auto &p : calls_){
caller *ca = p.second;
destroy_wait_ = true;
destroy_wait_c_.wait(ml);
}
- tprintf("rpcc::cancel: done");
+ LOG("rpcc::cancel: done");
}
int
-rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
+rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
TO to)
{
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
xid_rep = xid_rep_window_.front();
}
}
void
-rpcs::reg1(unsigned int proc, handler *h)
+rpcs::reg1(proc_t proc, handler *h)
{
lock pl(procs_m_);
VERIFY(procs_.count(proc) == 0);
}
void
-rpcs::updatestat(unsigned int proc)
+rpcs::updatestat(proc_t proc)
{
lock cl(count_m_);
counts_[proc]++;
curr_counts_--;
if(curr_counts_ == 0){
- tprintf("RPC STATS: ");
+ LOG("RPC STATS: ");
for (auto i = counts_.begin(); i != counts_.end(); i++)
- tprintf("%x:%lu ", i->first, i->second);
+ LOG(std::hex << i->first << ":" << std::dec << i->second);
lock rwl(reply_window_m_);
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
+ map<unsigned int,list<reply_t> >::iterator clt;
size_t totalrep = 0, maxrep = 0;
for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
request_header h;
req.unpack_req_header(&h);
- unsigned int proc = (unsigned int)h.proc;
+ proc_t proc = h.proc;
if(!req.ok()){
jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
{
lock rwl(reply_window_m_);
- std::list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> &l = reply_window_[clt_nonce];
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
int past_xid_rep = l.begin()->xid;
- std::list<reply_t>::iterator start = l.begin(), it;
+ list<reply_t>::iterator start = l.begin(), it;
it = ++start;
if (past_xid_rep < xid_rep || past_xid_rep == -1) {
{
lock rwl(reply_window_m_);
// remember the RPC reply value
- std::list<reply_t> &l = reply_window_[clt_nonce];
- std::list<reply_t>::iterator it = l.begin();
+ list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t>::iterator it = l.begin();
// skip to our place in the list
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
marshall &
-operator<<(marshall &m, const std::string &s) {
+operator<<(marshall &m, const string &s) {
m << (unsigned int) s.size();
m.rawbytes(s.data(), s.size());
return m;
}
void
-unmarshall::rawbytes(std::string &ss, size_t n)
+unmarshall::rawbytes(string &ss, size_t n)
{
VERIFY(ensure(n));
ss.assign(buf_+index_, n);
unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, std::string &s) {
+unmarshall & operator>>(unmarshall &u, string &s) {
unsigned sz = u.grab<unsigned>();
if(u.ok())
u.rawbytes(s, sz);
}
/*---------------auxilary function--------------*/
-void
-make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
+sockaddr_in make_sockaddr(const string &hostandport) {
auto colon = hostandport.find(':');
- if (colon == std::string::npos)
- make_sockaddr("127.0.0.1", hostandport, dst);
+ if (colon == string::npos)
+ return make_sockaddr("127.0.0.1", hostandport);
else
- make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
+ return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
}
-void
-make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
- bzero(dst, sizeof(*dst));
- dst->sin_family = AF_INET;
+sockaddr_in make_sockaddr(const string &host, const string &port) {
+ sockaddr_in dst;
+ bzero(&dst, sizeof(dst));
+ dst.sin_family = AF_INET;
struct in_addr a{inet_addr(host.c_str())};
if(a.s_addr != INADDR_NONE)
- dst->sin_addr.s_addr = a.s_addr;
+ dst.sin_addr.s_addr = a.s_addr;
else {
struct hostent *hp = gethostbyname(host.c_str());
exit(1);
}
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
- dst->sin_addr.s_addr = a.s_addr;
+ dst.sin_addr.s_addr = a.s_addr;
}
- dst->sin_port = hton((uint16_t)std::stoi(port));
+ dst.sin_port = hton((uint16_t)stoi(port));
+ return dst;
}