Skip to content

Commit

Permalink
Merge pull request #5 from chainstack/bug/concurrent-requests-corrupt…
Browse files Browse the repository at this point in the history
…-the-message-size-calculation

Add separate request contexts for server events and client events
  • Loading branch information
zaslavskii authored Apr 30, 2021
2 parents 0588ca1 + cce7dba commit 71e9d01
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
25 changes: 9 additions & 16 deletions src/ngx_http_websocket_stat_frame_counter.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,16 @@ frame_counter_process_message(u_char **buffer, ssize_t *size,
break;
case PAYLOAD_LEN_LARGE:
case PAYLOAD_LEN_HUGE: {
int i;
if (frame_counter->stage == PAYLOAD_LEN_LARGE) {
assert(*size >= 2);
i = 2;
} else {
assert(*size >= 8);
i = 8;
frame_counter->bytes_consumed++;
frame_counter->current_payload_size <<= 8;
frame_counter->current_payload_size |= **buffer;
if ((frame_counter->stage == PAYLOAD_LEN_LARGE && frame_counter->bytes_consumed == 2) ||
(frame_counter->stage == PAYLOAD_LEN_HUGE && frame_counter->bytes_consumed == 8)) {
frame_counter->current_message_size += frame_counter->current_payload_size;
frame_counter->stage = frame_counter->payload_masked ? MASK : PAYLOAD;
frame_counter->bytes_consumed = 0;
}
do {
frame_counter->current_payload_size <<= 8;
frame_counter->current_payload_size |= **buffer;
move_buffer(buffer, size, 1);
} while (--i);
frame_counter->current_message_size += frame_counter->current_payload_size;
frame_counter->stage =
frame_counter->payload_masked ? MASK : PAYLOAD;
frame_counter->bytes_consumed = 0;
move_buffer(buffer, size, 1);
break;
}
case MASK:
Expand Down
32 changes: 19 additions & 13 deletions src/ngx_http_websocket_stat_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ typedef struct {
ngx_frame_counter_t frame_counter;
} ngx_http_websocket_stat_ctx;

typedef struct {
ngx_http_websocket_stat_ctx recv_ctx;
ngx_http_websocket_stat_ctx send_ctx;
} ngx_http_websocket_stat_request_ctx;

typedef struct {
char from_client : 1;
ngx_http_websocket_stat_ctx *ws_ctx;
Expand Down Expand Up @@ -145,15 +150,15 @@ my_send(ngx_connection_t *c, u_char *buf, size_t size)
{
ngx_http_request_t *r = c->data;
ngx_http_websocket_srv_conf_t *srvcf = ngx_http_get_module_srv_conf(r, ngx_http_websocket_stat_module);
ngx_http_websocket_stat_ctx *ctx = ngx_http_get_module_ctx(r, ngx_http_websocket_stat_module);
ngx_http_websocket_stat_request_ctx *request_ctx = ngx_http_get_module_ctx(r, ngx_http_websocket_stat_module);

if (check_ws_age(ctx->ws_conn_start_time, r) != NGX_OK) {
if (check_ws_age(request_ctx->send_ctx.ws_conn_start_time, r) != NGX_OK) {
return NGX_ERROR;
}

template_ctx_s template_ctx;
template_ctx.from_client = 0;
template_ctx.ws_ctx = ctx;
template_ctx.ws_ctx = &request_ctx->send_ctx;

int n = orig_send(c, buf, size);
if (n <= 0) {
Expand All @@ -164,7 +169,7 @@ my_send(ngx_connection_t *c, u_char *buf, size_t size)
ssize_t sz = n;

while (sz > 0) {
if (frame_counter_process_message(&buf, &sz, &(ctx->frame_counter))) {
if (frame_counter_process_message(&buf, &sz, &request_ctx->send_ctx.frame_counter)) {
ws_do_log(get_ws_log_template(&template_ctx, srvcf), r, &template_ctx);
}
}
Expand All @@ -178,7 +183,7 @@ my_recv(ngx_connection_t *c, u_char *buf, size_t size)
{
ngx_http_request_t *r = c->data;
ngx_http_websocket_srv_conf_t *srvcf = ngx_http_get_module_srv_conf(r, ngx_http_websocket_stat_module);
ngx_http_websocket_stat_ctx *ctx = ngx_http_get_module_ctx(r, ngx_http_websocket_stat_module);
ngx_http_websocket_stat_request_ctx *request_ctx = ngx_http_get_module_ctx(r, ngx_http_websocket_stat_module);

int n = orig_recv(c, buf, size);
if (n <= 0) {
Expand All @@ -188,14 +193,14 @@ my_recv(ngx_connection_t *c, u_char *buf, size_t size)
ssize_t sz = n;
template_ctx_s template_ctx;
template_ctx.from_client = 1;
template_ctx.ws_ctx = ctx;
template_ctx.ws_ctx = &request_ctx->recv_ctx;

if (check_ws_age(ctx->ws_conn_start_time, r) != NGX_OK) {
if (check_ws_age(request_ctx->recv_ctx.ws_conn_start_time, r) != NGX_OK) {
return NGX_ERROR;
}

while (sz > 0) {
if (frame_counter_process_message(&buf, &sz, &ctx->frame_counter)) {
if (frame_counter_process_message(&buf, &sz, &request_ctx->recv_ctx.frame_counter)) {
ws_do_log(get_ws_log_template(&template_ctx, srvcf), r, &template_ctx);
}
}
Expand Down Expand Up @@ -224,23 +229,24 @@ ngx_http_websocket_stat_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
if (r->headers_in.upgrade) {
if (r->upstream->peer.connection) {
// connection opened
ngx_http_websocket_stat_ctx *ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_websocket_stat_ctx));
ngx_http_websocket_stat_request_ctx *ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_websocket_stat_request_ctx));
if (!ctx) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_set_ctx(r, ctx, ngx_http_websocket_stat_module);
template_ctx_s template_ctx;
template_ctx.ws_ctx = ctx;
template_ctx.ws_ctx = &ctx->recv_ctx;
ws_do_log(srvcf->log_open_template, r, &template_ctx);
orig_recv = r->connection->recv;
r->connection->recv = my_recv;
orig_send = r->connection->send;
r->connection->send = my_send;
ctx->ws_conn_start_time = ngx_time();
ctx->recv_ctx.ws_conn_start_time = ngx_time();
ctx->send_ctx.ws_conn_start_time = ctx->recv_ctx.ws_conn_start_time;
} else {
ngx_http_websocket_stat_ctx *ctx = ngx_http_get_module_ctx(r, ngx_http_websocket_stat_module);
ngx_http_websocket_stat_request_ctx *ctx = ngx_http_get_module_ctx(r, ngx_http_websocket_stat_module);
template_ctx_s template_ctx;
template_ctx.ws_ctx = ctx;
template_ctx.ws_ctx = &ctx->recv_ctx;
ws_do_log(srvcf->log_close_template, r, &template_ctx);
}
}
Expand Down

0 comments on commit 71e9d01

Please sign in to comment.