Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix & improves #178

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions common/executor/easyawaiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2022 The Photon Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <easy/easy_io.h>
#include <easy/easy_uthread.h>
#include <photon/thread/awaiter.h>

namespace photon {

struct EasyContext {};

template <>
struct Awaiter<EasyContext> {
easy_comutex_t mtx;
Awaiter() {
easy_comutex_init(&mtx);
easy_comutex_cond_lock(&mtx);
}
~Awaiter() { easy_comutex_cond_unlock(&mtx); }
void suspend() { easy_comutex_cond_wait(&mtx); }
void resume() {
easy_comutex_cond_lock(&mtx);
easy_comutex_cond_signal(&mtx);
easy_comutex_cond_unlock(&mtx);
}
};

} // namespace photon
8 changes: 4 additions & 4 deletions common/executor/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace photon {

class ExecutorImpl {
class Executor::ExecutorImpl {
public:
using CBList =
common::RingChannel<LockfreeMPMCRingQueue<Delegate<void>, 32UL * 1024>>;
Expand Down Expand Up @@ -83,9 +83,9 @@ class ExecutorImpl {
};

Executor::Executor(int init_ev, int init_io)
: e(new ExecutorImpl(init_ev, init_io)) {}
: e(new Executor::ExecutorImpl(init_ev, init_io)) {}

Executor::Executor(create_on_current_vcpu) : e(new ExecutorImpl()) {}
Executor::Executor(create_on_current_vcpu) : e(new Executor::ExecutorImpl()) {}

Executor::~Executor() { delete e; }

Expand All @@ -95,7 +95,7 @@ void Executor::_issue(ExecutorImpl *e, Delegate<void> act) {

Executor *Executor::export_as_executor() {
auto ret = new Executor(create_on_current_vcpu());
auto th = photon::thread_create11(&ExecutorImpl::do_loop, ret->e);
auto th = photon::thread_create11(&Executor::ExecutorImpl::do_loop, ret->e);
photon::thread_yield_to(th);
return ret;
}
Expand Down
72 changes: 26 additions & 46 deletions common/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,62 @@ limitations under the License.
#pragma once

#include <photon/common/callback.h>
#include <photon/common/executor/stdlock.h>
#include <photon/photon.h>

#include <atomic>
#include <type_traits>
#include <photon/thread/awaiter.h>
#include <photon/thread/thread.h>

namespace photon {

class ExecutorImpl;

class Executor {
public:
class ExecutorImpl;

ExecutorImpl *e;
Executor(int init_ev = photon::INIT_EVENT_DEFAULT,
int init_io = photon::INIT_IO_DEFAULT);
~Executor();

template <
typename Context = StdContext, typename Func,
typename Context = AutoContext, typename Func,
typename R = typename std::result_of<Func()>::type,
typename _ = typename std::enable_if<!std::is_void<R>::value, R>::type>
R perform(Func &&act) {
R result;
AsyncOp<Context> aop;
aop.call(e, [&] {
int err;
Awaiter<Context> aop;
auto task = [&] {
result = act();
aop.done();
});
err = errno;
aop.resume();
};
_issue(e, task);
aop.suspend();
errno = err;
return result;
}

template <
typename Context = StdContext, typename Func,
typename Context = AutoContext, typename Func,
typename R = typename std::result_of<Func()>::type,
typename _ = typename std::enable_if<std::is_void<R>::value, R>::type>
void perform(Func &&act) {
AsyncOp<Context> aop;
aop.call(e, [&] {
Awaiter<Context> aop;
int err;
auto task = [&] {
act();
aop.done();
});
err = errno;
aop.resume();
};
_issue(e, task);
aop.suspend();
errno = err;
}

// `task` accept on heap lambda or functor pointer
// Usually could able to call as
// `e.async_perform(new auto ([]{ ... })`
// to create a new lambda object on heap without move from stack.
// The task object will be delete after work done
template <typename Context = StdContext, typename Func>
template <typename Context = AutoContext, typename Func>
void async_perform(Func *task) {
void (*func)(void *);
func = [](void *task_) {
Expand All @@ -77,7 +84,7 @@ class Executor {
_issue(e, {func, task});
}

static Executor* export_as_executor();
static Executor *export_as_executor();

protected:
static constexpr int64_t kCondWaitMaxTime = 100L * 1000;
Expand All @@ -86,33 +93,6 @@ class Executor {

Executor(create_on_current_vcpu);

template <typename Context>
struct AsyncOp {
int err;
std::atomic_bool gotit;
typename Context::Mutex mtx;
typename Context::Cond cond;
AsyncOp() : gotit(false), cond(mtx) {}
void wait_for_completion() {
typename Context::CondLock lock(mtx);
while (!gotit.load(std::memory_order_acquire)) {
cond.wait_for(lock, kCondWaitMaxTime);
}
if (err) errno = err;
}
void done(int error_number = 0) {
typename Context::CondLock lock(mtx);
err = error_number;
gotit.store(true, std::memory_order_release);
cond.notify_all();
}
template <typename Func>
void call(ExecutorImpl *e, Func &&act) {
_issue(e, act);
wait_for_completion();
}
};

static void _issue(ExecutorImpl *e, Delegate<void> cb);
};

Expand Down
1 change: 0 additions & 1 deletion common/executor/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ limitations under the License.
#include <gtest/gtest.h>
#include <photon/common/alog.h>
#include <photon/common/executor/executor.h>
#include <photon/common/executor/stdlock.h>
#include <photon/common/utility.h>
#include <photon/fs/exportfs.h>
#include <photon/fs/filesystem.h>
Expand Down
2 changes: 1 addition & 1 deletion common/executor/test/test_easy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ limitations under the License.
#include <photon/fs/localfs.h>
#include <photon/common/utility.h>
#include <photon/common/executor/executor.h>
#include <photon/common/executor/easylock.h>
#include <photon/common/executor/easyawaiter.h>
#include <photon/thread/thread.h>

using namespace photon;
Expand Down
3 changes: 2 additions & 1 deletion common/executor/test/test_easyexport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ limitations under the License.
#include <photon/io/fd-events.h>
#include <photon/thread/thread.h>
#include <photon/common/executor/executor.h>
#include <photon/common/executor/easylock.h>
#include <photon/common/executor/easyawaiter.h>

using namespace photon;

Expand Down Expand Up @@ -98,6 +98,7 @@ TEST(easy_performer, test) {
}

fs::exportfs_fini();
return 0;
}).detach();

EasyCoroutinePool ecp;
Expand Down
2 changes: 0 additions & 2 deletions common/executor/test/test_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ limitations under the License.
#include <photon/fs/localfs.h>
#include <photon/common/utility.h>
#include <photon/common/executor/executor.h>
#include <photon/common/executor/stdlock.h>
#include "photon/common/executor/executor.h"

using namespace photon;

Expand Down
1 change: 1 addition & 0 deletions include/photon/common/executor/easyawaiter.h
1 change: 1 addition & 0 deletions include/photon/thread/awaiter.h
2 changes: 1 addition & 1 deletion net/http/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PooledDialer {
PooledDialer() :
tls_ctx(new_tls_context(nullptr, nullptr, nullptr)),
tcpsock(new_tcp_socket_pool(new_tcp_socket_client(), -1, true)),
tlssock(new_tcp_socket_pool(new_tls_client(tls_ctx, new_tcp_socket_client()), -1, true)),
tlssock(new_tcp_socket_pool(new_tls_client(tls_ctx, new_tcp_socket_client(), true), -1, true)),
resolver(new_default_resolver(kDNSCacheLife)) {
}

Expand Down
77 changes: 77 additions & 0 deletions thread/awaiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2022 The Photon Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <errno.h>
#include <photon/thread/thread.h>

#include <atomic>
#include <future>

namespace photon {

// Special context that supports calling executor in photon thread
struct PhotonContext {};

// Context for stdthread calling executor
struct StdContext {};

// Special context that can automaticlly choose `PhotonContext` or `StdContext`
// by if photon initialized in current environment
struct AutoContext {};

template <typename T>
struct Awaiter;

template <>
struct Awaiter<PhotonContext> {
photon::semaphore sem;
Awaiter() {}
void suspend() { sem.wait(1); }
void resume() { sem.signal(1); }
};

template <>
struct Awaiter<StdContext> {
int err;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处是用future promise代替了原来的cond variable嘛? future promise性能怎么样,最终会被实现成啥同步原语?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GCC的标准库(libstdc++)中,future的wait动作使用condition_variable的wait_for实现。

std::promise<void> p;
std::future<void> f;
Awaiter() : f(p.get_future()) {}
void suspend() { f.get(); }
void resume() { return p.set_value(); }
};

template <>
struct Awaiter<AutoContext> {
Awaiter<PhotonContext> pctx;
Awaiter<StdContext> sctx;
bool is_photon = false;
Awaiter() : is_photon(photon::CURRENT) {}
void suspend() {
if (is_photon)
pctx.suspend();
else
sctx.suspend();
}
void resume() {
if (is_photon)
pctx.resume();
else
sctx.resume();
}
};
} // namespace photon
1 change: 0 additions & 1 deletion thread/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,6 @@ TEST(mutex, timeout_is_zero) {
});
}
for(auto &th : ths) th.join();
EXPECT_GT(cnt.load(), 0);
LOG_INFO("Meet ` lock timeout, all work finished", cnt.load());
}

Expand Down
Loading
Loading