diff --git a/rsocket/RSocketResponder.h b/rsocket/RSocketResponder.h index 4fc17cc9c..eedcc2ef8 100644 --- a/rsocket/RSocketResponder.h +++ b/rsocket/RSocketResponder.h @@ -126,24 +126,24 @@ class RSocketResponderAdapter : public RSocketResponderCore { std::shared_ptr> handleRequestChannel( Payload request, StreamId streamId, - std::shared_ptr> - response) noexcept override; + std::shared_ptr> response) noexcept + override; /// Internal method for handling stream requests, not intended to be used /// by application code. void handleRequestStream( Payload request, StreamId streamId, - std::shared_ptr> - response) noexcept override; + std::shared_ptr> response) noexcept + override; /// Internal method for handling request-response requests, not intended to be /// used by application code. void handleRequestResponse( Payload request, StreamId streamId, - std::shared_ptr> - response) noexcept override; + std::shared_ptr> response) noexcept + override; void handleFireAndForget(Payload request, StreamId streamId) override; void handleMetadataPush(std::unique_ptr buf) override; diff --git a/rsocket/benchmarks/StreamThroughputMemory.cpp b/rsocket/benchmarks/StreamThroughputMemory.cpp index dc47bf4e6..c5128152e 100644 --- a/rsocket/benchmarks/StreamThroughputMemory.cpp +++ b/rsocket/benchmarks/StreamThroughputMemory.cpp @@ -140,8 +140,8 @@ class Factory : public ConnectionFactory { ProtocolVersion, ResumeStatus /* unused */) override { return folly::via(worker_.getEventBase(), [this] { - return ConnectedDuplexConnection{std::move(connection_), - *worker_.getEventBase()}; + return ConnectedDuplexConnection{ + std::move(connection_), *worker_.getEventBase()}; }); } diff --git a/rsocket/examples/stream-observable-to-flowable/StreamObservableToFlowable_Server.cpp b/rsocket/examples/stream-observable-to-flowable/StreamObservableToFlowable_Server.cpp index 2e9b633a6..30f2b3d5e 100644 --- a/rsocket/examples/stream-observable-to-flowable/StreamObservableToFlowable_Server.cpp +++ b/rsocket/examples/stream-observable-to-flowable/StreamObservableToFlowable_Server.cpp @@ -73,8 +73,7 @@ class PushStreamRequestResponder : public rsocket::RSocketResponder { std::string payloadData = ss.str(); s->onNext(Payload(payloadData, "metadata")); } - }) - .detach(); + }).detach(); }) ->toFlowable(BackpressureStrategy::DROP); } diff --git a/rsocket/test/RequestChannelTest.cpp b/rsocket/test/RequestChannelTest.cpp index 6697e8795..4a815166a 100644 --- a/rsocket/test/RequestChannelTest.cpp +++ b/rsocket/test/RequestChannelTest.cpp @@ -442,13 +442,13 @@ TEST(RequestChannelTest, TestLargePayload) { return Payload(seedPayload.data->clone(), seedPayload.metadata->clone()); }; - auto requests = yarpl::flowable::Flowable::create( - [&](auto& subscriber, int64_t num) { - while (num--) { - subscriber.onNext(makePayload()); - } - }) - ->take(3); + auto requests = + yarpl::flowable::Flowable::create([&](auto& subscriber, + int64_t num) { + while (num--) { + subscriber.onNext(makePayload()); + } + })->take(3); requester->requestChannel(std::move(requests)) ->map([&](Payload p) { diff --git a/rsocket/test/RequestResponseTest.cpp b/rsocket/test/RequestResponseTest.cpp index 126147f8c..313ff124d 100644 --- a/rsocket/test/RequestResponseTest.cpp +++ b/rsocket/test/RequestResponseTest.cpp @@ -66,8 +66,7 @@ class TestHandlerCancel : public rsocket::RSocketResponder { } else { // if not cancelled would do work and emit here } - }) - .detach(); + }).detach(); }); } diff --git a/rsocket/transports/tcp/TcpConnectionAcceptor.cpp b/rsocket/transports/tcp/TcpConnectionAcceptor.cpp index 0c1c41b80..92f0995d5 100644 --- a/rsocket/transports/tcp/TcpConnectionAcceptor.cpp +++ b/rsocket/transports/tcp/TcpConnectionAcceptor.cpp @@ -91,24 +91,20 @@ void TcpConnectionAcceptor::start(OnDuplexConnectionAccept onAccept) { // The AsyncServerSocket needs to be accessed from the listener thread only. // This will propagate out any exceptions the listener throws. - folly::via( - serverThread_->getEventBase(), - [this] { - serverSocket_->bind(options_.address); - - for (auto const& callback : callbacks_) { - serverSocket_->addAcceptCallback( - callback.get(), callback->eventBase()); - } - - serverSocket_->listen(options_.backlog); - serverSocket_->startAccepting(); - - for (const auto& i : serverSocket_->getAddresses()) { - VLOG(1) << "Listening on " << i.describe(); - } - }) - .get(); + folly::via(serverThread_->getEventBase(), [this] { + serverSocket_->bind(options_.address); + + for (auto const& callback : callbacks_) { + serverSocket_->addAcceptCallback(callback.get(), callback->eventBase()); + } + + serverSocket_->listen(options_.backlog); + serverSocket_->startAccepting(); + + for (const auto& i : serverSocket_->getAddresses()) { + VLOG(1) << "Listening on " << i.describe(); + } + }).get(); } void TcpConnectionAcceptor::stop() { diff --git a/rsocket/transports/tcp/TcpDuplexConnection.cpp b/rsocket/transports/tcp/TcpDuplexConnection.cpp index 6a3f91a18..97c9602ce 100644 --- a/rsocket/transports/tcp/TcpDuplexConnection.cpp +++ b/rsocket/transports/tcp/TcpDuplexConnection.cpp @@ -107,9 +107,8 @@ class TcpReaderWriter : public folly::AsyncTransportWrapper::WriteCallback, intrusive_ptr_release(this); } - void writeErr( - size_t, - const folly::AsyncSocketException& exn) noexcept override { + void writeErr(size_t, const folly::AsyncSocketException& exn) noexcept + override { closeErr(folly::exception_wrapper{std::make_exception_ptr(exn), exn}); intrusive_ptr_release(this); }