Skip to content

Commit

Permalink
Fix benchmarks (#418)
Browse files Browse the repository at this point in the history
* move setup and resume frame handling from StandardReactiveSocket to ConnectionAutomaton

* fixing benchmarks build
  • Loading branch information
lehecka committed May 12, 2017
1 parent 8f801d4 commit a9bc269
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 52 deletions.
1 change: 1 addition & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function(benchmark name file)
${name}
rsocket_experimental
ReactiveSocket
yarpl
${GOOGLE_BENCHMARK_LIBS}
${FOLLY_LIBRARIES}
${GFLAGS_LIBRARY}
Expand Down
48 changes: 30 additions & 18 deletions benchmarks/RequestResponseLatency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <src/SubscriptionBase.h>
#include "rsocket/RSocket.h"
#include "rsocket/transports/TcpConnectionFactory.h"
#include "yarpl/Flowable.h"

using namespace ::reactivesocket;
using namespace ::folly;
Expand Down Expand Up @@ -58,24 +59,32 @@ class BM_Subscription : public SubscriptionBase {
std::atomic_bool cancelled_;
};

class BM_RequestHandler : public DefaultRequestHandler
class BM_RequestHandler : public RSocketRequestHandler
{
public:
void handleRequestResponse(
Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
{
LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;

response->onSubscribe(
std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
}

std::shared_ptr<StreamState> handleSetupPayload(
ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
{
LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
return nullptr;
}
// TODO(lehecka): enable when we have support for request-response
yarpl::Reference<yarpl::flowable::Flowable<reactivesocket::Payload>>
handleRequestStream(
reactivesocket::Payload request,
reactivesocket::StreamId streamId) override {
CHECK(false) << "not implemented";
}

// void handleRequestResponse(
// Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
// {
// LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;

// response->onSubscribe(
// std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
// }

// std::shared_ptr<StreamState> handleSetupPayload(
// ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
// {
// LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
// return nullptr;
// }
};

class BM_Subscriber
Expand Down Expand Up @@ -194,8 +203,11 @@ class BM_RsFixture : public benchmark::Fixture

BENCHMARK_F(BM_RsFixture, BM_RequestResponse_Latency)(benchmark::State &state)
{
auto clientRs = RSocket::createClient(
std::make_unique<TcpConnectionFactory>(host_, port_));
folly::SocketAddress address;
address.setFromHostPort(host_, port_);

auto clientRs = RSocket::createClient(std::make_unique<TcpConnectionFactory>(
std::move(address)));
int reqs = 0;

auto rs = clientRs->connect().get();
Expand Down
46 changes: 29 additions & 17 deletions benchmarks/RequestResponseThroughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <src/SubscriptionBase.h>
#include "rsocket/RSocket.h"
#include "rsocket/transports/TcpConnectionFactory.h"
#include "yarpl/Flowable.h"

using namespace ::reactivesocket;
using namespace ::folly;
Expand Down Expand Up @@ -58,24 +59,32 @@ class BM_Subscription : public SubscriptionBase {
std::atomic_bool cancelled_;
};

class BM_RequestHandler : public DefaultRequestHandler
class BM_RequestHandler : public RSocketRequestHandler
{
public:
void handleRequestResponse(
Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
{
LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;

response->onSubscribe(
std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
}

std::shared_ptr<StreamState> handleSetupPayload(
ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
{
LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
return nullptr;
}
// TODO(lehecka): enable when we have support for request-response
yarpl::Reference<yarpl::flowable::Flowable<reactivesocket::Payload>>
handleRequestStream(
reactivesocket::Payload request,
reactivesocket::StreamId streamId) override {
CHECK(false) << "not implemented";
}

// void handleRequestResponse(
// Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
// {
// LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request;

// response->onSubscribe(
// std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
// }

// std::shared_ptr<StreamState> handleSetupPayload(
// ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
// {
// LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
// return nullptr;
// }
};

class BM_Subscriber
Expand Down Expand Up @@ -193,8 +202,11 @@ class BM_RsFixture : public benchmark::Fixture

BENCHMARK_DEFINE_F(BM_RsFixture, BM_RequestResponse_Throughput)(benchmark::State &state)
{
folly::SocketAddress address;
address.setFromHostPort(host_, port_);

auto clientRs = RSocket::createClient(std::make_unique<TcpConnectionFactory>(
host_, port_));
std::move(address)));
int reqs = 0;
int numSubscribers = state.range(0);
int mask = numSubscribers - 1;
Expand Down
36 changes: 19 additions & 17 deletions benchmarks/StreamThroughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <src/NullRequestHandler.h>
#include <src/SubscriptionBase.h>
#include "rsocket/RSocket.h"
#include "rsocket/OldNewBridge.h"
#include "rsocket/transports/TcpConnectionFactory.h"
#include "yarpl/Flowables.h"

using namespace ::reactivesocket;
using namespace ::folly;
Expand Down Expand Up @@ -59,23 +61,17 @@ class BM_Subscription : public SubscriptionBase {
std::atomic_bool cancelled_;
};

class BM_RequestHandler : public DefaultRequestHandler
class BM_RequestHandler : public RSocketRequestHandler
{
public:
void handleRequestStream(
Payload request, StreamId streamId, const std::shared_ptr<Subscriber<Payload>> &response) noexcept override
{
LOG(INFO) << "BM_RequestHandler.handleRequestStream " << request;

response->onSubscribe(
std::make_shared<BM_Subscription>(response, MESSAGE_LENGTH));
}

std::shared_ptr<StreamState> handleSetupPayload(
ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override
{
LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request;
return nullptr;
yarpl::Reference<yarpl::flowable::Flowable<reactivesocket::Payload>>
handleRequestStream(
reactivesocket::Payload request,
reactivesocket::StreamId streamId) override {
CHECK(false) << "not implemented";
// TODO(lehecka) need to implement new operator fromGenerator
// return yarpl::flowable::Flowables::fromGenerator<reactivesocket::Payload>(
// []{return Payload(std::string(MESSAGE_LENGTH, 'a')); });
}
};

Expand Down Expand Up @@ -206,7 +202,11 @@ class BM_RsFixture : public benchmark::Fixture

BENCHMARK_DEFINE_F(BM_RsFixture, BM_Stream_Throughput)(benchmark::State &state)
{
auto clientRs = RSocket::createClient(TcpConnectionFactory::create(host_, port_));
folly::SocketAddress address;
address.setFromHostPort(host_, port_);

auto clientRs = RSocket::createClient(std::make_unique<TcpConnectionFactory>(
std::move(address)));

auto s = std::make_shared<BM_Subscriber>(state.range(0));

Expand All @@ -215,7 +215,9 @@ BENCHMARK_DEFINE_F(BM_RsFixture, BM_Stream_Throughput)(benchmark::State &state)
.then(
[s](std::shared_ptr<RSocketRequester> rs)
{
rs->requestStream(Payload("BM_Stream"), s);
rs->requestStream(Payload("BM_Stream"))->subscribe(
yarpl::Reference<yarpl::flowable::Subscriber<Payload>>(
new NewToOldSubscriber(s)));
});

while (state.KeepRunning())
Expand Down

0 comments on commit a9bc269

Please sign in to comment.