Skip to content

Commit

Permalink
[bugfix](stdcallonce) replace std callonce with a lock because it is …
Browse files Browse the repository at this point in the history
…not exception safe (apache#35126)
  • Loading branch information
yiguolei authored and Doris-Extras committed Jun 1, 2024
1 parent 2bad561 commit 9dd5738
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 4 deletions.
60 changes: 56 additions & 4 deletions be/src/util/once.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <atomic>
#include <mutex>
#include <stdexcept>

#include "common/exception.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -52,17 +53,57 @@ class DorisCallOnce {
public:
DorisCallOnce() : _has_called(false) {}

// this method is not exception safe, it will core when exception occurs in
// callback method. I have tested the code https://en.cppreference.com/w/cpp/thread/call_once.
// If the underlying `once_flag` has yet to be invoked, invokes the provided
// lambda and stores its return value. Otherwise, returns the stored Status.
// template <typename Fn>
// ReturnType call(Fn fn) {
// std::call_once(_once_flag, [this, fn] {
// _status = fn();
// _has_called.store(true, std::memory_order_release);
// });
// return _status;
// }

// If exception occurs in the function, the call flag is set, if user call
// it again, the same exception will be thrown.
// It is different from std::call_once. This is because if a method is called once
// some internal state is changed, it maybe not called again although exception
// occurred.
template <typename Fn>
ReturnType call(Fn fn) {
std::call_once(_once_flag, [this, fn] {
// Avoid lock to improve performance
if (has_called()) {
if (_eptr) {
std::rethrow_exception(_eptr);
}
return _status;
}
std::lock_guard l(_flag_lock);
// should check again because maybe another thread call successfully.
if (has_called()) {
if (_eptr) {
std::rethrow_exception(_eptr);
}
return _status;
}
try {
_status = fn();
} catch (...) {
// Save the exception for next call.
_eptr = std::current_exception();
_has_called.store(true, std::memory_order_release);
});
std::rethrow_exception(_eptr);
}
// This memory order make sure both status and eptr is set
// and will be seen in another thread.
_has_called.store(true, std::memory_order_release);
return _status;
}

// Has to pay attention to memory order
// see https://en.cppreference.com/w/cpp/atomic/memory_order
// Return whether `call` has been invoked or not.
bool has_called() const {
// std::memory_order_acquire here and std::memory_order_release in
Expand All @@ -72,11 +113,22 @@ class DorisCallOnce {
}

// Return the stored result. The result is only meaningful when `has_called() == true`.
ReturnType stored_result() const { return _status; }
ReturnType stored_result() const {
if (!has_called()) {
// Could not return status if the method not called.
throw std::exception();
}
if (_eptr) {
std::rethrow_exception(_eptr);
}
return _status;
}

private:
std::atomic<bool> _has_called;
std::once_flag _once_flag;
// std::once_flag _once_flag;
std::mutex _flag_lock;
std::exception_ptr _eptr;
ReturnType _status;
};

Expand Down
152 changes: 152 additions & 0 deletions be/test/util/doris_callonce_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <time.h>

#include <algorithm>
#include <cstring>
#include <memory>
#include <mutex>
#include <thread>

#include "common/status.h"
#include "gtest/gtest_pred_impl.h"
#include "util/once.h"

namespace doris {
class DorisCallOnceTest : public ::testing::Test {};

TEST_F(DorisCallOnceTest, TestNormal) {
DorisCallOnce<Status> call1;
EXPECT_EQ(call1.has_called(), false);

Status st = call1.call([&]() -> Status { return Status::OK(); });
EXPECT_EQ(call1.has_called(), true);
EXPECT_EQ(call1.stored_result().code(), ErrorCode::OK);

st = call1.call([&]() -> Status { return Status::InternalError(""); });
EXPECT_EQ(call1.has_called(), true);
// The error code should not changed
EXPECT_EQ(call1.stored_result().code(), ErrorCode::OK);
}

// Test that, if the string contents is shorter than the initial capacity
// of the faststring, shrink_to_fit() leaves the string in the built-in
// array.
TEST_F(DorisCallOnceTest, TestErrorHappens) {
DorisCallOnce<Status> call1;
EXPECT_EQ(call1.has_called(), false);

Status st = call1.call([&]() -> Status { return Status::InternalError(""); });
EXPECT_EQ(call1.has_called(), true);
EXPECT_EQ(call1.stored_result().code(), ErrorCode::INTERNAL_ERROR);

st = call1.call([&]() -> Status { return Status::OK(); });
EXPECT_EQ(call1.has_called(), true);
// The error code should not changed
EXPECT_EQ(call1.stored_result().code(), ErrorCode::INTERNAL_ERROR);
}

TEST_F(DorisCallOnceTest, TestExceptionHappens) {
DorisCallOnce<Status> call1;
EXPECT_EQ(call1.has_called(), false);
bool exception_occured = false;
bool runtime_occured = false;
try {
Status st = call1.call([&]() -> Status {
throw std::exception();
return Status::InternalError("");
});
} catch (...) {
// Exception has to throw to the call method
exception_occured = true;
}
EXPECT_EQ(exception_occured, true);
EXPECT_EQ(call1.has_called(), true);

// call again, should throw the same exception.
exception_occured = false;
runtime_occured = false;
try {
Status st = call1.call([&]() -> Status { return Status::InternalError(""); });
} catch (...) {
// Exception has to throw to the call method
exception_occured = true;
}
EXPECT_EQ(exception_occured, true);
EXPECT_EQ(call1.has_called(), true);

// If call get result, should catch exception.
exception_occured = false;
runtime_occured = false;
try {
Status st = call1.stored_result();
} catch (...) {
// Exception has to throw to the call method
exception_occured = true;
}
EXPECT_EQ(exception_occured, true);

// Test the exception should actually the same one throwed by the callback method.
DorisCallOnce<Status> call2;
exception_occured = false;
runtime_occured = false;
try {
try {
Status st = call2.call([&]() -> Status {
throw std::runtime_error("runtime error happens");
return Status::InternalError("");
});
} catch (const std::runtime_error&) {
// Exception has to throw to the call method
runtime_occured = true;
}
} catch (...) {
// Exception has to throw to the call method, the runtime error is captured,
// so this code will not hit.
exception_occured = true;
}
EXPECT_EQ(runtime_occured, true);
EXPECT_EQ(exception_occured, false);

// Test the exception should actually the same one throwed by the callback method.
DorisCallOnce<Status> call3;
exception_occured = false;
runtime_occured = false;
try {
try {
Status st = call3.call([&]() -> Status {
throw std::exception();
return Status::InternalError("");
});
} catch (const std::runtime_error&) {
// Exception has to throw to the call method, but not runtime error
// so that this code will not hit
runtime_occured = true;
}
} catch (...) {
// Exception has to throw to the call method, the runtime error is not captured,
// so this code will hit.
exception_occured = true;
}
EXPECT_EQ(runtime_occured, false);
EXPECT_EQ(exception_occured, true);
}

} // namespace doris

0 comments on commit 9dd5738

Please sign in to comment.