diff --git a/prospect/observation/__init__.py b/prospect/observation/__init__.py index ebf9c627..ff5cefa8 100644 --- a/prospect/observation/__init__.py +++ b/prospect/observation/__init__.py @@ -1,7 +1,10 @@ # -*- coding: utf-8 -*- -from .observation import Photometry, Spectrum, Lines +from .observation import Observation +from .observation import Photometry, Spectrum, Lines, UndersampledSpectrum from .observation import from_oldstyle, from_serial -__all__ = ["Observation", "Photometry", "Spectrum", "Lines", +__all__ = ["Observation", + "Photometry", "Spectrum", "Lines", + "UndersampledSpectrum", "from_oldstyle", "from_serial"] diff --git a/prospect/observation/observation.py b/prospect/observation/observation.py index 81dce927..5f993d72 100644 --- a/prospect/observation/observation.py +++ b/prospect/observation/observation.py @@ -10,7 +10,9 @@ from ..likelihood.noise_model import NoiseModel -__all__ = ["Observation", "Spectrum", "Photometry", "Lines", +__all__ = ["Observation", + "Spectrum", "Photometry", "Lines", + "UndersampledSpectrum", "from_oldstyle", "from_serial", "obstypes"] @@ -351,35 +353,50 @@ def pad_wavelength_array(self): self.padded_resolution = np.interp(self.padded_wavelength, self.wavelength, self.resolution) def _smooth_lsf_fft(self, inwave, influx, outwave, sigma): + + # construct cdf of 'resolution elements' as a fn of wavelength dw = np.gradient(outwave) sigma_per_pixel = (dw / sigma) cdf = np.cumsum(sigma_per_pixel) cdf /= cdf.max() - # check: do we need this? + + # Get number of resolution elements: is this the best way to do this? + # Can't we just use unnormalized cdf above x_per_pixel = np.gradient(cdf) x_per_sigma = np.nanmedian(x_per_pixel / sigma_per_pixel) pix_per_sigma = 1 N = pix_per_sigma / x_per_sigma nx = int(2**np.ceil(np.log2(N))) + # now evenly sample in the x coordinate - x = np.linspace(0, 1, nx) - dx = 1.0 / nx + x = np.linspace(cdf[0], 1, nx) + dx = (1.0 - cdf[0]) / nx + # convert x back to wavelength, and get model at these wavelengths lam = np.interp(x, cdf, outwave) newflux = np.interp(lam, inwave, influx) + + # smooth flux sampled at constant resolution + # could replace this with np.conv() flux_conv = smooth_fft(dx, newflux, x_per_sigma) - outflux = np.interp(outwave, lam, flux_conv) + + # sample at wavelengths of pixels + outflux = self._pixelize(outwave, lam, flux_conv) return outflux - def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0): + def _pixelize(self, outwave, inwave, influx): + # could do this with an FFT in pixel space using a sinc + return np.interp(outwave, inwave, influx) + + def instrumental_smoothing(self, model_wave_obsframe, model_flux, zred=0, libres=0): """Smooth a spectrum by the instrumental resolution, optionally accounting (in quadrature) the intrinsic library resolution. Parameters ---------- - wave_obs : ndarray of shape (N_pix_model,) + model_wave_obsframe : ndarray of shape (N_pix_model,) Observed frame wavelengths, in units of AA for the *model* - influx : ndarray of shape (N_pix_model,) + model_flux : ndarray of shape (N_pix_model,) Flux array corresponding to the observed frame wavelengths libres : float or ndarray of shape (N_pix_model,) @@ -396,12 +413,12 @@ def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0): ``influx`` is simply interpolated onto the wavelength grid """ if self.wavelength is None: - return influx + return model_flux if self.resolution is None: - return np.interp(self.wavelength, wave_obs, influx) + return np.interp(self.wavelength, model_wave_obsframe, model_flux) # interpolate library resolution onto the instrumental wavelength grid - Klib = np.interp(self.padded_wavelength, wave_obs, libres) + Klib = np.interp(self.padded_wavelength, model_wave_obsframe, libres) assert np.all(self.padded_resolution >= Klib), "data higher resolution than library" # quadrature difference of instrumental and library resolution @@ -409,8 +426,8 @@ def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0): Kdelta_lambda = Kdelta / CKMS * self.padded_wavelength # Smooth by the difference kernel - outspec_padded = self._smooth_lsf_fft(wave_obs, - influx, + outspec_padded = self._smooth_lsf_fft(model_wave_obsframe, + model_flux, self.padded_wavelength, Kdelta_lambda) @@ -419,30 +436,8 @@ def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0): class UndersampledSpectrum(Spectrum): - def _smooth_lsf_fft(self, inwave, influx, outwave, sigma): - raise NotImplementedError - # TODO does this need to be changed if outwave is undersampled? - # TODO testing - dw = np.gradient(outwave) - sigma_per_pixel = (dw / sigma) - cdf = np.cumsum(sigma_per_pixel) - cdf /= cdf.max() - # check: do we need this? - x_per_pixel = np.gradient(cdf) - x_per_sigma = np.nanmedian(x_per_pixel / sigma_per_pixel) - pix_per_sigma = 1 - N = pix_per_sigma / x_per_sigma - nx = int(2**np.ceil(np.log2(N))) - # now evenly sample in the x coordinate - x = np.linspace(0, 1, nx) - dx = 1.0 / nx - # convert x to wave - lam = np.interp(x, cdf, outwave) - newflux = np.interp(lam, inwave, influx) - flux_conv = smooth_fft(dx, newflux, x_per_sigma) - # TODO - does this do the right thing regarding edge/center of pixels? - outflux = rebin(outwave, lam, flux_conv) - return outflux + def _pixelize(self, outwave, inwave, influx): + return rebin(outwave, inwave, influx) class Lines(Spectrum): diff --git a/tests/tests_undersampling.py b/tests/tests_undersampling.py new file mode 100644 index 00000000..4a6e7432 --- /dev/null +++ b/tests/tests_undersampling.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import numpy as np +#import pytest + +from prospect.sources import CSPSpecBasis +from prospect.models import SpecModel, templates +from prospect.observation import Spectrum, UndersampledSpectrum + +#@pytest.fixture(scope="module") +def build_sps(): + sps = CSPSpecBasis(zcontinuous=1) + return sps + + +def build_model(add_neb=False, sigma_v=200): + model_params = templates.TemplateLibrary["ssp"] + model_params.update(templates.TemplateLibrary["spectral_smoothing"]) + model_params["sigma_smooth"] = dict(N=1, isfree=False, init=sigma_v) + if add_neb: + model_params.update(templates.TemplateLibrary["nebular"]) + model_params["nebemlineinspec"]["init"] = np.array([False]) + model_params["eline_sigma"] = dict(N=1, isfree=False, init=sigma_v) + + model_params["tage"]["init"] = 0.2 + + return SpecModel(model_params) + + +def build_obs(undersampling=4): + + wmin, wmax = 4000, 7000 + fwhm = 5 # instrumental LSF FWHM in AA + dl_s = fwhm / 3 # well sampled spectrum + dl_u = fwhm / 2 * undersampling # horribly undersampled spectrum + + wave = np.arange(wmin, wmax, dl_s) + resolution = (fwhm/2.355) / wave * 2.998e5 # in km/s + full = Spectrum(wavelength=wave.copy(), + flux=np.ones(len(wave)), + uncertainty=np.ones(len(wave)) / 10, + resolution=resolution, + mask=slice(None), + name="Oversampled") + wave = np.arange(wmin, wmax, dl_u) + resolution = (fwhm/2.355) / wave * 2.998e5 # in km/s + under = UndersampledSpectrum(wavelength=wave.copy(), + flux=np.ones(len(wave)), + uncertainty=np.ones(len(wave)) / 10, + resolution=resolution, + mask=slice(None), + name="Undersampled") + + obslist = [full, under] + [obs.rectify() for obs in obslist] + return obslist + + +#def test_undersample(build_sps, plot=False): +if __name__ == "__main__": + plot = True + + sps = build_sps() + obslist = build_obs() + model = build_model(add_neb=True) + + preds, x = model.predict(model.theta, observations=obslist, sps=sps) + + # TODO: turn this plot into an actual test + if plot: + import matplotlib.pyplot as pl + fig, ax = pl.subplots() + ax.plot(model.observed_wave(model._wave), model._smooth_spec, label="intrinsic") + for p, o in zip(preds, obslist): + if o.kind == "photometry": + ax.plot(o.wavelength, p, "o") + else: + ax.step(o.wavelength, p, where="mid", label=o.name) + + ax.set_xlim(o.wavelength.min(), o.wavelength.max()) \ No newline at end of file