Skip to content

Commit

Permalink
remove pending workers on cleanup; add/enable test
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Dec 9, 2023
1 parent 6fc312d commit fafa821
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CMQMaster {
void close(int timeout=0) {
peers.clear();
env.clear();
pending_workers = 0;

if (sock.handle() != nullptr) {
sock.set(zmq::sockopt::linger, timeout);
Expand Down
8 changes: 8 additions & 0 deletions tests/testthat/test-4-pool.r
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ test_that("starting and stopping multicore", {
expect_equal(w$workers_total, 0)
})

test_that("pending workers area cleaned up properly", {
skip_on_os("windows")
w = workers(1, qsys_id="multicore")
w$cleanup(5000L)
expect_equal(w$workers_running, 0)
expect_equal(w$workers_total, 0)
})

test_that("calculations are really done on the worker", {
skip_on_os("windows")
x = 1
Expand Down
24 changes: 2 additions & 22 deletions tests/testthat/test-5-queue.r
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,7 @@ test_that("worker timeout throws error", {
Q(Sys.sleep, 3, rettype="numeric", workers=w, timeout=1L)))
})

#test_that("error timeout works", {
# skip_on_cran()
# skip_if_not(has_localhost)
# skip_on_os("windows")
# fx = function(x) {
# Sys.sleep(x)
# stop("error")
# }
#
# options(clustermq.error.timeout = 3)
# w = workers(n_jobs=2, qsys_id="multicore", reuse=FALSE)
#
# times = system.time({
# expect_error(expect_warning(Q(fx, x=c(1,10), workers=w, timeout=10)))
# })
# expect_true(times[["elapsed"]] < 5)
#})

test_that("Q with expired workers throws error quickly", {
skip("FIXME")
skip_on_cran()
skip_on_os("windows")

Expand All @@ -112,14 +93,13 @@ test_that("Q with expired workers throws error quickly", {
times = system.time({
expect_error(Q(identity, x=1:3, rettype="numeric", workers=w, timeout=10L))
})
expect_true(times[["elapsed"]] < 1)
expect_true(times[["elapsed"]] < 5)
})

test_that("shutdown monitor does not fire on clean disconnects", {
skip_on_os("windows")
skip_if_not(libzmq_has_draft())

# doing this via a separate call to `workers()` works
# so this seems to be a race condition of some sort
w = workers(n_jobs=2, qsys_id="multicore", reuse=FALSE)
res = Q(Sys.sleep, time=c(0,1), workers=w, timeout=10L)
expect_equal(res, list(NULL, NULL))
Expand Down

0 comments on commit fafa821

Please sign in to comment.