Skip to content

Commit

Permalink
How to get into a joinable pthread state?
Browse files Browse the repository at this point in the history
1. Currently the job dispatcher might have pending jobs. These are not processed any more
   if we test for dt_control_running() in job.c
2. We continue dispatching even in DT_CONTROL_STATE_CLEANUP to get all things done.

3. But how can we know if all jobs are finalized?
   This might be a way, we increase `pending_jobs` jobs-to-be-done and decrease for
   finalized jobs.

4. In control_shutdown we have to wait until everything is done.
  • Loading branch information
jenshannoschwalm committed Dec 10, 2024
1 parent 77bed88 commit a9d0f10
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
24 changes: 19 additions & 5 deletions src/control/control.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ void dt_control_init(dt_control_t *s)
s->widget_definitions = g_ptr_array_new ();
s->input_drivers = NULL;
dt_atomic_set_int(&s->running, DT_CONTROL_STATE_DISABLED);
dt_atomic_set_int(&s->pending_jobs, 0);
s->cups_started = FALSE;

dt_action_define_fallback(DT_ACTION_TYPE_IOP, &dt_action_def_iop);
Expand Down Expand Up @@ -326,22 +327,35 @@ void dt_control_shutdown(dt_control_t *s)
return;

dt_pthread_mutex_lock(&s->cond_mutex);
const gboolean cleanup = dt_atomic_exch_int(&s->running, DT_CONTROL_STATE_DISABLED) == DT_CONTROL_STATE_CLEANUP;
// just want to know if we need to cleanup at all
const gboolean cleanup = dt_atomic_get_int(&s->running) == DT_CONTROL_STATE_CLEANUP;
pthread_cond_broadcast(&s->cond);
dt_pthread_mutex_unlock(&s->cond_mutex);

int err = 0; // collect all joining errors
int err = 0;

/* first wait for gphoto device updater */
#ifdef HAVE_GPHOTO2
err = pthread_join(s->update_gphoto_thread, NULL);
#endif

if(!cleanup)
return; // if not running there are no threads to join
return; // if control was not running there are no threads to join

dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] closing control threads");
// wait for up to 30 seconds while jobs are still going on
// FIXME a simple while loop after sorting out the pending jobs counting?
for(int i = 0; i < 30 && (dt_control_jobs_pending(s) > 0); i++)
{
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] closing control threads, %d pending jobs", dt_control_jobs_pending(s));
g_usleep(1000000);
}

// finally stop all jobs work and join threads
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] switching to DT_CONTROL_STATE_DISABLED, %d pending jobs", dt_control_jobs_pending(s));
dt_atomic_set_int(&s->running, DT_CONTROL_STATE_DISABLED);
g_usleep(10000);

/* then wait for kick_on_workers_thread */
/* wait for kick_on_workers_thread */
err = pthread_join(s->kick_on_workers_thread, NULL);
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] joined kicker%s", err ? ", error" : "");

Expand Down
1 change: 1 addition & 0 deletions src/control/control.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ typedef struct dt_control_t

// job management
dt_atomic_int running;
dt_atomic_int pending_jobs;
gboolean cups_started;
gboolean export_scheduled;
dt_pthread_mutex_t queue_mutex, cond_mutex;
Expand Down
32 changes: 25 additions & 7 deletions src/control/jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ static void _control_job_set_state(_dt_job_t *job,
{
if(!job) return;
dt_pthread_mutex_lock(&job->state_mutex);
if(state >= DT_JOB_STATE_FINISHED && job->state != DT_JOB_STATE_RUNNING && job->progress)
if(state >= DT_JOB_STATE_FINISHED
&& job->state != DT_JOB_STATE_RUNNING
&& job->progress)
{
dt_control_progress_destroy(darktable.control, job->progress);
job->progress = NULL;
}

job->state = state;
/* pass state change to callback */
if(job->state_changed_cb) job->state_changed_cb(job, state);
Expand Down Expand Up @@ -191,7 +194,11 @@ static void _control_job_print(_dt_job_t *job, const char *info, const char *err
{
if(!job) return;
dt_print(DT_DEBUG_CONTROL, "[%s]\t%02d %s %s | queue: %d | priority: %d",
info, res, err, job->description, job->queue, job->priority);
info ? info : "info missing",
res,
err ? err : "err missing",
job->description ? job->description : "???",
job->queue, job->priority);
}

void dt_control_job_cancel(_dt_job_t *job)
Expand Down Expand Up @@ -257,6 +264,7 @@ static gboolean _control_run_job_res(dt_control_t *control, int32_t res)
_control_job_set_state(job, DT_JOB_STATE_FINISHED);
_control_job_print(job, "run_job-", "", res);
}
dt_atomic_add_int(&darktable.control->pending_jobs, -1);
dt_pthread_mutex_unlock(&job->wait_mutex);
dt_control_job_dispose(job);
return FALSE;
Expand Down Expand Up @@ -336,6 +344,7 @@ static void _control_job_execute(_dt_job_t *job)

_control_job_set_state(job, DT_JOB_STATE_FINISHED);
_control_job_print(job, "run_job-", "", DT_CTL_WORKER_RESERVED + dt_control_get_threadid());
dt_atomic_add_int(&darktable.control->pending_jobs, -1);
}

