forked from scitran-apps/dcm-convert
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrun
executable file
·302 lines (247 loc) · 10.6 KB
/
run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/env python
import os
import logging
import datetime
import dicom
import tempfile
import shutil
import zipfile
import json
import glob
import subprocess
import shlex
import pprint
import scitran.data as scidata
logging.basicConfig()
log = logging.getLogger(' [ stanfordcni/cni-dcm-convert ] ')
log.setLevel(getattr(logging, 'DEBUG'))
def _get_bids_info(dicom_archive):
"""
Use dcm2niix to generate info for a given dicom archive <dicom_archive>
"""
bids_info = {}
if zipfile.is_zipfile(dicom_archive):
extract_dir = tempfile.mkdtemp()
dcm2niix_dir = tempfile.mkdtemp()
with zipfile.ZipFile(dicom_archive, 'r') as zf:
zf = zipfile.ZipFile(dicom_archive)
zf.extractall(extract_dir)
cmd = '/usr/bin/dcm2niix -b o -ba y -o %s %s' % (dcm2niix_dir, extract_dir)
log.debug('Running %s' % cmd)
subprocess.check_call(shlex.split(cmd))
json_file = glob.glob(dcm2niix_dir + "/*.json")
if len(json_file) > 1:
log.warning("More than one bids_sidecar was produced by dcm2niix. Ambiguous which to use, thus no "
"bids-info will be generated.")
return bids_info
if json_file:
log.info('Generating BIDS info...')
with open(json_file[0], 'r') as jfile:
bids_info = json.load(jfile)
shutil.rmtree(dcm2niix_dir)
shutil.rmtree(extract_dir)
return bids_info
def _get_dicom_info_from_dicom(zip_file_path):
"""
Extract the last file in the zip to /tmp/ and parse it
"""
dcm = []
if zipfile.is_zipfile(zip_file_path):
zip = zipfile.ZipFile(zip_file_path)
num_files = len(zip.namelist())
for n in range((num_files -1), -1, -1):
dcm_path = zip.extract(zip.namelist()[n], '/tmp')
if os.path.isfile(dcm_path):
try:
log.info(' Loading %s ...' % dcm_path)
dcm = dicom.read_file(dcm_path)
# Here we check for the Raw Data Storage SOP Class, if there
# are other DICOM files in the zip then we read the next one,
# if this is the only class of DICOM in the file, we accept
# our fate and move on.
if dcm.get('SOPClassUID') == 'Raw Data Storage' and n != range((num_files -1), -1, -1)[-1]:
continue
else:
break
except:
pass
else:
log.warning('%s does not exist!' % dcm_path)
else:
log.info(' Not a zip. Attempting to read %s directly' % os.path.basename(zip_file_path))
dcm = dicom.read_file(zip_file_path)
if not dcm:
log.warning('DICOM could not be parsed!')
return dcm
def dicom_convert(fp, classification, modality, outbase):
"""
Attempts multiple types of conversion on dicom files.
Attempts to create a nifti for all files, except screen shots.
Also attempts to create montages of all files.
"""
# CONFIG: If there is a config file then load that, else load the manifest and read the default values.
if os.path.exists('/flywheel/v0/config.json'):
config_file = '/flywheel/v0/config.json'
MANIFEST=False
else:
config_file = '/flywheel/v0/manifest.json'
log.info('Loading default config from manifest!')
MANIFEST=True
with open(config_file, 'r') as jsonfile:
config = json.load(jsonfile)
config = config.pop('config')
log.info(pprint.pformat(config))
if MANIFEST:
convert_montage = config['convert_montage']['default']
convert_nifti = config['convert_nifti']['default']
convert_png = config['convert_png']['default']
else:
convert_montage = config['convert_montage']
convert_nifti = config['convert_nifti']
convert_png = config['convert_png']
# CONVERSION
log.info(' Converting dicom file %s...' % fp)
ds = scidata.parse(fp, filetype='dicom', ignore_json=True, load_data=True)
log.info(' Loaded and parsed.')
final_results = []
if ds.scan_type != 'screenshot':
if convert_montage:
log.info(' Performing montage conversion...')
final_results += scidata.write(ds, ds.data, outbase=outbase + '.montage', filetype='montage', voxel_order='LPS') # always LPS
if convert_nifti:
log.info(' Performing NIfTI conversion...')
final_results += scidata.write(ds, ds.data, outbase=outbase, filetype='nifti') # no reorder
else:
log.warning(' NIfTI files will not be generated - convert_nifti=False!')
elif ds.scan_type == 'screenshot':
if convert_png:
log.info(' Performing screenshot conversion to png...')
final_results += scidata.write(ds, ds.data, outbase=outbase + '.screenshot', filetype='png')
else:
log.warning(' Screenshot acquisition detected, but convert_png=False. Nothing to do! Exiting(0)')
os.sys.exit(0)
# METADATA
bids_info = dict()
try:
bids_info = _get_bids_info(fp)
except:
log.warning("Could not generate bids info.")
output_files = os.listdir(os.path.dirname(outbase))
files = list()
metadata = dict()
if len(output_files) > 0:
for f in output_files:
fdict = {'name': f}
# Set filetype
if f.endswith('.nii.gz'):
ftype = 'nifti'
elif f.endswith('bvec'):
ftype = 'bvec'
elif f.endswith('bval'):
ftype = 'bval'
elif f.endswith('montage.zip'):
ftype = 'montage'
elif f.endswith('.png'):
ftype = 'screenshot'
else:
ftype = None
if ftype:
fdict['type'] = ftype
# Set classification and modality
if classification:
fdict['classification'] = classification
else:
log.info(' No classification was passed in! Not setting on outputs.')
if modality:
fdict['modality'] = modality
else:
log.info(' No modality was passed in! Assuming MR.')
fdict['modality'] = 'MR'
if bids_info and ftype in ['nifti', 'bval', 'bvec']:
fdict['info'] = bids_info
files.append(fdict)
metadata['acquisition'] = {}
metadata['acquisition']['files'] = files
with open(os.path.join(os.path.dirname(outbase), '.metadata.json'), 'w') as metafile:
json.dump(metadata, metafile)
if metadata:
log.debug(' Metadata output:')
log.info(pprint.pformat(metadata))
return final_results
if __name__ == '__main__':
"""
Run dcm-convert on input dicom file
"""
log.info(' Job start: %s' % datetime.datetime.utcnow())
OUTDIR = '/flywheel/v0/output'
CONFIG_FILE_PATH = '/flywheel/v0/config.json'
############################################################################
# Grab Config
with open(CONFIG_FILE_PATH) as config_file:
config = json.load(config_file)
# Grab file info and top-level info from config
dicom_file_path = config['inputs']['dicom']['location']['path']
dicom_file_name = config['inputs']['dicom']['location']['name']
output_name = config['config']['output_name'] if config['config'].has_key('output_name') else ''
convert_mux = config['config']['convert_mux']
config_ojb = config['inputs']['dicom']['object']
# Read a DICOM from the DICOM archive
dcm = _get_dicom_info_from_dicom(dicom_file_path)
# Grab dicom-info from previous classifier RUN
if config_ojb.has_key('classification'):
classification = config_ojb['classification']
else:
log.info(' No classification was found in the config.')
classification = []
if config_ojb.has_key('modality'):
modality = config_ojb['modality']
else:
log.info(' No modality was found in the config. Assigning value from DICOM')
modality = dcm.get('Modality', 'MR')
############################################################################
# Check series description for 'mux' string
# Determine by pulse sequence
if not convert_mux:
do_exit = False
try:
psd = str(dcm[0x019, 0x109c].value)
if psd.startswith('muxarcepi'):
log.info(' This dataset was captured using a MUX PSD (%s). Conversion will not continue! Exit(0). If you would like to convert this dataset re-run with convert_mux=true' % psd)
do_exit = True
except:
series_descrip = dcm.get('SeriesDescription', '')
if series_descrip:
if series_descrip.find('mux') != -1:
log.info(' MUX string found in sereis description. Conversion will not continue! Exit(0). If you would like to convert this dataset re-run with convert_mux=true')
do_exit = True
else:
log.warning(' PSD or series description could not be checked. Proceeding!')
if do_exit:
os.sys.exit(0)
############################################################################
# Set output name
exam_num = str(dcm.get('StudyID', ''))
series_num = str(dcm.get('SeriesNumber', ''))
acq_num = str(dcm.get('AcquisitionNumber', ''))
# Prefer the config input, then exam_num_series_num, then input DICOM file name
if output_name:
output_basename = output_name.split('.nii')[0]
elif exam_num and series_num and acq_num:
output_basename = exam_num + '_' + series_num + '_' + acq_num
else:
# Use the input file name, stripping off the zip, dcm, or dicom ext.
log.info(' Using input filename to generate output_basename.')
output_basename = dicom_file_name.split('.zip')[0].split('.dcm')[0].split('.dicom')[0]
output_basename = os.path.join(OUTDIR, output_basename)
log.info(' Output base name set to %s' % output_basename)
############################################################################
# RUN the conversion
results = dicom_convert(dicom_file_path, classification, modality, output_basename)
log.info(' Job stop: %s' % datetime.datetime.utcnow())
# Check for results
if results:
log.info(' generated %s' % ', '.join(results))
os.sys.exit(0)
else:
log.info(' No results were generated - check your config!')
os.sys.exit(0)