Skip to content

Commit

Permalink
delete legacy rocket frame parser
Browse files Browse the repository at this point in the history
Summary: remove legacy rocket frame parser as the default now is `strategy`.

Differential Revision: D58472997

fbshipit-source-id: 87b536b421a0cc7b45e525c5102325adb6dea6b6
  • Loading branch information
avalonalex authored and facebook-github-bot committed Jul 15, 2024
1 parent 92ed599 commit a1863c5
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1448,13 +1448,6 @@ void RocketClient::detachEventBase() {
evb_->dcheckIsInEventBaseThread();
DCHECK(!writeLoopCallback_.isLoopCallbackScheduled());

if (parser_.isReadCallbackBased()) {
// trigger the buffer resize timeout ensure rocket client not holding extra
// buffer when detaching
parser_.cancelTimeout();
parser_.timeoutExpired();
}

eventBaseDestructionCallback_.cancel();
detachableLoopCallback_.cancelLoopCallback();
if (keepAliveWatcher_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class RocketClient : public virtual folly::DelayedDestruction,
// inflight writes of its own.
return !writeLoopCallback_.isLoopCallbackScheduled() && !requests_ &&
streams_.empty() && (!socket_ || socket_->isDetachable()) &&
parser_.getReadBufLength() == 0 && !interactions_;
!interactions_;
}

void attachEventBase(folly::EventBase& evb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,97 +41,29 @@
namespace apache {
namespace thrift {
namespace rocket {
template <class T>
void Parser<T>::getReadBufferOld(void** bufout, size_t* lenout) {
DCHECK(!readBuffer_.isChained());

readBuffer_.unshareOne();
if (readBuffer_.length() == 0) {
DCHECK(readBuffer_.capacity() > 0);
// If we read everything, reset pointers to 0 and reuse the buffer
readBuffer_.clear();
} else if (readBuffer_.headroom() > 0) {
// Move partially read data to the beginning
readBuffer_.retreat(readBuffer_.headroom());
}

*bufout = readBuffer_.writableTail();
*lenout = readBuffer_.tailroom();
}

template <class T>
void Parser<T>::readDataAvailableOld(size_t nbytes) {
readBuffer_.append(nbytes);

while (!readBuffer_.empty()) {
if (readBuffer_.length() < Serializer::kMinimumFrameHeaderLength) {
return;
}

folly::io::Cursor cursor(&readBuffer_);
const size_t totalFrameSize = Serializer::kBytesForFrameOrMetadataLength +
readFrameOrMetadataSize(cursor);

if (!currentFrameLength_) {
if (!owner_.incMemoryUsage(totalFrameSize)) {
return;
}
currentFrameLength_ = totalFrameSize;
}

if (readBuffer_.length() < totalFrameSize) {
if (readBuffer_.length() + readBuffer_.tailroom() < totalFrameSize) {
DCHECK(!readBuffer_.isChained());
readBuffer_.unshareOne();
bufferSize_ = std::max<size_t>(bufferSize_, totalFrameSize);
readBuffer_.reserve(
0 /* minHeadroom */,
bufferSize_ - readBuffer_.length() /* minTailroom */);
}
return;
}

// Otherwise, we have a full frame to handle.
const size_t bytesToClone =
totalFrameSize - Serializer::kBytesForFrameOrMetadataLength;
cursor.reset(&readBuffer_);
readFrameOrMetadataSize(cursor);
std::unique_ptr<folly::IOBuf> frame;
cursor.clone(frame, bytesToClone);
owner_.decMemoryUsage(currentFrameLength_);
currentFrameLength_ = 0;
readBuffer_.trimStart(totalFrameSize);
owner_.handleFrame(std::move(frame));
}

if (!isScheduled() && bufferSize_ > kMaxBufferSize) {
owner_.scheduleTimeout(this, kDefaultBufferResizeInterval);
}
}

template <class T>
void Parser<T>::getReadBuffer(void** bufout, size_t* lenout) {
blockResize_ = true;
if (mode_ == ParserMode::STRATEGY) {
frameLengthParser_->getReadBuffer(bufout, lenout);
} else if (mode_ == ParserMode::ALLOCATING) {
allocatingParser_->getReadBuffer(bufout, lenout);
} else {
getReadBufferOld(bufout, lenout);
switch (mode_) {
case (ParserMode::STRATEGY):
frameLengthParser_->getReadBuffer(bufout, lenout);
break;
case (ParserMode::ALLOCATING):
allocatingParser_->getReadBuffer(bufout, lenout);
break;
}
}

template <class T>
void Parser<T>::readDataAvailable(size_t nbytes) noexcept {
folly::DelayedDestruction::DestructorGuard dg(&this->owner_);
blockResize_ = false;
try {
if (mode_ == ParserMode::STRATEGY) {
frameLengthParser_->readDataAvailable(nbytes);
} else if (mode_ == ParserMode::ALLOCATING) {
allocatingParser_->readDataAvailable(nbytes);
} else {
readDataAvailableOld(nbytes);
switch (mode_) {
case (ParserMode::STRATEGY):
frameLengthParser_->readDataAvailable(nbytes);
break;
case (ParserMode::ALLOCATING):
allocatingParser_->readDataAvailable(nbytes);
break;
}
} catch (...) {
auto exceptionStr =
Expand All @@ -145,7 +77,6 @@ template <class T>
void Parser<T>::readEOF() noexcept {
folly::DelayedDestruction::DestructorGuard dg(&this->owner_);

blockResize_ = false;
owner_.close(transport::TTransportException(
transport::TTransportException::TTransportExceptionType::END_OF_FILE,
"Channel got EOF. Check for server hitting connection limit, "
Expand All @@ -155,56 +86,22 @@ void Parser<T>::readEOF() noexcept {
template <class T>
void Parser<T>::readErr(const folly::AsyncSocketException& ex) noexcept {
folly::DelayedDestruction::DestructorGuard dg(&this->owner_);
blockResize_ = false;
owner_.close(transport::TTransportException(ex));
}

template <class T>
void Parser<T>::timeoutExpired() noexcept {
if (LIKELY(!blockResize_)) {
resizeBuffer();
}
}

template <class T>
void Parser<T>::readBufferAvailable(
std::unique_ptr<folly::IOBuf> buf) noexcept {
folly::DelayedDestruction::DestructorGuard dg(&this->owner_);
try {
if (mode_ == ParserMode::STRATEGY) {
frameLengthParser_->readBufferAvailable(std::move(buf));
} else if (mode_ == ParserMode::ALLOCATING) {
// Will throw not implemented runtime exception
allocatingParser_->readBufferAvailable(std::move(buf));
} else {
readBufQueue_.append(std::move(buf));
while (!readBufQueue_.empty()) {
if (readBufQueue_.chainLength() <
Serializer::kBytesForFrameOrMetadataLength) {
return;
}
folly::io::Cursor cursor(readBufQueue_.front());

if (!currentFrameLength_) {
currentFrameLength_ = Serializer::kBytesForFrameOrMetadataLength +
readFrameOrMetadataSize(cursor);
if (!owner_.incMemoryUsage(currentFrameLength_)) {
currentFrameLength_ = 0;
return;
}
}

if (readBufQueue_.chainLength() < currentFrameLength_) {
return;
}

readBufQueue_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
auto frame = readBufQueue_.split(
currentFrameLength_ - Serializer::kBytesForFrameOrMetadataLength);
owner_.handleFrame(std::move(frame));
owner_.decMemoryUsage(currentFrameLength_);
currentFrameLength_ = 0;
}
switch (mode_) {
case (ParserMode::STRATEGY):
frameLengthParser_->readBufferAvailable(std::move(buf));
break;
case (ParserMode::ALLOCATING):
// Will throw not implemented runtime exception
allocatingParser_->readBufferAvailable(std::move(buf));
break;
}
} catch (...) {
auto exceptionStr =
Expand All @@ -214,21 +111,6 @@ void Parser<T>::readBufferAvailable(
}
}

template <class T>
void Parser<T>::resizeBuffer() {
if (bufferSize_ <= kMaxBufferSize || readBuffer_.length() >= kMaxBufferSize) {
return;
}
// resize readBuffer_ to kMaxBufferSize
readBuffer_ = folly::IOBuf(
folly::IOBuf::CopyBufferOp(),
readBuffer_.data(),
readBuffer_.length(),
/* headroom */ 0,
/* tailroom */ kMaxBufferSize - readBuffer_.length());
bufferSize_ = kMaxBufferSize;
}

} // namespace rocket
} // namespace thrift
} // namespace apache
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ namespace rocket {
// TODO (T160861572): deprecate most of logic in this class and replace with
// either AllocatingParserStrategy or FrameLengthParserStrategy
template <class T>
class Parser final : public folly::AsyncTransport::ReadCallback,
public folly::HHWheelTimer::Callback {
class Parser final : public folly::AsyncTransport::ReadCallback {
public:
explicit Parser(
T& owner, std::shared_ptr<ParserAllocatorType> alloc = nullptr)
: owner_(owner),
readBuffer_(folly::IOBuf::CreateOp(), bufferSize_),
mode_(stringToMode(THRIFT_FLAG(rocket_frame_parser))),
allocator_(alloc ? alloc : std::make_shared<ParserAllocatorType>()) {
if (mode_ == ParserMode::STRATEGY) {
Expand All @@ -61,12 +59,6 @@ class Parser final : public folly::AsyncTransport::ReadCallback,
}
}

~Parser() override {
if (currentFrameLength_) {
owner_.decMemoryUsage(currentFrameLength_);
}
}

// AsyncTransport::ReadCallback implementation
FOLLY_NOINLINE void getReadBuffer(void** bufout, size_t* lenout) override;
FOLLY_NOINLINE void readDataAvailable(size_t nbytes) noexcept override;
Expand All @@ -80,56 +72,24 @@ class Parser final : public folly::AsyncTransport::ReadCallback,
return mode_ != ParserMode::ALLOCATING;
}

void timeoutExpired() noexcept override;

const folly::IOBuf& getReadBuffer() const { return readBuffer_; }

void setReadBuffer(folly::IOBuf&& buffer) { readBuffer_ = std::move(buffer); }

size_t getReadBufferSize() const { return bufferSize_; }

void setReadBufferSize(size_t size) { bufferSize_ = size; }

void resizeBuffer();

size_t getReadBufLength() const {
return readBuffer_.computeChainDataLength();
}

bool isReadCallbackBased() const { return mode_ == ParserMode::LEGACY; }

static constexpr size_t kMinBufferSize{256};
static constexpr size_t kMaxBufferSize{4096};
const folly::IOBuf& getReadBuffer() const;

private:
enum class ParserMode { LEGACY, STRATEGY, ALLOCATING };
enum class ParserMode { STRATEGY, ALLOCATING };

static ParserMode stringToMode(const std::string& modeStr) noexcept {
if (modeStr == "allocating") {
return ParserMode::ALLOCATING;
} else if (modeStr == "strategy") {
if (modeStr == "strategy") {
return ParserMode::STRATEGY;
} else if (modeStr == "legacy") {
return ParserMode::LEGACY;
} else if (modeStr == "allocating") {
return ParserMode::ALLOCATING;
}

LOG(WARNING) << "Invalid parser mode: '" << modeStr
<< ", default to ParserMode::LEGACY";
return ParserMode::LEGACY;
<< ", default to ParserMode::STRATEGY";
return ParserMode::STRATEGY;
}

void getReadBufferOld(void** bufout, size_t* lenout);
void readDataAvailableOld(size_t nbytes);
static constexpr std::chrono::milliseconds kDefaultBufferResizeInterval{
std::chrono::seconds(3)};

T& owner_;
size_t bufferSize_{kMinBufferSize};
size_t currentFrameLength_{0};
uint8_t currentFrameType_{0};
folly::IOBufQueue readBufQueue_{folly::IOBufQueue::cacheChainLength()};
folly::IOBuf readBuffer_;
bool blockResize_{false};

ParserMode mode_;
std::unique_ptr<ParserStrategy<T, FrameLengthParserStrategy>>
Expand Down
Loading

0 comments on commit a1863c5

Please sign in to comment.