Skip to content

Commit

Permalink
Implement pending jobs counter
Browse files Browse the repository at this point in the history
While running the job dispatchers we keep track on jobs being inserted and executed so we
at least know about the number of unprocessed jobs via dt_control_jobs_pending()

Also show pending jobs in the log if closing dt.
  • Loading branch information
jenshannoschwalm committed Dec 14, 2024
1 parent 0fb5faa commit fdaf125
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/control/control.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ void dt_control_init(dt_control_t *s)
s->widget_definitions = g_ptr_array_new ();
s->input_drivers = NULL;
dt_atomic_set_int(&s->running, DT_CONTROL_STATE_DISABLED);
dt_atomic_set_int(&s->pending_jobs, 0);
s->cups_started = FALSE;

dt_action_define_fallback(DT_ACTION_TYPE_IOP, &dt_action_def_iop);
Expand Down Expand Up @@ -328,8 +329,9 @@ void dt_control_shutdown(dt_control_t *control)
pthread_cond_broadcast(&control->cond);
dt_pthread_mutex_unlock(&control->cond_mutex);

dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] closing control threads%s",
cleanup ? " in cleanup mode" : "");
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] closing control threads%s, %d pending jobs",
cleanup ? "in cleanup mode" : "",
dt_control_jobs_pending(control));

if(!cleanup)
return; // if not running there are no threads to join
Expand Down
1 change: 1 addition & 0 deletions src/control/control.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ typedef struct dt_control_t

// job management
dt_atomic_int running;
dt_atomic_int pending_jobs;
gboolean cups_started;
gboolean export_scheduled;
dt_pthread_mutex_t queue_mutex, cond_mutex;
Expand Down
10 changes: 10 additions & 0 deletions src/control/jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ static void _control_job_execute(_dt_job_t *job)

_control_job_set_state(job, DT_JOB_STATE_FINISHED);
_control_job_print(job, "run_job-", "", DT_CTL_WORKER_RESERVED + _control_get_threadid());
dt_atomic_add_int(&darktable.control->pending_jobs, -1);
}

static gboolean _control_run_job(dt_control_t *control)
Expand Down Expand Up @@ -432,6 +433,7 @@ gboolean dt_control_add_job(dt_control_t *control,
}
}

dt_atomic_add_int(&darktable.control->pending_jobs, 1);
// if the job is already in the queue -> move it to the top
for(GList *iter = *queue; iter; iter = g_list_next(iter))
{
Expand All @@ -446,6 +448,7 @@ gboolean dt_control_add_job(dt_control_t *control,
job_for_disposal = job;

job = other_job;
dt_atomic_add_int(&darktable.control->pending_jobs, -1);
break; // there can't be any further copy in the list
}
}
Expand All @@ -462,6 +465,7 @@ gboolean dt_control_add_job(dt_control_t *control,
dt_control_job_dispose(last->data);
*queue = g_list_delete_link(*queue, last);
length--;
dt_atomic_add_int(&darktable.control->pending_jobs, -1);
}

control->queue_length[queue_id] = length;
Expand All @@ -477,6 +481,7 @@ gboolean dt_control_add_job(dt_control_t *control,
job->priority = DT_CONTROL_FG_PRIORITY;
*queue = g_list_append(*queue, job);
control->queue_length[queue_id]++;
dt_atomic_add_int(&darktable.control->pending_jobs, 1);
}
_control_job_set_state(job, DT_JOB_STATE_QUEUED);
dt_pthread_mutex_unlock(&control->queue_mutex);
Expand Down Expand Up @@ -636,6 +641,11 @@ void dt_control_jobs_cleanup(dt_control_t *control)
control->thread = NULL;
}

int dt_control_jobs_pending(dt_control_t *control)
{
return control ? dt_atomic_get_int(&control->pending_jobs) : 0;
}

// clang-format off
// modelines: These editor modelines have been set for all relevant files by tools/update_modelines.py
// vim: shiftwidth=2 expandtab tabstop=2 cindent
Expand Down
1 change: 1 addition & 0 deletions src/control/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ double dt_control_job_get_progress(dt_job_t *job);
struct dt_control_t;
void dt_control_jobs_init(struct dt_control_t *control);
void dt_control_jobs_cleanup(struct dt_control_t *control);
int dt_control_jobs_pending(struct dt_control_t *control);

gboolean dt_control_add_job(struct dt_control_t *control, dt_job_queue_t queue_id, dt_job_t *job);
gboolean dt_control_add_job_res(struct dt_control_t *s, dt_job_t *job, int32_t res);
Expand Down

0 comments on commit fdaf125

Please sign in to comment.