|
|
@@ -49,6 +49,7 @@ |
|
|
|
#ifdef HAVE_SYS_SELECT_H |
|
|
|
#include <sys/select.h> |
|
|
|
#endif |
|
|
|
#include <event.h> |
|
|
|
|
|
|
|
#include <netinet/in.h> |
|
|
|
#include <netinet/tcp.h> |
|
|
@@ -72,7 +73,6 @@ |
|
|
|
static int running = 1; |
|
|
|
static int iteration; |
|
|
|
static u_long max_burst_len; |
|
|
|
static fd_set rdfds, wrfds; |
|
|
|
static int min_sd = 0x7fffffff, max_sd = 0, alloced_sd_to_conn = 0; |
|
|
|
static struct timeval select_timeout; |
|
|
|
static struct sockaddr_in myaddr; |
|
|
@@ -134,6 +134,8 @@ static u_int syscall_count[SC_NUM_SYSCALLS]; |
|
|
|
while (errno == EINTR); \ |
|
|
|
} |
|
|
|
#endif |
|
|
|
static void conn_handle_read_event(int sd, short ev, void *a); |
|
|
|
static void conn_handle_write_event(int sd, short ev, void *a); |
|
|
|
|
|
|
|
struct hash_entry { |
|
|
|
const char *hostname; |
|
|
@@ -272,6 +274,34 @@ port_get(void) |
|
|
|
return port; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
conn_read_set(Conn *s) |
|
|
|
{ |
|
|
|
if (s->sd > -1) |
|
|
|
event_add(&s->ev_read, NULL); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
conn_read_clear(Conn *s) |
|
|
|
{ |
|
|
|
if (s->sd > -1) |
|
|
|
event_del(&s->ev_read); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
conn_write_set(Conn *s) |
|
|
|
{ |
|
|
|
if (s->sd > -1) |
|
|
|
event_add(&s->ev_write, NULL); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
conn_write_clear(Conn *s) |
|
|
|
{ |
|
|
|
if (s->sd > -1) |
|
|
|
event_del(&s->ev_write); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
conn_failure(Conn * s, int err) |
|
|
|
{ |
|
|
@@ -297,11 +327,9 @@ conn_timeout(struct Timer *t, Any_Type arg) |
|
|
|
c = 0; |
|
|
|
if (s->sd >= 0) { |
|
|
|
now = timer_now(); |
|
|
|
if (FD_ISSET(s->sd, &rdfds) |
|
|
|
&& s->recvq && now >= s->recvq->timeout) |
|
|
|
if (s->recvq && now >= s->recvq->timeout) |
|
|
|
c = s->recvq; |
|
|
|
else if (FD_ISSET(s->sd, &wrfds) |
|
|
|
&& s->sendq && now >= s->sendq->timeout) |
|
|
|
else if (s->sendq && now >= s->sendq->timeout) |
|
|
|
c = s->sendq; |
|
|
|
} |
|
|
|
if (DBG > 0) { |
|
|
@@ -319,18 +347,12 @@ conn_timeout(struct Timer *t, Any_Type arg) |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
set_active(Conn * s, fd_set * fdset) |
|
|
|
set_active_shared(Conn * s) |
|
|
|
{ |
|
|
|
int sd = s->sd; |
|
|
|
Any_Type arg; |
|
|
|
Time timeout; |
|
|
|
|
|
|
|
FD_SET(sd, fdset); |
|
|
|
if (sd < min_sd) |
|
|
|
min_sd = sd; |
|
|
|
if (sd >= max_sd) |
|
|
|
max_sd = sd; |
|
|
|
|
|
|
|
if (s->watchdog) |
|
|
|
return; |
|
|
|
|
|
|
@@ -347,6 +369,32 @@ set_active(Conn * s, fd_set * fdset) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
set_active_read(Conn *s) |
|
|
|
{ |
|
|
|
int sd = s->sd; |
|
|
|
|
|
|
|
conn_read_set(s); |
|
|
|
if (sd < min_sd) |
|
|
|
min_sd = sd; |
|
|
|
if (sd >= max_sd) |
|
|
|
max_sd = sd; |
|
|
|
set_active_shared(s); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
set_active_write(Conn *s) |
|
|
|
{ |
|
|
|
int sd = s->sd; |
|
|
|
|
|
|
|
conn_write_set(s); |
|
|
|
if (sd < min_sd) |
|
|
|
min_sd = sd; |
|
|
|
if (sd >= max_sd) |
|
|
|
max_sd = sd; |
|
|
|
set_active_shared(s); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
do_send(Conn * conn) |
|
|
|
{ |
|
|
@@ -434,7 +482,7 @@ do_send(Conn * conn) |
|
|
|
*/ |
|
|
|
call->timeout = |
|
|
|
param.timeout ? timer_now() + param.timeout : 0.0; |
|
|
|
set_active(conn, &wrfds); |
|
|
|
set_active_write(conn); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
@@ -444,7 +492,7 @@ do_send(Conn * conn) |
|
|
|
conn->sendq = call->sendq_next; |
|
|
|
if (!conn->sendq) { |
|
|
|
conn->sendq_tail = 0; |
|
|
|
FD_CLR(sd, &wrfds); |
|
|
|
conn_write_clear(conn); |
|
|
|
} |
|
|
|
arg.l = 0; |
|
|
|
event_signal(EV_CALL_SEND_STOP, (Object *) call, arg); |
|
|
@@ -468,7 +516,7 @@ do_send(Conn * conn) |
|
|
|
call->timeout = param.timeout + param.think_timeout; |
|
|
|
if (call->timeout > 0.0) |
|
|
|
call->timeout += timer_now(); |
|
|
|
set_active(conn, &rdfds); |
|
|
|
set_active_read(conn); |
|
|
|
if (conn->state < S_REPLY_STATUS) |
|
|
|
conn->state = S_REPLY_STATUS; /* expecting reply |
|
|
|
* status */ |
|
|
@@ -491,7 +539,7 @@ recv_done(Call * call) |
|
|
|
|
|
|
|
conn->recvq = call->recvq_next; |
|
|
|
if (!conn->recvq) { |
|
|
|
FD_CLR(conn->sd, &rdfds); |
|
|
|
conn_read_clear(conn); |
|
|
|
conn->recvq_tail = 0; |
|
|
|
} |
|
|
|
/* |
|
|
@@ -602,7 +650,7 @@ do_recv(Conn * s) |
|
|
|
while (buf_len > 0); |
|
|
|
|
|
|
|
if (s->recvq) |
|
|
|
set_active(c->conn, &rdfds); |
|
|
|
set_active_read(c->conn); |
|
|
|
} |
|
|
|
|
|
|
|
struct sockaddr_in * |
|
|
@@ -651,8 +699,6 @@ core_init(void) |
|
|
|
struct rlimit rlimit; |
|
|
|
|
|
|
|
memset(&hash_table, 0, sizeof(hash_table)); |
|
|
|
memset(&rdfds, 0, sizeof(rdfds)); |
|
|
|
memset(&wrfds, 0, sizeof(wrfds)); |
|
|
|
memset(&myaddr, 0, sizeof(myaddr)); |
|
|
|
memset(&port_free_map, 0xff, sizeof(port_free_map)); |
|
|
|
|
|
|
@@ -689,12 +735,15 @@ core_init(void) |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
|
|
|
|
/* Disable the FD_SETSIZE check for now - perhaps only enforce on libevent select()? */ |
|
|
|
#if 0 |
|
|
|
if (rlimit.rlim_max > FD_SETSIZE) { |
|
|
|
fprintf(stderr, "%s: warning: open file limit > FD_SETSIZE; " |
|
|
|
"limiting max. # of open files to FD_SETSIZE\n", |
|
|
|
prog_name); |
|
|
|
rlimit.rlim_max = FD_SETSIZE; |
|
|
|
} |
|
|
|
#endif |
|
|
|
|
|
|
|
rlimit.rlim_cur = rlimit.rlim_max; |
|
|
|
if (setrlimit(RLIMIT_NOFILE, &rlimit) < 0) { |
|
|
@@ -741,14 +790,12 @@ core_ssl_connect(Conn * s) |
|
|
|
(reason == |
|
|
|
SSL_ERROR_WANT_READ) ? "read" : |
|
|
|
"write"); |
|
|
|
if (reason == SSL_ERROR_WANT_READ |
|
|
|
&& !FD_ISSET(s->sd, &rdfds)) { |
|
|
|
FD_CLR(s->sd, &wrfds); |
|
|
|
set_active(s, &rdfds); |
|
|
|
} else if (reason == SSL_ERROR_WANT_WRITE |
|
|
|
&& !FD_ISSET(s->sd, &wrfds)) { |
|
|
|
FD_CLR(s->sd, &rdfds); |
|
|
|
set_active(s, &wrfds); |
|
|
|
if (reason == SSL_ERROR_WANT_READ) { |
|
|
|
conn_write_clear(s); |
|
|
|
set_active_read(s); |
|
|
|
} else if (reason == SSL_ERROR_WANT_WRITE) { |
|
|
|
conn_read_clear(s); |
|
|
|
set_active_write(s); |
|
|
|
} |
|
|
|
return; |
|
|
|
} |
|
|
@@ -859,6 +906,8 @@ core_connect(Conn * s) |
|
|
|
} |
|
|
|
|
|
|
|
s->sd = sd; |
|
|
|
event_set(&s->ev_read, sd, EV_READ | EV_PERSIST, conn_handle_read_event, s); |
|
|
|
event_set(&s->ev_write, sd, EV_WRITE | EV_PERSIST, conn_handle_write_event, s); |
|
|
|
if (sd >= alloced_sd_to_conn) { |
|
|
|
size_t size, old_size; |
|
|
|
|
|
|
@@ -939,7 +988,7 @@ core_connect(Conn * s) |
|
|
|
* connection establishment. |
|
|
|
*/ |
|
|
|
s->state = S_CONNECTING; |
|
|
|
set_active(s, &wrfds); |
|
|
|
set_active_write(s); |
|
|
|
if (param.timeout > 0.0) { |
|
|
|
arg.vp = s; |
|
|
|
assert(!s->watchdog); |
|
|
@@ -1040,7 +1089,7 @@ core_send(Conn * conn, Call * call) |
|
|
|
return -1; |
|
|
|
call->timeout = |
|
|
|
param.timeout ? timer_now() + param.timeout : 0.0; |
|
|
|
set_active(conn, &wrfds); |
|
|
|
set_active_write(conn); |
|
|
|
} else { |
|
|
|
conn->sendq_tail->sendq_next = call; |
|
|
|
conn->sendq_tail = call; |
|
|
@@ -1097,8 +1146,8 @@ core_close(Conn * conn) |
|
|
|
if (sd >= 0) { |
|
|
|
close(sd); |
|
|
|
sd_to_conn[sd] = 0; |
|
|
|
FD_CLR(sd, &wrfds); |
|
|
|
FD_CLR(sd, &rdfds); |
|
|
|
conn_read_clear(conn); |
|
|
|
conn_write_clear(conn); |
|
|
|
} |
|
|
|
if (conn->myport > 0) |
|
|
|
port_put(conn->myport); |
|
|
@@ -1111,124 +1160,66 @@ core_close(Conn * conn) |
|
|
|
conn_dec_ref(conn); |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
core_loop(void) |
|
|
|
static void |
|
|
|
conn_handle_read_event(int sd, short ev, void *a) |
|
|
|
{ |
|
|
|
int is_readable, is_writable, n, sd, bit, min_i, max_i, i = |
|
|
|
0; |
|
|
|
fd_set readable, writable; |
|
|
|
fd_mask mask; |
|
|
|
Any_Type arg; |
|
|
|
Conn *conn; |
|
|
|
Conn *conn = sd_to_conn[sd]; |
|
|
|
Any_Type arg; |
|
|
|
|
|
|
|
while (running) { |
|
|
|
struct timeval tv = select_timeout; |
|
|
|
conn_inc_ref(conn); |
|
|
|
|
|
|
|
timer_tick(); |
|
|
|
if (conn->watchdog) { |
|
|
|
timer_cancel(conn->watchdog); |
|
|
|
conn->watchdog = 0; |
|
|
|
} |
|
|
|
|
|
|
|
readable = rdfds; |
|
|
|
writable = wrfds; |
|
|
|
min_i = min_sd / NFDBITS; |
|
|
|
max_i = max_sd / NFDBITS; |
|
|
|
if (conn->recvq) |
|
|
|
do_recv(conn); |
|
|
|
|
|
|
|
SYSCALL(SELECT, |
|
|
|
n = select(max_sd + 1, &readable, &writable, 0, &tv)); |
|
|
|
conn_dec_ref(conn); |
|
|
|
} |
|
|
|
|
|
|
|
++iteration; |
|
|
|
static void |
|
|
|
conn_handle_write_event(int sd, short ev, void *a) |
|
|
|
{ |
|
|
|
Conn *conn = sd_to_conn[sd]; |
|
|
|
Any_Type arg; |
|
|
|
|
|
|
|
if (n <= 0) { |
|
|
|
if (n < 0) { |
|
|
|
fprintf(stderr, |
|
|
|
"%s.core_loop: select failed: %s\n", |
|
|
|
prog_name, strerror(errno)); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
continue; |
|
|
|
} |
|
|
|
conn_inc_ref(conn); |
|
|
|
|
|
|
|
while (n > 0) { |
|
|
|
/* |
|
|
|
* find the index of the fdmask that has something |
|
|
|
* going on: |
|
|
|
*/ |
|
|
|
do { |
|
|
|
++i; |
|
|
|
if (i > max_i) |
|
|
|
i = min_i; |
|
|
|
|
|
|
|
assert(i <= max_i); |
|
|
|
mask = |
|
|
|
readable.fds_bits[i] | writable. |
|
|
|
fds_bits[i]; |
|
|
|
} |
|
|
|
while (!mask); |
|
|
|
bit = 0; |
|
|
|
sd = i * NFDBITS + bit; |
|
|
|
do { |
|
|
|
if (mask & 1) { |
|
|
|
--n; |
|
|
|
|
|
|
|
is_readable = |
|
|
|
(FD_ISSET(sd, &readable) |
|
|
|
&& FD_ISSET(sd, &rdfds)); |
|
|
|
is_writable = (FD_ISSET(sd, &writable) |
|
|
|
&& FD_ISSET(sd, |
|
|
|
&wrfds)); |
|
|
|
|
|
|
|
if (is_readable || is_writable) { |
|
|
|
/* |
|
|
|
* only handle sockets that |
|
|
|
* haven't timed out yet |
|
|
|
*/ |
|
|
|
conn = sd_to_conn[sd]; |
|
|
|
|
|
|
|
conn_inc_ref(conn); |
|
|
|
|
|
|
|
if (conn->watchdog) { |
|
|
|
timer_cancel(conn-> |
|
|
|
watchdog); |
|
|
|
conn->watchdog = 0; |
|
|
|
} |
|
|
|
if (conn->state == |
|
|
|
S_CONNECTING) { |
|
|
|
if (conn->watchdog) { |
|
|
|
timer_cancel(conn->watchdog); |
|
|
|
conn->watchdog = 0; |
|
|
|
} |
|
|
|
|
|
|
|
if (conn->state == S_CONNECTING) { |
|
|
|
#ifdef HAVE_SSL |
|
|
|
if (param.use_ssl) |
|
|
|
core_ssl_connect |
|
|
|
(conn); |
|
|
|
else |
|
|
|
if (param.use_ssl) |
|
|
|
core_ssl_connect (conn); |
|
|
|
else { |
|
|
|
#endif |
|
|
|
if (is_writable) { |
|
|
|
FD_CLR(sd, |
|
|
|
&wrfds); |
|
|
|
conn->state = |
|
|
|
S_CONNECTED; |
|
|
|
arg.l = 0; |
|
|
|
event_signal |
|
|
|
(EV_CONN_CONNECTED, |
|
|
|
(Object *) |
|
|
|
conn, |
|
|
|
arg); |
|
|
|
} |
|
|
|
} else { |
|
|
|
if (is_writable |
|
|
|
&& conn->sendq) |
|
|
|
do_send(conn); |
|
|
|
if (is_readable |
|
|
|
&& conn->recvq) |
|
|
|
do_recv(conn); |
|
|
|
} |
|
|
|
|
|
|
|
conn_dec_ref(conn); |
|
|
|
|
|
|
|
if (n > 0) |
|
|
|
timer_tick(); |
|
|
|
} |
|
|
|
} |
|
|
|
mask = ((u_long) mask) >> 1; |
|
|
|
++sd; |
|
|
|
} |
|
|
|
while (mask); |
|
|
|
conn_write_clear(conn); |
|
|
|
conn->state = S_CONNECTED; |
|
|
|
arg.l = 0; |
|
|
|
event_signal(EV_CONN_CONNECTED, (Object *) conn, arg); |
|
|
|
} |
|
|
|
} else if (conn->sendq) |
|
|
|
do_send(conn); |
|
|
|
|
|
|
|
conn_dec_ref(conn); |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
|
core_loop(void) |
|
|
|
{ |
|
|
|
int is_readable, is_writable, n, sd, bit, min_i, max_i, i = |
|
|
|
0; |
|
|
|
fd_set readable, writable; |
|
|
|
fd_mask mask; |
|
|
|
|
|
|
|
while (running) { |
|
|
|
timer_tick(); |
|
|
|
n = event_loop(EVLOOP_ONCE); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|