Skip to content

Commit

Permalink
jobs maintenance (part 1)
Browse files Browse the repository at this point in the history
1. Don't fiddle around with dt_job_t variants declarations
2. make _control_job_set_synchronous() and _control_get_threadid() static with
   a leading underscore as only used internally and remove from .h
3. remove dt_control_job_wait() as not used in the code at all
4. some verbose job info added for intensive debugging
  • Loading branch information
jenshannoschwalm committed Dec 11, 2024
1 parent 0acaaab commit 892f8fc
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 106 deletions.
146 changes: 50 additions & 96 deletions src/control/jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,13 @@ typedef struct worker_thread_parameters_t
int32_t threadid;
} worker_thread_parameters_t;

typedef struct _dt_job_t
{
dt_job_execute_callback execute;
void *params;
size_t params_size;
dt_job_destroy_callback params_destroy;
int32_t result;

dt_pthread_mutex_t state_mutex;
dt_pthread_mutex_t wait_mutex;

dt_job_state_t state;
unsigned char priority;
dt_job_queue_t queue;

dt_job_state_change_callback state_changed_cb;

dt_progress_t *progress;

char description[DT_CONTROL_DESCRIPTION_LEN];
dt_view_type_flags_t view_creator;
gboolean is_synchronous;
} _dt_job_t;

/** check if two jobs are to be considered equal. a simple memcmp won't work since the mutexes probably won't
match
we don't want to compare result, priority or state since these will change during the course of
processing.
NOTE: maybe allow to pass a comparator for params.
*/
static inline gboolean _control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
static inline gboolean _control_job_equal(dt_job_t *j1, dt_job_t *j2)
{
if(!j1 || !j2) return FALSE;
if(j1->params_size != 0
Expand All @@ -77,8 +53,8 @@ static inline gboolean _control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
&& (g_strcmp0(j1->description, j2->description) == 0));
}

