-
Notifications
You must be signed in to change notification settings - Fork 0
/
isSpots.c
161 lines (134 loc) · 5.26 KB
/
isSpots.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*! @file isSpots.c
* @copyright 2018 by Northwestern University
* @author Keith Brister
* @brief Find spots in a image
*/
#include "is.h"
/** Count the spots in an image
**
** @param wctx Worker context
** @li @c wctx->ctxMutex Keeps the worker theads in line
**
** @param tcp Thread data
** @li @c tcp->rep ZMQ Response socket into which to throw our response.
**
** @param job What the user asked us to do
** @param pid {String} - Token representing a valid user
** @param rqstObj {Object} - Description of what is requested
** @param rqstObj.esaf {Inteter} - experiment id to which this image belongs
** @param rqstObj.fn {String} - file name
** @param rqstObj.frame {Integer} - Frame number to return
** @param rqstObj.tag {String} - ID for us to know what to do with the result
** @param rqstObj.type {String} - "SPOTS"
** @param rqstObj.xsize {Integer} - Requested width of resulting jpeg (pixels)
** @param rsltCB {isResultCB} - Callback function when request has been processed
*/
void isSpots(isWorkerContext_t *wctx, isThreadContextType *tcp, json_t *job) {
static const char *id = FILEID "isSpots";
const char *fn; // file name from job.
isImageBufType *imb;
char *job_str; // stringified version of job
char *meta_str; // stringified version of meta
int err; // error code from routies that return integers
zmq_msg_t err_msg; // error message to send via zmq
zmq_msg_t job_msg; // the job message to send via zmq
zmq_msg_t meta_msg; // the metadata to send via zmq
json_t *jxsize; // xsize entry in job
pthread_mutex_lock(&wctx->metaMutex);
fn = json_string_value(json_object_get(job, "fn"));
pthread_mutex_unlock(&wctx->metaMutex);
isLogging_info("%s: request for image %s", id, fn);
if (fn == NULL || strlen(fn) == 0) {
char *tmps;
pthread_mutex_lock(&wctx->metaMutex);
tmps = json_dumps(job, JSON_SORT_KEYS | JSON_COMPACT | JSON_INDENT(0));
pthread_mutex_unlock(&wctx->metaMutex);
isLogging_err("%s: missing filename for job %s\n", id, tmps);
is_zmq_error_reply(NULL, 0, tcp->rep, "%s: Missing filename for job %s", id, tmps);
free(tmps);
return;
}
// Set a default xsize if none was specified
jxsize = json_object_get(job, "xsize");
if (jxsize == NULL ) {
set_json_object_integer(id, job, "xsize", IS_DEFAULT_SPOT_IMAGE_WIDTH);
}
// Enforce looking at the full image
set_json_object_real(id, job, "segcol", 0.0);
set_json_object_real(id, job, "segrow", 0.0);
set_json_object_real(id, job, "zoom", 1.0);
// when isReduceImage returns a buffer it is read locked
imb = isReduceImage(wctx, tcp->rc, job);
if (imb == NULL) {
char *tmps;
pthread_mutex_lock(&wctx->metaMutex);
tmps = json_dumps(job, JSON_SORT_KEYS | JSON_COMPACT | JSON_INDENT(0));
pthread_mutex_unlock(&wctx->metaMutex);
isLogging_err("%s: missing data for job %s\n", id, tmps);
is_zmq_error_reply(NULL, 0, tcp->rep, "%s: Missing data for job %s", id, tmps);
free(tmps);
return;
}
// Compose messages
// Err
zmq_msg_init(&err_msg);
// Job
job_str = NULL;
if (job != NULL) {
pthread_mutex_lock(&wctx->metaMutex);
job_str = json_dumps(job, JSON_SORT_KEYS | JSON_INDENT(0) | JSON_COMPACT);
pthread_mutex_unlock(&wctx->metaMutex);
}
if (job_str == NULL) {
job_str = strdup("");
}
err = zmq_msg_init_data(&job_msg, job_str, strlen(job_str), is_zmq_free_fn, NULL);
if (err != 0) {
isLogging_err("%s: zmq_msg_init failed (job_str): %s\n", id, zmq_strerror(errno));
is_zmq_error_reply(NULL, 0, tcp->rep, "%s: Could not initialize reply message (job_str)", id);
pthread_exit (NULL);
}
// Meta
meta_str = NULL;
if (imb->meta != NULL) {
pthread_mutex_lock(&wctx->metaMutex);
meta_str = json_dumps(imb->meta, JSON_SORT_KEYS | JSON_INDENT(0) | JSON_COMPACT);
pthread_mutex_unlock(&wctx->metaMutex);
}
if (meta_str == NULL) {
meta_str = strdup("");
}
err = zmq_msg_init_data(&meta_msg, meta_str, strlen(meta_str), is_zmq_free_fn, NULL);
if (err == -1) {
isLogging_err("%s: zmq_msg_init failed (meta_str): %s\n", id, zmq_strerror(errno));
is_zmq_error_reply(NULL, 0, tcp->rep, "%s: Could not initialize reply message (meta_str)", id);
pthread_exit (NULL);
}
isLogging_info("%s: returning meta data %s", id, meta_str);
// Send them out
do {
// Error Message
err = zmq_msg_send(&err_msg, tcp->rep, ZMQ_SNDMORE);
if (err == -1) {
isLogging_err("%s: Could not send empty error frame: %s\n", id, zmq_strerror(errno));
break;
}
// Job
err = zmq_msg_send(&job_msg, tcp->rep, ZMQ_SNDMORE);
if (err < 0) {
isLogging_err("%s: sending job_str failed: %s\n", id, zmq_strerror(errno));
break;
}
// Meta
err = zmq_msg_send(&meta_msg, tcp->rep, 0);
if (err == -1) {
isLogging_err("%s: sending meta_str failed: %s\n", id, zmq_strerror(errno));
break;
}
} while (0);
pthread_rwlock_unlock(&imb->buflock);
pthread_mutex_lock(&wctx->ctxMutex);
imb->in_use--;
assert(imb->in_use >= 0);
pthread_mutex_unlock(&wctx->ctxMutex);
}