diff --git a/CMakeLists.txt b/CMakeLists.txt index 7bb075f0d..24cba67bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -381,6 +381,8 @@ add_subdirectory(experimental/yarpl) add_library( rsocket_experimental experimental/rsocket/RSocket.h + # TODO remove when ReactiveStreams all synced + experimental/rsocket/OldNewBridge.h experimental/rsocket-src/RSocket.cpp experimental/rsocket/RSocketServer.h experimental/rsocket-src/RSocketServer.cpp @@ -399,9 +401,10 @@ add_library( experimental/rsocket-src/transports/TcpConnectionAcceptor.cpp experimental/rsocket/transports/TcpConnectionFactory.h experimental/rsocket-src/transports/TcpConnectionFactory.cpp + experimental/rsocket/RSocketRequestHandler.h ) -add_dependencies(rsocket_experimental ReactiveStreams) +add_dependencies(rsocket_experimental ReactiveStreams yarpl) # include the experimental includes for usage target_include_directories(rsocket_experimental PUBLIC "${PROJECT_SOURCE_DIR}/experimental") @@ -416,8 +419,6 @@ add_executable( experimental/rsocket-test/RSocketClientServerTest.cpp experimental/rsocket-test/handlers/HelloStreamRequestHandler.h experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp - experimental/rsocket-test/handlers/HelloStreamSubscription.h - experimental/rsocket-test/handlers/HelloStreamSubscription.cpp ) target_link_libraries( @@ -459,16 +460,14 @@ target_link_libraries( add_executable( example_stream-hello-world-server examples/stream-hello-world/StreamHelloWorld_Server.cpp - examples/stream-hello-world/HelloStreamSubscription.cpp - examples/stream-hello-world/HelloStreamSubscription.h - examples/stream-hello-world/HelloStreamRequestHandler.cpp - examples/stream-hello-world/HelloStreamRequestHandler.h) +) target_link_libraries( example_stream-hello-world-server ReactiveSocket rsocket_experimental reactivesocket_examples_util + yarpl ${FOLLY_LIBRARIES} ${GFLAGS_LIBRARY} ${GLOG_LIBRARY} @@ -484,6 +483,7 @@ target_link_libraries( ReactiveSocket rsocket_experimental reactivesocket_examples_util + yarpl ${FOLLY_LIBRARIES} ${GFLAGS_LIBRARY} ${GLOG_LIBRARY} @@ -496,15 +496,16 @@ add_executable( examples/conditional-request-handling/ConditionalRequestHandling_Server.cpp examples/conditional-request-handling/TextRequestHandler.h examples/conditional-request-handling/TextRequestHandler.cpp - examples/conditional-request-handling/ConditionalRequestSubscription.h - examples/conditional-request-handling/ConditionalRequestSubscription.cpp - examples/conditional-request-handling/JsonRequestHandler.cpp examples/conditional-request-handling/JsonRequestHandler.h) + examples/conditional-request-handling/JsonRequestHandler.cpp + examples/conditional-request-handling/JsonRequestHandler.h +) target_link_libraries( example_conditional-request-handling-server ReactiveSocket rsocket_experimental reactivesocket_examples_util + yarpl ${FOLLY_LIBRARIES} ${GFLAGS_LIBRARY} ${GLOG_LIBRARY} @@ -520,6 +521,7 @@ target_link_libraries( ReactiveSocket rsocket_experimental reactivesocket_examples_util + yarpl ${FOLLY_LIBRARIES} ${GFLAGS_LIBRARY} ${GLOG_LIBRARY} diff --git a/examples/conditional-request-handling/ConditionalRequestHandling_Client.cpp b/examples/conditional-request-handling/ConditionalRequestHandling_Client.cpp index aed72d55a..31514c2b4 100644 --- a/examples/conditional-request-handling/ConditionalRequestHandling_Client.cpp +++ b/examples/conditional-request-handling/ConditionalRequestHandling_Client.cpp @@ -1,15 +1,19 @@ // Copyright 2004-present Facebook. All Rights Reserved. +#include #include #include #include "examples/util/ExampleSubscriber.h" #include "rsocket/RSocket.h" #include "rsocket/transports/TcpConnectionFactory.h" +#include "yarpl/v/Flowable.h" +#include "yarpl/v/Subscriber.h" using namespace ::reactivesocket; using namespace ::folly; using namespace ::rsocket_example; using namespace ::rsocket; +using namespace yarpl; DEFINE_string(host, "localhost", "host to connect to"); DEFINE_int32(port, 9898, "host:port to connect to"); @@ -17,22 +21,24 @@ DEFINE_int32(port, 9898, "host:port to connect to"); int main(int argc, char* argv[]) { FLAGS_logtostderr = true; FLAGS_minloglevel = 0; - google::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - google::InstallFailureSignalHandler(); + folly::init(&argc, &argv); auto rsf = RSocket::createClient( TcpConnectionFactory::create(FLAGS_host, FLAGS_port)); auto rs = rsf->connect().get(); LOG(INFO) << "------------------ Hello Bob!"; - auto s1 = std::make_shared(5, 6); - rs->requestStream(Payload("Bob"), s1); + auto s1 = yarpl::Reference(new ExampleSubscriber(5, 6)); + rs->requestStream(Payload("Bob")) + ->take(5) + ->subscribe(yarpl::Reference>(s1.get())); s1->awaitTerminalEvent(); LOG(INFO) << "------------------ Hello Jane!"; - auto s2 = std::make_shared(5, 6); - rs->requestStream(Payload("Jane"), s2); + auto s2 = yarpl::Reference(new ExampleSubscriber(5, 6)); + rs->requestStream(Payload("Jane")) + ->take(3) + ->subscribe(yarpl::Reference>(s2.get())); s2->awaitTerminalEvent(); // TODO on shutdown the destruction of diff --git a/examples/conditional-request-handling/ConditionalRequestHandling_Server.cpp b/examples/conditional-request-handling/ConditionalRequestHandling_Server.cpp index fa9a28032..133d881bb 100644 --- a/examples/conditional-request-handling/ConditionalRequestHandling_Server.cpp +++ b/examples/conditional-request-handling/ConditionalRequestHandling_Server.cpp @@ -1,11 +1,12 @@ // Copyright 2004-present Facebook. All Rights Reserved. #include -#include "JsonRequestHandler.h" -#include "TextRequestHandler.h" +#include #include "rsocket/RSocket.h" #include "rsocket/RSocketErrors.h" #include "rsocket/transports/TcpConnectionAcceptor.h" +#include "JsonRequestHandler.h" +#include "TextRequestHandler.h" using namespace ::reactivesocket; using namespace ::folly; @@ -33,7 +34,7 @@ int main(int argc, char* argv[]) { // start accepting connections rs->startAndPark( [textHandler, jsonHandler](std::unique_ptr r) - -> std::shared_ptr { + -> std::shared_ptr { if (r->getDataMimeType() == "text/plain") { LOG(INFO) << "Connection Request => text/plain MimeType"; return textHandler; diff --git a/examples/conditional-request-handling/ConditionalRequestSubscription.cpp b/examples/conditional-request-handling/ConditionalRequestSubscription.cpp deleted file mode 100644 index bc742b22d..000000000 --- a/examples/conditional-request-handling/ConditionalRequestSubscription.cpp +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#include "ConditionalRequestSubscription.h" -#include - -namespace reactivesocket { - -// Emit a stream of ints starting at 0 until number of ints -// emitted matches 'numberToEmit' at which point onComplete() -// will be emitted. -// -// On each invocation will restrict emission to number of requested. -// -// This method has no concurrency since SubscriptionBase -// schedules this on an Executor sequentially -void ConditionalRequestSubscription::requestImpl(size_t n) noexcept { - LOG(INFO) << "requested=" << n << " currentElem=" << currentElem_ - << " numberToEmit=" << numberToEmit_; - - if (numberToEmit_ == 0) { - subscriber_->onComplete(); - return; - } - for (size_t i = 0; i < n; i++) { - if (cancelled_) { - LOG(INFO) << "emission stopped by cancellation"; - return; - } - std::stringstream ss; - ss << "Hello " << name_ << " " << currentElem_ << "!"; - std::string s = ss.str(); - subscriber_->onNext(Payload(s)); - // currentElem is used to track progress across requestImpl invocations - currentElem_++; - // break the loop and complete the stream if numberToEmit_ is matched - if (currentElem_ == numberToEmit_) { - subscriber_->onComplete(); - return; - } - } -} - -void ConditionalRequestSubscription::cancelImpl() noexcept { - LOG(INFO) << "cancellation received"; - // simple cancellation token (nothing to shut down, just stop next loop) - cancelled_ = true; -} - -ConditionalRequestSubscription::~ConditionalRequestSubscription() { - LOG(INFO) << "ConditionalRequestSubscription => destroyed"; -} -} diff --git a/examples/conditional-request-handling/ConditionalRequestSubscription.h b/examples/conditional-request-handling/ConditionalRequestSubscription.h deleted file mode 100644 index 51d989a0c..000000000 --- a/examples/conditional-request-handling/ConditionalRequestSubscription.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#pragma once - -#include -#include "src/DuplexConnection.h" -#include "src/Payload.h" -#include "src/SubscriptionBase.h" - -namespace reactivesocket { - -/// Emits a stream of ints -class ConditionalRequestSubscription : public SubscriptionBase { - public: - explicit ConditionalRequestSubscription( - std::shared_ptr> subscriber, - std::string name, - size_t numberToEmit = 2) - : ExecutorBase(defaultExecutor()), - subscriber_(std::move(subscriber)), - name_(std::move(name)), - numberToEmit_(numberToEmit), - cancelled_(false) {} - - ~ConditionalRequestSubscription(); - - private: - // Subscription methods - void requestImpl(size_t n) noexcept override; - void cancelImpl() noexcept override; - - std::shared_ptr> subscriber_; - std::string name_; - size_t numberToEmit_; - size_t currentElem_ = 0; - std::atomic_bool cancelled_; -}; -} diff --git a/examples/conditional-request-handling/JsonRequestHandler.cpp b/examples/conditional-request-handling/JsonRequestHandler.cpp index c7f13cd4a..cbe9e4e0d 100644 --- a/examples/conditional-request-handling/JsonRequestHandler.cpp +++ b/examples/conditional-request-handling/JsonRequestHandler.cpp @@ -2,29 +2,26 @@ #include "JsonRequestHandler.h" #include -#include "ConditionalRequestSubscription.h" +#include "yarpl/v/Flowable.h" +#include "yarpl/v/Flowables.h" -using namespace ::reactivesocket; +using namespace reactivesocket; +using namespace rsocket; +using namespace yarpl; /// Handles a new inbound Stream requested by the other end. -void JsonRequestHandler::handleRequestStream( - Payload request, - StreamId streamId, - const std::shared_ptr>& response) noexcept { +yarpl::Reference> +JsonRequestHandler::handleRequestStream(Payload request, StreamId streamId) { LOG(INFO) << "JsonRequestHandler.handleRequestStream " << request; // string from payload data - auto pds = request.moveDataToString(); - auto requestString = std::string(pds, request.data->length()); + auto requestString = request.moveDataToString(); - response->onSubscribe(std::make_shared( - response, requestString, 10)); -} - -std::shared_ptr JsonRequestHandler::handleSetupPayload( - ReactiveSocket& socket, - ConnectionSetupPayload request) noexcept { - LOG(INFO) << "JsonRequestHandler.handleSetupPayload " << request; - // TODO what should this do? - return nullptr; + return Flowables::range(1, 100)->map([name = std::move(requestString)]( + int64_t v) { + std::stringstream ss; + ss << "Hello (should be JSON) " << name << " " << v << "!"; + std::string s = ss.str(); + return Payload(s, "metadata"); + }); } diff --git a/examples/conditional-request-handling/JsonRequestHandler.h b/examples/conditional-request-handling/JsonRequestHandler.h index 2f35ef560..dbff1d6c1 100644 --- a/examples/conditional-request-handling/JsonRequestHandler.h +++ b/examples/conditional-request-handling/JsonRequestHandler.h @@ -2,24 +2,14 @@ #pragma once -#include -#include "src/NullRequestHandler.h" #include "src/Payload.h" -#include "src/ReactiveStreamsCompat.h" -#include "src/ReactiveSocket.h" -#include "src/SubscriptionBase.h" +#include "rsocket/RSocket.h" -class JsonRequestHandler : public reactivesocket::DefaultRequestHandler { +class JsonRequestHandler : public rsocket::RSocketRequestHandler { public: /// Handles a new inbound Stream requested by the other end. - void handleRequestStream( - reactivesocket::Payload request, - reactivesocket::StreamId streamId, - const std::shared_ptr< - reactivesocket::Subscriber>& - response) noexcept override; - - std::shared_ptr handleSetupPayload( - reactivesocket::ReactiveSocket&, - reactivesocket::ConnectionSetupPayload request) noexcept override; + yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) override; }; diff --git a/examples/conditional-request-handling/TextRequestHandler.cpp b/examples/conditional-request-handling/TextRequestHandler.cpp index 904a54615..2bcf85fe1 100644 --- a/examples/conditional-request-handling/TextRequestHandler.cpp +++ b/examples/conditional-request-handling/TextRequestHandler.cpp @@ -2,29 +2,26 @@ #include "TextRequestHandler.h" #include -#include "ConditionalRequestSubscription.h" +#include "yarpl/v/Flowable.h" +#include "yarpl/v/Flowables.h" -using namespace ::reactivesocket; +using namespace reactivesocket; +using namespace rsocket; +using namespace yarpl; /// Handles a new inbound Stream requested by the other end. -void TextRequestHandler::handleRequestStream( - Payload request, - StreamId streamId, - const std::shared_ptr>& response) noexcept { +yarpl::Reference> +TextRequestHandler::handleRequestStream(Payload request, StreamId streamId) { LOG(INFO) << "TextRequestHandler.handleRequestStream " << request; // string from payload data - auto pds = request.moveDataToString(); - auto requestString = std::string(pds, request.data->length()); + auto requestString = request.moveDataToString(); - response->onSubscribe(std::make_shared( - response, requestString, 10)); -} - -std::shared_ptr TextRequestHandler::handleSetupPayload( - ReactiveSocket& socket, - ConnectionSetupPayload request) noexcept { - LOG(INFO) << "TextRequestHandler.handleSetupPayload " << request; - // TODO what should this do? - return nullptr; + return Flowables::range(1, 100)->map([name = std::move(requestString)]( + int64_t v) { + std::stringstream ss; + ss << "Hello " << name << " " << v << "!"; + std::string s = ss.str(); + return Payload(s, "metadata"); + }); } diff --git a/examples/conditional-request-handling/TextRequestHandler.h b/examples/conditional-request-handling/TextRequestHandler.h index aaad0e19b..25f43bd1c 100644 --- a/examples/conditional-request-handling/TextRequestHandler.h +++ b/examples/conditional-request-handling/TextRequestHandler.h @@ -2,24 +2,14 @@ #pragma once -#include -#include "src/NullRequestHandler.h" #include "src/Payload.h" -#include "src/ReactiveStreamsCompat.h" -#include "src/ReactiveSocket.h" -#include "src/SubscriptionBase.h" - -class TextRequestHandler : public reactivesocket::DefaultRequestHandler { - public: - /// Handles a new inbound Stream requested by the other end. - void handleRequestStream( - reactivesocket::Payload request, - reactivesocket::StreamId streamId, - const std::shared_ptr< - reactivesocket::Subscriber>& - response) noexcept override; - - std::shared_ptr handleSetupPayload( - reactivesocket::ReactiveSocket&, - reactivesocket::ConnectionSetupPayload request) noexcept override; +#include "rsocket/RSocket.h" + +class TextRequestHandler : public rsocket::RSocketRequestHandler { +public: + /// Handles a new inbound Stream requested by the other end. + yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) override; }; diff --git a/examples/stream-hello-world/HelloStreamRequestHandler.cpp b/examples/stream-hello-world/HelloStreamRequestHandler.cpp deleted file mode 100644 index 0ac163206..000000000 --- a/examples/stream-hello-world/HelloStreamRequestHandler.cpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#include "HelloStreamRequestHandler.h" -#include -#include "HelloStreamSubscription.h" - -using namespace ::reactivesocket; - -/// Handles a new inbound Stream requested by the other end. -void HelloStreamRequestHandler::handleRequestStream( - Payload request, - StreamId streamId, - const std::shared_ptr>& response) noexcept { - LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request; - - // string from payload data - const char* p = reinterpret_cast(request.data->data()); - auto requestString = std::string(p, request.data->length()); - - response->onSubscribe( - std::make_shared(response, requestString, 10)); -} - -std::shared_ptr HelloStreamRequestHandler::handleSetupPayload( - ReactiveSocket& socket, - ConnectionSetupPayload request) noexcept { - LOG(INFO) << "HelloStreamRequestHandler.handleSetupPayload " << request; - // TODO what should this do? - return nullptr; -} diff --git a/examples/stream-hello-world/HelloStreamSubscription.cpp b/examples/stream-hello-world/HelloStreamSubscription.cpp deleted file mode 100644 index 72fec1056..000000000 --- a/examples/stream-hello-world/HelloStreamSubscription.cpp +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#include "HelloStreamSubscription.h" - -#include -#include - -namespace reactivesocket { - -// Emit a stream of ints starting at 0 until number of ints -// emitted matches 'numberToEmit' at which point onComplete() -// will be emitted. -// -// On each invocation will restrict emission to number of requested. -// -// This method has no concurrency since SubscriptionBase -// schedules this on an Executor sequentially -void HelloStreamSubscription::requestImpl(size_t n) noexcept { - LOG(INFO) << "requested=" << n << " currentElem=" << currentElem_ - << " numberToEmit=" << numberToEmit_; - - if (numberToEmit_ == 0) { - subscriber_->onComplete(); - return; - } - for (size_t i = 0; i < n; i++) { - if (cancelled_) { - LOG(INFO) << "emission stopped by cancellation"; - return; - } - std::stringstream ss; - ss << "Hello " << name_ << " " << currentElem_ << "!"; - std::string s = ss.str(); - subscriber_->onNext(Payload(s)); - // currentElem is used to track progress across requestImpl invocations - currentElem_++; - // break the loop and complete the stream if numberToEmit_ is matched - if (currentElem_ == numberToEmit_) { - subscriber_->onComplete(); - return; - } - } -} - -void HelloStreamSubscription::cancelImpl() noexcept { - LOG(INFO) << "cancellation received"; - // simple cancellation token (nothing to shut down, just stop next loop) - cancelled_ = true; -} -} diff --git a/examples/stream-hello-world/HelloStreamSubscription.h b/examples/stream-hello-world/HelloStreamSubscription.h deleted file mode 100644 index 8da5b1e4d..000000000 --- a/examples/stream-hello-world/HelloStreamSubscription.h +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#pragma once - -#include -#include "src/DuplexConnection.h" -#include "src/Payload.h" -#include "src/SubscriptionBase.h" - -namespace reactivesocket { - -/// Emits a stream of ints -class HelloStreamSubscription : public SubscriptionBase { - public: - explicit HelloStreamSubscription( - std::shared_ptr> subscriber, - std::string name, - size_t numberToEmit = 2) - : ExecutorBase(defaultExecutor()), - subscriber_(std::move(subscriber)), - name_(std::move(name)), - numberToEmit_(numberToEmit), - cancelled_(false) {} - - private: - // Subscription methods - void requestImpl(size_t n) noexcept override; - void cancelImpl() noexcept override; - - std::shared_ptr> subscriber_; - std::string name_; - size_t numberToEmit_; - size_t currentElem_ = 0; - std::atomic_bool cancelled_; -}; -} diff --git a/examples/stream-hello-world/StreamHelloWorld_Client.cpp b/examples/stream-hello-world/StreamHelloWorld_Client.cpp index dcf2dbb5e..2c08a7b24 100644 --- a/examples/stream-hello-world/StreamHelloWorld_Client.cpp +++ b/examples/stream-hello-world/StreamHelloWorld_Client.cpp @@ -9,6 +9,9 @@ #include "rsocket/RSocket.h" #include "rsocket/transports/TcpConnectionFactory.h" +#include "yarpl/v/Flowable.h" +#include "yarpl/v/Subscriber.h" + using namespace reactivesocket; using namespace rsocket_example; using namespace rsocket; @@ -17,6 +20,8 @@ DEFINE_string(host, "localhost", "host to connect to"); DEFINE_int32(port, 9898, "host:port to connect to"); int main(int argc, char* argv[]) { + FLAGS_logtostderr = true; + FLAGS_minloglevel = 0; folly::init(&argc, &argv); // create a client which can then make connections below @@ -26,9 +31,10 @@ int main(int argc, char* argv[]) { { // this example runs inside the Future.then lambda LOG(INFO) << "------------------ Run in future.then"; - auto s = std::make_shared(5, 6); + auto s = yarpl::Reference(new ExampleSubscriber(5, 6)); rsf->connect().then([s](std::shared_ptr rs) { - rs->requestStream(Payload("Bob"), s); + rs->requestStream(Payload("Bob")) + ->subscribe(yarpl::Reference>(s.get())); }); s->awaitTerminalEvent(); } @@ -36,9 +42,10 @@ int main(int argc, char* argv[]) { { // this example extracts from the Future.get and runs in the main thread LOG(INFO) << "------------------ Run after future.get"; - auto s = std::make_shared(5, 6); + auto s = yarpl::Reference(new ExampleSubscriber(5, 6)); auto rs = rsf->connect().get(); - rs->requestStream(Payload("Jane"), s); + rs->requestStream(Payload("Jane")) + ->subscribe(yarpl::Reference>(s.get())); s->awaitTerminalEvent(); } LOG(INFO) << "------------- main() terminating -----------------"; diff --git a/examples/stream-hello-world/StreamHelloWorld_Server.cpp b/examples/stream-hello-world/StreamHelloWorld_Server.cpp index df58be409..a773a544a 100644 --- a/examples/stream-hello-world/StreamHelloWorld_Server.cpp +++ b/examples/stream-hello-world/StreamHelloWorld_Server.cpp @@ -5,16 +5,42 @@ #include -#include "HelloStreamRequestHandler.h" #include "rsocket/RSocket.h" #include "rsocket/transports/TcpConnectionAcceptor.h" +#include "yarpl/v/Flowable.h" +#include "yarpl/v/Flowables.h" using namespace reactivesocket; using namespace rsocket; +using namespace yarpl; DEFINE_int32(port, 9898, "port to connect to"); +class HelloStreamRequestHandler : public rsocket::RSocketRequestHandler { + public: + /// Handles a new inbound Stream requested by the other end. + yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) override { + LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request; + + // string from payload data + auto requestString = request.moveDataToString(); + + return Flowables::range(1, 10)->map([name = std::move(requestString)]( + int64_t v) { + std::stringstream ss; + ss << "Hello " << name << " " << v << "!"; + std::string s = ss.str(); + return Payload(s, "metadata"); + }); + } +}; + int main(int argc, char* argv[]) { + FLAGS_logtostderr = true; + FLAGS_minloglevel = 0; folly::init(&argc, &argv); TcpConnectionAcceptor::Options opts; diff --git a/examples/util/ExampleSubscriber.cpp b/examples/util/ExampleSubscriber.cpp index e10147aa7..c85844939 100644 --- a/examples/util/ExampleSubscriber.cpp +++ b/examples/util/ExampleSubscriber.cpp @@ -22,16 +22,16 @@ ExampleSubscriber::ExampleSubscriber(int initialRequest, int numToTake) } void ExampleSubscriber::onSubscribe( - std::shared_ptr subscription) noexcept { + yarpl::Reference subscription) noexcept { LOG(INFO) << "ExampleSubscriber " << this << " onSubscribe"; subscription_ = std::move(subscription); requested_ = initialRequest_; subscription_->request(initialRequest_); } -void ExampleSubscriber::onNext(Payload element) noexcept { +void ExampleSubscriber::onNext(const Payload& element) noexcept { LOG(INFO) << "ExampleSubscriber " << this - << " onNext as string: " << element.moveDataToString(); + << " onNext as string: " << element.cloneDataToString(); received_++; if (--requested_ == thresholdForRequest_) { int toRequest = (initialRequest_ - thresholdForRequest_); @@ -53,8 +53,8 @@ void ExampleSubscriber::onComplete() noexcept { terminalEventCV_.notify_all(); } -void ExampleSubscriber::onError(folly::exception_wrapper ex) noexcept { - LOG(INFO) << "ExampleSubscriber " << this << " onError " << ex.what(); +void ExampleSubscriber::onError(const std::exception_ptr ex) noexcept { + LOG(INFO) << "ExampleSubscriber " << this << " onError"; terminated_ = true; terminalEventCV_.notify_all(); } diff --git a/examples/util/ExampleSubscriber.h b/examples/util/ExampleSubscriber.h index 325e07213..c5830934d 100644 --- a/examples/util/ExampleSubscriber.h +++ b/examples/util/ExampleSubscriber.h @@ -6,7 +6,9 @@ #include #include #include "src/Payload.h" -#include "src/ReactiveStreamsCompat.h" + +#include "yarpl/v/Subscriber.h" +#include "yarpl/v/Flowable.h" /** * Subscriber that logs all events. @@ -14,16 +16,16 @@ */ namespace rsocket_example { class ExampleSubscriber - : public reactivesocket::Subscriber { + : public yarpl::Subscriber { public: ~ExampleSubscriber(); ExampleSubscriber(int initialRequest, int numToTake); - void onSubscribe(std::shared_ptr + void onSubscribe(yarpl::Reference subscription) noexcept override; - void onNext(reactivesocket::Payload element) noexcept override; + void onNext(const reactivesocket::Payload& element) noexcept override; void onComplete() noexcept override; - void onError(folly::exception_wrapper ex) noexcept override; + void onError(const std::exception_ptr ex) noexcept override; void awaitTerminalEvent(); @@ -33,7 +35,7 @@ class ExampleSubscriber int numToTake_; int requested_; int received_; - std::shared_ptr subscription_; + yarpl::Reference subscription_; bool terminated_{false}; std::mutex m_; std::condition_variable terminalEventCV_; diff --git a/experimental/rsocket-src/RSocketRequester.cpp b/experimental/rsocket-src/RSocketRequester.cpp index cfe893bd7..1efdd51d3 100644 --- a/experimental/rsocket-src/RSocketRequester.cpp +++ b/experimental/rsocket-src/RSocketRequester.cpp @@ -3,6 +3,8 @@ #include "rsocket/RSocketRequester.h" #include "rsocket/OldNewBridge.h" +#include "yarpl/v/Flowable.h" +#include "yarpl/v/Flowables.h" #include @@ -41,25 +43,15 @@ std::shared_ptr> RSocketRequester::requestChannel( return reactiveSocket_->requestChannel(std::move(responseSink)); } -void RSocketRequester::requestStream( - Payload request, - std::shared_ptr> responseSink) { - eventBase_.runInEventBaseThread( - [ this, request = std::move(request), responseSink ]() mutable { - reactiveSocket_->requestStream( - std::move(request), std::move(responseSink)); - }); -} - -std::shared_ptr> -RSocketRequester::requestStream(reactivesocket::Payload request) { +yarpl::Reference> RSocketRequester::requestStream( + Payload request) { auto& eb = eventBase_; auto srs = reactiveSocket_; - return yarpl::flowable::Flowable::create( + + return yarpl::Flowables::fromPublisher( [&eb, request = std::move(request), srs = std::move(srs) ]( - auto uptr_subscriber) mutable { - auto os = - std::make_shared(std::move(uptr_subscriber)); + yarpl::Reference> subscriber) mutable { + auto os = std::make_shared(std::move(subscriber)); eb.runInEventBaseThread([ request = std::move(request), os = std::move(os), diff --git a/experimental/rsocket-src/RSocketServer.cpp b/experimental/rsocket-src/RSocketServer.cpp index fd87fe59c..b6be07ef4 100644 --- a/experimental/rsocket-src/RSocketServer.cpp +++ b/experimental/rsocket-src/RSocketServer.cpp @@ -4,8 +4,10 @@ #include #include #include +#include "rsocket/OldNewBridge.h" #include "rsocket/RSocketErrors.h" #include "src/FrameTransport.h" +#include "src/NullRequestHandler.h" using namespace reactivesocket; @@ -50,6 +52,28 @@ class RSocketConnectionHandler : public reactivesocket::ConnectionHandler { OnSetupNewSocket onSetup_; }; +class RSocketHandlerBridge : public reactivesocket::DefaultRequestHandler { + public: + RSocketHandlerBridge(std::shared_ptr handler) + : handler_(std::move(handler)){}; + + void handleRequestStream( + Payload request, + StreamId streamId, + const std::shared_ptr>& + subscriber) noexcept override { + auto flowable = + handler_->handleRequestStream(std::move(request), std::move(streamId)); + // bridge from the existing eager RequestHandler and old Subscriber type + // to the lazy Flowable and new Subscriber type + flowable->subscribe(yarpl::Reference>( + new NewToOldSubscriber(std::move(subscriber)))); + } + + private: + std::shared_ptr handler_; +}; + RSocketServer::RSocketServer( std::unique_ptr connectionAcceptor) : lazyAcceptor_(std::move(connectionAcceptor)), @@ -91,7 +115,7 @@ void RSocketServer::start(OnAccept onAccept) { auto socketParams = SocketParameters( setupPayload.resumable, setupPayload.protocolVersion); - std::shared_ptr requestHandler; + std::shared_ptr requestHandler; try { requestHandler = onAccept(std::make_unique( std::move(setupPayload))); @@ -103,10 +127,11 @@ void RSocketServer::start(OnAccept onAccept) { } LOG(INFO) << "RSocketServer => received request handler"; + auto handlerBridge = std::make_shared(std::move(requestHandler)); auto rs = ReactiveSocket::disconnectedServer( // we know this callback is on a specific EventBase executor_, - std::move(requestHandler), + std::move(handlerBridge), Stats::noop()); rs->onClosed([ this, rs = rs.get() ](const folly::exception_wrapper&) { diff --git a/experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp b/experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp index 1a86c5001..99bfd380a 100644 --- a/experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp +++ b/experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp @@ -2,33 +2,30 @@ #include "HelloStreamRequestHandler.h" #include -#include "HelloStreamSubscription.h" +#include "yarpl/v/Flowables.h" using namespace ::reactivesocket; +using namespace yarpl; namespace rsocket { namespace tests { /// Handles a new inbound Stream requested by the other end. -void HelloStreamRequestHandler::handleRequestStream( - Payload request, - StreamId streamId, - const std::shared_ptr>& response) noexcept { +yarpl::Reference> +HelloStreamRequestHandler::handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) { LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request; // string from payload data - const char* p = reinterpret_cast(request.data->data()); - auto requestString = std::string(p, request.data->length()); + auto requestString = request.moveDataToString(); - response->onSubscribe( - std::make_shared(response, requestString, 10)); -} - -std::shared_ptr HelloStreamRequestHandler::handleSetupPayload( - ReactiveSocket& socket, - ConnectionSetupPayload request) noexcept { - LOG(INFO) << "HelloStreamRequestHandler.handleSetupPayload " << request; - // TODO what should this do? - return nullptr; + return Flowables::range(1, 10)->map([name = std::move(requestString)]( + int64_t v) { + std::stringstream ss; + ss << "Hello " << name << " " << v << "!"; + std::string s = ss.str(); + return Payload(s, "metadata"); + }); } } } diff --git a/experimental/rsocket-test/handlers/HelloStreamRequestHandler.h b/experimental/rsocket-test/handlers/HelloStreamRequestHandler.h index c5cee90b8..86a5bc09b 100644 --- a/experimental/rsocket-test/handlers/HelloStreamRequestHandler.h +++ b/experimental/rsocket-test/handlers/HelloStreamRequestHandler.h @@ -3,28 +3,23 @@ #pragma once #include +#include "rsocket/RSocketRequestHandler.h" #include "src/NullRequestHandler.h" #include "src/Payload.h" #include "src/ReactiveStreamsCompat.h" -#include "src/StandardReactiveSocket.h" #include "src/SubscriptionBase.h" +#include "yarpl/v/Flowable.h" namespace rsocket { namespace tests { -class HelloStreamRequestHandler : public reactivesocket::DefaultRequestHandler { +class HelloStreamRequestHandler : public RSocketRequestHandler { public: /// Handles a new inbound Stream requested by the other end. - void handleRequestStream( + yarpl::Reference> + handleRequestStream( reactivesocket::Payload request, - reactivesocket::StreamId streamId, - const std::shared_ptr< - reactivesocket::Subscriber>& - response) noexcept override; - - std::shared_ptr handleSetupPayload( - reactivesocket::ReactiveSocket&, - reactivesocket::ConnectionSetupPayload request) noexcept override; + reactivesocket::StreamId streamId) override; }; } } diff --git a/experimental/rsocket-test/handlers/HelloStreamSubscription.cpp b/experimental/rsocket-test/handlers/HelloStreamSubscription.cpp deleted file mode 100644 index 12b3d1d55..000000000 --- a/experimental/rsocket-test/handlers/HelloStreamSubscription.cpp +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#include "HelloStreamSubscription.h" - -#include -#include - -using namespace reactivesocket; - -namespace rsocket { -namespace tests { - -// Emit a stream of ints starting at 0 until number of ints -// emitted matches 'numberToEmit' at which point onComplete() -// will be emitted. -// -// On each invocation will restrict emission to number of requested. -// -// This method has no concurrency since SubscriptionBase -// schedules this on an Executor sequentially -void HelloStreamSubscription::requestImpl(size_t n) noexcept { - LOG(INFO) << "requested=" << n << " currentElem=" << currentElem_ - << " numberToEmit=" << numberToEmit_; - - if (numberToEmit_ == 0) { - subscriber_->onComplete(); - return; - } - for (size_t i = 0; i < n; i++) { - if (cancelled_) { - LOG(INFO) << "emission stopped by cancellation"; - return; - } - std::stringstream ss; - ss << "Hello " << name_ << " " << currentElem_ << "!"; - std::string s = ss.str(); - subscriber_->onNext(Payload(s)); - // currentElem is used to track progress across requestImpl invocations - currentElem_++; - // break the loop and complete the stream if numberToEmit_ is matched - if (currentElem_ == numberToEmit_) { - subscriber_->onComplete(); - return; - } - } -} - -void HelloStreamSubscription::cancelImpl() noexcept { - LOG(INFO) << "cancellation received"; - // simple cancellation token (nothing to shut down, just stop next loop) - cancelled_ = true; -} -} -} diff --git a/experimental/rsocket-test/handlers/HelloStreamSubscription.h b/experimental/rsocket-test/handlers/HelloStreamSubscription.h deleted file mode 100644 index 2a8a8d9c9..000000000 --- a/experimental/rsocket-test/handlers/HelloStreamSubscription.h +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2004-present Facebook. All Rights Reserved. - -#pragma once - -#include -#include "src/DuplexConnection.h" -#include "src/Payload.h" -#include "src/SubscriptionBase.h" - -namespace rsocket { -namespace tests { - -/// Emits a stream of ints -class HelloStreamSubscription : public reactivesocket::SubscriptionBase { - public: - explicit HelloStreamSubscription( - std::shared_ptr> - subscriber, - std::string name, - size_t numberToEmit = 2) - : ExecutorBase(reactivesocket::defaultExecutor()), - subscriber_(std::move(subscriber)), - name_(std::move(name)), - numberToEmit_(numberToEmit), - cancelled_(false) {} - - private: - // Subscription methods - void requestImpl(size_t n) noexcept override; - void cancelImpl() noexcept override; - - std::shared_ptr> - subscriber_; - std::string name_; - size_t numberToEmit_; - size_t currentElem_ = 0; - std::atomic_bool cancelled_; -}; -} -} diff --git a/experimental/rsocket/OldNewBridge.h b/experimental/rsocket/OldNewBridge.h index c2bca0a7e..72ed833a8 100644 --- a/experimental/rsocket/OldNewBridge.h +++ b/experimental/rsocket/OldNewBridge.h @@ -6,7 +6,11 @@ #include -#include "reactivestreams/ReactiveStreams.h" +// using the "v" Flowable types +// since they don't have ReactiveStreams right now +#include "yarpl/v/Subscriber.h" +#include "yarpl/v/Subscription.h" + #include "src/Payload.h" #include "src/ReactiveStreamsCompat.h" @@ -14,7 +18,7 @@ namespace rsocket { //////////////////////////////////////////////////////////////////////////////// -class NewToOldSubscription : public reactivestreams_yarpl::Subscription { +class NewToOldSubscription : public yarpl::Subscription { public: explicit NewToOldSubscription( std::shared_ptr inner) @@ -37,14 +41,14 @@ class OldToNewSubscriber : public reactivesocket::Subscriber { public: explicit OldToNewSubscriber( - std::unique_ptr< - reactivestreams_yarpl::Subscriber> inner) + yarpl::Reference> inner) : inner_{std::move(inner)} {} void onSubscribe( std::shared_ptr subscription) noexcept { - bridge_ = std::make_shared(std::move(subscription)); - inner_->onSubscribe(bridge_.get()); + bridge_ = yarpl::Reference( + new NewToOldSubscription(std::move(subscription))); + inner_->onSubscribe(bridge_); } void onNext(reactivesocket::Payload element) noexcept { @@ -60,16 +64,15 @@ class OldToNewSubscriber } private: - std::unique_ptr> - inner_; - std::shared_ptr bridge_; + yarpl::Reference> inner_; + yarpl::Reference bridge_; }; //////////////////////////////////////////////////////////////////////////////// class OldToNewSubscription : public reactivesocket::Subscription { public: - explicit OldToNewSubscription(reactivestreams_yarpl::Subscription* inner) + explicit OldToNewSubscription(yarpl::Reference inner) : inner_{inner} {} void request(size_t n) noexcept override { @@ -89,19 +92,19 @@ class OldToNewSubscription : public reactivesocket::Subscription { } private: - reactivestreams_yarpl::Subscription* inner_{nullptr}; + yarpl::Reference inner_{nullptr}; bool terminated_{false}; }; -class NewToOldSubscriber - : public reactivestreams_yarpl::Subscriber { +class NewToOldSubscriber : public yarpl::Subscriber { public: explicit NewToOldSubscriber( std::shared_ptr> inner) : inner_{std::move(inner)} {} - void onSubscribe(reactivestreams_yarpl::Subscription* subscription) override { + void onSubscribe( + yarpl::Reference subscription) override { bridge_ = std::make_shared(subscription); inner_->onSubscribe(bridge_); } diff --git a/experimental/rsocket/RSocketRequestHandler.h b/experimental/rsocket/RSocketRequestHandler.h new file mode 100644 index 000000000..2c70246d3 --- /dev/null +++ b/experimental/rsocket/RSocketRequestHandler.h @@ -0,0 +1,37 @@ +// Copyright 2004-present Facebook. All Rights Reserved. + +#pragma once + +#include + +// TODO migrate yarpl/v/Flowable to yarpl/Flowable +#include "yarpl/v/Flowable.h" + +#include "src/Payload.h" +#include "src/StreamState.h" + +namespace rsocket { + +/** + * RequestHandler APIs to handle requests on an RSocket connection. + * + * This is most commonly used by an RSocketServer, but due to the symmetric + * nature of RSocket, this can be used on the client as well. + */ +class RSocketRequestHandler { + public: + /** + * Called when a new `requestStream` occurs from an RSocketRequester. + * + * Return a Flowable with the response stream. + * + * @param request + * @param streamId + * @return + */ + virtual yarpl::Reference> + handleRequestStream( + reactivesocket::Payload request, + reactivesocket::StreamId streamId) = 0; +}; +} \ No newline at end of file diff --git a/experimental/rsocket/RSocketRequester.h b/experimental/rsocket/RSocketRequester.h index 428713808..34728b1b9 100644 --- a/experimental/rsocket/RSocketRequester.h +++ b/experimental/rsocket/RSocketRequester.h @@ -4,7 +4,8 @@ #include -#include "yarpl/Flowable.h" +// TODO migrate yarpl/v/Flowable to yarpl/Flowable +#include "yarpl/v/Flowable.h" #include "src/ReactiveStreamsCompat.h" #include "src/ReactiveSocket.h" @@ -41,18 +42,8 @@ class RSocketRequester { * @param payload * @param responseSink */ - void requestStream( - reactivesocket::Payload payload, - std::shared_ptr> - responseSink); - - /** - * TODO: This is a temporary hack to bridge instances of the old - * reactivesocket::Subscri{ber,ption} to - * reactivestreams_yarpl::Subscri{ber,ption}. - */ - std::shared_ptr> - requestStream(reactivesocket::Payload request); + yarpl::Reference> requestStream( + reactivesocket::Payload payload); /** * Start a channel (streams in both directions). diff --git a/experimental/rsocket/RSocketServer.h b/experimental/rsocket/RSocketServer.h index 0d99df598..8eb69d95d 100644 --- a/experimental/rsocket/RSocketServer.h +++ b/experimental/rsocket/RSocketServer.h @@ -10,14 +10,14 @@ #include "rsocket/ConnectionAcceptor.h" #include "rsocket/ConnectionResumeRequest.h" #include "rsocket/ConnectionSetupRequest.h" +#include "rsocket/RSocketRequestHandler.h" -#include "src/RequestHandler.h" #include "src/ServerConnectionAcceptor.h" #include "src/ReactiveSocket.h" namespace rsocket { -using OnAccept = std::function( +using OnAccept = std::function( std::unique_ptr)>; /** * API for starting an RSocket server. Returned from RSocket::createServer. diff --git a/experimental/yarpl/examples/FlowableVExamples.cpp b/experimental/yarpl/examples/FlowableVExamples.cpp index dafcc6a98..43f52c654 100644 --- a/experimental/yarpl/examples/FlowableVExamples.cpp +++ b/experimental/yarpl/examples/FlowableVExamples.cpp @@ -23,18 +23,50 @@ auto printer() { 2 /* low [optional] batch size for demo */); } +Reference> getData() { + return Flowables::range(2, 5); +} + std::string getThreadId() { std::ostringstream oss; oss << std::this_thread::get_id(); return oss.str(); } +void fromPublisherExample() { + auto onSubscribe = [](Reference> subscriber) { + class Subscription : public ::yarpl::Subscription { + public: + virtual void request(int64_t delta) override { + // TODO + } + + virtual void cancel() override { + // TODO + } + }; + + Reference<::yarpl::Subscription> subscription(new Subscription); + subscriber->onSubscribe(subscription); + subscriber->onNext(1234); + subscriber->onNext(5678); + subscriber->onNext(1234); + subscriber->onComplete(); + }; + + Flowables::fromPublisher(std::move(onSubscribe)) + ->subscribe(printer()); +} + } // namespace void FlowableVExamples::run() { std::cout << "create a flowable" << std::endl; Flowables::range(2, 2); + std::cout << "get a flowable from a method" << std::endl; + getData()->subscribe(printer()); + std::cout << "just: single value" << std::endl; Flowables::just(23)->subscribe(printer()); @@ -96,4 +128,7 @@ void FlowableVExamples::run() { ->subscribe(printer()); std::cout << " waiting on " << getThreadId() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + std::cout << "fromPublisher - delegate to onSubscribe" << std::endl; + fromPublisherExample(); } diff --git a/experimental/yarpl/include/yarpl/v/Flowables.h b/experimental/yarpl/include/yarpl/v/Flowables.h index 03900e31c..7acffdfa3 100644 --- a/experimental/yarpl/include/yarpl/v/Flowables.h +++ b/experimental/yarpl/include/yarpl/v/Flowables.h @@ -65,6 +65,16 @@ class Flowables { return Flowable::create(std::move(lambda)); } + template>), + void>::value>::type> + static Reference> fromPublisher(OnSubscribe&& function) { + return Reference>( + new FromPublisherOperator( + std::forward(function))); + } + private: Flowables() = delete; }; diff --git a/experimental/yarpl/include/yarpl/v/Operator.h b/experimental/yarpl/include/yarpl/v/Operator.h index 6ab1802e3..583aeef02 100644 --- a/experimental/yarpl/include/yarpl/v/Operator.h +++ b/experimental/yarpl/include/yarpl/v/Operator.h @@ -240,4 +240,17 @@ class SubscribeOnOperator : public Operator { std::unique_ptr worker_; }; +template +class FromPublisherOperator : public Flowable { +public: + FromPublisherOperator(OnSubscribe&& function) + : function_(std::move(function)) {} + + void subscribe(Reference> subscriber) { + function_(std::move(subscriber)); + } +private: + OnSubscribe function_; +}; + } // yarpl diff --git a/experimental/yarpl/include/yarpl/v/Refcounted.h b/experimental/yarpl/include/yarpl/v/Refcounted.h index d532d8b37..5f182b9ba 100644 --- a/experimental/yarpl/include/yarpl/v/Refcounted.h +++ b/experimental/yarpl/include/yarpl/v/Refcounted.h @@ -28,7 +28,7 @@ class Refcounted { virtual ~Refcounted() = default; #endif /* NDEBUG */ - private: +private: template friend class Reference; diff --git a/src/Payload.cpp b/src/Payload.cpp index 4a2ad4e8c..17357eb2c 100644 --- a/src/Payload.cpp +++ b/src/Payload.cpp @@ -43,6 +43,13 @@ std::string Payload::moveDataToString() { return data->moveToFbString().toStdString(); } +std::string Payload::cloneDataToString() const { + if (!data) { + return ""; + } + return data->cloneAsValue().moveToFbString().toStdString(); +} + void Payload::clear() { data.reset(); metadata.reset(); diff --git a/src/Payload.h b/src/Payload.h index 926e553a9..fcda26511 100644 --- a/src/Payload.h +++ b/src/Payload.h @@ -31,6 +31,7 @@ struct Payload { void checkFlags(FrameFlags flags) const; std::string moveDataToString(); + std::string cloneDataToString() const; void clear(); std::unique_ptr data;