Skip to content

Commit

Permalink
RSocket with Flowable. (#356)
Browse files Browse the repository at this point in the history
* RSocket with Flowable

* Update RSocket Tests to Flowable

* Shorten String Capture Code
  • Loading branch information
benjchristensen authored and lehecka committed Apr 26, 2017
1 parent 0907520 commit a8ef49e
Show file tree
Hide file tree
Showing 32 changed files with 307 additions and 483 deletions.
22 changes: 12 additions & 10 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ add_subdirectory(experimental/yarpl)
add_library(
rsocket_experimental
experimental/rsocket/RSocket.h
# TODO remove when ReactiveStreams all synced
experimental/rsocket/OldNewBridge.h
experimental/rsocket-src/RSocket.cpp
experimental/rsocket/RSocketServer.h
experimental/rsocket-src/RSocketServer.cpp
Expand All @@ -399,9 +401,10 @@ add_library(
experimental/rsocket-src/transports/TcpConnectionAcceptor.cpp
experimental/rsocket/transports/TcpConnectionFactory.h
experimental/rsocket-src/transports/TcpConnectionFactory.cpp
experimental/rsocket/RSocketRequestHandler.h
)

add_dependencies(rsocket_experimental ReactiveStreams)
add_dependencies(rsocket_experimental ReactiveStreams yarpl)

# include the experimental includes for usage
target_include_directories(rsocket_experimental PUBLIC "${PROJECT_SOURCE_DIR}/experimental")
Expand All @@ -416,8 +419,6 @@ add_executable(
experimental/rsocket-test/RSocketClientServerTest.cpp
experimental/rsocket-test/handlers/HelloStreamRequestHandler.h
experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp
experimental/rsocket-test/handlers/HelloStreamSubscription.h
experimental/rsocket-test/handlers/HelloStreamSubscription.cpp
)

target_link_libraries(
Expand Down Expand Up @@ -459,16 +460,14 @@ target_link_libraries(
add_executable(
example_stream-hello-world-server
examples/stream-hello-world/StreamHelloWorld_Server.cpp
examples/stream-hello-world/HelloStreamSubscription.cpp
examples/stream-hello-world/HelloStreamSubscription.h
examples/stream-hello-world/HelloStreamRequestHandler.cpp
examples/stream-hello-world/HelloStreamRequestHandler.h)
)

target_link_libraries(
example_stream-hello-world-server
ReactiveSocket
rsocket_experimental
reactivesocket_examples_util
yarpl
${FOLLY_LIBRARIES}
${GFLAGS_LIBRARY}
${GLOG_LIBRARY}
Expand All @@ -484,6 +483,7 @@ target_link_libraries(
ReactiveSocket
rsocket_experimental
reactivesocket_examples_util
yarpl
${FOLLY_LIBRARIES}
${GFLAGS_LIBRARY}
${GLOG_LIBRARY}
Expand All @@ -496,15 +496,16 @@ add_executable(
examples/conditional-request-handling/ConditionalRequestHandling_Server.cpp
examples/conditional-request-handling/TextRequestHandler.h
examples/conditional-request-handling/TextRequestHandler.cpp
examples/conditional-request-handling/ConditionalRequestSubscription.h
examples/conditional-request-handling/ConditionalRequestSubscription.cpp
examples/conditional-request-handling/JsonRequestHandler.cpp examples/conditional-request-handling/JsonRequestHandler.h)
examples/conditional-request-handling/JsonRequestHandler.cpp
examples/conditional-request-handling/JsonRequestHandler.h
)

target_link_libraries(
example_conditional-request-handling-server
ReactiveSocket
rsocket_experimental
reactivesocket_examples_util
yarpl
${FOLLY_LIBRARIES}
${GFLAGS_LIBRARY}
${GLOG_LIBRARY}
Expand All @@ -520,6 +521,7 @@ target_link_libraries(
ReactiveSocket
rsocket_experimental
reactivesocket_examples_util
yarpl
${FOLLY_LIBRARIES}
${GFLAGS_LIBRARY}
${GLOG_LIBRARY}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,44 @@
// Copyright 2004-present Facebook. All Rights Reserved.

#include <folly/init/Init.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <iostream>
#include "examples/util/ExampleSubscriber.h"
#include "rsocket/RSocket.h"
#include "rsocket/transports/TcpConnectionFactory.h"
#include "yarpl/v/Flowable.h"
#include "yarpl/v/Subscriber.h"

using namespace ::reactivesocket;
using namespace ::folly;
using namespace ::rsocket_example;
using namespace ::rsocket;
using namespace yarpl;

DEFINE_string(host, "localhost", "host to connect to");
DEFINE_int32(port, 9898, "host:port to connect to");

int main(int argc, char* argv[]) {
FLAGS_logtostderr = true;
FLAGS_minloglevel = 0;
google::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
folly::init(&argc, &argv);

auto rsf = RSocket::createClient(
TcpConnectionFactory::create(FLAGS_host, FLAGS_port));
auto rs = rsf->connect().get();

LOG(INFO) << "------------------ Hello Bob!";
auto s1 = std::make_shared<ExampleSubscriber>(5, 6);
rs->requestStream(Payload("Bob"), s1);
auto s1 = yarpl::Reference<ExampleSubscriber>(new ExampleSubscriber(5, 6));
rs->requestStream(Payload("Bob"))
->take(5)
->subscribe(yarpl::Reference<yarpl::Subscriber<Payload>>(s1.get()));
s1->awaitTerminalEvent();

LOG(INFO) << "------------------ Hello Jane!";
auto s2 = std::make_shared<ExampleSubscriber>(5, 6);
rs->requestStream(Payload("Jane"), s2);
auto s2 = yarpl::Reference<ExampleSubscriber>(new ExampleSubscriber(5, 6));
rs->requestStream(Payload("Jane"))
->take(3)
->subscribe(yarpl::Reference<yarpl::Subscriber<Payload>>(s2.get()));
s2->awaitTerminalEvent();

// TODO on shutdown the destruction of
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright 2004-present Facebook. All Rights Reserved.

#include <iostream>
#include "JsonRequestHandler.h"
#include "TextRequestHandler.h"
#include <folly/init/Init.h>
#include "rsocket/RSocket.h"
#include "rsocket/RSocketErrors.h"
#include "rsocket/transports/TcpConnectionAcceptor.h"
#include "JsonRequestHandler.h"
#include "TextRequestHandler.h"

using namespace ::reactivesocket;
using namespace ::folly;
Expand Down Expand Up @@ -33,7 +34,7 @@ int main(int argc, char* argv[]) {
// start accepting connections
rs->startAndPark(
[textHandler, jsonHandler](std::unique_ptr<ConnectionSetupRequest> r)
-> std::shared_ptr<RequestHandler> {
-> std::shared_ptr<RSocketRequestHandler> {
if (r->getDataMimeType() == "text/plain") {
LOG(INFO) << "Connection Request => text/plain MimeType";
return textHandler;
Expand Down

This file was deleted.

This file was deleted.

33 changes: 15 additions & 18 deletions examples/conditional-request-handling/JsonRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,26 @@

#include "JsonRequestHandler.h"
#include <string>
#include "ConditionalRequestSubscription.h"
#include "yarpl/v/Flowable.h"
#include "yarpl/v/Flowables.h"

using namespace ::reactivesocket;
using namespace reactivesocket;
using namespace rsocket;
using namespace yarpl;

/// Handles a new inbound Stream requested by the other end.
void JsonRequestHandler::handleRequestStream(
Payload request,
StreamId streamId,
const std::shared_ptr<Subscriber<Payload>>& response) noexcept {
yarpl::Reference<yarpl::Flowable<reactivesocket::Payload>>
JsonRequestHandler::handleRequestStream(Payload request, StreamId streamId) {
LOG(INFO) << "JsonRequestHandler.handleRequestStream " << request;

// string from payload data
auto pds = request.moveDataToString();
auto requestString = std::string(pds, request.data->length());
auto requestString = request.moveDataToString();

response->onSubscribe(std::make_shared<ConditionalRequestSubscription>(
response, requestString, 10));
}

std::shared_ptr<StreamState> JsonRequestHandler::handleSetupPayload(
ReactiveSocket& socket,
ConnectionSetupPayload request) noexcept {
LOG(INFO) << "JsonRequestHandler.handleSetupPayload " << request;
// TODO what should this do?
return nullptr;
return Flowables::range(1, 100)->map([name = std::move(requestString)](
int64_t v) {
std::stringstream ss;
ss << "Hello (should be JSON) " << name << " " << v << "!";
std::string s = ss.str();
return Payload(s, "metadata");
});
}
22 changes: 6 additions & 16 deletions examples/conditional-request-handling/JsonRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,14 @@

#pragma once

#include <folly/ExceptionWrapper.h>
#include "src/NullRequestHandler.h"
#include "src/Payload.h"
#include "src/ReactiveStreamsCompat.h"
#include "src/ReactiveSocket.h"
#include "src/SubscriptionBase.h"
#include "rsocket/RSocket.h"

class JsonRequestHandler : public reactivesocket::DefaultRequestHandler {
class JsonRequestHandler : public rsocket::RSocketRequestHandler {
public:
/// Handles a new inbound Stream requested by the other end.
void handleRequestStream(
reactivesocket::Payload request,
reactivesocket::StreamId streamId,
const std::shared_ptr<
reactivesocket::Subscriber<reactivesocket::Payload>>&
response) noexcept override;

std::shared_ptr<reactivesocket::StreamState> handleSetupPayload(
reactivesocket::ReactiveSocket&,
reactivesocket::ConnectionSetupPayload request) noexcept override;
yarpl::Reference<yarpl::Flowable<reactivesocket::Payload>>
handleRequestStream(
reactivesocket::Payload request,
reactivesocket::StreamId streamId) override;
};
33 changes: 15 additions & 18 deletions examples/conditional-request-handling/TextRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,26 @@

#include "TextRequestHandler.h"
#include <string>
#include "ConditionalRequestSubscription.h"
#include "yarpl/v/Flowable.h"
#include "yarpl/v/Flowables.h"

using namespace ::reactivesocket;
using namespace reactivesocket;
using namespace rsocket;
using namespace yarpl;

/// Handles a new inbound Stream requested by the other end.
void TextRequestHandler::handleRequestStream(
Payload request,
StreamId streamId,
const std::shared_ptr<Subscriber<Payload>>& response) noexcept {
yarpl::Reference<yarpl::Flowable<reactivesocket::Payload>>
TextRequestHandler::handleRequestStream(Payload request, StreamId streamId) {
LOG(INFO) << "TextRequestHandler.handleRequestStream " << request;

// string from payload data
auto pds = request.moveDataToString();
auto requestString = std::string(pds, request.data->length());
auto requestString = request.moveDataToString();

response->onSubscribe(std::make_shared<ConditionalRequestSubscription>(
response, requestString, 10));
}

std::shared_ptr<StreamState> TextRequestHandler::handleSetupPayload(
ReactiveSocket& socket,
ConnectionSetupPayload request) noexcept {
LOG(INFO) << "TextRequestHandler.handleSetupPayload " << request;
// TODO what should this do?
return nullptr;
return Flowables::range(1, 100)->map([name = std::move(requestString)](
int64_t v) {
std::stringstream ss;
ss << "Hello " << name << " " << v << "!";
std::string s = ss.str();
return Payload(s, "metadata");
});
}
Loading

0 comments on commit a8ef49e

Please sign in to comment.