Skip to content

Commit

Permalink
feat: added SSE implementation for the events
Browse files Browse the repository at this point in the history
Also:
- removed ENET noise from the event bus
- fixed exception when loading up the config.toml
- simplified reflection
  • Loading branch information
ABeltramo committed Sep 11, 2024
1 parent 858a9a7 commit d3eb854
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 319 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ jobs:
imageTag: devcontainer
imageName: ghcr.io/${{ github.repository_owner }}/wolf
cacheFrom: ghcr.io/${{ github.repository_owner }}/wolf:${{github.ref_name}}
push: always
# push: always
# TODO:
# runCmd: |
# cmake -Bbuild \
Expand Down
2 changes: 1 addition & 1 deletion src/core/src/core/audio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ constexpr auto SAMPLE_RATE = 48000;

struct AudioMode {

enum Speakers {
enum class Speakers {
FRONT_LEFT,
FRONT_RIGHT,
FRONT_CENTER,
Expand Down
18 changes: 9 additions & 9 deletions src/core/src/platforms/linux/pulseaudio/pulse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@ template <> class formatter<wolf::core::audio::AudioMode::Speakers> {
// Mapping taken from
// https://www.freedesktop.org/wiki/Software/PulseAudio/Documentation/User/Modules/#module-null-sink
switch (speaker) {
case wolf::core::audio::AudioMode::FRONT_LEFT:
case wolf::core::audio::AudioMode::Speakers::FRONT_LEFT:
speaker_name = "front-left";
break;
case wolf::core::audio::AudioMode::FRONT_RIGHT:
case wolf::core::audio::AudioMode::Speakers::FRONT_RIGHT:
speaker_name = "front-right";
break;
case wolf::core::audio::AudioMode::FRONT_CENTER:
case wolf::core::audio::AudioMode::Speakers::FRONT_CENTER:
speaker_name = "front-center";
break;
case wolf::core::audio::AudioMode::LOW_FREQUENCY:
case wolf::core::audio::AudioMode::Speakers::LOW_FREQUENCY:
speaker_name = "lfe";
break;
case wolf::core::audio::AudioMode::BACK_LEFT:
case wolf::core::audio::AudioMode::Speakers::BACK_LEFT:
speaker_name = "rear-left";
break;
case wolf::core::audio::AudioMode::BACK_RIGHT:
case wolf::core::audio::AudioMode::Speakers::BACK_RIGHT:
speaker_name = "rear-right";
break;
case wolf::core::audio::AudioMode::SIDE_LEFT:
case wolf::core::audio::AudioMode::Speakers::SIDE_LEFT:
speaker_name = "side-left";
break;
case wolf::core::audio::AudioMode::SIDE_RIGHT:
case wolf::core::audio::AudioMode::Speakers::SIDE_RIGHT:
speaker_name = "side-right";
break;
case wolf::core::audio::AudioMode::MAX_SPEAKERS:
case wolf::core::audio::AudioMode::Speakers::MAX_SPEAKERS:
break;
}
return fmt::format_to(ctx.out(), "{}", speaker_name);
Expand Down
195 changes: 6 additions & 189 deletions src/moonlight-server/api/api.cpp
Original file line number Diff line number Diff line change
@@ -1,210 +1,27 @@
#include <api/api.hpp>
#include <boost/asio.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <events/reflectors.hpp>
#include <helpers/tsqueue.hpp>
#include <memory>
#include <rfl.hpp>
#include <rfl/json.hpp>
#include <server_http.hpp>

namespace wolf::api {

using namespace wolf::core;

std::string to_str(boost::asio::streambuf &streambuf) {
return {buffers_begin(streambuf.data()), buffers_end(streambuf.data())};
}

UnixSocketServer::UnixSocketServer(boost::asio::io_context &io_context,
const std::string &socket_path,
immer::box<state::AppState> app_state)
: io_context_(io_context), app_state(app_state),
acceptor_(io_context, boost::asio::local::stream_protocol::endpoint(socket_path)) {

http.add(HTTPMethod::GET,
"/api/v1/events",
{.summary = "Subscribe to events",
.description = "This endpoint allows clients to subscribe to events using HTTP 0.9. \n"
"Events are sent as JSON objects, one per line. Clients should expect to receive a newline "
"character after each event. The connection will be kept open until the client closes it.",
.handler = [this](auto req, auto socket) { endpoint_Events(req, socket); }});

http.add(
HTTPMethod::GET,
"/api/v1/pending-pair-requests",
{
.summary = "Get pending pair requests",
.description = "This endpoint returns a list of Moonlight clients that are currently waiting to be paired.",
.response_description = {{200, {.json_schema = rfl::json::to_schema<PendingPairRequestsResponse>()}}},
.handler = [this](auto req, auto socket) { endpoint_PendingPairRequest(req, socket); },
});

http.add(HTTPMethod::POST,
"/api/v1/pair-client",
{
.summary = "Pair a client",
.request_description = APIDescription{.json_schema = rfl::json::to_schema<PairRequest>()},
.response_description = {{200, {.json_schema = rfl::json::to_schema<PairResponse>()}},
{500, {.json_schema = rfl::json::to_schema<PairResponseError>()}}},
.handler = [this](auto req, auto socket) { endpoint_Pair(req, socket); },
});

auto openapi_schema = http.openapi_schema();
http.add(HTTPMethod::GET, "/api/v1/openapi-schema", {.handler = [this, openapi_schema](auto req, auto socket) {
send_http(socket, 200, openapi_schema);
}});

start_accept();
}

void UnixSocketServer::broadcast_event(const std::string &event_json) {
logs::log(logs::trace, "[API] Sending event: {}", event_json);
for (auto &socket : sockets_) {
boost::asio::async_write(socket->socket,
boost::asio::buffer(event_json + "\n"),
[this, socket](const boost::system::error_code &ec, std::size_t /*length*/) {
if (ec) {
logs::log(logs::error, "[API] Error sending event: {}", ec.message());
close(*socket);
}
});
}
}

void UnixSocketServer::cleanup_sockets() {
sockets_.erase(std::remove_if(sockets_.begin(), sockets_.end(), [](const auto &socket) { return !socket->is_alive; }),
sockets_.end());
}

void UnixSocketServer::send_http(std::shared_ptr<UnixSocket> socket, int status_code, std::string_view body) {
auto http_reply = fmt::format("HTTP/1.0 {} OK\r\nContent-Length: {}\r\n\r\n{}", status_code, body.size(), body);
boost::asio::async_write(socket->socket,
boost::asio::buffer(http_reply),
[this, socket](const boost::system::error_code &ec, std::size_t /*length*/) {
if (ec) {
logs::log(logs::error, "[API] Error sending HTTP: {}", ec.message());
close(*socket);
}
});
}

void UnixSocketServer::handle_request(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket) {
logs::log(logs::debug, "[API] Received request: {} {} - {}", rfl::enum_to_string(req.method), req.path, req.body);

if (!http.handle_request(req, socket)) {
send_http(socket, 404, "");
close(*socket);
}
}

void UnixSocketServer::start_connection(std::shared_ptr<UnixSocket> socket) {
auto request_buf = std::make_shared<boost::asio::streambuf>(std::numeric_limits<std::size_t>::max());
boost::asio::async_read_until(
socket->socket,
*request_buf,
"\r\n\r\n",
[this, socket, request_buf](const boost::system::error_code &ec, std::size_t bytes_transferred) {
if (ec) {
logs::log(logs::error, "[API] Error reading request: {}", ec.message());
close(*socket);
return;
}
HTTPRequest req = {};
std::string method;
std::istream is(request_buf.get());
SimpleWeb::RequestMessage::parse(is, method, req.path, req.query_string, req.http_version, req.headers);
if (method == "GET")
req.method = HTTPMethod::GET;
else if (method == "POST")
req.method = HTTPMethod::POST;
else if (method == "PUT")
req.method = HTTPMethod::PUT;
else if (method == "DELETE")
req.method = HTTPMethod::DELETE;
else
req.method = HTTPMethod::GET;

if (req.headers.contains("Transfer-Encoding") && req.headers.find("Transfer-Encoding")->second == "chunked") {
logs::log(logs::error, "[API] Chunked encoding not supported, use HTTP/1.0 instead");
close(*socket);
return;
}

// Get the body payload
if (req.headers.contains("Content-Length")) {
auto content_length = std::stoul(req.headers.find("Content-Length")->second);
std::size_t num_additional_bytes = request_buf->size() - bytes_transferred;
if (content_length > num_additional_bytes) {
boost::asio::async_read(socket->socket,
*request_buf,
boost::asio::transfer_exactly(content_length - bytes_transferred),
[this, socket, request_buf, req = std::make_unique<HTTPRequest>(req)](
const boost::system::error_code &ec,
std::size_t /*bytes_transferred*/) {
if (ec) {
logs::log(logs::error, "[API] Error reading request body: {}", ec.message());
close(*socket);
return;
}
req->body = to_str(*request_buf);
handle_request(*req, socket);
});
} else {
req.body = to_str(*request_buf);
handle_request(req, socket);
}
} else {
handle_request(req, socket);
}
});
}

void UnixSocketServer::start_accept() {
auto socket =
std::make_shared<UnixSocket>(UnixSocket{.socket = boost::asio::local::stream_protocol::socket(io_context_)});
acceptor_.async_accept(socket->socket, [this, socket](const boost::system::error_code &ec) {
if (!ec) {
start_accept(); // Immediately start accepting a new connection
start_connection(socket); // Start reading the request from the new connection
} else {
logs::log(logs::error, "[API] Error accepting connection: {}", ec.message());
close(*socket);
}
});
}

void UnixSocketServer::close(UnixSocket &socket) {
socket.socket.close();
socket.is_alive = false;
}

void start_server(immer::box<state::AppState> app_state) {
auto socket_path = "/tmp/wolf.sock";
logs::log(logs::info, "Starting API server on {}", socket_path);

auto event_queue = std::make_shared<TSQueue<events::EventsVariant>>();

auto global_ev_handler = app_state->event_bus->register_global_handler([event_queue](events::EventsVariant ev) {
/**
* We could do server.send_event() here, but since this callback will be running from the fire_event() thread,
* we don't want to block it. Instead, we'll push the event to a queue and have a separate thread handle the
* sending of the event.
*/
event_queue->push(std::move(ev));
});

::unlink(socket_path);
boost::asio::io_context io_context;
UnixSocketServer server(io_context, socket_path, app_state);
auto server_ptr = std::make_shared<UnixSocketServer>(server);

auto global_ev_handler = app_state->event_bus->register_global_handler([server_ptr](events::EventsVariant ev) {
std::visit([server_ptr](auto &&arg) { server_ptr->broadcast_event(arg->event_type, rfl::json::write(*arg)); }, ev);
});

while (true) {
io_context.run_for(std::chrono::milliseconds(100));
server.cleanup_sockets();
while (auto ev = event_queue->pop(std::chrono::milliseconds(0))) {
std::visit([&server](auto &&arg) { server.broadcast_event(rfl::json::write(*arg)); }, *ev);
}
}
io_context.run();
}

} // namespace wolf::api
29 changes: 22 additions & 7 deletions src/moonlight-server/api/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,41 @@ class UnixSocketServer {
const std::string &socket_path,
immer::box<state::AppState> app_state);

void broadcast_event(const std::string &event_json);
UnixSocketServer(const UnixSocketServer &) = default;

void cleanup_sockets();
void broadcast_event(const std::string &event_type, const std::string &event_json);

private:
void endpoint_Events(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket);
void endpoint_PendingPairRequest(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket);
void endpoint_Pair(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket);

void sse_broadcast(const std::string &payload);
void sse_keepalive(const boost::system::error_code &e);

void send_http(std::shared_ptr<UnixSocket> socket, int status_code, std::string_view body);
void send_http(std::shared_ptr<UnixSocket> socket,
int status_code,
const std::vector<std::string_view> &http_headers,
std::string_view body);

void handle_request(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket);
void start_connection(std::shared_ptr<UnixSocket> socket);
void start_accept();

void cleanup_sockets();
void close(UnixSocket &socket);

boost::asio::io_context &io_context_;
boost::asio::local::stream_protocol::acceptor acceptor_;
std::vector<std::shared_ptr<UnixSocket>> sockets_ = {};
immer::box<state::AppState> app_state;
HTTPServer<std::shared_ptr<UnixSocket>> http = {};
struct UnixSocketState {
boost::asio::io_context &io_context;
immer::box<state::AppState> app_state;
boost::asio::local::stream_protocol::acceptor acceptor;
std::vector<std::shared_ptr<UnixSocket>> sockets;
HTTPServer<std::shared_ptr<UnixSocket>> http;
boost::asio::steady_timer sse_keepalive_timer;
};

std::shared_ptr<UnixSocketState> state_;
};

} // namespace wolf::api
12 changes: 8 additions & 4 deletions src/moonlight-server/api/endpoints.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
namespace wolf::api {

void UnixSocketServer::endpoint_Events(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket) {
// curl -v --http0.9 --unix-socket /tmp/wolf.sock http://localhost/api/v1/events
sockets_.push_back(socket);
// curl -N --unix-socket /tmp/wolf.sock http://localhost/api/v1/events
state_->sockets.push_back(socket);
send_http(socket,
200,
{{"Content-Type: text/event-stream"}, {"Connection: keep-alive"}, {"Cache-Control: no-cache"}},
""); // Inform clients this is going to be SSE
}

void UnixSocketServer::endpoint_PendingPairRequest(const HTTPRequest &req, std::shared_ptr<UnixSocket> socket) {
// curl -v --http1.0 --unix-socket /tmp/wolf.sock http://localhost/api/v1/pending-pair-requests
auto res = PendingPairRequestsResponse{.success = true};
auto requests = std::vector<PairRequest>();
for (auto [secret, pair_request] : *app_state->pairing_atom->load()) {
for (auto [secret, pair_request] : *(state_->app_state)->pairing_atom->load()) {
requests.push_back({.pair_secret = secret, .pin = pair_request->client_ip});
}
send_http(socket, 200, rfl::json::write(res));
Expand All @@ -21,7 +25,7 @@ void UnixSocketServer::endpoint_Pair(const HTTPRequest &req, std::shared_ptr<Uni
// curl -v --http1.0 --unix-socket /tmp/wolf.sock -d '{"pair_secret": "xxxx", "pin": "1234"}'
// http://localhost/api/v1/pair-client
if (auto event = rfl::json::read<PairRequest>(req.body)) {
if (auto pair_request = app_state->pairing_atom->load()->find(event.value().pair_secret)) {
if (auto pair_request = state_->app_state->pairing_atom->load()->find(event.value().pair_secret)) {
pair_request->get().user_pin->set_value(event.value().pin.value()); // Resolve the promise
auto res = PairResponse{.success = true};
send_http(socket, 200, rfl::json::write(res));
Expand Down
Loading

0 comments on commit d3eb854

Please sign in to comment.