diff --git a/common/executor/executor.cpp b/common/executor/executor.cpp index 83adae67..afde3af4 100644 --- a/common/executor/executor.cpp +++ b/common/executor/executor.cpp @@ -16,35 +16,26 @@ namespace photon { class ExecutorImpl { public: - using CBList = LockfreeMPMCRingQueue, 32UL * 1024>; + using CBList = + common::RingChannel, 32UL * 1024>>; std::unique_ptr th; photon::thread *pth = nullptr; - EventLoop *loop = nullptr; CBList queue; photon::ThreadPoolBase *pool; - bool quiting; - photon::semaphore sem; - std::atomic_bool waiting; - - ExecutorImpl() { - loop = new_event_loop({this, &ExecutorImpl::wait_for_event}, - {this, &ExecutorImpl::on_event}); - th.reset(new std::thread(&ExecutorImpl::do_loop, this)); - quiting = false; - waiting = true; - while (!loop || loop->state() != loop->WAITING) ::sched_yield(); - } - ~ExecutorImpl() { - photon::thread_interrupt(pth); - th->join(); + ExecutorImpl(int init_ev, int init_io) { + th.reset( + new std::thread(&ExecutorImpl::launch, this, init_ev, init_io)); } - int wait_for_event(EventLoop *) { - std::atomic_thread_fence(std::memory_order_acquire); - sem.wait(1); - waiting.store(true, std::memory_order_release); - return quiting ? -1 : 1; + ExecutorImpl() {} + + ~ExecutorImpl() { + queue.send({}); + if (th) + th->join(); + else + while (pool) photon::thread_yield(); } struct CallArg { @@ -60,51 +51,53 @@ class ExecutorImpl { return nullptr; } - int on_event(EventLoop *) { + void main_loop() { CallArg arg; arg.backth = photon::CURRENT; - size_t cnt = 0; - while (!queue.empty()) { - arg.task = queue.recv(); + for (;;) { + arg.task = queue.recv(); + if (!arg.task) { + return; + } auto th = pool->thread_create(&ExecutorImpl::do_event, (void *)&arg); photon::thread_yield_to(th); - cnt++; } - return 0; } void do_loop() { - photon::init(INIT_EVENT_DEFAULT, photon::INIT_IO_DEFAULT); - DEFER(photon::fini()); pth = photon::CURRENT; - LOG_INFO("worker start"); pool = photon::new_thread_pool(32); - loop->async_run(); - photon::thread_usleep(-1); + LOG_INFO("worker start"); + main_loop(); LOG_INFO("worker finished"); - while (!queue.empty()) { - photon::thread_usleep(1000); - } - quiting = true; - sem.signal(1); - delete loop; photon::delete_thread_pool(pool); pool = nullptr; } + + void launch(int init_ev, int init_io) { + photon::init(init_ev, init_io); + DEFER(photon::fini()); + do_loop(); + } }; -ExecutorImpl *_new_executor() { return new ExecutorImpl(); } +Executor::Executor(int init_ev, int init_io) + : e(new ExecutorImpl(init_ev, init_io)) {} + +Executor::Executor(create_on_current_vcpu) : e(new ExecutorImpl()) {} -void _delete_executor(ExecutorImpl *e) { delete e; } +Executor::~Executor() { delete e; } -void _issue(ExecutorImpl *e, Delegate act) { +void Executor::_issue(ExecutorImpl *e, Delegate act) { e->queue.send(act); - bool cond = true; - if (e->waiting.compare_exchange_weak(cond, false, - std::memory_order_acq_rel)) { - e->sem.signal(1); - } +} + +Executor *Executor::export_as_executor() { + auto ret = new Executor(create_on_current_vcpu()); + auto th = photon::thread_create11(&ExecutorImpl::do_loop, ret->e); + photon::thread_yield_to(th); + return ret; } } // namespace photon \ No newline at end of file diff --git a/common/executor/executor.h b/common/executor/executor.h index 7c17101e..8e2f48ff 100644 --- a/common/executor/executor.h +++ b/common/executor/executor.h @@ -18,6 +18,7 @@ limitations under the License. #include #include +#include #include #include @@ -26,14 +27,12 @@ namespace photon { class ExecutorImpl; -ExecutorImpl *_new_executor(); -void _delete_executor(ExecutorImpl *e); -void _issue(ExecutorImpl *e, Delegate cb); - class Executor { public: - ExecutorImpl *e = _new_executor(); - ~Executor() { _delete_executor(e); } + ExecutorImpl *e; + Executor(int init_ev = photon::INIT_EVENT_DEFAULT, + int init_io = photon::INIT_IO_DEFAULT); + ~Executor(); template < typename Context = StdContext, typename Func, @@ -68,8 +67,8 @@ class Executor { // The task object will be delete after work done template void async_perform(Func *task) { - void (*func)(void*); - func = [](void* task_) { + void (*func)(void *); + func = [](void *task_) { using Task = decltype(task); auto t = (Task)task_; (*t)(); @@ -78,8 +77,14 @@ class Executor { _issue(e, {func, task}); } + static Executor* export_as_executor(); + protected: - static constexpr int64_t kCondWaitMaxTime = 1000L * 1000; + static constexpr int64_t kCondWaitMaxTime = 100L * 1000; + + struct create_on_current_vcpu {}; + + Executor(create_on_current_vcpu); template struct AsyncOp { @@ -107,6 +112,8 @@ class Executor { wait_for_completion(); } }; + + static void _issue(ExecutorImpl *e, Delegate cb); }; } // namespace photon diff --git a/common/executor/test/test_export_as_executor.cpp b/common/executor/test/test_export_as_executor.cpp new file mode 100644 index 00000000..33c1bd19 --- /dev/null +++ b/common/executor/test/test_export_as_executor.cpp @@ -0,0 +1,37 @@ +#include +#include +#include +#include +#include +#include + +#include + +TEST(enter_as_executor, test) { + // set global default logger output to null + set_log_output(log_output_null); + + // create own logger + ALogLogger logger; + // set log_output to stdout, log level as info + logger.log_output = log_output_stdout; + logger.log_level = ALOG_INFO; + + photon::init(); + DEFER(photon::fini()); + // do some ready work in this photon environment + auto vcpu = photon::get_vcpu(); + auto e = photon::Executor::export_as_executor(); + DEFER(delete e); + + std::thread([&logger, e, vcpu] { + e->perform([&logger, vcpu] { + logger << LOG_INFO("Hello from a normal thread"); + auto cvcpu = photon::get_vcpu(); + logger << LOG_INFO("executor at `, current on `, `", vcpu, cvcpu, + vcpu == cvcpu ? "EQ" : "NEQ"); + }); + }).detach(); + + photon::thread_usleep(1UL * 1024 * 1024); +} \ No newline at end of file diff --git a/common/expirecontainer.h b/common/expirecontainer.h index 52f98849..6f90fe2a 100644 --- a/common/expirecontainer.h +++ b/common/expirecontainer.h @@ -80,7 +80,7 @@ class ExpireContainerBase : public Object { intrusive_list _list; uint64_t _expiration; photon::Timer _timer; - photon::spinlock _lock; + photon::spinlock _lock; // protect _list/_set operations using ItemPtr = Item*; struct ItemHash { @@ -177,6 +177,7 @@ class ExpireContainer : public ExpireContainerBase { template iterator insert(const InterfaceKey& key, Gs&&... xs) { auto item = new Item(key, std::forward(xs)...); + SCOPED_LOCK(_lock); auto pr = Base::insert(item); if (!pr.second) { delete item; diff --git a/common/lockfree_queue.h b/common/lockfree_queue.h index 17da6513..eb19262f 100644 --- a/common/lockfree_queue.h +++ b/common/lockfree_queue.h @@ -531,6 +531,10 @@ namespace common { * photon thread when queue is empty, and once it got object by recv, it will * trying using `thread_yield` instead of semaphore, to get better performance * and load balancing. + * Watch out that `recv` should run in photon environment (because it has to) + * use photon semaphore to be notified that new item has sended. `send` could + * running in photon or std::thread environment (needs to set template `Pause` as + * `ThreadPause`). * * @tparam QueueType shoulde be one of LockfreeMPMCRingQueue, * LockfreeBatchMPMCRingQueue, or LockfreeSPSCRingQueue, with their own template @@ -541,7 +545,8 @@ class RingChannel : public QueueType { protected: photon::semaphore queue_sem; std::atomic idler{0}; - static constexpr uint64_t BUSY_YIELD_TIMEOUT = 1024; + uint64_t m_busy_yield_turn; + uint64_t m_busy_yield_timeout; using T = decltype(std::declval().recv()); @@ -553,21 +558,36 @@ class RingChannel : public QueueType { using QueueType::read_available; using QueueType::write_available; + /** + * @brief Construct a new Ring Channel object + * + * @param busy_yield_timeout setting yield timeout, default is template + * parameter DEFAULT_BUSY_YIELD_TIMEOUT. Ring Channel will try busy yield + * in `busy_yield_timeout` usecs. + */ + RingChannel(uint64_t busy_yield_turn = 64, + uint64_t busy_yield_timeout = 1024) + : m_busy_yield_turn(busy_yield_turn), + m_busy_yield_timeout(busy_yield_timeout) {} + + template void send(const T& x) { while (!push(x)) { - if (!full()) photon::thread_yield(); + if (!full()) Pause::pause(); } - uint64_t idle = idler.exchange(0, std::memory_order_acq_rel); - queue_sem.signal(idle); + queue_sem.signal(idler.load(std::memory_order_acquire)); } T recv() { T x; - Timeout yield_timeout(BUSY_YIELD_TIMEOUT); + Timeout yield_timeout(m_busy_yield_timeout); + int yield_turn = m_busy_yield_turn; + idler.fetch_add(1, std::memory_order_acq_rel); + DEFER(idler.fetch_sub(1, std::memory_order_acq_rel)); while (!pop(x)) { - if (photon::now < yield_timeout.expire()) { + if (yield_turn > 0 && photon::now < yield_timeout.expire()) { + yield_turn--; photon::thread_yield(); } else { - idler.fetch_add(1, std::memory_order_acq_rel); queue_sem.wait(1); } } diff --git a/common/metric-meter/metrics.h b/common/metric-meter/metrics.h index 8fe4891c..b45113b0 100644 --- a/common/metric-meter/metrics.h +++ b/common/metric-meter/metrics.h @@ -59,9 +59,9 @@ class AverageCounter { if (now - time > m_interval * 2) { reset(); } else if (now - time > m_interval) { - sum = photon::sat_sub(sum, sum * (now - time) / m_interval); - cnt = photon::sat_sub(cnt, cnt * (now - time) / m_interval); - time = now; + sum = photon::sat_sub(sum, sum * (now - time - m_interval) / m_interval); + cnt = photon::sat_sub(cnt, cnt * (now - time - m_interval) / m_interval); + time = now - m_interval; } } void put(int64_t val) { @@ -95,8 +95,8 @@ class QPSCounter { reset(); } else if (now - time > m_interval) { counter = - photon::sat_sub(counter, counter * (now - time) / m_interval); - time = now; + photon::sat_sub(counter, counter * (now - time - m_interval) / m_interval); + time = now - m_interval; } } void put(int64_t val = 1) { diff --git a/fs/exportfs.cpp b/fs/exportfs.cpp index 66cd2689..9563ed02 100644 --- a/fs/exportfs.cpp +++ b/fs/exportfs.cpp @@ -34,7 +34,8 @@ namespace photon { namespace fs { static EventLoop* evloop = nullptr; - using Queue = LockfreeSPSCRingQueue, 65536>; + using Queue = + common::RingChannel, 65536>>; class ExportBase { @@ -44,7 +45,7 @@ namespace fs static Queue op_queue; static int ref; static condition_variable cond; - static semaphore sem; + static Delegate op; static ThreadPoolBase* pool; template static void perform_helper(void* arg) { @@ -57,14 +58,14 @@ namespace fs { { SCOPED_LOCK(lock); - op_queue.send(Delegate(&ExportBase::perform_helper, act)); + op_queue.send( + Delegate(&ExportBase::perform_helper, act)); } - sem.signal(1); } static int wait4events(void*, EventLoop*) { - sem.wait(1); - if (op_queue.empty()) return -1; + op = op_queue.recv(); + if (!op) return -1; return 1; } @@ -79,11 +80,20 @@ namespace fs photon::thread_yield_to(th); // let `th` to run and pop an op return 0; } + static void stop() { + { + SCOPED_LOCK(lock); + op_queue.send({}); + } + while (evloop->state() != evloop->STOP) { + photon::thread_yield(); + } + } static void* do_opq(void*) { DEFER({if (--ref == 0) cond.notify_all();}); - if (op_queue.empty()) return nullptr; - auto func = op_queue.recv(); + auto func = op; + op = {}; if (func) func(); return nullptr; } @@ -129,7 +139,7 @@ namespace fs __attribute__((visibility("hidden"))) Queue ExportBase::op_queue; __attribute__((visibility("hidden"))) int ExportBase::ref = 1; __attribute__((visibility("hidden"))) condition_variable ExportBase::cond; - __attribute__((visibility("hidden"))) semaphore ExportBase::sem(0); + __attribute__((visibility("hidden"))) Delegate ExportBase::op; __attribute__((visibility("hidden"))) ThreadPoolBase* ExportBase::pool = nullptr; #define PERFORM(ID, expr) \ @@ -549,7 +559,6 @@ namespace fs LOG_ERROR_RETURN(EFAULT, -1, "failed to create event loop"); ExportBase::ref = 1; - ExportBase::sem.wait(ExportBase::sem.count()); if (thread_pool_capacity != 0) ExportBase::pool = new_thread_pool(thread_pool_capacity); evloop->async_run(); return 0; @@ -562,8 +571,7 @@ namespace fs if (!evloop) LOG_ERROR_RETURN(ENOSYS, -1, "not inited yet"); - ExportBase::sem.signal(1); - evloop->stop(); + ExportBase::stop(); --ExportBase::ref; while (ExportBase::ref != 0) { @@ -577,7 +585,6 @@ namespace fs auto cb = ExportBase::op_queue.recv(); cb(); } - ExportBase::sem.wait(ExportBase::sem.count()); return 0; } IAsyncFile* export_as_async_file(IFile* file) diff --git a/io/aio-wrapper.cpp b/io/aio-wrapper.cpp index 12c99b16..2b7b9b0d 100644 --- a/io/aio-wrapper.cpp +++ b/io/aio-wrapper.cpp @@ -90,7 +90,6 @@ namespace photon { int ret = io_submit(ctx->aio_ctx, 1, &piocb); if (ret == 1) break; - thread_usleep(1000*10); // sleep 10ms whenever error occurs if (ret < 0) { auto e = -ret; @@ -104,10 +103,9 @@ namespace photon case EFAULT: case EINVAL: default: - errno = e; - LOG_ERRNO_RETURN(0, ret, "failed to io_submit()"); + thread_usleep(1000*10); // sleep 10ms whenever error occurs + LOG_ERRNO_RETURN(e, ret, "failed to io_submit()"); } - return -1; } } @@ -152,6 +150,7 @@ namespace photon static void resume_libaio_requesters() { +retry: struct io_event events[IODEPTH]; int n = HAVE_N_TRY(my_io_getevents, (0, IODEPTH, events)); for (int i=0; iu.c.buf), VALUE(piocb->u.c.resfd)); thread_interrupt((thread *)events[i].data, EOK); } + if (n == IODEPTH) + { + thread_yield(); + goto retry; + } } static uint64_t wait_for_events() diff --git a/net/http/client.cpp b/net/http/client.cpp index 5cc0f63b..7071185e 100644 --- a/net/http/client.cpp +++ b/net/http/client.cpp @@ -54,7 +54,7 @@ class PooledDialer { template ISocketStream* dial(const T& x, uint64_t timeout = -1UL) { - return dial(x.host(), x.port(), x.secure(), timeout); + return dial(x.host_no_port(), x.port(), x.secure(), timeout); } }; diff --git a/net/http/cookie_jar.cpp b/net/http/cookie_jar.cpp index c8f70b6c..9bf4af35 100644 --- a/net/http/cookie_jar.cpp +++ b/net/http/cookie_jar.cpp @@ -118,8 +118,9 @@ class SimpleCookieJar : public ICookieJar { return m_cookie[host]->get_cookies_from_headers(message); } int set_cookies_to_headers(Request* request) override { - if (request->host().empty()) return -1; - return m_cookie[request->host()]->set_cookies_to_headers(request); + auto host = request->host(); + if (host.empty()) return -1; + return m_cookie[host]->set_cookies_to_headers(request); } }; diff --git a/net/http/message.cpp b/net/http/message.cpp index 0193a636..395d10e4 100644 --- a/net/http/message.cpp +++ b/net/http/message.cpp @@ -309,7 +309,7 @@ int Request::reset(Verb v, std::string_view url, bool enable_proxy) { if ((size_t)m_buf_capacity <= u.target().size() + 21 + verbstr[v].size()) LOG_ERROR_RETURN(ENOBUFS, -1, "out of buffer"); - LOG_DEBUG("requst reset ", VALUE(u.host()), VALUE(u.host_port()), VALUE(enable_proxy)); + LOG_DEBUG("requst reset ", VALUE(u.host()), VALUE(enable_proxy)); Message::reset(); make_request_line(v, u, enable_proxy); diff --git a/net/http/message.h b/net/http/message.h index 89d68e86..e8e8859f 100644 --- a/net/http/message.h +++ b/net/http/message.h @@ -176,7 +176,17 @@ class Request : public Message { uint16_t port() const { return m_port; } + std::string_view host_no_port() const { + // only contains host, without port + auto tmp = headers["Host"]; + auto pos = tmp.find(":"); + if (pos == std::string_view::npos) { + return tmp; + } + return tmp.substr(0, pos); + } std::string_view host() const { + // the original "Host" in header return headers["Host"]; } std::string_view abs_path() const { diff --git a/net/http/test/CMakeLists.txt b/net/http/test/CMakeLists.txt index 401c6fa1..23ef62af 100644 --- a/net/http/test/CMakeLists.txt +++ b/net/http/test/CMakeLists.txt @@ -11,10 +11,10 @@ target_link_libraries(server_perf PRIVATE photon_shared ${testing_libs}) add_executable(pure_libcurl pure_libcurl.cpp) target_link_libraries(pure_libcurl PRIVATE photon_shared ${testing_libs}) -#add_executable(client_function_test client_function_test.cpp) -#target_link_libraries(client_function_test PRIVATE photon_shared ${testing_libs}) -#add_test(NAME client_function_test COMMAND $) -# +add_executable(client_function_test client_function_test.cpp) +target_link_libraries(client_function_test PRIVATE photon_shared ${testing_libs}) +add_test(NAME client_function_test COMMAND $) + add_executable(server_function_test server_function_test.cpp) target_link_libraries(server_function_test PRIVATE photon_shared ${testing_libs}) add_test(NAME server_function_test COMMAND $) diff --git a/net/http/test/client_function_test.cpp b/net/http/test/client_function_test.cpp index 98190ce0..2604fb0e 100644 --- a/net/http/test/client_function_test.cpp +++ b/net/http/test/client_function_test.cpp @@ -535,11 +535,13 @@ int main(int argc, char** arg) { if (photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE)) return -1; DEFER(photon::fini()); +#ifdef __linux__ if (et_poller_init() < 0) { LOG_ERROR("et_poller_init failed"); exit(EAGAIN); } DEFER(et_poller_fini()); +#endif set_log_output_level(ALOG_DEBUG); ::testing::InitGoogleTest(&argc, arg); LOG_DEBUG("test result:`", RUN_ALL_TESTS()); diff --git a/net/http/test/headers_test.cpp b/net/http/test/headers_test.cpp index 0118c866..de0dca22 100644 --- a/net/http/test/headers_test.cpp +++ b/net/http/test/headers_test.cpp @@ -183,12 +183,13 @@ TEST(headers, resp_header) { } while (!exceed_stream.done()); } TEST(headers, url) { - RequestHeadersStored<> headers(Verb::UNKNOWN, "https://domain.com/dir1/dir2/file?key1=value1&key2=value2"); - LOG_DEBUG(VALUE(headers.target())); - LOG_DEBUG(VALUE(headers.host())); - LOG_DEBUG(VALUE(headers.secure())); - LOG_DEBUG(VALUE(headers.query())); - LOG_DEBUG(VALUE(headers.port())); + RequestHeadersStored<> headers(Verb::UNKNOWN, "https://domain.com:8888/dir1/dir2/file?key1=value1&key2=value2"); + EXPECT_EQ(true, headers.target() =="/dir1/dir2/file?key1=value1&key2=value2"); + EXPECT_EQ(true, headers.host() == "domain.com:8888"); + EXPECT_EQ(headers.port(), 8888); + EXPECT_EQ(true, headers.host_no_port() == "domain.com"); + EXPECT_EQ(headers.secure(), 1); + EXPECT_EQ(true, headers.query() == "key1=value1&key2=value2"); RequestHeadersStored<> new_headers(Verb::UNKNOWN, ""); if (headers.secure()) new_headers.headers.insert("Referer", http_url_scheme); @@ -198,6 +199,7 @@ TEST(headers, url) { new_headers.headers.value_append(headers.target()); auto Referer_value = new_headers.headers["Referer"]; LOG_DEBUG(VALUE(Referer_value)); + EXPECT_EQ(true, Referer_value == "http://domain.com:8888/dir1/dir2/file?key1=value1&key2=value2"); } TEST(ReqHeaders, redirect) { diff --git a/net/http/url.h b/net/http/url.h index 686e2864..0ea27543 100644 --- a/net/http/url.h +++ b/net/http/url.h @@ -73,6 +73,7 @@ class URL { } std::string_view host() const { return m_url | m_host; } + std::string_view host_no_port() const { return host(); } std::string_view host_port() const { return m_url | m_host_port;} uint16_t port() const { return m_port; } bool secure() const { return m_secure; } diff --git a/thread/thread.cpp b/thread/thread.cpp index a01ec3cd..4a105262 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -111,6 +111,37 @@ namespace photon } }; + void* default_photon_thread_stack_alloc(void*, size_t stack_size) { + char* ptr = nullptr; + int err = posix_memalign((void**)&ptr, PAGE_SIZE, stack_size); + if (unlikely(err)) + LOG_ERROR_RETURN(err, nullptr, "Failed to allocate photon stack! ", + ERRNO(err)); +#if defined(__linux__) + madvise(ptr, stack_size, MADV_NOHUGEPAGE); +#endif + return ptr; + } + + void default_photon_thread_stack_dealloc(void*, void* ptr, size_t size) { +#ifndef __aarch64__ + madvise(ptr, size, MADV_DONTNEED); +#endif + free(ptr); + } + + Delegate &photon_thread_alloc() { + static Delegate _photon_thread_alloc( + &default_photon_thread_stack_alloc, nullptr); + return _photon_thread_alloc; + } + + Delegate &photon_thread_dealloc() { + static Delegate _photon_thread_dealloc( + &default_photon_thread_stack_dealloc, nullptr); + return _photon_thread_dealloc; + } + struct vcpu_t; struct thread; class Stack @@ -257,11 +288,9 @@ namespace photon } void dispose() { assert(state == states::DONE); - register auto b = buf; //store in register to prevent from being deleted by madvise -#ifndef __aarch64__ - madvise(b, stack_size, MADV_DONTNEED); -#endif - free(b); + // `buf` and `stack_size` will always store on register + // when calling deallocating. + photon_thread_dealloc()(buf, stack_size); } }; @@ -816,13 +845,7 @@ R"( LOG_ERROR_RETURN(ENOSYS, nullptr, "Photon not initialized in this vCPU (OS thread)"); size_t randomizer = (rand() % 32) * (1024 + 8); stack_size = align_up(randomizer + stack_size + sizeof(thread), PAGE_SIZE); - char* ptr = nullptr; - int err = posix_memalign((void**)&ptr, PAGE_SIZE, stack_size); - if (unlikely(err)) - LOG_ERROR_RETURN(err, nullptr, "Failed to allocate photon stack! ", ERRNO(err)); -#if defined(__linux__) - madvise(ptr, stack_size, MADV_NOHUGEPAGE); -#endif + char *ptr = (char *)photon_thread_alloc()(stack_size); auto p = ptr + stack_size - sizeof(thread) - randomizer; (uint64_t&)p &= ~63; auto th = new (p) thread; @@ -1792,6 +1815,13 @@ R"( return --_n_vcpu; } + void set_photon_thread_stack_allocator( + Delegate _photon_thread_alloc, + Delegate _photon_thread_dealloc) { + photon_thread_alloc() = _photon_thread_alloc; + photon_thread_dealloc() = _photon_thread_dealloc; + } + void* stackful_malloc(size_t size) { return CURRENT->stackful_malloc(size); } diff --git a/thread/thread.h b/thread/thread.h index 4fc5aed4..d29b9492 100644 --- a/thread/thread.h +++ b/thread/thread.h @@ -32,8 +32,6 @@ namespace photon int wait_all(); int timestamp_updater_init(); int timestamp_updater_fini(); - void* stackful_malloc(size_t size); - void stackful_free(void* ptr); struct thread; @@ -433,6 +431,23 @@ namespace photon bool is_master_event_engine_default(); void reset_master_event_engine_default(); + // alloc space on rear end of current thread stack, + // helps allocating when using hybrid C++20 style coroutine + void* stackful_malloc(size_t size); + void stackful_free(void* ptr); + + // Set photon allocator/deallocator for photon thread stack + // this is a hook for thread allocation, both alloc and dealloc + // helps user to do more works like mark GC while allocating + void* default_photon_thread_stack_alloc(void*, size_t stack_size); + void default_photon_thread_stack_dealloc(void*, void* stack_ptr, + size_t stack_size); + void set_photon_thread_stack_allocator( + Delegate photon_thread_alloc = { + &default_photon_thread_stack_alloc, nullptr}, + Delegate photon_thread_dealloc = { + &default_photon_thread_stack_dealloc, nullptr}); + // Saturating addition, primarily for timeout caculation __attribute__((always_inline)) inline uint64_t sat_add(uint64_t x, uint64_t y) { diff --git a/thread/workerpool.cpp b/thread/workerpool.cpp index 331fa766..757584de 100644 --- a/thread/workerpool.cpp +++ b/thread/workerpool.cpp @@ -66,7 +66,7 @@ class WorkPool::impl { } void enqueue(Delegate call) { - ring.send(call); + ring.send(call); } void do_call(Delegate call) {