From a9bc2696cddfcc16eb5fbd1d92f1a2ff8db8bbd2 Mon Sep 17 00:00:00 2001 From: Ondrej Lehecka Date: Fri, 12 May 2017 15:55:29 -0700 Subject: [PATCH] Fix benchmarks (#418) * move setup and resume frame handling from StandardReactiveSocket to ConnectionAutomaton * fixing benchmarks build --- benchmarks/CMakeLists.txt | 1 + benchmarks/RequestResponseLatency.cpp | 48 +++++++++++++++--------- benchmarks/RequestResponseThroughput.cpp | 46 ++++++++++++++--------- benchmarks/StreamThroughput.cpp | 36 +++++++++--------- 4 files changed, 79 insertions(+), 52 deletions(-) diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index a890f8101..74ac0235d 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -29,6 +29,7 @@ function(benchmark name file) ${name} rsocket_experimental ReactiveSocket + yarpl ${GOOGLE_BENCHMARK_LIBS} ${FOLLY_LIBRARIES} ${GFLAGS_LIBRARY} diff --git a/benchmarks/RequestResponseLatency.cpp b/benchmarks/RequestResponseLatency.cpp index 052207a89..dbcbd96be 100644 --- a/benchmarks/RequestResponseLatency.cpp +++ b/benchmarks/RequestResponseLatency.cpp @@ -10,6 +10,7 @@ #include #include "rsocket/RSocket.h" #include "rsocket/transports/TcpConnectionFactory.h" +#include "yarpl/Flowable.h" using namespace ::reactivesocket; using namespace ::folly; @@ -58,24 +59,32 @@ class BM_Subscription : public SubscriptionBase { std::atomic_bool cancelled_; }; -class BM_RequestHandler : public DefaultRequestHandler +class BM_RequestHandler : public RSocketRequestHandler { public: - void handleRequestResponse( - Payload request, StreamId streamId, const std::shared_ptr> &response) noexcept override - { - LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request; - - response->onSubscribe( - std::make_shared(response, MESSAGE_LENGTH)); - } - - std::shared_ptr handleSetupPayload( - ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override - { - LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request; - return nullptr; - } + // TODO(lehecka): enable when we have support for request-response + yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) override { + CHECK(false) << "not implemented"; + } + + // void handleRequestResponse( + // Payload request, StreamId streamId, const std::shared_ptr> &response) noexcept override + // { + // LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request; + + // response->onSubscribe( + // std::make_shared(response, MESSAGE_LENGTH)); + // } + + // std::shared_ptr handleSetupPayload( + // ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override + // { + // LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request; + // return nullptr; + // } }; class BM_Subscriber @@ -194,8 +203,11 @@ class BM_RsFixture : public benchmark::Fixture BENCHMARK_F(BM_RsFixture, BM_RequestResponse_Latency)(benchmark::State &state) { - auto clientRs = RSocket::createClient( - std::make_unique(host_, port_)); + folly::SocketAddress address; + address.setFromHostPort(host_, port_); + + auto clientRs = RSocket::createClient(std::make_unique( + std::move(address))); int reqs = 0; auto rs = clientRs->connect().get(); diff --git a/benchmarks/RequestResponseThroughput.cpp b/benchmarks/RequestResponseThroughput.cpp index 0e8b255b2..733fe3676 100644 --- a/benchmarks/RequestResponseThroughput.cpp +++ b/benchmarks/RequestResponseThroughput.cpp @@ -10,6 +10,7 @@ #include #include "rsocket/RSocket.h" #include "rsocket/transports/TcpConnectionFactory.h" +#include "yarpl/Flowable.h" using namespace ::reactivesocket; using namespace ::folly; @@ -58,24 +59,32 @@ class BM_Subscription : public SubscriptionBase { std::atomic_bool cancelled_; }; -class BM_RequestHandler : public DefaultRequestHandler +class BM_RequestHandler : public RSocketRequestHandler { public: - void handleRequestResponse( - Payload request, StreamId streamId, const std::shared_ptr> &response) noexcept override - { - LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request; - - response->onSubscribe( - std::make_shared(response, MESSAGE_LENGTH)); - } - - std::shared_ptr handleSetupPayload( - ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override - { - LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request; - return nullptr; - } + // TODO(lehecka): enable when we have support for request-response + yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) override { + CHECK(false) << "not implemented"; + } + + // void handleRequestResponse( + // Payload request, StreamId streamId, const std::shared_ptr> &response) noexcept override + // { + // LOG(INFO) << "BM_RequestHandler.handleRequestResponse " << request; + + // response->onSubscribe( + // std::make_shared(response, MESSAGE_LENGTH)); + // } + + // std::shared_ptr handleSetupPayload( + // ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override + // { + // LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request; + // return nullptr; + // } }; class BM_Subscriber @@ -193,8 +202,11 @@ class BM_RsFixture : public benchmark::Fixture BENCHMARK_DEFINE_F(BM_RsFixture, BM_RequestResponse_Throughput)(benchmark::State &state) { + folly::SocketAddress address; + address.setFromHostPort(host_, port_); + auto clientRs = RSocket::createClient(std::make_unique( - host_, port_)); + std::move(address))); int reqs = 0; int numSubscribers = state.range(0); int mask = numSubscribers - 1; diff --git a/benchmarks/StreamThroughput.cpp b/benchmarks/StreamThroughput.cpp index e93644b4e..7715d2fed 100644 --- a/benchmarks/StreamThroughput.cpp +++ b/benchmarks/StreamThroughput.cpp @@ -9,7 +9,9 @@ #include #include #include "rsocket/RSocket.h" +#include "rsocket/OldNewBridge.h" #include "rsocket/transports/TcpConnectionFactory.h" +#include "yarpl/Flowables.h" using namespace ::reactivesocket; using namespace ::folly; @@ -59,23 +61,17 @@ class BM_Subscription : public SubscriptionBase { std::atomic_bool cancelled_; }; -class BM_RequestHandler : public DefaultRequestHandler +class BM_RequestHandler : public RSocketRequestHandler { public: - void handleRequestStream( - Payload request, StreamId streamId, const std::shared_ptr> &response) noexcept override - { - LOG(INFO) << "BM_RequestHandler.handleRequestStream " << request; - - response->onSubscribe( - std::make_shared(response, MESSAGE_LENGTH)); - } - - std::shared_ptr handleSetupPayload( - ReactiveSocket &socket, ConnectionSetupPayload request) noexcept override - { - LOG(INFO) << "BM_RequestHandler.handleSetupPayload " << request; - return nullptr; + yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) override { + CHECK(false) << "not implemented"; + // TODO(lehecka) need to implement new operator fromGenerator + // return yarpl::flowable::Flowables::fromGenerator( + // []{return Payload(std::string(MESSAGE_LENGTH, 'a')); }); } }; @@ -206,7 +202,11 @@ class BM_RsFixture : public benchmark::Fixture BENCHMARK_DEFINE_F(BM_RsFixture, BM_Stream_Throughput)(benchmark::State &state) { - auto clientRs = RSocket::createClient(TcpConnectionFactory::create(host_, port_)); + folly::SocketAddress address; + address.setFromHostPort(host_, port_); + + auto clientRs = RSocket::createClient(std::make_unique( + std::move(address))); auto s = std::make_shared(state.range(0)); @@ -215,7 +215,9 @@ BENCHMARK_DEFINE_F(BM_RsFixture, BM_Stream_Throughput)(benchmark::State &state) .then( [s](std::shared_ptr rs) { - rs->requestStream(Payload("BM_Stream"), s); + rs->requestStream(Payload("BM_Stream"))->subscribe( + yarpl::Reference>( + new NewToOldSubscriber(s))); }); while (state.KeepRunning())