Skip to content

Commit

Permalink
Apply clang-format update fixes
Browse files Browse the repository at this point in the history
Reviewed By: igorsugak

Differential Revision: D25861960

fbshipit-source-id: e3c39c080429058a58cdc66d45350e5d1420f98c
  • Loading branch information
zertosh authored and facebook-github-bot committed Jan 10, 2021
1 parent e43ed46 commit e4be70c
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 40 deletions.
12 changes: 6 additions & 6 deletions rsocket/RSocketResponder.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,24 @@ class RSocketResponderAdapter : public RSocketResponderCore {
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> handleRequestChannel(
Payload request,
StreamId streamId,
std::shared_ptr<yarpl::flowable::Subscriber<Payload>>
response) noexcept override;
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> response) noexcept
override;

/// Internal method for handling stream requests, not intended to be used
/// by application code.
void handleRequestStream(
Payload request,
StreamId streamId,
std::shared_ptr<yarpl::flowable::Subscriber<Payload>>
response) noexcept override;
std::shared_ptr<yarpl::flowable::Subscriber<Payload>> response) noexcept
override;

/// Internal method for handling request-response requests, not intended to be
/// used by application code.
void handleRequestResponse(
Payload request,
StreamId streamId,
std::shared_ptr<yarpl::single::SingleObserver<Payload>>
response) noexcept override;
std::shared_ptr<yarpl::single::SingleObserver<Payload>> response) noexcept
override;

void handleFireAndForget(Payload request, StreamId streamId) override;
void handleMetadataPush(std::unique_ptr<folly::IOBuf> buf) override;
Expand Down
4 changes: 2 additions & 2 deletions rsocket/benchmarks/StreamThroughputMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ class Factory : public ConnectionFactory {
ProtocolVersion,
ResumeStatus /* unused */) override {
return folly::via(worker_.getEventBase(), [this] {
return ConnectedDuplexConnection{std::move(connection_),
*worker_.getEventBase()};
return ConnectedDuplexConnection{
std::move(connection_), *worker_.getEventBase()};
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ class PushStreamRequestResponder : public rsocket::RSocketResponder {
std::string payloadData = ss.str();
s->onNext(Payload(payloadData, "metadata"));
}
})
.detach();
}).detach();
})
->toFlowable(BackpressureStrategy::DROP);
}
Expand Down
14 changes: 7 additions & 7 deletions rsocket/test/RequestChannelTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,13 @@ TEST(RequestChannelTest, TestLargePayload) {
return Payload(seedPayload.data->clone(), seedPayload.metadata->clone());
};

auto requests = yarpl::flowable::Flowable<Payload>::create(
[&](auto& subscriber, int64_t num) {
while (num--) {
subscriber.onNext(makePayload());
}
})
->take(3);
auto requests =
yarpl::flowable::Flowable<Payload>::create([&](auto& subscriber,
int64_t num) {
while (num--) {
subscriber.onNext(makePayload());
}
})->take(3);

requester->requestChannel(std::move(requests))
->map([&](Payload p) {
Expand Down
3 changes: 1 addition & 2 deletions rsocket/test/RequestResponseTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class TestHandlerCancel : public rsocket::RSocketResponder {
} else {
// if not cancelled would do work and emit here
}
})
.detach();
}).detach();
});
}

Expand Down
32 changes: 14 additions & 18 deletions rsocket/transports/tcp/TcpConnectionAcceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,20 @@ void TcpConnectionAcceptor::start(OnDuplexConnectionAccept onAccept) {

// The AsyncServerSocket needs to be accessed from the listener thread only.
// This will propagate out any exceptions the listener throws.
folly::via(
serverThread_->getEventBase(),
[this] {
serverSocket_->bind(options_.address);

for (auto const& callback : callbacks_) {
serverSocket_->addAcceptCallback(
callback.get(), callback->eventBase());
}

serverSocket_->listen(options_.backlog);
serverSocket_->startAccepting();

for (const auto& i : serverSocket_->getAddresses()) {
VLOG(1) << "Listening on " << i.describe();
}
})
.get();
folly::via(serverThread_->getEventBase(), [this] {
serverSocket_->bind(options_.address);

for (auto const& callback : callbacks_) {
serverSocket_->addAcceptCallback(callback.get(), callback->eventBase());
}

serverSocket_->listen(options_.backlog);
serverSocket_->startAccepting();

for (const auto& i : serverSocket_->getAddresses()) {
VLOG(1) << "Listening on " << i.describe();
}
}).get();
}

void TcpConnectionAcceptor::stop() {
Expand Down
5 changes: 2 additions & 3 deletions rsocket/transports/tcp/TcpDuplexConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ class TcpReaderWriter : public folly::AsyncTransportWrapper::WriteCallback,
intrusive_ptr_release(this);
}

void writeErr(
size_t,
const folly::AsyncSocketException& exn) noexcept override {
void writeErr(size_t, const folly::AsyncSocketException& exn) noexcept
override {
closeErr(folly::exception_wrapper{std::make_exception_ptr(exn), exn});
intrusive_ptr_release(this);
}
Expand Down

0 comments on commit e4be70c

Please sign in to comment.