Skip to content

Commit

Permalink
jobs maintenance (part 1)
Browse files Browse the repository at this point in the history
1. Don't fiddle around with dt_job_t variants declarations
2. make _control_job_set_synchronous() static as only internal and remove from .h
3. remove dt_control_job_wait() as not used in the code at all
4. some verbose job info added for intensive debugging
  • Loading branch information
jenshannoschwalm committed Dec 11, 2024
1 parent 0acaaab commit f529db8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 68 deletions.
102 changes: 40 additions & 62 deletions src/control/jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ typedef struct worker_thread_parameters_t
int32_t threadid;
} worker_thread_parameters_t;

typedef struct _dt_job_t
typedef struct dt_job_t
{
dt_job_execute_callback execute;
void *params;
Expand All @@ -54,15 +54,15 @@ typedef struct _dt_job_t
char description[DT_CONTROL_DESCRIPTION_LEN];
dt_view_type_flags_t view_creator;
gboolean is_synchronous;
} _dt_job_t;
} dt_job_t;

/** check if two jobs are to be considered equal. a simple memcmp won't work since the mutexes probably won't
match
we don't want to compare result, priority or state since these will change during the course of
processing.
NOTE: maybe allow to pass a comparator for params.
*/
static inline gboolean _control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
static inline gboolean _control_job_equal(dt_job_t *j1, dt_job_t *j2)
{
if(!j1 || !j2) return FALSE;
if(j1->params_size != 0
Expand All @@ -77,8 +77,8 @@ static inline gboolean _control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
&& (g_strcmp0(j1->description, j2->description) == 0));
}

