4 #include <netinet/tcp.h>
9 #include "method_thread.h"
10 #include "connection.h"
15 #include "lang/verify.h"
17 #define MAX_PDU (10<<20) //maximum PDF is 10M
20 connection::connection(chanmgr *m1, int f1, int l1)
21 : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
24 int flags = fcntl(fd_, F_GETFL, NULL);
26 fcntl(fd_, F_SETFL, flags);
28 signal(SIGPIPE, SIG_IGN);
29 VERIFY(pthread_mutex_init(&m_,0)==0);
30 VERIFY(pthread_mutex_init(&ref_m_,0)==0);
31 VERIFY(pthread_cond_init(&send_wait_,0)==0);
32 VERIFY(pthread_cond_init(&send_complete_,0)==0);
34 VERIFY(gettimeofday(&create_time_, NULL) == 0);
36 PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
39 connection::~connection()
42 VERIFY(pthread_mutex_destroy(&m_)== 0);
43 VERIFY(pthread_mutex_destroy(&ref_m_)== 0);
44 VERIFY(pthread_cond_destroy(&send_wait_) == 0);
45 VERIFY(pthread_cond_destroy(&send_complete_) == 0);
55 ScopedLock ml(&ref_m_);
67 connection::closeconn()
73 shutdown(fd_,SHUT_RDWR);
78 //after block_remove_fd, select will never wait on fd_
79 //and no callbacks will be active
80 PollMgr::Instance()->block_remove_fd(fd_);
86 VERIFY(pthread_mutex_lock(&ref_m_)==0);
90 VERIFY(pthread_mutex_lock(&m_)==0);
92 VERIFY(pthread_mutex_unlock(&ref_m_)==0);
93 VERIFY(pthread_mutex_unlock(&m_)==0);
97 VERIFY(pthread_mutex_unlock(&m_)==0);
99 pthread_mutex_unlock(&ref_m_);
105 ScopedLock rl(&ref_m_);
110 connection::compare(connection *another)
112 if (create_time_.tv_sec > another->create_time_.tv_sec)
114 if (create_time_.tv_sec < another->create_time_.tv_sec)
116 if (create_time_.tv_usec > another->create_time_.tv_usec)
118 if (create_time_.tv_usec < another->create_time_.tv_usec)
124 connection::send(char *b, int sz)
128 while (!dead_ && wpdu_.buf) {
129 VERIFY(pthread_cond_wait(&send_wait_, &m_)==0);
140 if ((random()%100) < lossy_) {
141 jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
142 shutdown(fd_,SHUT_RDWR);
148 VERIFY(pthread_mutex_unlock(&m_) == 0);
149 PollMgr::Instance()->block_remove_fd(fd_);
150 VERIFY(pthread_mutex_lock(&m_) == 0);
152 if (wpdu_.solong == wpdu_.sz) {
154 //should be rare to need to explicitly add write callback
155 PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
156 while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
157 VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0);
161 bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
162 wpdu_.solong = wpdu_.sz = 0;
165 pthread_cond_broadcast(&send_wait_);
169 //fd_ is ready to be written
171 connection::write_cb(int s)
177 PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
181 PollMgr::Instance()->del_callback(fd_, CB_RDWR);
184 VERIFY(wpdu_.solong >= 0);
185 if (wpdu_.solong < wpdu_.sz) {
189 pthread_cond_signal(&send_complete_);
192 //fd_ is ready to be read
194 connection::read_cb(int s)
203 if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
208 PollMgr::Instance()->del_callback(fd_,CB_RDWR);
210 pthread_cond_signal(&send_complete_);
213 if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
214 if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
215 //chanmgr has successfully consumed the pdu
217 rpdu_.sz = rpdu_.solong = 0;
223 connection::writepdu()
225 VERIFY(wpdu_.solong >= 0);
226 if (wpdu_.solong == wpdu_.sz)
229 if (wpdu_.solong == 0) {
230 int sz = htonl(wpdu_.sz);
231 bcopy(&sz,wpdu_.buf,sizeof(sz));
233 int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
235 if (errno != EAGAIN) {
236 jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
240 return (errno == EAGAIN);
247 connection::readpdu()
251 int n = read(fd_, &sz1, sizeof(sz1));
258 VERIFY(errno!=EAGAIN);
262 if (n >0 && n!= sizeof(sz)) {
263 jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
270 char *tmpb = (char *)&sz1;
271 jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
272 sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
277 VERIFY(rpdu_.buf == NULL);
278 rpdu_.buf = (char *)malloc(sz+sizeof(sz));
280 bcopy(&sz1,rpdu_.buf,sizeof(sz));
281 rpdu_.solong = sizeof(sz);
284 int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
291 rpdu_.sz = rpdu_.solong = 0;
292 return (errno == EAGAIN);
298 tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
299 : mgr_(m1), lossy_(lossytest)
302 VERIFY(pthread_mutex_init(&m_,NULL) == 0);
304 struct sockaddr_in sin;
305 memset(&sin, 0, sizeof(sin));
306 sin.sin_family = AF_INET;
307 sin.sin_port = htons(port);
309 tcp_ = socket(AF_INET, SOCK_STREAM, 0);
311 perror("tcpsconn::tcpsconn accept_loop socket:");
316 setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
317 setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
319 if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){
320 perror("accept_loop tcp bind:");
324 if(listen(tcp_, 1000) < 0) {
325 perror("tcpsconn::tcpsconn listen:");
329 socklen_t addrlen = sizeof(sin);
330 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
331 port_ = ntohs(sin.sin_port);
333 jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
336 if (pipe(pipe_) < 0) {
337 perror("accept_loop pipe:");
341 int flags = fcntl(pipe_[0], F_GETFL, NULL);
343 fcntl(pipe_[0], F_SETFL, flags);
345 VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0);
348 tcpsconn::~tcpsconn()
350 VERIFY(close(pipe_[1]) == 0);
351 VERIFY(pthread_join(th_, NULL) == 0);
353 //close all the active connections
354 std::map<int, connection *>::iterator i;
355 for (i = conns_.begin(); i != conns_.end(); i++) {
356 i->second->closeconn();
362 tcpsconn::process_accept()
365 socklen_t slen = sizeof(sin);
366 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
368 perror("tcpsconn::accept_conn error");
372 jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
373 s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
374 connection *ch = new connection(mgr_, s1, lossy_);
376 // garbage collect all dead connections with refcount of 1
377 std::map<int, connection *>::iterator i;
378 for (i = conns_.begin(); i != conns_.end();) {
379 if (i->second->isdead() && i->second->ref() == 1) {
380 jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
381 i->second->channo());
383 // Careful not to reuse i right after erase. (i++) will
384 // be evaluated before the erase call because in C++,
385 // there is a sequence point before a function call.
386 // See http://en.wikipedia.org/wiki/Sequence_point.
392 conns_[ch->channo()] = ch;
396 tcpsconn::accept_conn()
399 int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
403 FD_SET(pipe_[0], &rfds);
406 int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
409 if (errno == EINTR) {
412 perror("accept_conn select:");
413 jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
418 if (FD_ISSET(pipe_[0], &rfds)) {
423 else if (FD_ISSET(tcp_, &rfds)) {
432 connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
434 int s= socket(AF_INET, SOCK_STREAM, 0);
436 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
437 if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
438 jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
439 inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
443 jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n",
444 s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
445 return new connection(mgr, s, lossy);