Skip to content

Commit

Permalink
Put listen sockets in its own list
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Dec 28, 2022
1 parent afd527a commit 034575d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 31 deletions.
83 changes: 62 additions & 21 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ unsigned short us_socket_context_timestamp(int ssl, struct us_socket_context_t *
void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls) {
/* us_listen_socket_t extends us_socket_t so we close in similar ways */
if (!us_socket_is_closed(0, &ls->s)) {
us_internal_socket_context_unlink(ls->s.context, &ls->s);
us_internal_socket_context_unlink_listen_socket(ls->s.context, ls);
us_poll_stop((struct us_poll_t *) &ls->s, ls->s.context->loop);
bsd_close_socket(us_poll_fd((struct us_poll_t *) &ls->s));

Expand All @@ -49,27 +49,56 @@ void us_listen_socket_close(int ssl, struct us_listen_socket_t *ls) {
}

void us_socket_context_close(int ssl, struct us_socket_context_t *context) {
struct us_socket_t *s = context->head;
while (s) {
struct us_socket_t *nextS = s->next;
us_socket_close(ssl, s, 0, 0);
s = nextS;
/* Begin by closing all listen sockets */
struct us_listen_socket_t *ls = context->head_listen_sockets;
while (ls) {
struct us_listen_socket_t *nextLS = (struct us_listen_socket_t *) ls->s.next;
us_listen_socket_close(ssl, ls);
ls = nextLS;
}

/* Then close all regular sockets */
struct us_socket_t *s = context->head_sockets;
while (s) {
struct us_socket_t *nextS = s->next;
us_socket_close(ssl, s, 0, 0);
s = nextS;
}
}

void us_internal_socket_context_unlink_listen_socket(struct us_socket_context_t *context, struct us_listen_socket_t *ls) {
/* We have to properly update the iterator used to sweep sockets for timeouts */
if (ls == (struct us_listen_socket_t *) context->iterator) {
context->iterator = ls->s.next;
}

if (ls->s.prev == ls->s.next) {
context->head_listen_sockets = 0;
} else {
if (ls->s.prev) {
ls->s.prev->next = ls->s.next;
} else {
context->head_listen_sockets = (struct us_listen_socket_t *) ls->s.next;
}
if (ls->s.next) {
ls->s.next->prev = ls->s.prev;
}
}
}

void us_internal_socket_context_unlink(struct us_socket_context_t *context, struct us_socket_t *s) {
void us_internal_socket_context_unlink_socket(struct us_socket_context_t *context, struct us_socket_t *s) {
/* We have to properly update the iterator used to sweep sockets for timeouts */
if (s == context->iterator) {
context->iterator = s->next;
}

if (s->prev == s->next) {
context->head = 0;
context->head_sockets = 0;
} else {
if (s->prev) {
s->prev->next = s->next;
} else {
context->head = s->next;
context->head_sockets = s->next;
}
if (s->next) {
s->next->prev = s->prev;
Expand All @@ -78,14 +107,25 @@ void us_internal_socket_context_unlink(struct us_socket_context_t *context, stru
}

/* We always add in the top, so we don't modify any s.next */
void us_internal_socket_context_link(struct us_socket_context_t *context, struct us_socket_t *s) {
void us_internal_socket_context_link_listen_socket(struct us_socket_context_t *context, struct us_listen_socket_t *ls) {
ls->s.context = context;
ls->s.next = (struct us_socket_t *) context->head_listen_sockets;
ls->s.prev = 0;
if (context->head_listen_sockets) {
context->head_listen_sockets->s.prev = &ls->s;
}
context->head_listen_sockets = ls;
}

/* We always add in the top, so we don't modify any s.next */
void us_internal_socket_context_link_socket(struct us_socket_context_t *context, struct us_socket_t *s) {
s->context = context;
s->next = context->head;
s->next = context->head_sockets;
s->prev = 0;
if (context->head) {
context->head->prev = s;
if (context->head_sockets) {
context->head_sockets->prev = s;
}
context->head = s;
context->head_sockets = s;
}

struct us_loop_t *us_socket_context_loop(int ssl, struct us_socket_context_t *context) {
Expand Down Expand Up @@ -172,7 +212,8 @@ struct us_socket_context_t *us_create_socket_context(int ssl, struct us_loop_t *

struct us_socket_context_t *context = malloc(sizeof(struct us_socket_context_t) + context_ext_size);
context->loop = loop;
context->head = 0;
context->head_sockets = 0;
context->head_listen_sockets = 0;
context->iterator = 0;
context->next = 0;
context->is_low_prio = default_is_low_prio_handler;
Expand Down Expand Up @@ -228,7 +269,7 @@ struct us_listen_socket_t *us_socket_context_listen(int ssl, struct us_socket_co
ls->s.long_timeout = 255;
ls->s.low_prio_state = 0;
ls->s.next = 0;
us_internal_socket_context_link(context, &ls->s);
us_internal_socket_context_link_listen_socket(context, ls);

ls->socket_ext_size = socket_ext_size;

Expand Down Expand Up @@ -259,7 +300,7 @@ struct us_listen_socket_t *us_socket_context_listen_unix(int ssl, struct us_sock
ls->s.long_timeout = 255;
ls->s.low_prio_state = 0;
ls->s.next = 0;
us_internal_socket_context_link(context, &ls->s);
us_internal_socket_context_link_listen_socket(context, ls);

ls->socket_ext_size = socket_ext_size;

Expand Down Expand Up @@ -290,7 +331,7 @@ struct us_socket_t *us_socket_context_connect(int ssl, struct us_socket_context_
connect_socket->timeout = 255;
connect_socket->long_timeout = 255;
connect_socket->low_prio_state = 0;
us_internal_socket_context_link(context, connect_socket);
us_internal_socket_context_link_socket(context, connect_socket);

return connect_socket;
}
Expand Down Expand Up @@ -319,7 +360,7 @@ struct us_socket_t *us_socket_context_connect_unix(int ssl, struct us_socket_con
connect_socket->timeout = 255;
connect_socket->long_timeout = 255;
connect_socket->low_prio_state = 0;
us_internal_socket_context_link(context, connect_socket);
us_internal_socket_context_link_socket(context, connect_socket);

return connect_socket;
}
Expand Down Expand Up @@ -351,7 +392,7 @@ struct us_socket_t *us_socket_context_adopt_socket(int ssl, struct us_socket_con

if (s->low_prio_state != 1) {
/* This properly updates the iterator if in on_timeout */
us_internal_socket_context_unlink(s->context, s);
us_internal_socket_context_unlink_socket(s->context, s);
}

struct us_socket_t *new_s = (struct us_socket_t *) us_poll_resize(&s->p, s->context->loop, sizeof(struct us_socket_t) + ext_size);
Expand All @@ -365,7 +406,7 @@ struct us_socket_t *us_socket_context_adopt_socket(int ssl, struct us_socket_con

if (new_s->next) new_s->next->prev = new_s;
} else {
us_internal_socket_context_link(context, new_s);
us_internal_socket_context_link_socket(context, new_s);
}

return new_s;
Expand Down
11 changes: 8 additions & 3 deletions src/internal/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ void us_internal_init_loop_ssl_data(struct us_loop_t *loop);
void us_internal_free_loop_ssl_data(struct us_loop_t *loop);

/* Socket context related */
void us_internal_socket_context_link(struct us_socket_context_t *context, struct us_socket_t *s);
void us_internal_socket_context_unlink(struct us_socket_context_t *context, struct us_socket_t *s);
void us_internal_socket_context_link_socket(struct us_socket_context_t *context, struct us_socket_t *s);
void us_internal_socket_context_unlink_socket(struct us_socket_context_t *context, struct us_socket_t *s);

/* Sockets are polls */
struct us_socket_t {
Expand All @@ -110,12 +110,17 @@ struct us_listen_socket_t {
unsigned int socket_ext_size;
};

/* Listen sockets are keps in their own list */
void us_internal_socket_context_link_listen_socket(struct us_socket_context_t *context, struct us_listen_socket_t *s);
void us_internal_socket_context_unlink_listen_socket(struct us_socket_context_t *context, struct us_listen_socket_t *s);

struct us_socket_context_t {
alignas(LIBUS_EXT_ALIGNMENT) struct us_loop_t *loop;
uint32_t global_tick;
unsigned char timestamp;
unsigned char long_timestamp;
struct us_socket_t *head;
struct us_socket_t *head_sockets;
struct us_listen_socket_t *head_listen_sockets;
struct us_socket_t *iterator;
struct us_socket_context_t *prev, *next;

Expand Down
2 changes: 1 addition & 1 deletion src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ WIN32_EXPORT void us_socket_context_on_end(int ssl, struct us_socket_context_t *
/* Returns user data extension for this socket context */
WIN32_EXPORT void *us_socket_context_ext(int ssl, struct us_socket_context_t *context);

/* Closes all open sockets. Does not close listen sockets. Does not invalidate the socket context. */
/* Closes all open sockets, including listen sockets. Does not invalidate the socket context. */
void us_socket_context_close(int ssl, struct us_socket_context_t *context);

/* Listen for connections. Acts as the main driving cog in a server. Will call set async callbacks. */
Expand Down
8 changes: 4 additions & 4 deletions src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void us_internal_timer_sweep(struct us_loop_t *loop) {
unsigned char long_ticks = context->long_timestamp = (context->global_tick / 15) % 240;

/* Begin at head */
struct us_socket_t *s = context->head;
struct us_socket_t *s = context->head_sockets;
while (s) {
/* Seek until end or timeout found (tightest loop) */
while (1) {
Expand Down Expand Up @@ -152,7 +152,7 @@ void us_internal_handle_low_priority_sockets(struct us_loop_t *loop) {
if (s->next) s->next->prev = 0;
s->next = 0;

us_internal_socket_context_link(s->context, s);
us_internal_socket_context_link_socket(s->context, s);
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) | LIBUS_SOCKET_READABLE);

s->low_prio_state = 2;
Expand Down Expand Up @@ -259,7 +259,7 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events)
/* We always use nodelay */
bsd_socket_nodelay(client_fd, 1);

us_internal_socket_context_link(listen_socket->s.context, s);
us_internal_socket_context_link_socket(listen_socket->s.context, s);

listen_socket->s.context->on_open(s, 0, bsd_addr_get_ip(&addr), bsd_addr_get_ip_length(&addr));

Expand Down Expand Up @@ -313,7 +313,7 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events)
s->context->loop->data.low_prio_budget--; /* Still having budget for this iteration - do normal processing */
} else {
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE);
us_internal_socket_context_unlink(s->context, s);
us_internal_socket_context_unlink_socket(s->context, s);

/* Link this socket to the low-priority queue - we use a LIFO queue, to prioritize newer clients that are
* maybe not already timeouted - sounds unfair, but works better in real-life with smaller client-timeouts
Expand Down
4 changes: 2 additions & 2 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ int us_socket_is_established(int ssl, struct us_socket_t *s) {
/* Exactly the same as us_socket_close but does not emit on_close event */
struct us_socket_t *us_socket_close_connecting(int ssl, struct us_socket_t *s) {
if (!us_socket_is_closed(0, s)) {
us_internal_socket_context_unlink(s->context, s);
us_internal_socket_context_unlink_socket(s->context, s);
us_poll_stop((struct us_poll_t *) s, s->context->loop);
bsd_close_socket(us_poll_fd((struct us_poll_t *) s));

Expand Down Expand Up @@ -115,7 +115,7 @@ struct us_socket_t *us_socket_close(int ssl, struct us_socket_t *s, int code, vo
s->next = 0;
s->low_prio_state = 0;
} else {
us_internal_socket_context_unlink(s->context, s);
us_internal_socket_context_unlink_socket(s->context, s);
}
us_poll_stop((struct us_poll_t *) s, s->context->loop);
bsd_close_socket(us_poll_fd((struct us_poll_t *) s));
Expand Down

0 comments on commit 034575d

Please sign in to comment.