static void _control_job_set_state(_dt_job_t *job,
dt_job_state_t state)
static void _control_job_set_state(dt_job_t *job,
dt_job_state_t state)
{
if(!job) return;
dt_pthread_mutex_lock(&job->state_mutex);
Expand All @@ -93,7 +69,7 @@ static void _control_job_set_state(_dt_job_t *job,
dt_pthread_mutex_unlock(&job->state_mutex);
}

dt_job_state_t dt_control_job_get_state(_dt_job_t *job)
dt_job_state_t dt_control_job_get_state(dt_job_t *job)
{
if(!job) return DT_JOB_STATE_DISPOSED;
dt_pthread_mutex_lock(&job->state_mutex);
Expand All @@ -102,7 +78,7 @@ dt_job_state_t dt_control_job_get_state(_dt_job_t *job)
return state;
}

void dt_control_job_set_params(_dt_job_t *job, void *params, dt_job_destroy_callback callback)
void dt_control_job_set_params(dt_job_t *job, void *params, dt_job_destroy_callback callback)
{
if(!job || dt_control_job_get_state(job) != DT_JOB_STATE_INITIALIZED)
return;
Expand All @@ -123,7 +99,7 @@ void dt_control_job_set_params_with_size(dt_job_t *job,
job->params_destroy = callback;
}

void *dt_control_job_get_params(const _dt_job_t *job)
void *dt_control_job_get_params(const dt_job_t *job)
{
if(!job) return NULL;
return job->params;
Expand All @@ -132,7 +108,7 @@ void *dt_control_job_get_params(const _dt_job_t *job)
dt_job_t *dt_control_job_create(dt_job_execute_callback execute,
const char *msg, ...)
{
_dt_job_t *job = calloc(1, sizeof(_dt_job_t));
dt_job_t *job = calloc(1, sizeof(dt_job_t));
if(!job) return NULL;

va_list ap;
Expand All @@ -159,12 +135,20 @@ gboolean dt_control_job_is_synchronous(const dt_job_t *job)
return job->is_synchronous;
}

void dt_control_job_set_synchronous(dt_job_t *job, gboolean sync)
static void _control_job_set_synchronous(dt_job_t *job, gboolean sync)
{
job->is_synchronous = sync;
}

void dt_control_job_dispose(_dt_job_t *job)
static __thread int32_t threadid = -1;

static int32_t _control_get_threadid()
{
if(threadid > -1) return threadid;
return darktable.control->num_threads;
}

void dt_control_job_dispose(dt_job_t *job)
{
if(!job) return;
if(job->progress)
Expand All @@ -178,7 +162,7 @@ void dt_control_job_dispose(_dt_job_t *job)
free(job);
}

void dt_control_job_set_state_callback(_dt_job_t *job, dt_job_state_change_callback cb)
void dt_control_job_set_state_callback(dt_job_t *job, dt_job_state_change_callback cb)
{
// once the job got added to the queue it may not be changed from the outside
if(dt_control_job_get_state(job) != DT_JOB_STATE_INITIALIZED)
Expand All @@ -187,52 +171,28 @@ void dt_control_job_set_state_callback(_dt_job_t *job, dt_job_state_change_callb
}

// We don't want to log dt_get_wtime() as we already show the stamp
static void _control_job_print(_dt_job_t *job, const char *info, const char *err, int32_t res)
static void _control_job_print(dt_job_t *job, const char *info, const char *err, int32_t res)
{
if(!job) return;
dt_print(DT_DEBUG_CONTROL, "[%s]\t%02d %s %s | queue: %d | priority: %d",
info, res, err, job->description, job->queue, job->priority);
info ? info : "info missing",
res,
err ? err : "err missing",
job->description,
job->queue, job->priority);
}

void dt_control_job_cancel(_dt_job_t *job)
void dt_control_job_cancel(dt_job_t *job)
{
_control_job_set_state(job, DT_JOB_STATE_CANCELLED);
}

void dt_control_job_wait(_dt_job_t *job)
{
if(!job) return;
dt_job_state_t state = dt_control_job_get_state(job);

// NOTE: could also use signals.

// if the job is merely queued and hasn't started yet, we
// need to wait until it is actually started before attempting
// to grab the mutex, or it will always succeed immediately
while(state == DT_JOB_STATE_QUEUED)
{
g_usleep(100000); // wait 0.1 seconds
state = dt_control_job_get_state(job);
}

/* if job execution is not finished let's wait for it */
if(state == DT_JOB_STATE_RUNNING || state == DT_JOB_STATE_CANCELLED)
{
// once the job finishes, it unlocks the mutex
// so by locking the mutex here, we will only get the lock once the job
// has finished and unlocked it.
dt_pthread_mutex_lock(&job->wait_mutex);
// yay, the job finished, we got the lock. nothing more to do.
dt_pthread_mutex_unlock(&job->wait_mutex);
}
}

static gboolean _control_run_job_res(dt_control_t *control, int32_t res)
{
if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED)
return TRUE;

_dt_job_t *job = NULL;
dt_job_t *job = NULL;
dt_pthread_mutex_lock(&control->res_mutex);
if(control->new_res[res])
{
Expand Down Expand Up @@ -262,7 +222,7 @@ static gboolean _control_run_job_res(dt_control_t *control, int32_t res)
return FALSE;
}

static _dt_job_t *_control_schedule_job(dt_control_t *control)
static dt_job_t *_control_schedule_job(dt_control_t *control)
{
/*
* job scheduling works like this:
Expand All @@ -278,14 +238,14 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)
dt_pthread_mutex_lock(&control->queue_mutex);

// find the job
_dt_job_t *job = NULL;
dt_job_t *job = NULL;
int winner_queue = DT_JOB_QUEUE_MAX;
int max_priority = -1;
for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
{
if(control->queues[i] == NULL) continue;
if(control->export_scheduled && i == DT_JOB_QUEUE_USER_EXPORT) continue;
_dt_job_t *_job = (_dt_job_t *)control->queues[i]->data;
dt_job_t *_job = control->queues[i]->data;
if(_job->priority > max_priority)
{
max_priority = _job->priority;
Expand All @@ -308,39 +268,40 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)
GList **queue = &control->queues[winner_queue];
*queue = g_list_delete_link(*queue, *queue);
control->queue_length[winner_queue]--;
if(winner_queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = TRUE;
if(winner_queue == DT_JOB_QUEUE_USER_EXPORT)
control->export_scheduled = TRUE;

// and place it in scheduled job array (for job deduping)
control->job[dt_control_get_threadid()] = job;
control->job[_control_get_threadid()] = job;

// increment the priorities of the others
for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
{
if(i == winner_queue || control->queues[i] == NULL) continue;
((_dt_job_t *)control->queues[i]->data)->priority++;
((dt_job_t *)control->queues[i]->data)->priority++;
}

dt_pthread_mutex_unlock(&control->queue_mutex);

return job;
}

static void _control_job_execute(_dt_job_t *job)
static void _control_job_execute(dt_job_t *job)
{
_control_job_print(job, "run_job+", "", DT_CTL_WORKER_RESERVED + dt_control_get_threadid());
_control_job_print(job, "run_job+", "", DT_CTL_WORKER_RESERVED + _control_get_threadid());

_control_job_set_state(job, DT_JOB_STATE_RUNNING);

/* execute job */
job->result = job->execute(job);

_control_job_set_state(job, DT_JOB_STATE_FINISHED);
_control_job_print(job, "run_job-", "", DT_CTL_WORKER_RESERVED + dt_control_get_threadid());
_control_job_print(job, "run_job-", "", DT_CTL_WORKER_RESERVED + _control_get_threadid());
}

static gboolean _control_run_job(dt_control_t *control)
{
_dt_job_t *job = _control_schedule_job(control);
dt_job_t *job = _control_schedule_job(control);

if(!job) return TRUE;

Expand All @@ -353,8 +314,9 @@ static gboolean _control_run_job(dt_control_t *control)

// remove the job from scheduled job array (for job deduping)
dt_pthread_mutex_lock(&control->queue_mutex);
control->job[dt_control_get_threadid()] = NULL;
if(job->queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = FALSE;
control->job[_control_get_threadid()] = NULL;
if(job->queue == DT_JOB_QUEUE_USER_EXPORT)
control->export_scheduled = FALSE;
dt_pthread_mutex_unlock(&control->queue_mutex);

// and free it
Expand All @@ -364,7 +326,7 @@ static gboolean _control_run_job(dt_control_t *control)
}

gboolean dt_control_add_job_res(dt_control_t *control,
_dt_job_t *job,
dt_job_t *job,
int32_t res)
{
if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED || !job)
Expand Down Expand Up @@ -401,7 +363,7 @@ gboolean dt_control_add_job_res(dt_control_t *control,

gboolean dt_control_add_job(dt_control_t *control,
dt_job_queue_t queue_id,
_dt_job_t *job)
dt_job_t *job)
{
if((((unsigned int)queue_id) >= DT_JOB_QUEUE_MAX && queue_id != DT_JOB_QUEUE_SYNCHRONOUS) || !job)
{
Expand All @@ -413,7 +375,7 @@ gboolean dt_control_add_job(dt_control_t *control,
{
// whatever we are adding here won't be scheduled as the system isn't running. execute it synchronous instead.
dt_pthread_mutex_lock(&job->wait_mutex); // is that even needed?
dt_control_job_set_synchronous(job, TRUE);
_control_job_set_synchronous(job, TRUE);
_control_job_execute(job);
dt_pthread_mutex_unlock(&job->wait_mutex);

Expand All @@ -423,7 +385,7 @@ gboolean dt_control_add_job(dt_control_t *control,

job->queue = queue_id;

_dt_job_t *job_for_disposal = NULL;
dt_job_t *job_for_disposal = NULL;

dt_pthread_mutex_lock(&control->queue_mutex);

Expand All @@ -440,7 +402,7 @@ gboolean dt_control_add_job(dt_control_t *control,
// check if we have already scheduled the job
for(int k = 0; k < control->num_threads; k++)
{
_dt_job_t *other_job = (_dt_job_t *)control->job[k];
dt_job_t *other_job = control->job[k];
if(_control_job_equal(job, other_job))
{
_control_job_print(other_job, "add_job", "found job already in scheduled:", -1);
Expand All @@ -457,7 +419,7 @@ gboolean dt_control_add_job(dt_control_t *control,
// if the job is already in the queue -> move it to the top
for(GList *iter = *queue; iter; iter = g_list_next(iter))
{
_dt_job_t *other_job = (_dt_job_t *)iter->data;
dt_job_t *other_job = iter->data;
if(_control_job_equal(job, other_job))
{
_control_job_print(other_job, "add_job", "found job already in queue", -1);
Expand All @@ -480,8 +442,8 @@ gboolean dt_control_add_job(dt_control_t *control,
if(length > DT_CONTROL_MAX_JOBS)
{
GList *last = g_list_last(*queue);
_control_job_set_state((_dt_job_t *)last->data, DT_JOB_STATE_DISCARDED);
dt_control_job_dispose((_dt_job_t *)last->data);
_control_job_set_state(last->data, DT_JOB_STATE_DISCARDED);
dt_control_job_dispose(last->data);
*queue = g_list_delete_link(*queue, last);
length--;
}
Expand Down Expand Up @@ -515,14 +477,6 @@ gboolean dt_control_add_job(dt_control_t *control,
return FALSE;
}

static __thread int threadid = -1;

int32_t dt_control_get_threadid()
{
if(threadid > -1) return threadid;
return darktable.control->num_threads;
}

static inline int32_t _control_get_threadid_res()
{
if(threadid > -1) return threadid;
Expand All @@ -544,7 +498,7 @@ static void *_control_work_res(void *ptr)
int32_t threadid_res = _control_get_threadid_res();
while(dt_control_running())
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid_res);
dt_print(DT_DEBUG_CONTROL | DT_DEBUG_VERBOSE, "[control_work_res] %d", threadid_res);
if(_control_run_job_res(s, threadid_res))
{
// wait for a new job.
Expand Down Expand Up @@ -589,7 +543,7 @@ static void *_control_work(void *ptr)
// int32_t threadid = dt_control_get_threadid();
while(dt_control_running())
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid);
dt_print(DT_DEBUG_CONTROL | DT_DEBUG_VERBOSE, "[control_work] %d", threadid);
if(_control_run_job(control))
{
// wait for a new job.
Expand Down
Loading

0 comments on commit 892f8fc

Please sign in to comment.