-
Notifications
You must be signed in to change notification settings - Fork 1
/
hdf5_to_calfits.py
226 lines (188 loc) · 9.11 KB
/
hdf5_to_calfits.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 UPennEoR
# Licensed under the 2-clause BSD License
from __future__ import print_function, division, absolute_import
from pyuvdata import UVData, UVCal
import numpy as np
import argparse
import os
import h5py
import copy
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import hera_cal
ap = argparse.ArgumentParser(description="Convert CASA gain solutions in hdf5 files to .calfits files")
ap.add_argument("--fname", type=str, help="name of calfits file", required=True)
ap.add_argument("--uv_file", type=str, help="name of uvfile to apply calibration to", required=True)
ap.add_argument("--kcal", type=str, default=None, help="name of file containing delay solutions")
ap.add_argument("--bcal", type=str, default=None, help="name of file containing bandpass solutions")
ap.add_argument("--acal", type=str, default=None, help="name of file containing absolute scale reference spectrum")
ap.add_argument("--overwrite", default=False, action="store_true", help="overwrite output file if it exists")
ap.add_argument("--multiply_gains", default=False, action="store_true", help="change gain convention from divide to multiply")
ap.add_argument("--smooth_ratio", default=False, action="store_true", help="smooth sim/data ratio with a low-order polynomial")
ap.add_argument("--plot_ratio", default=False, action="store_true", help="plot comparison of unsmoothed to smoothed ratio")
def main(ap):
# read in UVData object
uv_file = args.uv_file
uvd = UVData()
uvd.read(uv_file)
# get metadata
antpos, ants = uvd.get_ENU_antpos(center=True, pick_data_ants=True)
ants = list(ants)
freqs = np.unique(uvd.freq_array[0, :])
times = np.unique(uvd.time_array)
Nants = len(ants)
Nfreqs = len(freqs)
Ntimes = len(times)
# assume we have linear polarizations x and y for gain solutions, in that order
jones_array = ['Jxx', 'Jyy']
Njones = len(jones_array)
# start with unity gains
gains = np.ones((Nants, Nfreqs, 1, Njones), dtype=np.complex)
flags = np.zeros((Nants, Nfreqs, 1, Njones), dtype=np.bool)
# make sure the user specified at least one gain solution
if args.kcal is None and args.bcal is None and args.acal is None:
raise ValueError("at least one calibration type must be specified")
# read in K-gain file
kfile = args.kcal
if kfile is not None:
delays = np.zeros_like(ants, dtype=np.float)
with h5py.File(kfile, 'r') as f:
delay_ants = f["/Data/delay_ants"][()].tolist()
delays = f["/Data/delays"][()].T
delay_flags = f["/Data/delay_flags"][()].T
# convert from ns -> s
delays *= 1e-9
# reorder antennas
delays = np.array(map(lambda a: delays[delay_ants.index(a), :] if a in delay_ants else 0., ants))
delay_flags = np.array(map(lambda a: delay_flags[delay_ants.index(a), :] if a in delay_ants else True, ants))
# convert delays into complex gains; freqs has units of Hz
delay_gains = np.exp(-2j * np.pi * np.einsum('a,bc->bac', freqs - freqs.min(), delays))[:, :, np.newaxis, :]
delay_gain_flags = delay_flags[:, np.newaxis, :]
delay_gain_flags = np.repeat(delay_gain_flags, Nfreqs, axis=1)[:, :, np.newaxis, :]
assert(np.allclose(delay_gains[0, :, 0, 0], np.exp(-2j * np.pi * (freqs - freqs.min()) * delays[0, 0])))
assert(np.allclose(delay_gains[0, :, 0, 1], np.exp(-2j * np.pi * (freqs - freqs.min()) * delays[0, 1])))
assert(np.allclose(delay_gains[-1, :, 0, 0], np.exp(-2j * np.pi * (freqs - freqs.min()) * delays[-1, 0])))
assert(np.allclose(delay_gains[-1, :, 0, 1], np.exp(-2j * np.pi * (freqs - freqs.min()) * delays[-1, 1])))
# make sure we have the right shape
right_shape = (Nants, Nfreqs, 1, Njones)
if delay_gains.shape != right_shape:
raise ValueError("bandpass gains are not the right shape; expecting {}, got {}".format(right_shape, delay_gains.shape))
# multiply into gains
gains *= delay_gains
# add flags
flags += delay_gain_flags
# read in B-gain file
bfile = args.bcal
if bfile is not None:
with h5py.File(bfile, 'r') as f:
# casa saves bandpasses as (Njones, Nfreqs, Nants)
# we want to transpose them to (Nants, Nfreqs, Njones)
bp_gains = np.swapaxes(f["/Data/bp"][()], 0, 2)
bp_ants = f["/Data/bp_ants"][()].tolist()
bp_freqs = f["/Data/bp_freqs"][()]
bp_flags = np.swapaxes(f["/Data/bp_flags"][()], 0, 2)
# get number of frequencies
bp_Nfreqs = len(bp_freqs)
# reorder antennas
bp_gains = np.array([bp_gains[bp_ants.index(a), :, :] if a in bp_ants
else np.ones((bp_Nfreqs, Njones), dtype=np.complex) for a in ants])
bp_flags = np.array([bp_flags[bp_ants.index(a), :, :] if a in bp_ants
else np.ones((bp_Nfreqs, Njones), dtype=np.bool) for a in ants])
# get gains and flags into the right shape
bp_gains = bp_gains[:, :, np.newaxis, :]
bp_flags = bp_flags[:, :, np.newaxis, :]
# make sure we have the right shape
right_shape = (Nants, Nfreqs, 1, Njones)
if bp_gains.shape != right_shape:
raise ValueError("bandpass gains are not the right shape; expecting {}, got {}".format(right_shape, bp_gains.shape))
# multipy into gains
gains *= bp_gains.conj()
# add flags
flags += bp_flags
# read in overall amplitude spectrum
afile = args.acal
if afile is not None:
filename, ext = os.path.splitext(afile)
if ext == '.npz':
f = np.load(afile)
amp = f["SpectrumScale"]
elif ext == '.h5' or ext == '.hdf5':
# assume spectrum is stored in hdf5 format
with h5py.File(afile, 'r') as f:
amp = f["/Data/spectrum_scale"][()]
else:
raise ValueError("unrecognized filetype for abscale spectrum")
# optionally smooth the data
if args.smooth_ratio:
# save copy of original ratio
amp_orig = copy.deepcopy(amp)
# use same frequencies as data, but in MHz
f_array = freqs / 1e6
# define cutoff channels for where to compute polynomial fit
# ignore lowest and highest 100 channels
freq_low = 100
freq_hi = Nfreqs - 100
# perform fit on inverse; clean up inf values
amp = 1. / amp
amp = np.where(np.abs(amp) == np.inf, 0., amp)
# generate 3rd order polynomial and interpolate
p = np.poly1d(np.polyfit(f_array[freq_low:freq_hi], amp[freq_low:freq_hi], 5))
amp = p(f_array)
# clean up potential NaNs and undo inversion
amp = np.where(np.isnan(amp), 0., amp)
amp = 1. / amp
if args.plot_ratio:
# make comparison plot
f = plt.figure()
ax = plt.gca()
ax.plot(f_array, amp_orig, label='raw data')
ax.plot(f_array, amp, label='smoothed')
ax.set_xlim((f_array[freq_low], f_array[freq_hi]))
leg = ax.legend(loc=0)
output = "ratio_comparison.png"
print("Saving {}...".format(output))
f.savefig(output)
plt.close(f)
# turn it into the right shape
amp = np.stack((amp,) * Njones).T
amp = np.stack((amp,) * Nants)
right_shape = (Nants, Nfreqs, Njones)
if amp.shape != (Nants, Nfreqs, Njones):
raise ValueError("amp shape is {}; was expecting {}".format(amp.shape, right_shape))
assert(np.allclose(amp[0, :, 0], amp[1, :, 0]))
assert(np.allclose(amp[0, :, 0], amp[0, :, 1]))
# add an extra dummy time axis
amp = amp[:, :, np.newaxis, :]
# multiply into the gains; we take the square root so that g_i * g_j^* gives the original
# also, some frequencies have values of inf, so we need to handle those separately
sqrt_amp = np.where(amp == np.inf, np.inf, np.sqrt(amp))
gains *= sqrt_amp
# adjust flags to account for inf values
amp_flags = np.where(amp == np.inf, True, False).astype(np.bool)
flags += amp_flags
# make the gains and flags the right number of time samples
gains = np.repeat(gains, Ntimes, axis=2)
flags = np.repeat(flags, Ntimes, axis=2)
# make into calfits
fname = args.fname
overwrite = args.overwrite
if args.multiply_gains:
gain_convention = 'multiply'
else:
gain_convention = 'divide'
gain_dict = {}
flag_dict = {}
for i, ant in enumerate(ants):
for j, pol in enumerate(jones_array):
gain_dict[(ant, pol)] = gains[i, :, :, j].T
flag_dict[(ant, pol)] = flags[i, :, :, j].T
uvc = hera_cal.io.write_cal(fname, gain_dict, freqs, times, flags=flag_dict,
overwrite=overwrite, gain_convention=gain_convention)
return
if __name__ == '__main__':
# parse args
args = ap.parse_args()
main(ap)