static gboolean _control_run_job(dt_control_t *control)
Expand Down Expand Up @@ -383,6 +392,8 @@ gboolean dt_control_add_job_res(dt_control_t *control,
dt_control_job_dispose(control->job_res[res]);
}

dt_atomic_add_int(&darktable.control->pending_jobs, 1);

dt_print(DT_DEBUG_CONTROL, "[add_job_res] %d | ", res);
_control_job_print(job, "add_job_res", "", res);

Expand Down Expand Up @@ -454,6 +465,7 @@ gboolean dt_control_add_job(dt_control_t *control,
}
}

dt_atomic_add_int(&darktable.control->pending_jobs, 1);
// if the job is already in the queue -> move it to the top
for(GList *iter = *queue; iter; iter = g_list_next(iter))
{
Expand Down Expand Up @@ -542,9 +554,10 @@ static void *_control_work_res(void *ptr)
dt_pthread_setname(name);
free(params);
int32_t threadid_res = _control_get_threadid_res();
while(dt_control_running())

while(dt_atomic_get_int(&s->running) != DT_CONTROL_STATE_DISABLED)
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid_res);
dt_print(DT_DEBUG_CONTROL, "[control_work_res] %d", threadid_res);
if(_control_run_job_res(s, threadid_res))
{
// wait for a new job.
Expand All @@ -564,7 +577,7 @@ static void *_control_worker_kicker(void *ptr)
{
dt_control_t *control = (dt_control_t *)ptr;
dt_pthread_setname("kicker");
while(dt_control_running())
while(dt_atomic_get_int(&control->running) != DT_CONTROL_STATE_DISABLED)
{
sleep(2);
dt_pthread_mutex_lock(&control->cond_mutex);
Expand All @@ -587,9 +600,9 @@ static void *_control_work(void *ptr)
dt_pthread_setname(name);
free(params);
// int32_t threadid = dt_control_get_threadid();
while(dt_control_running())
while(dt_atomic_get_int(&control->running) != DT_CONTROL_STATE_DISABLED)
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid);
dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid);
if(_control_run_job(control))
{
// wait for a new job.
Expand Down Expand Up @@ -678,6 +691,11 @@ void dt_control_jobs_cleanup(dt_control_t *control)
control->thread = NULL;
}

int dt_control_jobs_pending(dt_control_t *control)
{
return dt_atomic_get_int(&control->pending_jobs);
}

// clang-format off
// modelines: These editor modelines have been set for all relevant files by tools/update_modelines.py
// vim: shiftwidth=2 expandtab tabstop=2 cindent
Expand Down
1 change: 1 addition & 0 deletions src/control/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ double dt_control_job_get_progress(dt_job_t *job);
struct dt_control_t;
void dt_control_jobs_init(struct dt_control_t *control);
void dt_control_jobs_cleanup(struct dt_control_t *control);
int dt_control_jobs_pending(struct dt_control_t *control);

gboolean dt_control_add_job(struct dt_control_t *control, dt_job_queue_t queue_id, dt_job_t *job);
gboolean dt_control_add_job_res(struct dt_control_t *s, dt_job_t *job, int32_t res);
Expand Down

0 comments on commit a9d0f10

Please sign in to comment.