diff --git a/newsfragments/645.feature b/newsfragments/645.feature new file mode 100644 index 000000000..fb38a5121 --- /dev/null +++ b/newsfragments/645.feature @@ -0,0 +1 @@ +Add partial support for the Rigaku Oxford Diffraction file format. diff --git a/src/dxtbx/format/FormatROD.py b/src/dxtbx/format/FormatROD.py new file mode 100644 index 000000000..417f99f1a --- /dev/null +++ b/src/dxtbx/format/FormatROD.py @@ -0,0 +1,459 @@ +"""Support for the Rigaku Oxford Diffraction image format + +NB: Rigaku datasets may use a non-zero-padded image incremental serial number. +At present, this is not compatible with assumptions in dxtbx. In order to +import these datasets, the images should be renumbered first. + +See https://github.com/cctbx/dxtbx/issues/646 for details.""" + +# Takanori Nakane took David Waterman's code to parse headers from +# https://github.com/cctbx/cctbx_project/commit/b95467f3b2a70a37eeb820ea294128a32551700c +# and heavily modified it. The original commit in the cctbx_project repository is orphan now. +from __future__ import annotations + +__author__ = "David Waterman, Takanori Nakane" +__copyright__ = ( + "Copyright 2018 United Kingdom Research and Innovation & 2022 Takanori Nakane" +) +__license__ = "BSD 3-clause" + +import re +import struct + +import numpy as np + +from scitbx.array_family import flex + +from dxtbx.format.Format import Format + + +class FormatROD(Format): + """An image reading class for Rigaku Oxford Diffraction images. + + The Rigaku Oxford Diffraction format is used by CrysAlis(Pro). + """ + + def __init__(self, image_file, **kwargs): + """Initialise the image structure from the given file.""" + + from dxtbx import IncorrectFormatError + + if not self.understand(image_file): + raise IncorrectFormatError(self, image_file) + + Format.__init__(self, image_file, **kwargs) + + @staticmethod + def understand(image_file): + with FormatROD.open_file(image_file, "rb") as f: + try: + hdr = f.read(256).decode("ascii") + except UnicodeDecodeError: + return False + + lines = hdr.splitlines() + if len(lines) < 2: + return False + + vers = lines[0].split() + if len(vers) < 2 or vers[0] != "OD" or vers[1] != "SAPPHIRE": + return False + + compression = lines[1].split("=") + if compression[0] != "COMPRESSION": + return False + + return True + + @staticmethod + def _read_ascii_header(image_file): + """Read the ASCII header comprising the first 256 bytes of the file""" + + hd = {} + with FormatROD.open_file(image_file, "rb") as f: + hdr = f.read(256).decode("ascii") + lines = hdr.splitlines() + + vers = lines[0].split() + if len(vers) < 2 or vers[0] != "OD" or vers[1] != "SAPPHIRE": + raise ValueError("Wrong header format") + hd["version"] = float(vers[-1]) + + compression = lines[1].split("=") + if compression[0] != "COMPRESSION": + raise ValueError("Wrong header format") + hd["compression"] = compression[1] + + # Extract definitions from the 3rd - 5th line + defn = re.compile(r"([A-Z]+=[ 0-9]+)") + for line in lines[2:5]: + sizes = defn.findall(line) + for s in sizes: + n, v = s.split("=") + hd[n] = int(v) + + hd["time"] = lines[5].split("TIME=")[-1].strip("\x1a").rstrip() + + return hd + + @staticmethod + def _read_binary_header( + image_file, + offset=256, + nbytes=5120, + general_nbytes=512, + special_nbytes=768, + km4gonio_nbytes=1024, + statistics_nbytes=512, + history_nbytes=2048, + ): + """Read the most relevant parameters from the binary header""" + + with FormatROD.open_file(image_file, "rb") as f: + # General section + f.seek(offset) + bin_x, bin_y = struct.unpack("> 4) & 15) + ipos += 1 + # ipos_bit = ipos * 8 + + for i in range(2): + nbit = nbits[i] + # 0: 0 + # 1: 1 or 0 + # 2: -1, 0, 1, 2 + # 3: -3, -2, -1, 0, 1, 2, 3, 4 (= b111 -3 = 7 - 3) + zero_at = 0 + if nbit > 1: + zero_at = (1 << (nbit - 1)) - 1 + + v = 0 + for j in range(nbit): + v |= linedata[ipos] << (8 * j) + ipos += 1 + + mask = (1 << nbit) - 1 + for j in range(BLOCKSIZE): + ret[opos] = ((v >> (nbit * j)) & mask) - zero_at + opos += 1 + + for i in range(opos - BLOCKSIZE * 2, opos): + offset = ret[i] + + # surprisingly, this IF is very slow! + if offset >= SHORT_OVERFLOW_SIGNED: + if offset >= LONG_OVERFLOW_SIGNED: + offset = linedata[ipos : (ipos + 4)].view(np.int32)[0] + ipos += 4 + else: + offset = linedata[ipos : (ipos + 2)].view(np.int16)[0] + ipos += 2 + + ret[i] = offset + ret[i - 1] + + for i in range(nrest): + px = linedata[ipos] + ipos += 1 + if px < SHORT_OVERFLOW: + ret[opos] = ret[opos - 1] + px - 127 + elif px == LONG_OVERFLOW: + ret[opos] = ( + ret[opos - 1] + linedata[ipos : (ipos + 4)].view(np.int32)[0] + ) + ipos += 4 + else: + ret[opos] = ( + ret[opos - 1] + linedata[ipos : (ipos + 2)].view(np.int16)[0] + ) + ipos += 2 + opos += 1 + + return ret + + +if __name__ == "__main__": + import sys + + for arg in sys.argv[1:]: + if FormatROD.understand(arg): + reader = FormatROD(arg) + print(reader._txt_header) + print(reader._bin_header) + else: + print("Unsupported format")