-// std::bind and syscall bind have the same name, so don't use std::bind in this file
-#define LIBT4_NO_FUNCTIONAL
#include "connection.h"
+#include "rpc_protocol.h"
#include <cerrno>
#include <csignal>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/socket.h>
-
-#define MAX_PDU (10<<20) //maximum PDF is 10M
+#include "marshall.h"
connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), lossy_(l1)
if (lossy_) {
if ((random()%100) < lossy_) {
- IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+ IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
shutdown(fd_,SHUT_RDWR);
}
}
if (wpdu_.solong == wpdu_.buf.size())
return true;
- if (wpdu_.solong == 0) {
- uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
- copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
- }
ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
if (n < 0) {
if (errno != EAGAIN) {
- IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+ IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
wpdu_.solong = size_t_max;
wpdu_.buf.clear();
}
bool connection::readpdu() {
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
- uint32_t sz1;
+ rpc_sz_t sz1;
ssize_t n = read(fd_, &sz1, sizeof(sz1));
if (n == 0) {
return false;
}
- size_t sz = ntohl(sz1);
+ size_t sz = ntoh(sz1);
if (sz > MAX_PDU) {
IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
VERIFY(rpdu_.buf.size() == 0);
rpdu_.buf = string(sz+sizeof(sz1), 0);
- copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]);
rpdu_.solong = sizeof(sz1);
}
return true;
}
-tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
: mgr_(m1), lossy_(lossytest)
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
- sin.sin_port = htons(port);
+ sin.sin_port = hton(port);
tcp_ = socket(AF_INET, SOCK_STREAM, 0);
if (tcp_ < 0) {
setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
- if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+ // careful to exactly match type signature of bind arguments so we don't
+ // get std::bind instead
+ if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
perror("accept_loop tcp bind:");
VERIFY(0);
}
socklen_t addrlen = sizeof(sin);
VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
- port_ = ntohs(sin.sin_port);
+ port_ = ntoh(sin.sin_port);
IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
throw thread_exit_exception();
}
- IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+ IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
connection *ch = new connection(mgr_, s1, lossy_);
// garbage collect all dead connections with refcount of 1
continue;
} else {
perror("accept_conn select:");
- IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
+ IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
VERIFY(0);
}
}
int yes = 1;
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
- IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+ IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
close(s);
return NULL;
}
- IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+ IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
return new connection(mgr, s, lossy);
}