Skip to content

Commit

Permalink
handle chunked
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Oct 12, 2023
1 parent c80802b commit 7c0bcf2
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 24 deletions.
116 changes: 93 additions & 23 deletions include/cinatra/coro_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <asio/buffer.hpp>
#include <thread>

#include "asio/dispatch.hpp"
#include "asio/streambuf.hpp"
#include "async_simple/coro/Lazy.h"
#include "coro_http_request.hpp"
Expand All @@ -15,11 +16,19 @@
#include "ylt/coro_io/coro_io.hpp"

namespace cinatra {
struct chunked_result {
std::error_code ec;
bool eof = false;
std::string_view data;
};

class coro_http_connection {
public:
template <typename executor_t>
coro_http_connection(executor_t *executor, asio::ip::tcp::socket socket)
: executor_(executor), socket_(std::move(socket)), request_(parser_) {
: executor_(executor),
socket_(std::move(socket)),
request_(parser_, this) {
buffers_.reserve(3);
response_.set_response_cb([this]() -> async_simple::coro::Lazy<void> {
co_await reply();
Expand Down Expand Up @@ -48,29 +57,32 @@ class coro_http_connection {
head_buf_.consume(size);
keep_alive_ = check_keep_alive();

size_t body_len = parser_.body_len();
if (body_len <= head_buf_.size()) {
if (body_len > 0) {
detail::resize(body_, body_len);
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
memcpy(body_.data(), data_ptr, body_len);
head_buf_.consume(head_buf_.size());
if (!parser_.is_chunked()) {
size_t body_len = parser_.body_len();
if (body_len <= head_buf_.size()) {
if (body_len > 0) {
detail::resize(body_, body_len);
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
memcpy(body_.data(), data_ptr, body_len);
head_buf_.consume(head_buf_.size());
}
}
}
else {
size_t part_size = head_buf_.size();
size_t size_to_read = body_len - part_size;
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
detail::resize(body_, body_len);
memcpy(body_.data(), data_ptr, part_size);
head_buf_.consume(part_size);

auto [ec, size] = co_await async_read(
asio::buffer(body_.data() + part_size, size_to_read), size_to_read);
if (ec) {
CINATRA_LOG_ERROR << "async_read error: " << ec.message();
close();
break;
else {
size_t part_size = head_buf_.size();
size_t size_to_read = body_len - part_size;
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
detail::resize(body_, body_len);
memcpy(body_.data(), data_ptr, part_size);
head_buf_.consume(part_size);

auto [ec, size] = co_await async_read(
asio::buffer(body_.data() + part_size, size_to_read),
size_to_read);
if (ec) {
CINATRA_LOG_ERROR << "async_read error: " << ec.message();
close();
break;
}
}
}

Expand Down Expand Up @@ -122,6 +134,63 @@ class coro_http_connection {
}
}

async_simple::coro::Lazy<chunked_result> read_chunked() {
if (head_buf_.size() > 0) {
const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}

chunked_result result{};
std::error_code ec{};
size_t size = 0;

if (std::tie(ec, size) = co_await async_read_until(chunked_buf_, CRCF);
ec) {
result.ec = ec;
close();
co_return result;
}

size_t buf_size = chunked_buf_.size();
size_t additional_size = buf_size - size;
const char *data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data());
std::string_view size_str(data_ptr, size - CRCF.size());
auto chunk_size = hex_to_int(size_str);
chunked_buf_.consume(size);
if (chunk_size < 0) {
CINATRA_LOG_DEBUG << "bad chunked size";
ec = asio::error::make_error_code(
asio::error::basic_errors::invalid_argument);
result.ec = ec;
co_return result;
}

if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(CRCF.size());
result.eof = true;
co_return result;
}

if (additional_size < size_t(chunk_size + 2)) {
// not a complete chunk, read left chunk data.
size_t size_to_read = chunk_size + 2 - additional_size;
if (std::tie(ec, size) = co_await async_read(chunked_buf_, size_to_read);
ec) {
result.ec = ec;
close();
co_return result;
}
}

data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data());
result.data = std::string_view{data_ptr, (size_t)chunk_size};
chunked_buf_.consume(chunk_size + CRCF.size());

co_return result;
}

auto &socket() { return socket_; }

void set_quit_callback(std::function<void(const uint64_t &conn_id)> callback,
Expand Down Expand Up @@ -182,6 +251,7 @@ class coro_http_connection {
asio::ip::tcp::socket socket_;
asio::streambuf head_buf_;
std::string body_;
asio::streambuf chunked_buf_;
http_parser parser_;
bool keep_alive_;
coro_http_response response_;
Expand Down
8 changes: 7 additions & 1 deletion include/cinatra/coro_http_request.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once
#include "async_simple/coro/Lazy.h"
#include "define.h"
#include "http_parser.hpp"

namespace cinatra {
class coro_http_connection;
class coro_http_request {
public:
coro_http_request(http_parser& parser) : parser_(parser) {}
coro_http_request(http_parser& parser, coro_http_connection* conn)
: parser_(parser), conn_(conn) {}

std::string_view get_header_value(std::string_view key) {
auto headers = parser_.get_headers();
Expand Down Expand Up @@ -81,8 +84,11 @@ class coro_http_request {
return content_type::unknown;
}

coro_http_connection* get_conn() { return conn_; }

private:
http_parser& parser_;
std::string_view body_;
coro_http_connection* conn_;
};
} // namespace cinatra
44 changes: 44 additions & 0 deletions tests/test_coro_http_server.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#include <chrono>
#include <future>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <system_error>
#include <thread>

#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/SyncAwait.h"
#include "cinatra/coro_connection.hpp"
#include "cinatra/define.h"
#include "cinatra/response_cv.hpp"
#include "cinatra/utils.hpp"
#include "cinatra/ylt/coro_io/coro_io.hpp"
#include "cinatra/ylt/coro_io/io_context_pool.hpp"
#define DOCTEST_CONFIG_IMPLEMENT
Expand Down Expand Up @@ -283,6 +289,44 @@ TEST_CASE("delay reply, server stop, form-urlencode, qureies, throw") {
std::cout << "ok\n";
}

TEST_CASE("chunked request") {
cinatra::coro_http_server server(1, 9001);
server.set_http_handler<cinatra::GET, cinatra::POST>(
"/chunked",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
assert(req.get_content_type() == content_type::chunked);
chunked_result result{};
std::string content;

while (true) {
result = co_await req.get_conn()->read_chunked();
if (result.ec) {
co_return;
}
if (result.eof) {
break;
}

content.append(result.data);
}

std::cout << content << "\n";
resp.set_status_and_content(status_type::ok, "chunked ok");
});

server.async_start();
std::this_thread::sleep_for(200ms);

coro_http_client client{};
auto ss = std::make_shared<std::stringstream>();
*ss << "hello world";
auto result = async_simple::coro::syncAwait(client.async_upload_chunked(
"http://127.0.0.1:9001/chunked"sv, http_method::POST, ss));
CHECK(result.status == 200);
CHECK(result.resp_body == "chunked ok");
}

DOCTEST_MSVC_SUPPRESS_WARNING_WITH_PUSH(4007)
int main(int argc, char **argv) { return doctest::Context(argc, argv).run(); }
DOCTEST_MSVC_SUPPRESS_WARNING_POP

0 comments on commit 7c0bcf2

Please sign in to comment.