From 0907520725c819a940dc34e6e7439602b08758b7 Mon Sep 17 00:00:00 2001 From: Ondrej Lehecka Date: Tue, 25 Apr 2017 18:22:44 -0700 Subject: [PATCH] fixing formatting --- .travis.yml | 14 +- TARGETS | 192 ++++++++++++++++-- examples/stream-hello-world/TARGETS | 28 +++ examples/util/TARGETS | 8 + .../rsocket-test/RSocketClientServerTest.cpp | 1 - experimental/yarpl/TARGETS | 91 +++++++++ .../yarpl/examples/FlowableVExamples.cpp | 43 ++-- .../yarpl/examples/yarpl-playground.cpp | 4 +- experimental/yarpl/include/yarpl/v/Flowable.h | 13 +- experimental/yarpl/include/yarpl/v/Operator.h | 48 +++-- .../yarpl/include/yarpl/v/Refcounted.h | 23 ++- .../yarpl/include/yarpl/v/Subscribers.h | 88 +++++--- .../yarpl/src/yarpl/ThreadScheduler.h | 4 +- experimental/yarpl/src/yarpl/v/Refcounted.cpp | 6 +- experimental/yarpl/test/v/FlowableTest.cpp | 50 ++--- experimental/yarpl/test/v/RefcountedTest.cpp | 4 +- 16 files changed, 463 insertions(+), 154 deletions(-) create mode 100644 examples/stream-hello-world/TARGETS create mode 100644 examples/util/TARGETS create mode 100644 experimental/yarpl/TARGETS diff --git a/.travis.yml b/.travis.yml index f6e8626f4..8d285149a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,16 +8,16 @@ language: cpp # Test matrix: matrix: include: - - env: CLANG_VERSION=3.6 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On + - env: CLANG_VERSION=3.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On os: linux - addons: &clang36 + addons: &clang39 apt: packages: - - clang-3.6 + - clang-3.9 - valgrind sources: - ubuntu-toolchain-r-test - - llvm-toolchain-precise-3.6 + - llvm-toolchain-precise-3.9 - env: GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On os: linux @@ -71,6 +71,6 @@ script: - ./experimental/yarpl/yarpl-tests - cd .. - ./scripts/prepare_tck_drivers.sh - - ./scripts/tck_test.sh -c cpp -s cpp - - ./scripts/tck_test.sh -c java -s java - - ./scripts/tck_test.sh -c java -s cpp + - ./scripts/tck_test.sh -c cpp -s cpp + - ./scripts/tck_test.sh -c java -s java + - ./scripts/tck_test.sh -c java -s cpp diff --git a/TARGETS b/TARGETS index 8a743f86a..3236cfb5d 100644 --- a/TARGETS +++ b/TARGETS @@ -6,7 +6,14 @@ cpp_library( ('src', '**/*.h') ]), srcs = [ + 'src/Common.cpp', 'src/Executor.cpp', + 'src/Frame.cpp', + 'src/FrameSerializer.cpp', + 'src/Payload.cpp', + 'src/versions/FrameSerializer_v0.cpp', + 'src/versions/FrameSerializer_v0_1.cpp', + 'src/versions/FrameSerializer_v1_0.cpp', ], deps = [ '@/folly/futures:futures', @@ -35,7 +42,6 @@ cpp_library( 'src/framed/FramedWriter.cpp', 'src/framed/FramedDuplexConnection.cpp', 'src/folly/FollyKeepaliveTimer.cpp', - 'src/Payload.cpp', ], deps = [ ':reactivesocket-internal', @@ -50,16 +56,15 @@ cpp_library( name = 'reactivesocket', headers = [ + 'src/Common.h', 'src/ClientResumeStatusCallback.h', 'src/EnableSharedFromThis.h', 'src/FrameTransport.h', 'src/NullRequestHandler.h', 'src/Payload.h', - 'src/ReactiveSocket.h', 'src/RequestHandler.h', - 'src/ResumeTracker.h', - 'src/SmartPointers.h', #will be removed soon - 'src/StandardReactiveSocket.h', + 'src/ServerConnectionAcceptor.h', + 'src/ReactiveSocket.h', 'src/Stats.h', 'src/StreamsFactory.h', 'src/SubscriberBase.h', @@ -68,22 +73,17 @@ cpp_library( srcs = glob([ 'src/AbstractStreamAutomaton.cpp', 'src/automata/*.cpp', - 'src/mixins/*.cpp', - 'src/versions/*.cpp', - 'src/Common.cpp', 'src/ConnectionAutomaton.cpp', 'src/ConnectionSetupPayload.cpp', - 'src/Frame.cpp', - 'src/FrameSerializer.cpp', 'src/FrameTransport.cpp', 'src/NullRequestHandler.cpp', - 'src/mixins/*.cpp', # temporarily 'src/Payload.cpp', 'src/RequestHandler.cpp', 'src/ResumeCache.cpp', - 'src/ResumeTracker.cpp', - 'src/StandardReactiveSocket.cpp', + 'src/ServerConnectionAcceptor.cpp', + 'src/ReactiveSocket.cpp', 'src/Stats.cpp', + 'src/StreamState.cpp', 'src/StreamsFactory.cpp', ]), deps = [ @@ -139,7 +139,7 @@ cpp_library( ':reactivesocket-streams-mocks', ], external_deps=[ - 'gmock', + ('googletest', None, 'gmock'), ], ) @@ -152,7 +152,9 @@ cpp_unittest( ]), srcs = glob([ 'test/*.cpp', + 'test/folly/*.cpp', 'test/framed/*.cpp', + 'test/automata/*.cpp', 'test/simple/*.cpp', ]), deps = [ @@ -163,8 +165,134 @@ cpp_unittest( '@/folly/io/async:scoped_event_base_thread', ], external_deps = [ - 'gmock', - 'gtest', + ('googletest', None, 'gmock'), + ('googletest', None, 'gtest'), + ], +) + +cpp_library( + name='reactivesocket-tests-internal', + header_namespace = 'test', + headers = subdir_glob([ + ('test', '**/*.h'), + ]), + srcs=[ + 'test/simple/StatsPrinter.cpp', + ], + deps=[ + ':reactivesocket', + ], +) + +cpp_binary( + name = 'reactivesocket-tck-server', + header_namespace = 'tck-test', + headers = subdir_glob([ + ('tck-test', '**/*.h'), + ]), + srcs = glob([ + 'tck-test/server.cpp', + 'tck-test/MarbleProcessor.cpp', + ]), + deps = [ + ':reactivesocket-tcp', + ':reactivesocket', + ':reactivesocket-streams-mocks', + ':reactivesocket-tests-internal', + '@/folly/io/async:scoped_event_base_thread', + ], +) + +cpp_binary( + name = 'reactivesocket-tck-client', + header_namespace = 'tck-test', + headers = subdir_glob([ + ('tck-test', '**/*.h'), + ]), + srcs = glob([ + 'tck-test/client.cpp', + 'tck-test/TestFileParser.cpp', + 'tck-test/TestInterpreter.cpp', + 'tck-test/TestSubscriber.cpp', + 'tck-test/TestSuite.cpp', + ]), + deps = [ + ':reactivesocket-tcp', + ':reactivesocket', + ':reactivesocket-streams-mocks', + '@/folly/io/async:scoped_event_base_thread', + ], +) + +cpp_binary( + name = 'reactivesocket-test-resumeclient', + header_namespace = 'test', + headers = subdir_glob([ + ('test', '**/*.h'), + ]), + srcs = glob([ + 'test/resume/TcpResumeClient.cpp', + 'test/simple/*.cpp', + ]), + deps = [ + ':reactivesocket-tcp', + ':reactivesocket', + ':reactivesocket-streams-mocks', + '@/folly/io/async:scoped_event_base_thread', + ], +) + +cpp_binary( + name = 'reactivesocket-test-resumeserver', + header_namespace = 'test', + headers = subdir_glob([ + ('test', '**/*.h'), + ]), + srcs = glob([ + 'test/resume/TcpResumeServer.cpp', + 'test/simple/*.cpp', + ]), + deps = [ + ':reactivesocket-tcp', + ':reactivesocket', + ':reactivesocket-streams-mocks', + '@/folly/io/async:scoped_event_base_thread', + ], +) + +cpp_binary( + name = 'reactivesocket-test-tcpclient', + header_namespace = 'test', + headers = subdir_glob([ + ('test', '**/*.h'), + ]), + srcs = glob([ + 'test/tcp/TcpClient.cpp', + 'test/simple/*.cpp', + ]), + deps = [ + ':reactivesocket-tcp', + ':reactivesocket', + ':reactivesocket-streams-mocks', + '@/folly/io/async:scoped_event_base_thread', + ], +) + +cpp_binary( + name = 'reactivesocket-test-tcpserver', + header_namespace = 'test', + headers = subdir_glob([ + ('test', '**/*.h'), + ]), + srcs = glob([ + 'test/tcp/TcpServer.cpp', + 'test/simple/*.cpp', + ]), + deps = [ + ':reactivesocket-tcp', + ':reactivesocket', + ':reactivesocket-streams-mocks', + '@/folly/io/async:scoped_event_base_thread', ], ) @@ -183,3 +311,35 @@ cpp_library( headers = ['test/ReactiveStreamsMocksCompat.h', 'test/streams/Mocks.h'], deps = ['@/lithium/reactive-streams-cpp:reactive-streams-mocks',], ) + +cpp_library( + name = 'rsocket', + + header_namespace = "rsocket", + headers = subdir_glob([ + ('experimental/rsocket', '**/*.h'), + ('experimental/rsocket/facebook', '**/*.h'), + ('experimental/rsocket/transports', '**/*.h'), + ]), + srcs = glob([ + 'experimental/rsocket-src/*.cpp', + 'experimental/rsocket-src/facebook/transports/*.cpp', + 'experimental/rsocket-src/transports/*.cpp', + ]), + deps = [ + ':reactivesocket', + ':reactivesocket-internal', + ':reactivesocket-streams', + ':reactivesocket-tcp', + '@/folly/futures:futures', + '@/folly/io:iobuf', + '@/folly:exception_wrapper', + '@/lithium/duplexconnection-proxygen:server', + '@/lithium/duplexconnection:util', + '@/lithium/reactivesocket-cpp/experimental/yarpl:reactive-streams', + '@/lithium/reactivesocket-cpp/experimental/yarpl:yarpl', + '@/lithium/reactivesocket-utils:reactivesocket-external-utils', + '@/proxygen/httpserver:httpserver', + ], + compiler_flags=['-DREACTIVE_SOCKET_EXTERNAL_STACK_TRACE_UTILS'], +) diff --git a/examples/stream-hello-world/TARGETS b/examples/stream-hello-world/TARGETS new file mode 100644 index 000000000..df637387d --- /dev/null +++ b/examples/stream-hello-world/TARGETS @@ -0,0 +1,28 @@ +cpp_binary( + name = 'tcp-server', + headers = subdir_glob([('.', '**/*.h')]), + srcs = [ + 'HelloStreamRequestHandler.cpp', + 'HelloStreamSubscription.cpp', + 'StreamHelloWorld_Server.cpp', + ], + deps = [ + '@/folly/init:init', + '@/lithium/reactivesocket-cpp:rsocket', + ], +) + +cpp_binary( + name = 'tcp-client', + headers = subdir_glob([('.', '**/*.h')]), + srcs = [ + 'HelloStreamRequestHandler.cpp', + 'HelloStreamSubscription.cpp', + 'StreamHelloWorld_Client.cpp', + ], + deps = [ + '@/folly/init:init', + '@/lithium/reactivesocket-cpp/examples/util:subscriber', + '@/lithium/reactivesocket-cpp:rsocket', + ], +) diff --git a/examples/util/TARGETS b/examples/util/TARGETS new file mode 100644 index 000000000..9209aa9b0 --- /dev/null +++ b/examples/util/TARGETS @@ -0,0 +1,8 @@ + +cpp_library( + name = 'subscriber', + header_namespace = 'examples/util', + headers = ['ExampleSubscriber.h'], + srcs = ['ExampleSubscriber.cpp'], + deps = ['@/lithium/reactivesocket-cpp:reactivesocket'], +) diff --git a/experimental/rsocket-test/RSocketClientServerTest.cpp b/experimental/rsocket-test/RSocketClientServerTest.cpp index 4151addcf..6da6b34bc 100644 --- a/experimental/rsocket-test/RSocketClientServerTest.cpp +++ b/experimental/rsocket-test/RSocketClientServerTest.cpp @@ -7,7 +7,6 @@ #include "handlers/HelloStreamRequestHandler.h" #include "rsocket/RSocket.h" -#include "rsocket/RSocket.h" #include "rsocket/transports/TcpConnectionAcceptor.h" using namespace rsocket; diff --git a/experimental/yarpl/TARGETS b/experimental/yarpl/TARGETS new file mode 100644 index 000000000..6133ce1d0 --- /dev/null +++ b/experimental/yarpl/TARGETS @@ -0,0 +1,91 @@ +cpp_library( + name='reactive-streams', + header_namespace='reactivestreams', + headers=subdir_glob([ + ('include/reactivestreams', '**/*.h'), + ]), +) + +cpp_library( + name='yarpl-internal', + header_namespace='yarpl', + headers=subdir_glob([ + ('include/yarpl', '**/*.h'), + ('src/yarpl', 'ThreadScheduler.h'), + ('src/yarpl', 'flowable/operators/*.h'), + ('src/yarpl', 'flowable/sources/*.h'), + ('src/yarpl', 'flowable/sources/utils/*.h'), + ('src/yarpl', 'flowable/utils/*.h'), + ('src/yarpl', 'observable/sources/*.h'), + ('src/yarpl', 'observable/operators/*.h'), + ('src/yarpl', 'utils/**/*.h'), + ]), + deps=[ + ':reactive-streams', + ], +) + +cpp_library( + name='yarpl', + headers=subdir_glob([ + ('', 'src/**/*.h'), + ]), + srcs=glob([ + 'src/**/*.cpp', + ]), + deps=[ + ':yarpl-internal', + ], +) + +cpp_unittest( + name='yarpl-test', + headers=subdir_glob([ + ('', 'test/**/*.h'), + ]), + srcs=glob([ + 'test/*.cpp', + ]), + deps=[ + ':yarpl', + ], + external_deps=[ + ('googletest', None, 'gtest'), + ], +) + +# cpp_binary( +# name='yarpl-perf', +# headers=subdir_glob([ +# ('', 'perf/**/*.h'), +# ]), +# srcs=glob([ +# 'perf/*.cpp', +# ]), +# deps=[ +# ':yarpl', +# ], +# external_deps=[ +# 'benchmark', +# ], +# ) + +cpp_binary( + name='yarpl-examples', + header_namespace = 'examples', + headers = subdir_glob([ + ('examples', '**/*.h') + ]), + srcs=[ + 'examples/FlowableAExamples.cpp', + 'examples/FlowableBExamples.cpp', + 'examples/FlowableCExamples.cpp', + 'examples/FlowableExamples.cpp', + #'examples/FlowableVExamples.cpp', + 'examples/ObservableExamples.cpp', + 'examples/yarpl-playground.cpp' + ], + deps=[ + ':yarpl', + ], +) diff --git a/experimental/yarpl/examples/FlowableVExamples.cpp b/experimental/yarpl/examples/FlowableVExamples.cpp index 5e80d1e02..dafcc6a98 100644 --- a/experimental/yarpl/examples/FlowableVExamples.cpp +++ b/experimental/yarpl/examples/FlowableVExamples.cpp @@ -64,39 +64,32 @@ void FlowableVExamples::run() { std::cout << "take example: 3 out of 10 items" << std::endl; Flowables::range(1, 11)->take(3)->subscribe(printer()); - auto flowable = Flowable::create( - [total=0](Subscriber& subscriber, int64_t requested) mutable { - subscriber.onNext(12345678); - subscriber.onError(std::make_exception_ptr( - std::runtime_error("error"))); - return std::make_tuple(int64_t{1}, false); - } - ); + auto flowable = Flowable::create([total = 0]( + Subscriber & subscriber, int64_t requested) mutable { + subscriber.onNext(12345678); + subscriber.onError(std::make_exception_ptr(std::runtime_error("error"))); + return std::make_tuple(int64_t{1}, false); + }); auto subscriber = Subscribers::create( - [](int next) { - std::cout << "@next: " << next << std::endl; - }, - [](std::exception_ptr eptr) { - try { - std::rethrow_exception(eptr); - } catch (const std::exception& exception) { - std::cerr << " exception: " << exception.what() << std::endl; - } catch (...) { - std::cerr << " !unknown exception!" << std::endl; - } - }, - [] { - std::cout << "Completed." << std::endl; - } - ); + [](int next) { std::cout << "@next: " << next << std::endl; }, + [](std::exception_ptr eptr) { + try { + std::rethrow_exception(eptr); + } catch (const std::exception& exception) { + std::cerr << " exception: " << exception.what() << std::endl; + } catch (...) { + std::cerr << " !unknown exception!" << std::endl; + } + }, + [] { std::cout << "Completed." << std::endl; }); flowable->subscribe(subscriber); ThreadScheduler scheduler; std::cout << "subscribe_on example" << std::endl; - Flowables::just({ "0: ", "1: ", "2: " }) + Flowables::just({"0: ", "1: ", "2: "}) ->map([](const char* p) { return std::string(p); }) ->map([](std::string log) { return log + " on " + getThreadId(); }) ->subscribeOn(scheduler) diff --git a/experimental/yarpl/examples/yarpl-playground.cpp b/experimental/yarpl/examples/yarpl-playground.cpp index 74e3bee83..31b79ab17 100644 --- a/experimental/yarpl/examples/yarpl-playground.cpp +++ b/experimental/yarpl/examples/yarpl-playground.cpp @@ -10,7 +10,9 @@ int main() { std::cout << "*** Run yarpl::flowable::v examples ***" << std::endl; - FlowableVExamples::run(); + // TODO: enable + // FlowableVExamples::run(); + // std::cout << "*** Run ObservableExamples ***" << std::endl; // ObservableExamples::run(); // std::cout << "*** Run FlowableExamples ***" << std::endl; diff --git a/experimental/yarpl/include/yarpl/v/Flowable.h b/experimental/yarpl/include/yarpl/v/Flowable.h index b27d49a96..3bc1b904f 100644 --- a/experimental/yarpl/include/yarpl/v/Flowable.h +++ b/experimental/yarpl/include/yarpl/v/Flowable.h @@ -46,7 +46,7 @@ class Flowable : public virtual Refcounted { * * \return a handle to a flowable that will use the emitter. */ - template + template class EmitterWrapper; template < @@ -205,13 +205,13 @@ namespace yarpl { template template class Flowable::EmitterWrapper : public Flowable { -public: + public: explicit EmitterWrapper(Emitter&& emitter) - : emitter_(std::forward(emitter)) {} + : emitter_(std::forward(emitter)) {} virtual void subscribe(Reference> subscriber) { new SynchronousSubscription( - Reference(this), std::move(subscriber)); + Reference(this), std::move(subscriber)); } virtual std::tuple emit( @@ -228,8 +228,7 @@ template template auto Flowable::create(Emitter&& emitter) { return Reference>( - new Flowable::EmitterWrapper( - std::forward(emitter))); + new Flowable::EmitterWrapper(std::forward(emitter))); } template @@ -246,7 +245,7 @@ auto Flowable::take(int64_t limit) { new TakeOperator(Reference>(this), limit)); } -template +template auto Flowable::subscribeOn(Scheduler& scheduler) { return Reference>( new SubscribeOnOperator(Reference>(this), scheduler)); diff --git a/experimental/yarpl/include/yarpl/v/Operator.h b/experimental/yarpl/include/yarpl/v/Operator.h index 1906da3a1..6ab1802e3 100644 --- a/experimental/yarpl/include/yarpl/v/Operator.h +++ b/experimental/yarpl/include/yarpl/v/Operator.h @@ -16,9 +16,9 @@ namespace yarpl { */ template class Operator : public Flowable { -public: + public: explicit Operator(Reference> upstream) - : upstream_(std::move(upstream)) {} + : upstream_(std::move(upstream)) {} virtual void subscribe(Reference> subscriber) override { upstream_->subscribe(Reference( @@ -183,41 +183,39 @@ class TakeOperator : public Operator { const int64_t limit_; }; -template +template class SubscribeOnOperator : public Operator { -public: + public: SubscribeOnOperator(Reference> upstream, Scheduler& scheduler) - : Operator(std::move(upstream)), worker_(scheduler.createWorker()) {} + : Operator(std::move(upstream)), + worker_(scheduler.createWorker()) {} virtual void subscribe(Reference> subscriber) override { Operator::upstream_->subscribe( - Reference( - new Subscription( - Reference>(this), - std::move(worker_), - std::move(subscriber)))); + Reference(new Subscription( + Reference>(this), + std::move(worker_), + std::move(subscriber)))); } -private: + private: class Subscription : public Operator::Subscription { - public: - Subscription(Reference> flowable, - std::unique_ptr worker, - Reference> subscriber) - : Operator::Subscription( - std::move(flowable), std::move(subscriber)), - worker_(std::move(worker)) {} + public: + Subscription( + Reference> flowable, + std::unique_ptr worker, + Reference> subscriber) + : Operator::Subscription( + std::move(flowable), + std::move(subscriber)), + worker_(std::move(worker)) {} virtual void request(int64_t delta) override { - worker_->schedule([delta, this] { - this->callSuperRequest(delta); - }); + worker_->schedule([delta, this] { this->callSuperRequest(delta); }); } virtual void cancel() override { - worker_->schedule([this] { - this->callSuperCancel(); - }); + worker_->schedule([this] { this->callSuperCancel(); }); } virtual void onNext(const T& value) override { @@ -225,7 +223,7 @@ class SubscribeOnOperator : public Operator { subscriber->onNext(value); } - private: + private: // Trampoline to call superclass method; gcc bug 58972. void callSuperRequest(int64_t delta) { Operator::Subscription::request(delta); diff --git a/experimental/yarpl/include/yarpl/v/Refcounted.h b/experimental/yarpl/include/yarpl/v/Refcounted.h index 633b285f6..d532d8b37 100644 --- a/experimental/yarpl/include/yarpl/v/Refcounted.h +++ b/experimental/yarpl/include/yarpl/v/Refcounted.h @@ -12,7 +12,7 @@ namespace yarpl { /// /// NOTE: only derive using "virtual public" inheritance. class Refcounted { -public: + public: #if !defined(NDEBUG) Refcounted(); virtual ~Refcounted(); @@ -21,10 +21,12 @@ class Refcounted { static std::size_t objects(); // Return the current count. For testing. - std::size_t count() const { return refcount_; } -#else /* NDEBUG */ + std::size_t count() const { + return refcount_; + } +#else /* NDEBUG */ virtual ~Refcounted() = default; -#endif /* NDEBUG */ +#endif /* NDEBUG */ private: template @@ -43,22 +45,25 @@ class Refcounted { mutable std::atomic_size_t refcount_{0}; -#if !defined (NDEBUG) +#if !defined(NDEBUG) static std::atomic_size_t objects_; -#endif /* NDEBUG */ +#endif /* NDEBUG */ }; /// RAII-enabling smart pointer for refcounted objects. Each reference /// constructed against a target refcounted object increases its count /// by 1 during its lifetime. -template::value>::type> +template < + typename T, + typename = + typename std::enable_if::value>::type> class Reference { public: Reference() : pointer_(nullptr) {} explicit Reference(T* pointer) : pointer_(pointer) { - if (pointer_) pointer_->incRef(); + if (pointer_) + pointer_->incRef(); } ~Reference() { diff --git a/experimental/yarpl/include/yarpl/v/Subscribers.h b/experimental/yarpl/include/yarpl/v/Subscribers.h index 0b9794b56..590172c51 100644 --- a/experimental/yarpl/include/yarpl/v/Subscribers.h +++ b/experimental/yarpl/include/yarpl/v/Subscribers.h @@ -3,9 +3,9 @@ #include #include -#include "yarpl/utils/type_traits.h" #include "Flowable.h" #include "Subscriber.h" +#include "yarpl/utils/type_traits.h" namespace yarpl { @@ -15,40 +15,57 @@ namespace yarpl { /// method bodies in the subscriber. class Subscribers { public: - template ::value>::type> - static auto create(Next&& next, - int64_t batch = Flowable::NO_FLOW_CONTROL) { + template < + typename T, + typename Next, + typename = typename std::enable_if< + std::is_callable::value>::type> + static auto create( + Next&& next, + int64_t batch = Flowable::NO_FLOW_CONTROL) { return Reference>( new Base(std::forward(next), batch)); } - template::value && - std::is_callable::value>::type> - static auto create(Next&& next, Error&& error, - int64_t batch = Flowable::NO_FLOW_CONTROL) { - return Reference>( - new WithError(std::forward(next), - std::forward(error), batch)); + std::is_callable::value && + std::is_callable::value>::type> + static auto create( + Next&& next, + Error&& error, + int64_t batch = Flowable::NO_FLOW_CONTROL) { + return Reference>(new WithError( + std::forward(next), std::forward(error), batch)); } - template::value && - std::is_callable::value && - std::is_callable::value>::type> - static auto create(Next&& next, Error&& error, Complete&& complete, - int64_t batch = Flowable::NO_FLOW_CONTROL) { + std::is_callable::value && + std::is_callable::value && + std::is_callable::value>::type> + static auto create( + Next&& next, + Error&& error, + Complete&& complete, + int64_t batch = Flowable::NO_FLOW_CONTROL) { return Reference>( new WithErrorAndComplete( - std::forward(next), std::forward(error), - std::forward(complete), batch)); + std::forward(next), + std::forward(error), + std::forward(complete), + batch)); } private: - template + template class Base : public Subscriber { public: Base(Next&& next, int64_t batch) @@ -75,34 +92,39 @@ class Subscribers { int64_t pending_; }; - template + template class WithError : public Base { - public: + public: WithError(Next&& next, Error&& error, int64_t batch) - : Base(std::forward(next), batch), error_(error) {} + : Base(std::forward(next), batch), error_(error) {} virtual void onError(std::exception_ptr error) override { error_(error); } - private: + private: Error error_; }; - template + template class WithErrorAndComplete : public WithError { - public: + public: WithErrorAndComplete( - Next&& next, Error&& error, Complete&& complete, int64_t batch) - : WithError( - std::forward(next), std::forward(error), batch), - complete_(complete) {} + Next&& next, + Error&& error, + Complete&& complete, + int64_t batch) + : WithError( + std::forward(next), + std::forward(error), + batch), + complete_(complete) {} virtual void onComplete() { complete_(); } - private: + private: Complete complete_; }; diff --git a/experimental/yarpl/src/yarpl/ThreadScheduler.h b/experimental/yarpl/src/yarpl/ThreadScheduler.h index 1e07d7bee..04d0371e8 100644 --- a/experimental/yarpl/src/yarpl/ThreadScheduler.h +++ b/experimental/yarpl/src/yarpl/ThreadScheduler.h @@ -8,12 +8,12 @@ namespace yarpl { class ThreadScheduler : public Scheduler { -public: + public: ThreadScheduler() {} std::unique_ptr createWorker() override; -private: + private: ThreadScheduler(ThreadScheduler&&) = delete; ThreadScheduler(const ThreadScheduler&) = delete; ThreadScheduler& operator=(ThreadScheduler&&) = delete; diff --git a/experimental/yarpl/src/yarpl/v/Refcounted.cpp b/experimental/yarpl/src/yarpl/v/Refcounted.cpp index ce4cff08d..2e23ad7d0 100644 --- a/experimental/yarpl/src/yarpl/v/Refcounted.cpp +++ b/experimental/yarpl/src/yarpl/v/Refcounted.cpp @@ -4,7 +4,7 @@ namespace yarpl { #if !defined(NDEBUG) -Refcounted::Refcounted () { +Refcounted::Refcounted() { ++objects_; } @@ -18,6 +18,6 @@ size_t Refcounted::objects() { std::atomic_size_t Refcounted::objects_{0}; -#endif /* !NDEBUG */ +#endif /* !NDEBUG */ -} // yarpl +} // yarpl diff --git a/experimental/yarpl/test/v/FlowableTest.cpp b/experimental/yarpl/test/v/FlowableTest.cpp index 4dcc8d5cb..764b37329 100644 --- a/experimental/yarpl/test/v/FlowableTest.cpp +++ b/experimental/yarpl/test/v/FlowableTest.cpp @@ -9,9 +9,9 @@ namespace yarpl { namespace { -template +template class CollectingSubscriber : public Subscriber { -public: + public: virtual void onSubscribe(Reference subscription) override { Subscriber::onSubscribe(subscription); subscription->request(100); @@ -26,23 +26,23 @@ class CollectingSubscriber : public Subscriber { return values_; } -private: + private: std::vector values_; }; /// Construct a pipeline with a collecting subscriber against the supplied /// flowable. Return the items that were sent to the subscriber. If some /// exception was sent, the exception is thrown. -template +template std::vector run(Reference> flowable) { - auto collector = Reference>( - new CollectingSubscriber); + auto collector = + Reference>(new CollectingSubscriber); auto subscriber = Reference>(collector.get()); flowable->subscribe(std::move(subscriber)); return collector->values(); } -} // namespace +} // namespace TEST(FlowableTest, SingleFlowable) { ASSERT_EQ(std::size_t{0}, Refcounted::objects()); @@ -58,38 +58,42 @@ TEST(FlowableTest, SingleFlowable) { TEST(FlowableTest, JustFlowable) { ASSERT_EQ(std::size_t{0}, Refcounted::objects()); EXPECT_EQ(run(Flowables::just(22)), std::vector{22}); - EXPECT_EQ(run(Flowables::just({12, 34, 56, 98})), - std::vector({12, 34, 56, 98})); - EXPECT_EQ(run(Flowables::just({"ab", "pq", "yz"})), - std::vector({"ab", "pq", "yz"})); + EXPECT_EQ( + run(Flowables::just({12, 34, 56, 98})), + std::vector({12, 34, 56, 98})); + EXPECT_EQ( + run(Flowables::just({"ab", "pq", "yz"})), + std::vector({"ab", "pq", "yz"})); EXPECT_EQ(std::size_t{0}, Refcounted::objects()); } TEST(FlowableTest, Range) { ASSERT_EQ(std::size_t{0}, Refcounted::objects()); - EXPECT_EQ(run(Flowables::range(10, 15)), std::vector( - {10, 11, 12, 13, 14})); + EXPECT_EQ( + run(Flowables::range(10, 15)), + std::vector({10, 11, 12, 13, 14})); EXPECT_EQ(std::size_t{0}, Refcounted::objects()); } TEST(FlowableTest, RangeWithMap) { ASSERT_EQ(std::size_t{0}, Refcounted::objects()); auto flowable = Flowables::range(1, 4) - ->map([](int64_t v) { return v * v; }) - ->map([](int64_t v) { return v * v; }) - ->map([](int64_t v) { return std::to_string(v); }); - EXPECT_EQ(run(std::move(flowable)), - std::vector({"1", "16", "81"})); + ->map([](int64_t v) { return v * v; }) + ->map([](int64_t v) { return v * v; }) + ->map([](int64_t v) { return std::to_string(v); }); + EXPECT_EQ( + run(std::move(flowable)), std::vector({"1", "16", "81"})); EXPECT_EQ(std::size_t{0}, Refcounted::objects()); } TEST(FlowableTest, SimpleTake) { ASSERT_EQ(std::size_t{0}, Refcounted::objects()); - EXPECT_EQ(run(Flowables::range(0, 100)->take(3)), - std::vector({0, 1, 2})); - EXPECT_EQ(run(Flowables::range(10, 15)), std::vector( - {10, 11, 12, 13, 14})); + EXPECT_EQ( + run(Flowables::range(0, 100)->take(3)), std::vector({0, 1, 2})); + EXPECT_EQ( + run(Flowables::range(10, 15)), + std::vector({10, 11, 12, 13, 14})); EXPECT_EQ(std::size_t{0}, Refcounted::objects()); } -} // yarpl +} // yarpl diff --git a/experimental/yarpl/test/v/RefcountedTest.cpp b/experimental/yarpl/test/v/RefcountedTest.cpp index 96a93401e..30fedd79c 100644 --- a/experimental/yarpl/test/v/RefcountedTest.cpp +++ b/experimental/yarpl/test/v/RefcountedTest.cpp @@ -16,7 +16,7 @@ TEST(RefcountedTest, ObjectCountsAreMaintained) { EXPECT_EQ(i, Refcounted::objects()); v.push_back(std::make_unique()); EXPECT_EQ(i + 1, Refcounted::objects()); - EXPECT_EQ(0U, v[i]->count()); // no references. + EXPECT_EQ(0U, v[i]->count()); // no references. } v.resize(11); @@ -59,4 +59,4 @@ TEST(RefcountedTest, ReferenceCountingWorks) { EXPECT_EQ(0U, Refcounted::objects()); } -} // yarpl +} // yarpl