diff --git a/.github/workflows/ubuntu-monitoring.yml b/.github/workflows/ubuntu-special.yml similarity index 85% rename from .github/workflows/ubuntu-monitoring.yml rename to .github/workflows/ubuntu-special.yml index a0dc9091..d7554cff 100644 --- a/.github/workflows/ubuntu-monitoring.yml +++ b/.github/workflows/ubuntu-special.yml @@ -1,4 +1,4 @@ -name: Monitoring +name: Special on: push: @@ -7,14 +7,14 @@ on: pull_request: jobs: - ubuntu-monitoring-build: - name: Build on Ubuntu with monitoring support + ubuntu-special-build: + name: Build on Ubuntu with monitoring / protobuf support runs-on: ubuntu-latest strategy: matrix: compiler: [g++-11] buildmode: [Debug] - build-prometheus-from-source: [0, 1] + build-special-from-source: [0, 1] steps: - name: Checkout repository code @@ -38,7 +38,7 @@ jobs: ninja sudo cmake --install . - if: matrix.build-prometheus-from-source == 0 + if: matrix.build-special-from-source == 0 - name: Create Build Environment run: cmake -E make_directory ${{github.workspace}}/build @@ -46,7 +46,7 @@ jobs: - name: Configure CMake working-directory: ${{github.workspace}}/build shell: bash - run: cmake $GITHUB_WORKSPACE -DCMAKE_BUILD_TYPE=${{matrix.buildmode}} -DCMAKE_CXX_COMPILER=${{matrix.compiler}} -DCCT_BUILD_PROMETHEUS_FROM_SRC=${{matrix.build-prometheus-from-source}} -GNinja + run: cmake $GITHUB_WORKSPACE -DCMAKE_BUILD_TYPE=${{matrix.buildmode}} -DCMAKE_CXX_COMPILER=${{matrix.compiler}} -DCCT_BUILD_PROMETHEUS_FROM_SRC=${{matrix.build-special-from-source}} -DCCT_ENABLE_PROTO=${{matrix.build-special-from-source}} -GNinja - name: Build working-directory: ${{github.workspace}}/build diff --git a/CMakeLists.txt b/CMakeLists.txt index a8d536a0..e80eedce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ option(CCT_ENABLE_TESTS "Build the unit tests" ${MAIN_PROJECT}) option(CCT_BUILD_EXEC "Build an executable instead of a static library" ${MAIN_PROJECT}) option(CCT_ENABLE_ASAN "Compile with AddressSanitizer" ${CCT_ASAN_BUILD}) option(CCT_ENABLE_CLANG_TIDY "Compile with clang-tidy checks" OFF) +option(CCT_ENABLE_PROTO "Compile with protobuf support (to export data to the outside world)" ON) option(CCT_BUILD_PROMETHEUS_FROM_SRC "Fetch and build from prometheus-cpp sources" OFF) set(CCT_DATA_DIR "${CMAKE_CURRENT_SOURCE_DIR}/data" CACHE PATH "Needed data directory for coincenter. Can also be overriden at runtime with this environment variable") @@ -83,6 +84,7 @@ if(CCT_ENABLE_TESTS) enable_testing() endif() +# nlohmann_json - coincenter json library find_package(nlohmann_json CONFIG) if(NOT nlohmann_json_FOUND) FetchContent_Declare( @@ -94,6 +96,7 @@ if(NOT nlohmann_json_FOUND) FetchContent_MakeAvailable(nlohmann_json) endif() +# spdlog - coincenter logging library find_package(spdlog CONFIG) if(NOT spdlog_FOUND) FetchContent_Declare( @@ -133,6 +136,30 @@ else() endif() endif() +if(CCT_ENABLE_PROTO) + find_package(Protobuf CONFIG) + if(protobuf_FOUND) + message(STATUS "Linking with protobuf ${protobuf_VERSION}") + else() + set(PROTOBUF_VERSION v25.2) + + message(STATUS "Compiling protobuf ${PROTOBUF_VERSION} from sources") + + set(protobuf_BUILD_TESTS OFF) + set(ABSL_PROPAGATE_CXX_STD ON) + + FetchContent_Declare( + protobuf + GIT_REPOSITORY https://github.com/protocolbuffers/protobuf.git + GIT_TAG ${PROTOBUF_VERSION} + ) + FetchContent_MakeAvailable(protobuf) + + include(${protobuf_SOURCE_DIR}/cmake/protobuf-generate.cmake) + + endif() +endif() + # Unit Tests #[[ Create an executable @@ -248,12 +275,18 @@ if(CCT_ENABLE_PROMETHEUS) add_compile_definitions(CCT_ENABLE_PROMETHEUS) endif() +if(CCT_ENABLE_PROTO) + add_compile_definitions(CCT_ENABLE_PROTO) + add_compile_definitions("CCT_PROTOBUF_VERSION=\"${PROTOBUF_VERSION}\"") +endif() + # Link to sub folders CMakeLists.txt, from the lowest level to the highest level for documentation # (beware of cyclic dependencies) add_subdirectory(src/tech) add_subdirectory(src/monitoring) add_subdirectory(src/http-request) add_subdirectory(src/objects) +add_subdirectory(src/serialization) add_subdirectory(src/api-objects) add_subdirectory(src/api) add_subdirectory(src/engine) diff --git a/CONFIG.md b/CONFIG.md index 4e341dd9..8343d1ea 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -146,6 +146,7 @@ Refer to the hardcoded default json example as a model in case of doubt. | *query* | **updateFrequency.depositWallet** | Duration string (ex: `1min`) | Minimum time between two consecutive requests of deposit information (including wallet) | | *query* | **updateFrequency.currencyInfo** | Duration string (ex: `4h`) | Minimum time between two consecutive requests of dynamic currency info retrieval on Bithumb only (used for place order) | | *query* | **placeSimulateRealOrder** | Boolean (`true` or `false`) | If `true`, in trade simulation mode (with `--sim`) exchanges which do not support simulated mode in place order will actually place a real order, with the following characteristics: This will allow place of a 'real' order that cannot be matched in practice (if it is, lucky you!) | +| *query* | **marketDataSerialization** | Boolean (`true` or `false`) | If `true` and `coincenter` is compiled with **protobuf** support, some market data will automatically be exported in the `data/serialization` directory (`orderbook` and `last-trades`) for a long term storage | | *query* | **multiTradeAllowedByDefault** | Boolean (`true` or `false`) | If `true`, [multi-trade](README.md#multi-trade) will be allowed by default for `trade`, `buy` and `sell`. It can be overridden at command line level with `--no-multi-trade` and `--multi-trade`. | | *query* | **validateApiKey** | Boolean (`true` or `false`) | If `true`, each loaded private key will be tested at start of the program. In case of a failure, it will be removed from the list of private accounts loaded by `coincenter`, so that later queries do not consider it instead of raising a runtime exception. The downside is that it will make an additional check that will make startup slower. | | | *tradefees* | **maker** | String as decimal number representing a percentage (for instance, "0.15") | Trade fees occurring when a maker order is matched | diff --git a/Dockerfile b/Dockerfile index 73202ad0..32dd1a6c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,12 +23,14 @@ ARG BUILD_MODE=Release ARG BUILD_TEST=0 ARG BUILD_ASAN=0 ARG BUILD_WITH_PROMETHEUS=1 +ARG BUILD_WITH_PROTOBUF=1 # Build and launch tests if any RUN cmake -DCMAKE_BUILD_TYPE=${BUILD_MODE} \ -DCCT_ENABLE_TESTS=${BUILD_TEST} \ -DCCT_ENABLE_ASAN=${BUILD_ASAN} \ -DCCT_BUILD_PROMETHEUS_FROM_SRC=${BUILD_WITH_PROMETHEUS} \ + -DCCT_ENABLE_PROTO=${BUILD_WITH_PROTOBUF} \ -GNinja .. && \ ninja && \ if [ "$BUILD_TEST" = "1" -o "$BUILD_TEST" = "ON" ]; then \ diff --git a/alpine.Dockerfile b/alpine.Dockerfile index f862cc90..86e83c6e 100644 --- a/alpine.Dockerfile +++ b/alpine.Dockerfile @@ -2,7 +2,7 @@ FROM alpine:3.19.0 AS build # Install base & build dependencies, needed certificates for curl to work with https -RUN apk add --update --upgrade --no-cache g++ libc-dev curl-dev cmake ninja git ca-certificates +RUN apk add --update --upgrade --no-cache g++ linux-headers libc-dev curl-dev cmake ninja git ca-certificates # Set default directory for application WORKDIR /app @@ -18,12 +18,14 @@ ARG BUILD_MODE=Release ARG BUILD_TEST=0 ARG BUILD_ASAN=0 ARG BUILD_WITH_PROMETHEUS=1 +ARG BUILD_WITH_PROTOBUF=1 # Build and launch tests if any RUN cmake -DCMAKE_BUILD_TYPE=${BUILD_MODE} \ -DCCT_ENABLE_TESTS=${BUILD_TEST} \ -DCCT_ENABLE_ASAN=${BUILD_ASAN} \ -DCCT_BUILD_PROMETHEUS_FROM_SRC=${BUILD_WITH_PROMETHEUS} \ + -DCCT_ENABLE_PROTO=${BUILD_WITH_PROTOBUF} \ -GNinja .. && \ ninja && \ if [ "$BUILD_TEST" = "1" -o "$BUILD_TEST" = "ON" ]; then \ diff --git a/src/api/interface/CMakeLists.txt b/src/api/interface/CMakeLists.txt index a17d1968..df998a70 100644 --- a/src/api/interface/CMakeLists.txt +++ b/src/api/interface/CMakeLists.txt @@ -3,6 +3,7 @@ aux_source_directory(src API_INTERFACE_SRC) add_library(coincenter_api-interface STATIC ${API_INTERFACE_SRC}) target_link_libraries(coincenter_api-interface PUBLIC coincenter_api-exchange) +target_link_libraries(coincenter_api-interface PUBLIC coincenter_serialization) target_link_libraries(coincenter_api-interface PRIVATE coincenter_monitoring) target_include_directories(coincenter_api-interface PUBLIC include) diff --git a/src/api/interface/include/exchange.hpp b/src/api/interface/include/exchange.hpp index c5939398..de4d2adc 100644 --- a/src/api/interface/include/exchange.hpp +++ b/src/api/interface/include/exchange.hpp @@ -1,7 +1,9 @@ #pragma once +#include #include #include +#include #include "cct_exception.hpp" #include "currencycode.hpp" @@ -17,17 +19,20 @@ #include "monetaryamountbycurrencyset.hpp" namespace cct { +class AbstractMarketDataSerializer; class Exchange { public: using ExchangePublic = api::ExchangePublic; /// Builds a Exchange without private exchange. All private requests will be forbidden. - Exchange(const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic); + Exchange(std::string_view dataDir, const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic); /// Build a Exchange with both private and public exchanges - Exchange(const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic, + Exchange(std::string_view dataDir, const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic, api::ExchangePrivate &exchangePrivate); + ~Exchange(); + std::string_view name() const { return _exchangePublic.name(); } std::string_view keyName() const { return apiPrivate().keyName(); } @@ -86,16 +91,12 @@ class Exchange { return _exchangePublic.queryAllApproximatedOrderBooks(depth); } - MarketOrderBook queryOrderBook(Market mk, int depth = ExchangePublic::kDefaultDepth) { - return _exchangePublic.queryOrderBook(mk, depth); - } + MarketOrderBook queryOrderBook(Market mk, int depth = ExchangePublic::kDefaultDepth); MonetaryAmount queryLast24hVolume(Market mk) { return _exchangePublic.queryLast24hVolume(mk); } /// Retrieve an ordered vector of recent last trades - LastTradesVector queryLastTrades(Market mk, int nbTrades = ExchangePublic::kNbLastTradesDefault) { - return _exchangePublic.queryLastTrades(mk, nbTrades); - } + LastTradesVector queryLastTrades(Market mk, int nbTrades = ExchangePublic::kNbLastTradesDefault); /// Retrieve the last price of given market. MonetaryAmount queryLastPrice(Market mk) { return _exchangePublic.queryLastPrice(mk); } @@ -110,9 +111,15 @@ class Exchange { void updateCacheFile() const; + using trivially_relocatable = std::true_type; + private: + Exchange(std::string_view dataDir, const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic, + api::ExchangePrivate *pExchangePrivate); + api::ExchangePublic &_exchangePublic; api::ExchangePrivate *_pExchangePrivate = nullptr; const ExchangeInfo &_exchangeInfo; + std::unique_ptr _marketDataSerializerPtr; }; } // namespace cct diff --git a/src/api/interface/src/exchange.cpp b/src/api/interface/src/exchange.cpp index 1c7a031d..5741b738 100644 --- a/src/api/interface/src/exchange.cpp +++ b/src/api/interface/src/exchange.cpp @@ -1,6 +1,7 @@ #include "exchange.hpp" #include +#include #include "cct_log.hpp" #include "currencycode.hpp" @@ -8,17 +9,39 @@ #include "exchangeinfo.hpp" #include "exchangeprivateapi.hpp" #include "exchangepublicapi.hpp" +#include "timedef.hpp" + +#ifdef CCT_ENABLE_PROTO +#include "proto-market-data-serializer.hpp" +#else +#include "dummy-market-data-serializer.hpp" +#endif namespace cct { -Exchange::Exchange(const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic, +#ifdef CCT_ENABLE_PROTO +using MarketDataSerializer = ProtobufMarketDataSerializer; +#else +using MarketDataSerializer = DummyMarketDataSerializer; +#endif + +Exchange::Exchange(std::string_view dataDir, const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic, api::ExchangePrivate &exchangePrivate) + : Exchange(dataDir, exchangeInfo, exchangePublic, std::addressof(exchangePrivate)) {} + +Exchange::Exchange(std::string_view dataDir, const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic) + : Exchange(dataDir, exchangeInfo, exchangePublic, nullptr) {} + +Exchange::Exchange(std::string_view dataDir, const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic, + api::ExchangePrivate *pExchangePrivate) : _exchangePublic(exchangePublic), - _pExchangePrivate(std::addressof(exchangePrivate)), - _exchangeInfo(exchangeInfo) {} + _pExchangePrivate(pExchangePrivate), + _exchangeInfo(exchangeInfo), + _marketDataSerializerPtr(_exchangeInfo.withMarketDataSerialization() + ? new MarketDataSerializer(dataDir, exchangePublic.name()) + : nullptr) {} -Exchange::Exchange(const ExchangeInfo &exchangeInfo, api::ExchangePublic &exchangePublic) - : _exchangePublic(exchangePublic), _exchangeInfo(exchangeInfo) {} +Exchange::~Exchange() = default; // declared here to have definition of ~MarketDataSerializer bool Exchange::canWithdraw(CurrencyCode currencyCode, const CurrencyExchangeFlatSet ¤cyExchangeSet) const { if (_exchangeInfo.excludedCurrenciesWithdrawal().contains(currencyCode)) { @@ -41,6 +64,23 @@ bool Exchange::canDeposit(CurrencyCode currencyCode, const CurrencyExchangeFlatS return lb->canDeposit(); } +MarketOrderBook Exchange::queryOrderBook(Market mk, int depth) { + auto marketOrderBook = _exchangePublic.queryOrderBook(mk, depth); + if (_marketDataSerializerPtr) { + _marketDataSerializerPtr->push(Clock::now(), marketOrderBook); + } + return marketOrderBook; +} + +/// Retrieve an ordered vector of recent last trades +LastTradesVector Exchange::queryLastTrades(Market mk, int nbTrades) { + auto lastTrades = _exchangePublic.queryLastTrades(mk, nbTrades); + if (_marketDataSerializerPtr) { + _marketDataSerializerPtr->push(lastTrades); + } + return lastTrades; +} + void Exchange::updateCacheFile() const { _exchangePublic.updateCacheFile(); if (_pExchangePrivate != nullptr) { diff --git a/src/api/interface/src/exchangepool.cpp b/src/api/interface/src/exchangepool.cpp index 11c8cbab..6af98770 100644 --- a/src/api/interface/src/exchangepool.cpp +++ b/src/api/interface/src/exchangepool.cpp @@ -31,6 +31,7 @@ ExchangePool::ExchangePool(const CoincenterInfo& coincenterInfo, FiatConverter& _krakenPublic(_coincenterInfo, _fiatConverter, _commonAPI), _kucoinPublic(_coincenterInfo, _fiatConverter, _commonAPI), _upbitPublic(_coincenterInfo, _fiatConverter, _commonAPI) { + const auto dataDir = coincenterInfo.dataDir(); for (std::string_view exchangeStr : kSupportedExchanges) { api::ExchangePublic* exchangePublic; if (exchangeStr == "binance") { @@ -81,10 +82,10 @@ ExchangePool::ExchangePool(const CoincenterInfo& coincenterInfo, FiatConverter& } } - _exchanges.emplace_back(exchangeInfo, *exchangePublic, *exchangePrivate); + _exchanges.emplace_back(dataDir, exchangeInfo, *exchangePublic, *exchangePrivate); } } else { - _exchanges.emplace_back(exchangeInfo, *exchangePublic); + _exchanges.emplace_back(dataDir, exchangeInfo, *exchangePublic); } } _exchanges.shrink_to_fit(); diff --git a/src/engine/src/coincenteroptions.cpp b/src/engine/src/coincenteroptions.cpp index cee7e967..a9d23ba7 100644 --- a/src/engine/src/coincenteroptions.cpp +++ b/src/engine/src/coincenteroptions.cpp @@ -37,6 +37,10 @@ std::ostream& CoincenterCmdLineOptions::PrintVersion(std::string_view programNam os << "compiled with " << CCT_COMPILER_VERSION << " on " << __DATE__ << " at " << __TIME__ << '\n'; os << " " << GetCurlVersionInfo() << '\n'; os << " " << ssl::GetOpenSSLVersion() << '\n'; +#ifdef CCT_PROTOBUF_VERSION + os << " " + << "protobuf " << CCT_PROTOBUF_VERSION << '\n'; +#endif return os; } diff --git a/src/engine/test/exchangedata_test.hpp b/src/engine/test/exchangedata_test.hpp index 36d9f2ce..d9faeb77 100644 --- a/src/engine/test/exchangedata_test.hpp +++ b/src/engine/test/exchangedata_test.hpp @@ -40,14 +40,22 @@ class ExchangesBaseTest : public ::testing::Test { api::MockExchangePrivate exchangePrivate6{exchangePublic3, coincenterInfo, key4}; api::MockExchangePrivate exchangePrivate7{exchangePublic3, coincenterInfo, key5}; api::MockExchangePrivate exchangePrivate8{exchangePublic1, coincenterInfo, key2}; - Exchange exchange1{coincenterInfo.exchangeInfo(exchangePublic1.name()), exchangePublic1, exchangePrivate1}; - Exchange exchange2{coincenterInfo.exchangeInfo(exchangePublic2.name()), exchangePublic2, exchangePrivate2}; - Exchange exchange3{coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, exchangePrivate3}; - Exchange exchange4{coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, exchangePrivate4}; - Exchange exchange5{coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, exchangePrivate5}; - Exchange exchange6{coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, exchangePrivate6}; - Exchange exchange7{coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, exchangePrivate7}; - Exchange exchange8{coincenterInfo.exchangeInfo(exchangePublic1.name()), exchangePublic1, exchangePrivate8}; + Exchange exchange1{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic1.name()), exchangePublic1, + exchangePrivate1}; + Exchange exchange2{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic2.name()), exchangePublic2, + exchangePrivate2}; + Exchange exchange3{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, + exchangePrivate3}; + Exchange exchange4{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, + exchangePrivate4}; + Exchange exchange5{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, + exchangePrivate5}; + Exchange exchange6{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, + exchangePrivate6}; + Exchange exchange7{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic3.name()), exchangePublic3, + exchangePrivate7}; + Exchange exchange8{kDefaultDataDir, coincenterInfo.exchangeInfo(exchangePublic1.name()), exchangePublic1, + exchangePrivate8}; Market m1{"ETH", "EUR"}; Market m2{"BTC", "EUR"}; diff --git a/src/http-request/src/besturlpicker.cpp b/src/http-request/src/besturlpicker.cpp index b10fa625..1a5e53c8 100644 --- a/src/http-request/src/besturlpicker.cpp +++ b/src/http-request/src/besturlpicker.cpp @@ -85,8 +85,6 @@ void BestURLPicker::storeResponseTimePerBaseURL(int8_t baseUrlPos, uint32_t resp uint64_t newDeviationResponseTime = static_cast(std::sqrt(sumDeviation / nbRequestsToConsider)); using DevType = decltype(stats.avgDeviation); if (newDeviationResponseTime > static_cast(std::numeric_limits::max())) { - log::warn("Cannot update accurately the new deviation response time {} because of overflow", - newDeviationResponseTime); stats.avgDeviation = std::numeric_limits::max(); } else { stats.avgDeviation = static_cast(newDeviationResponseTime); diff --git a/src/main/CMakeLists.txt b/src/main/CMakeLists.txt index d97cde5b..56d9fc2c 100644 --- a/src/main/CMakeLists.txt +++ b/src/main/CMakeLists.txt @@ -15,6 +15,10 @@ endif() target_link_libraries(coincenter PUBLIC coincenter_engine) +if(CCT_ENABLE_PROTO) + target_link_libraries(coincenter PUBLIC protobuf::libprotobuf) +endif() + set_target_properties(coincenter PROPERTIES VERSION ${PROJECT_VERSION} COMPILE_DEFINITIONS_DEBUG "JSON_DEBUG;JSON_SAFE;JSON_ISO_STRICT" diff --git a/src/monitoring/include/prometheusmetricgateway.hpp b/src/monitoring/include/prometheusmetricgateway.hpp index 6be53b15..6615b86f 100644 --- a/src/monitoring/include/prometheusmetricgateway.hpp +++ b/src/monitoring/include/prometheusmetricgateway.hpp @@ -5,9 +5,7 @@ #include #include -#include #include -#include #include "abstractmetricgateway.hpp" #include "timedef.hpp" diff --git a/src/monitoring/include/voidmetricgateway.hpp b/src/monitoring/include/voidmetricgateway.hpp index a0341a23..e4386ca9 100644 --- a/src/monitoring/include/voidmetricgateway.hpp +++ b/src/monitoring/include/voidmetricgateway.hpp @@ -1,8 +1,8 @@ #pragma once -#include - #include "abstractmetricgateway.hpp" +#include "metric.hpp" +#include "monitoringinfo.hpp" namespace cct { class VoidMetricGateway : public AbstractMetricGateway { diff --git a/src/objects/include/exchangeinfo.hpp b/src/objects/include/exchangeinfo.hpp index f9ab1ed2..1d32fd17 100644 --- a/src/objects/include/exchangeinfo.hpp +++ b/src/objects/include/exchangeinfo.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include "apiquerytypeenum.hpp" @@ -16,6 +17,7 @@ namespace cct { class ExchangeInfo { public: enum struct FeeType { kMaker, kTaker }; + enum class MarketDataSerialization : int8_t { kYes, kNo }; struct APIUpdateFrequencies { Duration freq[api::kQueryTypeMax]; @@ -27,7 +29,8 @@ class ExchangeInfo { const APIUpdateFrequencies &apiUpdateFrequencies, Duration publicAPIRate, Duration privateAPIRate, std::string_view acceptEncoding, int dustSweeperMaxNbTrades, log::level::level_enum requestsCallLogLevel, log::level::level_enum requestsAnswerLogLevel, bool multiTradeAllowedByDefault, - bool validateDepositAddressesInFile, bool placeSimulateRealOrder, bool validateApiKey); + bool validateDepositAddressesInFile, bool placeSimulateRealOrder, bool validateApiKey, + MarketDataSerialization marketDataSerialization); /// Get a reference to the list of statically excluded currency codes to consider for the exchange, /// In both trading and withdrawal. @@ -94,6 +97,8 @@ class ExchangeInfo { bool multiTradeAllowedByDefault() const { return _multiTradeAllowedByDefault; } + bool withMarketDataSerialization() const { return _withMarketSerialization; } + private: CurrencyCodeSet _excludedCurrenciesAll; // Currencies will be completely ignored by the exchange CurrencyCodeSet _excludedCurrenciesWithdrawal; // Currencies unavailable for withdrawals @@ -113,5 +118,6 @@ class ExchangeInfo { bool _validateDepositAddressesInFile; bool _placeSimulateRealOrder; bool _validateApiKey; + bool _withMarketSerialization; }; } // namespace cct diff --git a/src/objects/include/market.hpp b/src/objects/include/market.hpp index 576018ed..f4d3c007 100644 --- a/src/objects/include/market.hpp +++ b/src/objects/include/market.hpp @@ -101,4 +101,4 @@ struct hash { return cct::HashCombine(hash()(mk.base()), hash()(mk.quote())); } }; -} // namespace std \ No newline at end of file +} // namespace std diff --git a/src/objects/include/marketorderbook.hpp b/src/objects/include/marketorderbook.hpp index ea9a3184..27c7fbaa 100644 --- a/src/objects/include/marketorderbook.hpp +++ b/src/objects/include/marketorderbook.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "cct_smallvector.hpp" #include "market.hpp" @@ -138,6 +139,8 @@ class MarketOrderBook { std::optional computeAvgPrice(MonetaryAmount from, const PriceOptions& priceOptions) const; + VolAndPriNbDecimals volAndPriNbDecimals() const noexcept { return _volAndPriNbDecimals; } + /// Print the market order book in a SimpleTable and returns it. /// @param conversionPriceRate prices will be multiplied to given amount to display an additional column of equivalent /// currency @@ -149,7 +152,7 @@ class MarketOrderBook { struct AmountPrice { using AmountType = MonetaryAmount::AmountType; - bool operator==(const AmountPrice& o) const noexcept = default; + bool operator==(const AmountPrice&) const noexcept = default; AmountType amount = 0; AmountType price = 0; @@ -161,7 +164,7 @@ class MarketOrderBook { public: using trivially_relocatable = is_trivially_relocatable::type; - bool operator==(const MarketOrderBook&) const = default; + bool operator==(const MarketOrderBook&) const noexcept = default; private: /// Represents a total amount of waiting orders at a given price. diff --git a/src/objects/include/monetaryamount.hpp b/src/objects/include/monetaryamount.hpp index 1285600d..084a3a32 100644 --- a/src/objects/include/monetaryamount.hpp +++ b/src/objects/include/monetaryamount.hpp @@ -100,9 +100,13 @@ class MonetaryAmount { setNbDecimals(monetaryAmount.nbDecimals()); } + /// Get an integral representation of this MonetaryAmount multiplied by current number of decimals. + /// Example: "5.6235" with 6 decimals will return 5623500 + [[nodiscard]] AmountType amount() const { return _amount; } + /// Get an integral representation of this MonetaryAmount multiplied by given number of decimals. /// If an overflow would occur for the resulting amount, return std::nullopt - /// Example : "5.6235" with 6 decimals will return 5623500 + /// Example: "5.6235" with 6 decimals will return 5623500 [[nodiscard]] std::optional amount(int8_t nbDecimals) const; /// Get the integer part of the amount of this MonetaryAmount. diff --git a/src/api-objects/include/publictrade.hpp b/src/objects/include/publictrade.hpp similarity index 83% rename from src/api-objects/include/publictrade.hpp rename to src/objects/include/publictrade.hpp index 96671651..6dc0ad11 100644 --- a/src/api-objects/include/publictrade.hpp +++ b/src/objects/include/publictrade.hpp @@ -3,6 +3,7 @@ #include #include "cct_string.hpp" +#include "market.hpp" #include "monetaryamount.hpp" #include "timedef.hpp" #include "tradeside.hpp" @@ -17,6 +18,8 @@ class PublicTrade { TradeSide side() const { return _side; } + Market market() const { return Market(_amount.currencyCode(), _price.currencyCode()); } + MonetaryAmount amount() const { return _amount; } MonetaryAmount price() const { return _price; } @@ -26,7 +29,7 @@ class PublicTrade { /// 3 way operator - make compiler generate all 6 operators (including == and !=) /// we order by time first, then amount, price, etc. Do not change the fields order! - auto operator<=>(const PublicTrade &) const = default; + std::strong_ordering operator<=>(const PublicTrade&) const noexcept = default; private: TimePoint _time; diff --git a/src/api-objects/include/tradeside.hpp b/src/objects/include/tradeside.hpp similarity index 100% rename from src/api-objects/include/tradeside.hpp rename to src/objects/include/tradeside.hpp diff --git a/src/objects/src/coincenterinfo.cpp b/src/objects/src/coincenterinfo.cpp index a9e07581..1945871c 100644 --- a/src/objects/src/coincenterinfo.cpp +++ b/src/objects/src/coincenterinfo.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -21,6 +20,7 @@ #include "runmodes.hpp" #include "toupperlower-string.hpp" #include "toupperlower.hpp" + #ifdef CCT_ENABLE_PROMETHEUS #include "prometheusmetricgateway.hpp" #else diff --git a/src/objects/src/exchangeinfo.cpp b/src/objects/src/exchangeinfo.cpp index 46bc1047..e4cbdcce 100644 --- a/src/objects/src/exchangeinfo.cpp +++ b/src/objects/src/exchangeinfo.cpp @@ -66,7 +66,8 @@ ExchangeInfo::ExchangeInfo(std::string_view exchangeNameStr, std::string_view ma Duration privateAPIRate, std::string_view acceptEncoding, int dustSweeperMaxNbTrades, log::level::level_enum requestsCallLogLevel, log::level::level_enum requestsAnswerLogLevel, bool multiTradeAllowedByDefault, bool validateDepositAddressesInFile, - bool placeSimulateRealOrder, bool validateApiKey) + bool placeSimulateRealOrder, bool validateApiKey, + MarketDataSerialization marketDataSerialization) : _excludedCurrenciesAll(std::move(excludedAllCurrencies)), _excludedCurrenciesWithdrawal(std::move(excludedCurrenciesWithdraw)), _preferredPaymentCurrencies(std::move(preferredPaymentCurrencies)), @@ -83,7 +84,8 @@ ExchangeInfo::ExchangeInfo(std::string_view exchangeNameStr, std::string_view ma _multiTradeAllowedByDefault(multiTradeAllowedByDefault), _validateDepositAddressesInFile(validateDepositAddressesInFile), _placeSimulateRealOrder(placeSimulateRealOrder), - _validateApiKey(validateApiKey) { + _validateApiKey(validateApiKey), + _withMarketSerialization(marketDataSerialization == MarketDataSerialization::kYes) { if (dustSweeperMaxNbTrades > std::numeric_limits::max() || dustSweeperMaxNbTrades < 0) { throw exception("Invalid number of dust sweeper max trades '{}', should be in [0, {}]", dustSweeperMaxNbTrades, std::numeric_limits::max()); @@ -109,6 +111,7 @@ ExchangeInfo::ExchangeInfo(std::string_view exchangeNameStr, std::string_view ma _validateDepositAddressesInFile ? kDepositAddressesFileName : ""); log::trace(" - Order placing in simulation : {}", _placeSimulateRealOrder ? "real, unmatchable" : "none"); log::trace(" - Validate API Key : {}", _validateApiKey ? "yes" : "no"); + log::trace(" - Market data serialization : {}", _withMarketSerialization ? "yes" : "no"); } if (_preferredPaymentCurrencies.empty()) { log::warn("{} list of preferred currencies is empty, buy and sell commands cannot perform trades", exchangeNameStr); diff --git a/src/objects/src/exchangeinfodefault.hpp b/src/objects/src/exchangeinfodefault.hpp index bdde87cd..491c1a8e 100644 --- a/src/objects/src/exchangeinfodefault.hpp +++ b/src/objects/src/exchangeinfodefault.hpp @@ -56,6 +56,7 @@ struct ExchangeInfoDefault { "requestsCall": "info", "requestsAnswer": "trace" }, + "marketDataSerialization": true, "multiTradeAllowedByDefault": false, "placeSimulateRealOrder": false, "updateFrequency": { @@ -174,6 +175,7 @@ struct ExchangeInfoDefault { "requestsCall": "info", "requestsAnswer": "trace" }, + "marketDataSerialization": false, "multiTradeAllowedByDefault": true, "privateAPIRate": "1055ms", "publicAPIRate": "1236ms", diff --git a/src/objects/src/exchangeinfomap.cpp b/src/objects/src/exchangeinfomap.cpp index b81c10b0..1769e240 100644 --- a/src/objects/src/exchangeinfomap.cpp +++ b/src/objects/src/exchangeinfomap.cpp @@ -54,6 +54,10 @@ ExchangeInfoMap ComputeExchangeInfoMap(std::string_view fileName, const json &js withdrawTopLevelOption.getBool(exchangeName, "validateDepositAddressesInFile"); const bool placeSimulatedRealOrder = queryTopLevelOption.getBool(exchangeName, "placeSimulateRealOrder"); const bool validateApiKey = queryTopLevelOption.getBool(exchangeName, "validateApiKey"); + const ExchangeInfo::MarketDataSerialization marketDataSerialization = + queryTopLevelOption.getBool(exchangeName, "marketDataSerialization") + ? ExchangeInfo::MarketDataSerialization::kYes + : ExchangeInfo::MarketDataSerialization::kNo; MonetaryAmountByCurrencySet dustAmountsThresholds( queryTopLevelOption.getMonetaryAmountsArray(exchangeName, "dustAmountsThreshold")); @@ -66,13 +70,14 @@ ExchangeInfoMap ComputeExchangeInfoMap(std::string_view fileName, const json &js map.insert_or_assign( exchangeName, - ExchangeInfo( - exchangeName, makerStr, takerStr, assetTopLevelOption.getUnorderedCurrencyUnion(exchangeName, "allExclude"), - assetTopLevelOption.getUnorderedCurrencyUnion(exchangeName, "withdrawExclude"), - assetTopLevelOption.getCurrenciesArray(exchangeName, kPreferredPaymentCurrenciesOptName), - std::move(dustAmountsThresholds), std::move(apiUpdateFrequencies), publicAPIRate, privateAPIRate, - acceptEncoding, dustSweeperMaxNbTrades, requestsCallLogLevel, requestsAnswerLogLevel, - multiTradeAllowedByDefault, validateDepositAddressesInFile, placeSimulatedRealOrder, validateApiKey)); + ExchangeInfo(exchangeName, makerStr, takerStr, + assetTopLevelOption.getUnorderedCurrencyUnion(exchangeName, "allExclude"), + assetTopLevelOption.getUnorderedCurrencyUnion(exchangeName, "withdrawExclude"), + assetTopLevelOption.getCurrenciesArray(exchangeName, kPreferredPaymentCurrenciesOptName), + std::move(dustAmountsThresholds), std::move(apiUpdateFrequencies), publicAPIRate, privateAPIRate, + acceptEncoding, dustSweeperMaxNbTrades, requestsCallLogLevel, requestsAnswerLogLevel, + multiTradeAllowedByDefault, validateDepositAddressesInFile, placeSimulatedRealOrder, + validateApiKey, marketDataSerialization)); } // namespace cct // Print json unused values diff --git a/src/objects/src/marketorderbook.cpp b/src/objects/src/marketorderbook.cpp index 749546b4..c5f1bc3e 100644 --- a/src/objects/src/marketorderbook.cpp +++ b/src/objects/src/marketorderbook.cpp @@ -91,6 +91,7 @@ MarketOrderBook::MarketOrderBook(MonetaryAmount askPrice, MonetaryAmount askVolu if (bidVolume <= 0) { throw exception("Invalid bid volume {}{}", bidVolume, kErrNegVolumeMsg); } + static constexpr MonetaryAmount::RoundType roundType = MonetaryAmount::RoundType::kNearest; askPrice.round(_volAndPriNbDecimals.priNbDecimals, roundType); diff --git a/src/objects/src/monetaryamount.cpp b/src/objects/src/monetaryamount.cpp index 5dc95f03..0563ebf5 100644 --- a/src/objects/src/monetaryamount.cpp +++ b/src/objects/src/monetaryamount.cpp @@ -193,7 +193,6 @@ MonetaryAmount::MonetaryAmount(double amount, CurrencyCode currencyCode, RoundTy } std::optional MonetaryAmount::amount(int8_t nbDecimals) const { - assert(nbDecimals >= 0); AmountType integralAmount = _amount; const int8_t ourNbDecimals = this->nbDecimals(); for (; nbDecimals < ourNbDecimals; ++nbDecimals) { diff --git a/src/api-objects/src/publictrade.cpp b/src/objects/src/publictrade.cpp similarity index 98% rename from src/api-objects/src/publictrade.cpp rename to src/objects/src/publictrade.cpp index 5ba601fa..423275d0 100644 --- a/src/api-objects/src/publictrade.cpp +++ b/src/objects/src/publictrade.cpp @@ -4,5 +4,7 @@ #include "timestring.hpp" namespace cct { + string PublicTrade::timeStr() const { return ToString(_time); } + } // namespace cct \ No newline at end of file diff --git a/src/objects/test/balanceportfolio_test.cpp b/src/objects/test/balanceportfolio_test.cpp index 2848b2a7..df3ae051 100644 --- a/src/objects/test/balanceportfolio_test.cpp +++ b/src/objects/test/balanceportfolio_test.cpp @@ -4,6 +4,7 @@ #include #include "monetaryamount.hpp" + namespace cct { class BalancePortfolioTest1 : public ::testing::Test { diff --git a/src/objects/test/market_test.cpp b/src/objects/test/market_test.cpp index 432d083a..c9fdd98e 100644 --- a/src/objects/test/market_test.cpp +++ b/src/objects/test/market_test.cpp @@ -51,4 +51,4 @@ TEST(MarketTest, StringRepresentationFiatConversionMarket) { EXPECT_EQ(market.assetsPairStrUpper('('), "*USDT(EUR"); EXPECT_EQ(market.assetsPairStrLower(')'), "*usdt)eur"); } -} // namespace cct \ No newline at end of file +} // namespace cct diff --git a/src/serialization/CMakeLists.txt b/src/serialization/CMakeLists.txt new file mode 100644 index 00000000..92125438 --- /dev/null +++ b/src/serialization/CMakeLists.txt @@ -0,0 +1,34 @@ +if(CCT_ENABLE_PROTO) + aux_source_directory(src SERIALIZATION_SRC) + + list(APPEND SERIALIZATION_SRC "${CMAKE_CURRENT_LIST_DIR}/proto/market-order-book-timed-data.proto") + list(APPEND SERIALIZATION_SRC "${CMAKE_CURRENT_LIST_DIR}/proto/trade-data.proto") +else() + set(SERIALIZATION_SRC "src/dummy-market-data-serializer.cpp") +endif() + +add_library(coincenter_serialization OBJECT ${SERIALIZATION_SRC}) + +target_include_directories(coincenter_serialization PUBLIC include) +target_link_libraries(coincenter_serialization PUBLIC coincenter_objects) + +if(CCT_ENABLE_PROTO) + set(PROTO_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") + + target_include_directories(coincenter_serialization PUBLIC "$") + + target_link_libraries(coincenter_serialization PUBLIC protobuf::libprotobuf) + + protobuf_generate( + TARGET coincenter_serialization + IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}/proto" + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}" + ) + + add_unit_test( + proto-multiple-messages-handler_test + test/proto-multiple-messages-handler_test.cpp + LIBRARIES + coincenter_serialization + ) +endif() \ No newline at end of file diff --git a/src/serialization/include/abstract-market-data-serializer.hpp b/src/serialization/include/abstract-market-data-serializer.hpp new file mode 100644 index 00000000..4e1c6bd3 --- /dev/null +++ b/src/serialization/include/abstract-market-data-serializer.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include + +#include "marketorderbook.hpp" +#include "publictrade.hpp" +#include "timedef.hpp" + +namespace cct { +class AbstractMarketDataSerializer { + public: + virtual ~AbstractMarketDataSerializer() = default; + + virtual void push(TimePoint timeStamp, const MarketOrderBook &marketOrderBook) = 0; + + virtual void push(std::span publicTrades) = 0; +}; +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/dummy-market-data-serializer.hpp b/src/serialization/include/dummy-market-data-serializer.hpp new file mode 100644 index 00000000..a93932e1 --- /dev/null +++ b/src/serialization/include/dummy-market-data-serializer.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +#include "abstract-market-data-serializer.hpp" +#include "marketorderbook.hpp" +#include "publictrade.hpp" +#include "timedef.hpp" + +namespace cct { +/// Implementation of a market data serializer that does nothing. +/// Useful if coincenter is not compiled with protobuf support. +class DummyMarketDataSerializer : public AbstractMarketDataSerializer { + public: + DummyMarketDataSerializer([[maybe_unused]] std::string_view dataDir, [[maybe_unused]] std::string_view exchangeName); + + void push([[maybe_unused]] TimePoint timeStamp, [[maybe_unused]] const MarketOrderBook &marketOrderBook) override; + + void push([[maybe_unused]] std::span publicTrades) override; +}; +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/market-accumulator.hpp b/src/serialization/include/market-accumulator.hpp new file mode 100644 index 00000000..e634eeb0 --- /dev/null +++ b/src/serialization/include/market-accumulator.hpp @@ -0,0 +1,164 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "cct_log.hpp" +#include "cct_vector.hpp" +#include "market.hpp" +#include "proto-constants.hpp" +#include "proto-multiple-messages-handler.hpp" +#include "stringhelpers.hpp" +#include "timedef.hpp" + +namespace cct { +template +class MarketAccumulator { + public: + static_assert((std::is_void_v && std::is_void_v) || (!std::is_void_v && !std::is_void_v)); + + using ProtobufObjectTypeVector = vector; + using ProtobufObjectTypePerMarketMap = std::unordered_map; + + explicit MarketAccumulator(std::string subPath) : _subPath(std::move(subPath)) {} + + MarketAccumulator(const MarketAccumulator &) = delete; + MarketAccumulator &operator=(const MarketAccumulator &) = delete; + + MarketAccumulator(MarketAccumulator &&other) noexcept { swap(other); } + + MarketAccumulator &operator=(MarketAccumulator &&other) noexcept { + writeOnDiskNoExcept(); + swap(other); + return *this; + } + + ~MarketAccumulator() { writeOnDiskNoExcept(); } + + void push(Market market, const ProtobufObjectType &protoObj) { + _toBeFlushedData[market].push_back(protoObj); + incrNbObjects(); + } + + void push(Market market, ProtobufObjectType &&protoObj) { + _toBeFlushedData[market].push_back(std::move(protoObj)); + incrNbObjects(); + } + + void setDirectory(Market market, TimePoint tp, std::string &pathStr) const { + // Note: below code could be simplified once compilers fully implement std::format and chrono C++20 + // libraries. + const auto dp = std::chrono::floor(tp); + const std::chrono::year_month_day ymd{dp}; + + pathStr.replace(pathStr.begin() + _subPath.size(), pathStr.end(), + fmt::format("{}/{:04}/{:02}/{:02}/", market.str(), static_cast(ymd.year()), + static_cast(ymd.month()), static_cast(ymd.day()))); + } + + static void AddFilePath(TimePoint tp, std::string &pathStr) { + const auto dp = std::chrono::floor(tp); + const std::chrono::hh_mm_ss time{std::chrono::floor(tp - dp)}; + + const auto dayHour = std::chrono::duration_cast(time.hours()).count(); + const auto beforeHoursSize = pathStr.size(); + if (dayHour < 10) { + pathStr.push_back('0'); + } + AppendString(pathStr, dayHour); + const auto afterHoursSize = pathStr.size(); + pathStr.append(":00:00_"); + pathStr.append(pathStr.begin() + beforeHoursSize, pathStr.begin() + afterHoursSize); + pathStr.append(":59:59"); + pathStr.append(kBinProtobufExtension); + } + + void swap(MarketAccumulator &rhs) noexcept { + _toBeFlushedData.swap(rhs._toBeFlushedData); + _subPath.swap(rhs._subPath); + std::swap(_nbFlushesDone, rhs._nbFlushesDone); + std::swap(_nbObjects, rhs._nbObjects); + } + + private: + void writeOnDiskNoExcept() noexcept { + try { + writeOnDisk(); + } catch (const std::exception &e) { + log::error("exception caught in last writeOnDisk: {}", e.what()); + } + } + + void incrNbObjects() { + if (++_nbObjects == kFlushPeriod) { + writeOnDisk(); + } + } + + void writeOnDisk() { + if (_nbObjects == 0) { + return; + } + + const auto nowTime = std::chrono::steady_clock::now(); + std::string pathStr = _subPath; + + std::size_t nbElemsWritten = 0; + + for (auto &[market, protobufObjectsVector] : _toBeFlushedData) { + if constexpr (!std::is_void_v) { + std::ranges::sort(protobufObjectsVector, Comp{}); + protobufObjectsVector.erase(std::unique(protobufObjectsVector.begin(), protobufObjectsVector.end(), Equal{}), + protobufObjectsVector.end()); + } + std::chrono::time_point prevHours{}; + ProtobufMessagesWriter protobufMessagesWriter; + for (const auto &protobufObject : protobufObjectsVector) { + if (!protobufObject.has_unixtimestampinms()) { + log::error("Invalid data for {} detected (no timestamp set)", market); + continue; + } + const TimePoint tp{std::chrono::milliseconds{protobufObject.unixtimestampinms()}}; + const auto hours = std::chrono::floor(tp); + if (prevHours != hours) { + // reset outfile + setDirectory(market, tp, pathStr); + std::filesystem::create_directories(std::filesystem::path(pathStr)); + AddFilePath(tp, pathStr); + std::filesystem::path filePath(pathStr); + + protobufMessagesWriter.open(std::ofstream(filePath, std::ios_base::app)); + prevHours = hours; + } + + protobufMessagesWriter.write(protobufObject); + } + nbElemsWritten += protobufObjectsVector.size(); + protobufObjectsVector.clear(); + } + + static constexpr auto kNbFlushRehashThreshold = 100; + if (++_nbFlushesDone % kNbFlushRehashThreshold == 0) { + // if we don't destroy the underlying vectors periodically, memory footprint will increase indefinitely + _toBeFlushedData.clear(); + _toBeFlushedData.rehash(0); + } + + _nbObjects = 0; + + log::info("Wrote {} market order book timed data in {}ms, last in {}", nbElemsWritten, + std::chrono::duration_cast(std::chrono::steady_clock::now() - nowTime).count(), + pathStr); + } + + ProtobufObjectTypePerMarketMap _toBeFlushedData; + std::string _subPath; + int _nbFlushesDone{}; + int _nbObjects{}; +}; +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/proto-constants.hpp b/src/serialization/include/proto-constants.hpp new file mode 100644 index 00000000..f820e648 --- /dev/null +++ b/src/serialization/include/proto-constants.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +namespace cct { + +enum class ProtobufObject : int8_t { kMarketOrderBook, kTrade }; + +static constexpr std::string_view kBinProtobufExtension = ".binpb"; + +static constexpr std::string_view kSubPathMarketOrderBook = "market-order-book"; +static constexpr std::string_view kSubPathTrades = "trades"; +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/proto-market-data-serializer.hpp b/src/serialization/include/proto-market-data-serializer.hpp new file mode 100644 index 00000000..f4bd6c71 --- /dev/null +++ b/src/serialization/include/proto-market-data-serializer.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include "abstract-market-data-serializer.hpp" +#include "market-accumulator.hpp" +#include "market-order-book-timed-data.pb.h" +#include "marketorderbook.hpp" +#include "publictrade.hpp" +#include "timedef.hpp" +#include "trade-data.pb.h" + +namespace cct { +/// This class is responsible of managing the periodic writes to disk of timed market data, for a given exchange. +/// This class is not thread safe +class ProtobufMarketDataSerializer : public AbstractMarketDataSerializer { + public: + ProtobufMarketDataSerializer(std::string_view dataDir, std::string_view exchangeName); + + /// Push market order book timed data in the MarketDataSerializer. + void push(TimePoint timeStamp, const MarketOrderBook &marketOrderBook) override; + + /// Push public trades timed data in the MarketDataSerializer. + void push(std::span publicTrades) override; + + private: + struct TradeDataComp { + bool operator()(const ::objects::TradeData &lhs, const ::objects::TradeData &rhs) const; + }; + + struct TradeDataEqual { + bool operator()(const ::objects::TradeData &lhs, const ::objects::TradeData &rhs) const; + }; + + MarketAccumulator<::objects::MarketOrderBookTimedData, 300> _marketOrderBookAccumulator; + MarketAccumulator<::objects::TradeData, 3000, TradeDataComp, TradeDataEqual> _tradesAccumulator; +}; +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/proto-market-order-book.hpp b/src/serialization/include/proto-market-order-book.hpp new file mode 100644 index 00000000..d37de577 --- /dev/null +++ b/src/serialization/include/proto-market-order-book.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include "market-order-book-timed-data.pb.h" +#include "marketorderbook.hpp" +#include "timedef.hpp" + +namespace cct { +::objects::MarketOrderBookTimedData CreateMarketOrderBookTimedData(const MarketOrderBook &marketOrderBook, + TimePoint timeStamp); +} \ No newline at end of file diff --git a/src/serialization/include/proto-multiple-messages-handler.hpp b/src/serialization/include/proto-multiple-messages-handler.hpp new file mode 100644 index 00000000..8a14566f --- /dev/null +++ b/src/serialization/include/proto-multiple-messages-handler.hpp @@ -0,0 +1,75 @@ +#pragma once + +#include + +#include +#include + +#include "cct_exception.hpp" +#include "cct_log.hpp" + +namespace cct { +class ProtobufMessagesReader { + public: + explicit ProtobufMessagesReader(std::istream& is) : _is(is), _iis(&_is), _cis(&_iis) {} + + bool hasNext() { return _cis.ReadVarint64(&_nextSize); } + + template + MsgT next() { + MsgT msg; + auto msgLimit = _cis.PushLimit(_nextSize); + if (!msg.ParseFromCodedStream(&_cis)) { + log::error("Error reading single protobuf message of size {}", _nextSize); + } + _cis.PopLimit(msgLimit); + return msg; + } + + private: + std::istream& _is; + ::google::protobuf::io::IstreamInputStream _iis; + ::google::protobuf::io::CodedInputStream _cis; + uint64_t _nextSize{}; +}; + +template +class ProtobufMessagesWriter { + public: + void open(OStreamType&& newOs) { + // reverse destroy streams to flush latest data. Recreate the streams after creation of new ofstream + _cos.reset(); + _oos.reset(); + _os = std::move(newOs); + _oos = std::make_unique<::google::protobuf::io::OstreamOutputStream>(&_os); + _cos = std::make_unique<::google::protobuf::io::CodedOutputStream>(_oos.get()); + } + + template + void write(const MsgT& msg) { + if (!_cos) { + throw exception("ProtobufMessagesWriter::open should have been called first"); + } + + _cos->WriteVarint64(msg.ByteSizeLong()); + + if (!msg.SerializeToCodedStream(_cos.get())) { + log::error("Failed to serialize to coded stream"); + } + } + + OStreamType flush() { + _cos.reset(); + _oos.reset(); + + OStreamType ret(std::move(_os)); + _os = OStreamType(); + return ret; + } + + private: + OStreamType _os; + std::unique_ptr<::google::protobuf::io::OstreamOutputStream> _oos; + std::unique_ptr<::google::protobuf::io::CodedOutputStream> _cos; +}; +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/proto-public-trade.hpp b/src/serialization/include/proto-public-trade.hpp new file mode 100644 index 00000000..96c22333 --- /dev/null +++ b/src/serialization/include/proto-public-trade.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include "market.hpp" +#include "publictrade.hpp" +#include "trade-data.pb.h" + +namespace cct { +::objects::TradeData CreateTradeData(const PublicTrade &publicTrade); + +PublicTrade CreatePublicTrade(Market market, const ::objects::TradeData &tradeData); +} // namespace cct \ No newline at end of file diff --git a/src/serialization/include/proto-reader.hpp b/src/serialization/include/proto-reader.hpp new file mode 100644 index 00000000..20444fce --- /dev/null +++ b/src/serialization/include/proto-reader.hpp @@ -0,0 +1,134 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "cct_log.hpp" +#include "cct_vector.hpp" +#include "market.hpp" +#include "proto-multiple-messages-handler.hpp" +#include "timedef.hpp" + +namespace cct { + +class ProtobufObjectsGateway { + public: + template + using ProtobufObjVector = vector; + + template + using ProtobufObjPerMarketMap = std::unordered_map>; + + explicit ProtobufObjectsGateway(std::string_view exchangeSerializedDataPath); + + /// Load all data found on disk for the time window [fromTimeStamp, toTimeStamp] + template + ProtobufObjPerMarketMap load(TimePoint fromTimeStamp, TimePoint toTimeStamp) { + ProtobufObjPerMarketMap ret; + + const auto pathStr = _exchangeSerializedDataPath; + if (!std::filesystem::is_directory(pathStr)) { + return ret; + } + + const auto nowTime = std::chrono::steady_clock::now(); + const auto fromDays = std::chrono::floor(fromTimeStamp); + const std::chrono::year_month_day fromYmd{fromDays}; + const std::chrono::hh_mm_ss fromTime{std::chrono::floor(fromTimeStamp - fromDays)}; + + const auto toDays = std::chrono::floor(toTimeStamp); + const std::chrono::year_month_day toYmd{toDays}; + const std::chrono::hh_mm_ss toTime{std::chrono::floor(toTimeStamp - toDays)}; + + int nbElemsRead = 0; + + for (const auto& marketDirectory : std::filesystem::directory_iterator(pathStr)) { + if (!marketDirectory.is_directory()) { + continue; + } + const auto& marketPath = marketDirectory.path(); + const auto marketStr = marketPath.filename().string(); + const Market market(marketStr); + + const int fromYear = static_cast(fromYmd.year()); + const int toYear = static_cast(toYmd.year()); + for (int year = fromYear; year <= toYear; ++year) { + const auto yearPath = marketPath / fmt::format("{:04}", year); + if (!std::filesystem::is_directory(yearPath)) { + continue; + } + const auto fromMonth = year == fromYear ? static_cast(fromYmd.month()) : 1U; + const auto toMonth = year == toYear ? static_cast(toYmd.month()) : 12U; + for (std::remove_const_t month = fromMonth; month <= toMonth; ++month) { + const auto monthPath = yearPath / fmt::format("{:02}", month); + if (!std::filesystem::is_directory(monthPath)) { + continue; + } + const auto fromDay = year == fromYear && month == fromMonth ? static_cast(fromYmd.day()) : 1U; + const auto toDay = year == toYear && month == toMonth ? static_cast(toYmd.day()) : 31U; + for (std::remove_const_t day = fromDay; day <= toDay; ++day) { + const auto dayPath = monthPath / fmt::format("{:02}", day); + if (!std::filesystem::is_directory(dayPath)) { + continue; + } + for (const auto& binProtobufFile : std::filesystem::directory_iterator(dayPath)) { + if (!binProtobufFile.is_regular_file()) { + continue; + } + const auto& filePath = binProtobufFile.path(); + const auto fileName = filePath.filename().string(); + + int hour{}; + const auto [ptr, ec] = std::from_chars(fileName.data(), fileName.data() + 2, hour); + if (ec != std::errc() || ptr - fileName.data() != 2) { + log::error("Unable to load bin protobuf file {} because of error {} trying to convert to hour range", + filePath.string(), static_cast(ec)); + continue; + } + if (year == toYear && month == toMonth && day == toDay && + hour > std::chrono::duration_cast(toTime.hours()).count()) { + // not within our time window + continue; + } + std::ifstream ifs(filePath, std::ios::in | std::ios::binary); + ProtobufMessagesReader multipleProtobufMessagesReader(ifs); + while (multipleProtobufMessagesReader.hasNext()) { + auto msg = multipleProtobufMessagesReader.next(); + if (!ValidateTimestamp(msg, fromTimeStamp, toTimeStamp)) { + continue; + } + ret[market].push_back(std::move(msg)); + ++nbElemsRead; + } + } + } + } + } + } + + log::info( + "Read {} protobuf data in {}ms", nbElemsRead, + std::chrono::duration_cast(std::chrono::steady_clock::now() - nowTime).count()); + + return ret; + } + + private: + template + static bool ValidateTimestamp(const ProtobufObjType& msg, TimePoint fromTimeStamp, TimePoint toTimeStamp) { + if (!msg.has_unixtimestampinms()) { + log::error("Invalid data loaded for protobuf object, no unix timestamp set"); + return false; + } + const auto timeStampInMs = msg.unixtimestampinms(); + return timeStampInMs >= TimestampToMs(fromTimeStamp) && timeStampInMs <= TimestampToMs(toTimeStamp); + } + + std::string_view _exchangeSerializedDataPath; +}; + +} // namespace cct \ No newline at end of file diff --git a/src/serialization/proto/market-order-book-timed-data.proto b/src/serialization/proto/market-order-book-timed-data.proto new file mode 100644 index 00000000..286fdf0b --- /dev/null +++ b/src/serialization/proto/market-order-book-timed-data.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package objects; + +message MarketOrderBookTimedData { + optional int64 unixTimestampInMs = 1; + optional int32 volumeNbDecimals = 2; + optional int32 priceNbDecimals = 3; + + message PricedVolume { + optional int64 price = 1; + optional int64 volume = 2; + } + + message OrderBook { + repeated PricedVolume asks = 1; + repeated PricedVolume bids = 2; + } + + optional OrderBook orderBook = 4; +} \ No newline at end of file diff --git a/src/serialization/proto/trade-data.proto b/src/serialization/proto/trade-data.proto new file mode 100644 index 00000000..f1399429 --- /dev/null +++ b/src/serialization/proto/trade-data.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package objects; + +enum TradeSide { + TRADE_UNSPECIFIED = 0; + TRADE_BUY = 1; + TRADE_SELL = 2; +} + +message TradeData { + optional int64 unixTimestampInMs = 1; + optional int64 priceAmount = 2; + optional int64 volumeAmount = 3; + optional int32 priceNbDecimals = 4; + optional int32 volumeNbDecimals = 5; + TradeSide tradeSide = 6; +} \ No newline at end of file diff --git a/src/serialization/src/dummy-market-data-serializer.cpp b/src/serialization/src/dummy-market-data-serializer.cpp new file mode 100644 index 00000000..8c60adaa --- /dev/null +++ b/src/serialization/src/dummy-market-data-serializer.cpp @@ -0,0 +1,20 @@ +#include "dummy-market-data-serializer.hpp" + +#include +#include + +#include "marketorderbook.hpp" +#include "publictrade.hpp" +#include "timedef.hpp" + +namespace cct { + +DummyMarketDataSerializer::DummyMarketDataSerializer([[maybe_unused]] std::string_view dataDir, + [[maybe_unused]] std::string_view exchangeName) {} + +void DummyMarketDataSerializer::push([[maybe_unused]] TimePoint timeStamp, + [[maybe_unused]] const MarketOrderBook &marketOrderBook) {} + +void DummyMarketDataSerializer::push([[maybe_unused]] std::span publicTrades) {} + +} // namespace cct \ No newline at end of file diff --git a/src/serialization/src/proto-market-data-serializer.cpp b/src/serialization/src/proto-market-data-serializer.cpp new file mode 100644 index 00000000..6d5ab140 --- /dev/null +++ b/src/serialization/src/proto-market-data-serializer.cpp @@ -0,0 +1,73 @@ +#include "proto-market-data-serializer.hpp" + +#include +#include + +#include "marketorderbook.hpp" +#include "monetaryamount.hpp" +#include "proto-constants.hpp" +#include "proto-market-order-book.hpp" +#include "proto-public-trade.hpp" +#include "publictrade.hpp" +#include "timedef.hpp" + +namespace cct { +namespace { +std::string ComputeSubPath(std::string_view dataDir, std::string_view exchangeName, + std::string_view protobufObjectName) { + std::string ret(dataDir); + ret.append("/serialized/"); + ret.append(protobufObjectName); + ret.push_back('/'); + ret.append(exchangeName); + ret.push_back('/'); + return ret; +} +} // namespace + +bool ProtobufMarketDataSerializer::TradeDataComp::operator()(const ::objects::TradeData& lhs, + const ::objects::TradeData& rhs) const { + if (lhs.unixtimestampinms() != rhs.unixtimestampinms()) { + return lhs.unixtimestampinms() < rhs.unixtimestampinms(); + } + if (lhs.tradeside() != rhs.tradeside()) { + return lhs.tradeside() < rhs.tradeside(); + } + MonetaryAmount lhsPrice(lhs.priceamount(), CurrencyCode{}, lhs.pricenbdecimals()); + MonetaryAmount rhsPrice(rhs.priceamount(), CurrencyCode{}, rhs.pricenbdecimals()); + if (lhsPrice != rhsPrice) { + return lhsPrice < rhsPrice; + } + MonetaryAmount lhsAmount(lhs.volumeamount(), CurrencyCode{}, lhs.volumenbdecimals()); + MonetaryAmount rhsAmount(rhs.volumeamount(), CurrencyCode{}, rhs.volumenbdecimals()); + if (lhsAmount != rhsAmount) { + return lhsAmount < rhsAmount; + } + return false; +} + +bool ProtobufMarketDataSerializer::TradeDataEqual::operator()(const ::objects::TradeData& lhs, + const ::objects::TradeData& rhs) const { + return lhs.unixtimestampinms() == rhs.unixtimestampinms() && lhs.tradeside() == rhs.tradeside() && + MonetaryAmount(lhs.priceamount(), CurrencyCode{}, lhs.pricenbdecimals()) == + MonetaryAmount(rhs.priceamount(), CurrencyCode{}, rhs.pricenbdecimals()) && + MonetaryAmount(lhs.volumeamount(), CurrencyCode{}, lhs.volumenbdecimals()) == + MonetaryAmount(rhs.volumeamount(), CurrencyCode{}, rhs.volumenbdecimals()); +} + +ProtobufMarketDataSerializer::ProtobufMarketDataSerializer(std::string_view dataDir, std::string_view exchangeName) + : _marketOrderBookAccumulator(ComputeSubPath(dataDir, exchangeName, kSubPathMarketOrderBook)), + _tradesAccumulator(ComputeSubPath(dataDir, exchangeName, kSubPathTrades)) {} + +void ProtobufMarketDataSerializer::push(TimePoint timeStamp, const MarketOrderBook& marketOrderBook) { + _marketOrderBookAccumulator.push(marketOrderBook.market(), + CreateMarketOrderBookTimedData(marketOrderBook, timeStamp)); +} + +void ProtobufMarketDataSerializer::push(std::span publicTrades) { + for (const auto& publicTrade : publicTrades) { + _tradesAccumulator.push(publicTrade.market(), CreateTradeData(publicTrade)); + } +} + +} // namespace cct \ No newline at end of file diff --git a/src/serialization/src/proto-market-order-book.cpp b/src/serialization/src/proto-market-order-book.cpp new file mode 100644 index 00000000..493cad6c --- /dev/null +++ b/src/serialization/src/proto-market-order-book.cpp @@ -0,0 +1,44 @@ +#include "proto-market-order-book.hpp" + +#include "market-order-book-timed-data.pb.h" +#include "marketorderbook.hpp" +#include "timedef.hpp" + +namespace cct { +::objects::MarketOrderBookTimedData CreateMarketOrderBookTimedData(const MarketOrderBook& marketOrderBook, + TimePoint timeStamp) { + ::objects::MarketOrderBookTimedData protoObj; + + const auto volAndPriNbDecimals = marketOrderBook.volAndPriNbDecimals(); + const auto unixTimestampInMs = TimestampToMs(timeStamp); + + protoObj.set_unixtimestampinms(unixTimestampInMs); + protoObj.set_volumenbdecimals(volAndPriNbDecimals.volNbDecimals); + protoObj.set_pricenbdecimals(volAndPriNbDecimals.priNbDecimals); + + auto& orderBook = *protoObj.mutable_orderbook(); + + const auto priNbDecimals = protoObj.pricenbdecimals(); + const auto volNbDecimals = protoObj.volumenbdecimals(); + + const int nbBids = marketOrderBook.nbBidPrices(); + for (int bidPos = 1; bidPos <= nbBids; ++bidPos) { + auto [price, volume] = marketOrderBook[-bidPos]; + auto& pricedVolume = *orderBook.add_bids(); + + pricedVolume.set_price(price.amount(priNbDecimals).value()); + pricedVolume.set_volume(volume.amount(volNbDecimals).value()); + } + + const int nbAsks = marketOrderBook.nbAskPrices(); + for (int askPos = 1; askPos <= nbAsks; ++askPos) { + auto [price, volume] = marketOrderBook[askPos]; + auto& pricedVolume = *orderBook.add_asks(); + + pricedVolume.set_price(price.amount(priNbDecimals).value()); + pricedVolume.set_volume(volume.amount(volNbDecimals).value()); + } + + return protoObj; +} +} // namespace cct \ No newline at end of file diff --git a/src/serialization/src/proto-public-trade.cpp b/src/serialization/src/proto-public-trade.cpp new file mode 100644 index 00000000..58d36144 --- /dev/null +++ b/src/serialization/src/proto-public-trade.cpp @@ -0,0 +1,48 @@ +#include "proto-public-trade.hpp" + +#include "monetaryamount.hpp" +#include "publictrade.hpp" +#include "timedef.hpp" +#include "trade-data.pb.h" +#include "tradeside.hpp" +#include "unreachable.hpp" + +namespace cct { +namespace { +::objects::TradeSide ProtobufTradeSide(TradeSide tradeSide) { + switch (tradeSide) { + case TradeSide::kBuy: + return ::objects::TRADE_BUY; + case TradeSide::kSell: + return ::objects::TRADE_SELL; + default: + unreachable(); + } +} +} // namespace + +::objects::TradeData CreateTradeData(const PublicTrade &publicTrade) { + ::objects::TradeData protoObj; + + protoObj.set_unixtimestampinms(TimestampToMs(publicTrade.time())); + + protoObj.set_priceamount(publicTrade.price().amount()); + protoObj.set_pricenbdecimals(publicTrade.price().nbDecimals()); + + protoObj.set_volumeamount(publicTrade.amount().amount()); + protoObj.set_volumenbdecimals(publicTrade.amount().nbDecimals()); + + protoObj.set_tradeside(ProtobufTradeSide(publicTrade.side())); + + return protoObj; +} + +PublicTrade CreatePublicTrade(Market market, const ::objects::TradeData &tradeData) { + TradeSide tradeSide = tradeData.tradeside() == ::objects::TradeSide::TRADE_BUY ? TradeSide::kBuy : TradeSide::kSell; + MonetaryAmount amount(tradeData.volumeamount(), market.base(), tradeData.volumenbdecimals()); + MonetaryAmount price(tradeData.priceamount(), market.quote(), tradeData.pricenbdecimals()); + TimePoint timeStamp(TimeInMs(tradeData.unixtimestampinms())); + + return {tradeSide, amount, price, timeStamp}; +} +} // namespace cct \ No newline at end of file diff --git a/src/serialization/src/proto-reader.cpp b/src/serialization/src/proto-reader.cpp new file mode 100644 index 00000000..d8af50e6 --- /dev/null +++ b/src/serialization/src/proto-reader.cpp @@ -0,0 +1,10 @@ +#include "proto-reader.hpp" + +#include + +namespace cct { + +ProtobufObjectsGateway::ProtobufObjectsGateway(std::string_view exchangeSerializedDataPath) + : _exchangeSerializedDataPath(exchangeSerializedDataPath) {} + +} // namespace cct \ No newline at end of file diff --git a/src/serialization/test/proto-multiple-messages-handler_test.cpp b/src/serialization/test/proto-multiple-messages-handler_test.cpp new file mode 100644 index 00000000..40ffdf65 --- /dev/null +++ b/src/serialization/test/proto-multiple-messages-handler_test.cpp @@ -0,0 +1,120 @@ +#include "proto-multiple-messages-handler.hpp" + +#include + +#include +#include + +#include "proto-public-trade.hpp" +#include "publictrade.hpp" +#include "trade-data.pb.h" + +namespace cct { +class ProtobufMessagesTest : public ::testing::Test { + protected: + ProtobufMessagesWriter writer; + + TimePoint tp1{TimeInMs{std::numeric_limits::max() / 10000000}}; + TimePoint tp2{TimeInMs{std::numeric_limits::max() / 9000000}}; + TimePoint tp3{TimeInMs{std::numeric_limits::max() / 8000000}}; + + Market market{"ETH", "USDT"}; + + PublicTrade pt1{TradeSide::kBuy, MonetaryAmount{"0.13", "ETH"}, MonetaryAmount{"1500.5", "USDT"}, tp1}; + PublicTrade pt2{TradeSide::kSell, MonetaryAmount{"3.7", "ETH"}, MonetaryAmount{"1500.5", "USDT"}, tp2}; + PublicTrade pt3{TradeSide::kBuy, MonetaryAmount{"0.004", "ETH"}, MonetaryAmount{1501, "USDT"}, tp3}; + + ::objects::TradeData td1{CreateTradeData(pt1)}; + ::objects::TradeData td2{CreateTradeData(pt2)}; + ::objects::TradeData td3{CreateTradeData(pt3)}; +}; + +TEST_F(ProtobufMessagesTest, WriteReadSingle) { + writer.open(std::stringstream{}); + writer.write(td1); + + std::stringstream ss = writer.flush(); + + ProtobufMessagesReader reader{ss}; + + int nbObjectsRead = 0; + + while (reader.hasNext()) { + ::objects::TradeData nextObj = reader.next<::objects::TradeData>(); + PublicTrade pt = CreatePublicTrade(market, nextObj); + + EXPECT_EQ(pt, pt1); + ++nbObjectsRead; + } + EXPECT_EQ(nbObjectsRead, 1); +} + +TEST_F(ProtobufMessagesTest, WriteRead2Flushes) { + writer.open(std::stringstream{}); + writer.write(td1); + std::stringstream ss1 = writer.flush(); + + writer.open(std::stringstream{}); + writer.write(td2); + std::stringstream ss2 = writer.flush(); + + ProtobufMessagesReader reader1{ss1}; + + int nbObjectsRead = 0; + + while (reader1.hasNext()) { + auto nextObj = reader1.next<::objects::TradeData>(); + PublicTrade pt = CreatePublicTrade(market, nextObj); + + EXPECT_EQ(pt, pt1); + ++nbObjectsRead; + } + EXPECT_EQ(nbObjectsRead, 1); + + ProtobufMessagesReader reader2{ss2}; + + while (reader2.hasNext()) { + auto nextObj = reader2.next<::objects::TradeData>(); + PublicTrade pt = CreatePublicTrade(market, nextObj); + + EXPECT_EQ(pt, pt2); + ++nbObjectsRead; + } + EXPECT_EQ(nbObjectsRead, 2); +} + +TEST_F(ProtobufMessagesTest, WriteReadSeveral) { + writer.open(std::stringstream{}); + writer.write(td1); + writer.write(td2); + writer.write(td3); + + std::stringstream ss = writer.flush(); + + ProtobufMessagesReader reader{ss}; + + int nbObjectsRead = 0; + + while (reader.hasNext()) { + ::objects::TradeData nextObj = reader.next<::objects::TradeData>(); + PublicTrade pt = CreatePublicTrade(market, nextObj); + + switch (nbObjectsRead) { + case 0: + EXPECT_EQ(pt, pt1); + break; + case 1: + EXPECT_EQ(pt, pt2); + break; + case 2: + EXPECT_EQ(pt, pt3); + break; + default: + break; + } + + ++nbObjectsRead; + } + EXPECT_EQ(nbObjectsRead, 3); +} +} // namespace cct \ No newline at end of file