static void _control_job_set_state(_dt_job_t *job,
dt_job_state_t state)
static void _control_job_set_state(dt_job_t *job,
dt_job_state_t state)
{
if(!job) return;
dt_pthread_mutex_lock(&job->state_mutex);
Expand All @@ -93,7 +93,7 @@ static void _control_job_set_state(_dt_job_t *job,
dt_pthread_mutex_unlock(&job->state_mutex);
}

dt_job_state_t dt_control_job_get_state(_dt_job_t *job)
dt_job_state_t dt_control_job_get_state(dt_job_t *job)
{
if(!job) return DT_JOB_STATE_DISPOSED;
dt_pthread_mutex_lock(&job->state_mutex);
Expand All @@ -102,7 +102,7 @@ dt_job_state_t dt_control_job_get_state(_dt_job_t *job)
return state;
}

void dt_control_job_set_params(_dt_job_t *job, void *params, dt_job_destroy_callback callback)
void dt_control_job_set_params(dt_job_t *job, void *params, dt_job_destroy_callback callback)
{
if(!job || dt_control_job_get_state(job) != DT_JOB_STATE_INITIALIZED)
return;
Expand All @@ -123,7 +123,7 @@ void dt_control_job_set_params_with_size(dt_job_t *job,
job->params_destroy = callback;
}

void *dt_control_job_get_params(const _dt_job_t *job)
void *dt_control_job_get_params(const dt_job_t *job)
{
if(!job) return NULL;
return job->params;
Expand All @@ -132,7 +132,7 @@ void *dt_control_job_get_params(const _dt_job_t *job)
dt_job_t *dt_control_job_create(dt_job_execute_callback execute,
const char *msg, ...)
{
_dt_job_t *job = calloc(1, sizeof(_dt_job_t));
dt_job_t *job = calloc(1, sizeof(dt_job_t));
if(!job) return NULL;

va_list ap;
Expand All @@ -159,12 +159,12 @@ gboolean dt_control_job_is_synchronous(const dt_job_t *job)
return job->is_synchronous;
}

void dt_control_job_set_synchronous(dt_job_t *job, gboolean sync)
static void _control_job_set_synchronous(dt_job_t *job, gboolean sync)
{
job->is_synchronous = sync;
}

void dt_control_job_dispose(_dt_job_t *job)
void dt_control_job_dispose(dt_job_t *job)
{
if(!job) return;
if(job->progress)
Expand All @@ -178,7 +178,7 @@ void dt_control_job_dispose(_dt_job_t *job)
free(job);
}

void dt_control_job_set_state_callback(_dt_job_t *job, dt_job_state_change_callback cb)
void dt_control_job_set_state_callback(dt_job_t *job, dt_job_state_change_callback cb)
{
// once the job got added to the queue it may not be changed from the outside
if(dt_control_job_get_state(job) != DT_JOB_STATE_INITIALIZED)
Expand All @@ -187,52 +187,28 @@ void dt_control_job_set_state_callback(_dt_job_t *job, dt_job_state_change_callb
}

// We don't want to log dt_get_wtime() as we already show the stamp
static void _control_job_print(_dt_job_t *job, const char *info, const char *err, int32_t res)
static void _control_job_print(dt_job_t *job, const char *info, const char *err, int32_t res)
{
if(!job) return;
dt_print(DT_DEBUG_CONTROL, "[%s]\t%02d %s %s | queue: %d | priority: %d",
info, res, err, job->description, job->queue, job->priority);
info ? info : "info missing",
res,
err ? err : "err missing",
job->description,
job->queue, job->priority);
}

void dt_control_job_cancel(_dt_job_t *job)
void dt_control_job_cancel(dt_job_t *job)
{
_control_job_set_state(job, DT_JOB_STATE_CANCELLED);
}

void dt_control_job_wait(_dt_job_t *job)
{
if(!job) return;
dt_job_state_t state = dt_control_job_get_state(job);

// NOTE: could also use signals.

// if the job is merely queued and hasn't started yet, we
// need to wait until it is actually started before attempting
// to grab the mutex, or it will always succeed immediately
while(state == DT_JOB_STATE_QUEUED)
{
g_usleep(100000); // wait 0.1 seconds
state = dt_control_job_get_state(job);
}

/* if job execution is not finished let's wait for it */
if(state == DT_JOB_STATE_RUNNING || state == DT_JOB_STATE_CANCELLED)
{
// once the job finishes, it unlocks the mutex
// so by locking the mutex here, we will only get the lock once the job
// has finished and unlocked it.
dt_pthread_mutex_lock(&job->wait_mutex);
// yay, the job finished, we got the lock. nothing more to do.
dt_pthread_mutex_unlock(&job->wait_mutex);
}
}

static gboolean _control_run_job_res(dt_control_t *control, int32_t res)
{
if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED)
return TRUE;

_dt_job_t *job = NULL;
dt_job_t *job = NULL;
dt_pthread_mutex_lock(&control->res_mutex);
if(control->new_res[res])
{
Expand Down Expand Up @@ -262,7 +238,7 @@ static gboolean _control_run_job_res(dt_control_t *control, int32_t res)
return FALSE;
}

static _dt_job_t *_control_schedule_job(dt_control_t *control)
static dt_job_t *_control_schedule_job(dt_control_t *control)
{
/*
* job scheduling works like this:
Expand All @@ -278,14 +254,14 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)
dt_pthread_mutex_lock(&control->queue_mutex);

// find the job
_dt_job_t *job = NULL;
dt_job_t *job = NULL;
int winner_queue = DT_JOB_QUEUE_MAX;
int max_priority = -1;
for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
{
if(control->queues[i] == NULL) continue;
if(control->export_scheduled && i == DT_JOB_QUEUE_USER_EXPORT) continue;
_dt_job_t *_job = (_dt_job_t *)control->queues[i]->data;
dt_job_t *_job = control->queues[i]->data;
if(_job->priority > max_priority)
{
max_priority = _job->priority;
Expand All @@ -308,7 +284,8 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)
GList **queue = &control->queues[winner_queue];
*queue = g_list_delete_link(*queue, *queue);
control->queue_length[winner_queue]--;
if(winner_queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = TRUE;
if(winner_queue == DT_JOB_QUEUE_USER_EXPORT)
control->export_scheduled = TRUE;

// and place it in scheduled job array (for job deduping)
control->job[dt_control_get_threadid()] = job;
Expand All @@ -317,15 +294,15 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)
for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
{
if(i == winner_queue || control->queues[i] == NULL) continue;
((_dt_job_t *)control->queues[i]->data)->priority++;
((dt_job_t *)control->queues[i]->data)->priority++;
}

dt_pthread_mutex_unlock(&control->queue_mutex);

return job;
}

static void _control_job_execute(_dt_job_t *job)
static void _control_job_execute(dt_job_t *job)
{
_control_job_print(job, "run_job+", "", DT_CTL_WORKER_RESERVED + dt_control_get_threadid());

Expand All @@ -340,7 +317,7 @@ static void _control_job_execute(_dt_job_t *job)

static gboolean _control_run_job(dt_control_t *control)
{
_dt_job_t *job = _control_schedule_job(control);
dt_job_t *job = _control_schedule_job(control);

if(!job) return TRUE;

Expand All @@ -354,7 +331,8 @@ static gboolean _control_run_job(dt_control_t *control)
// remove the job from scheduled job array (for job deduping)
dt_pthread_mutex_lock(&control->queue_mutex);
control->job[dt_control_get_threadid()] = NULL;
if(job->queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = FALSE;
if(job->queue == DT_JOB_QUEUE_USER_EXPORT)
control->export_scheduled = FALSE;
dt_pthread_mutex_unlock(&control->queue_mutex);

// and free it
Expand All @@ -364,7 +342,7 @@ static gboolean _control_run_job(dt_control_t *control)
}

gboolean dt_control_add_job_res(dt_control_t *control,
_dt_job_t *job,
dt_job_t *job,
int32_t res)
{
if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED || !job)
Expand Down Expand Up @@ -401,7 +379,7 @@ gboolean dt_control_add_job_res(dt_control_t *control,

gboolean dt_control_add_job(dt_control_t *control,
dt_job_queue_t queue_id,
_dt_job_t *job)
dt_job_t *job)
{
if((((unsigned int)queue_id) >= DT_JOB_QUEUE_MAX && queue_id != DT_JOB_QUEUE_SYNCHRONOUS) || !job)
{
Expand All @@ -413,7 +391,7 @@ gboolean dt_control_add_job(dt_control_t *control,
{
// whatever we are adding here won't be scheduled as the system isn't running. execute it synchronous instead.
dt_pthread_mutex_lock(&job->wait_mutex); // is that even needed?
dt_control_job_set_synchronous(job, TRUE);
_control_job_set_synchronous(job, TRUE);
_control_job_execute(job);
dt_pthread_mutex_unlock(&job->wait_mutex);

Expand All @@ -423,7 +401,7 @@ gboolean dt_control_add_job(dt_control_t *control,

job->queue = queue_id;

_dt_job_t *job_for_disposal = NULL;
dt_job_t *job_for_disposal = NULL;

dt_pthread_mutex_lock(&control->queue_mutex);

Expand All @@ -440,7 +418,7 @@ gboolean dt_control_add_job(dt_control_t *control,
// check if we have already scheduled the job
for(int k = 0; k < control->num_threads; k++)
{
_dt_job_t *other_job = (_dt_job_t *)control->job[k];
dt_job_t *other_job = control->job[k];
if(_control_job_equal(job, other_job))
{
_control_job_print(other_job, "add_job", "found job already in scheduled:", -1);
Expand All @@ -457,7 +435,7 @@ gboolean dt_control_add_job(dt_control_t *control,
// if the job is already in the queue -> move it to the top
for(GList *iter = *queue; iter; iter = g_list_next(iter))
{
_dt_job_t *other_job = (_dt_job_t *)iter->data;
dt_job_t *other_job = iter->data;
if(_control_job_equal(job, other_job))
{
_control_job_print(other_job, "add_job", "found job already in queue", -1);
Expand All @@ -480,8 +458,8 @@ gboolean dt_control_add_job(dt_control_t *control,
if(length > DT_CONTROL_MAX_JOBS)
{
GList *last = g_list_last(*queue);
_control_job_set_state((_dt_job_t *)last->data, DT_JOB_STATE_DISCARDED);
dt_control_job_dispose((_dt_job_t *)last->data);
_control_job_set_state(last->data, DT_JOB_STATE_DISCARDED);
dt_control_job_dispose(last->data);
*queue = g_list_delete_link(*queue, last);
length--;
}
Expand Down Expand Up @@ -544,7 +522,7 @@ static void *_control_work_res(void *ptr)
int32_t threadid_res = _control_get_threadid_res();
while(dt_control_running())
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid_res);
dt_print(DT_DEBUG_CONTROL | DT_DEBUG_VERBOSE, "[control_work_res] %d", threadid_res);
if(_control_run_job_res(s, threadid_res))
{
// wait for a new job.
Expand Down Expand Up @@ -589,7 +567,7 @@ static void *_control_work(void *ptr)
// int32_t threadid = dt_control_get_threadid();
while(dt_control_running())
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid);
dt_print(DT_DEBUG_CONTROL | DT_DEBUG_VERBOSE, "[control_work] %d", threadid);
if(_control_run_job(control))
{
// wait for a new job.
Expand Down
5 changes: 1 addition & 4 deletions src/control/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ typedef enum dt_job_queue_t
DT_JOB_QUEUE_SYNCHRONOUS = 1000 // don't queue, run immediately and don't return until done
} dt_job_queue_t;

typedef struct _dt_job_t dt_job_t;
typedef struct dt_job_t dt_job_t;

typedef int32_t (*dt_job_execute_callback)(dt_job_t *);
typedef void (*dt_job_state_change_callback)(dt_job_t *, dt_job_state_t state);
Expand All @@ -67,8 +67,6 @@ void dt_control_job_set_state_callback(dt_job_t *job, dt_job_state_change_callba
/** cancel a job, running or in queue. */
void dt_control_job_cancel(dt_job_t *job);
dt_job_state_t dt_control_job_get_state(dt_job_t *job);
/** wait for a job to finish execution. */
void dt_control_job_wait(dt_job_t *job);
/** set job params and a callback to destroy those params */
void dt_control_job_set_params(dt_job_t *job, void *params, dt_job_destroy_callback callback);
/** set job params (with size params_size) and a callback to destroy those params.
Expand All @@ -92,7 +90,6 @@ gboolean dt_control_add_job_res(struct dt_control_t *s, dt_job_t *job, int32_t r

dt_view_type_flags_t dt_control_job_get_view_creator(const dt_job_t *job);
gboolean dt_control_job_is_synchronous(const dt_job_t *job);
void dt_control_job_set_synchronous(dt_job_t *job, gboolean sync);

int32_t dt_control_get_threadid();

Expand Down
4 changes: 2 additions & 2 deletions src/control/progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <glib.h>

struct dt_control_t;
struct _dt_job_t;
struct dt_job_t;

struct _dt_progress_t;
typedef struct _dt_progress_t dt_progress_t;
Expand All @@ -43,7 +43,7 @@ void dt_control_progress_make_cancellable(struct dt_control_t *control, dt_progr
dt_progress_cancel_callback_t cancel, void *data);
/** convenience function to cancel a job when the progress gets cancelled. */
void dt_control_progress_attach_job(struct dt_control_t *control, dt_progress_t *progress,
struct _dt_job_t *job);
struct dt_job_t *job);
/** cancel the job linked to with dt_control_progress_attach_job(). don't forget to call
* dt_control_progress_destroy() afterwards. */
void dt_control_progress_cancel(struct dt_control_t *control, dt_progress_t *progress);
Expand Down

0 comments on commit f529db8

Please sign in to comment.