Skip to content

Commit

Permalink
fixes to pixelization for undersampled spectra.
Browse files Browse the repository at this point in the history
  • Loading branch information
bd-j committed Jun 18, 2024
1 parent 9599c70 commit f9cc18a
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 39 deletions.
7 changes: 5 additions & 2 deletions prospect/observation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-

from .observation import Photometry, Spectrum, Lines
from .observation import Observation
from .observation import Photometry, Spectrum, Lines, UndersampledSpectrum
from .observation import from_oldstyle, from_serial

__all__ = ["Observation", "Photometry", "Spectrum", "Lines",
__all__ = ["Observation",
"Photometry", "Spectrum", "Lines",
"UndersampledSpectrum",
"from_oldstyle", "from_serial"]
69 changes: 32 additions & 37 deletions prospect/observation/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from ..likelihood.noise_model import NoiseModel


__all__ = ["Observation", "Spectrum", "Photometry", "Lines",
__all__ = ["Observation",
"Spectrum", "Photometry", "Lines",
"UndersampledSpectrum",
"from_oldstyle", "from_serial", "obstypes"]


Expand Down Expand Up @@ -351,35 +353,50 @@ def pad_wavelength_array(self):
self.padded_resolution = np.interp(self.padded_wavelength, self.wavelength, self.resolution)

def _smooth_lsf_fft(self, inwave, influx, outwave, sigma):

# construct cdf of 'resolution elements' as a fn of wavelength
dw = np.gradient(outwave)
sigma_per_pixel = (dw / sigma)
cdf = np.cumsum(sigma_per_pixel)
cdf /= cdf.max()
# check: do we need this?

# Get number of resolution elements: is this the best way to do this?
# Can't we just use unnormalized cdf above
x_per_pixel = np.gradient(cdf)
x_per_sigma = np.nanmedian(x_per_pixel / sigma_per_pixel)
pix_per_sigma = 1
N = pix_per_sigma / x_per_sigma
nx = int(2**np.ceil(np.log2(N)))

# now evenly sample in the x coordinate
x = np.linspace(0, 1, nx)
dx = 1.0 / nx
x = np.linspace(cdf[0], 1, nx)
dx = (1.0 - cdf[0]) / nx
# convert x back to wavelength, and get model at these wavelengths
lam = np.interp(x, cdf, outwave)
newflux = np.interp(lam, inwave, influx)

# smooth flux sampled at constant resolution
# could replace this with np.conv()
flux_conv = smooth_fft(dx, newflux, x_per_sigma)
outflux = np.interp(outwave, lam, flux_conv)

# sample at wavelengths of pixels
outflux = self._pixelize(outwave, lam, flux_conv)
return outflux

def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0):
def _pixelize(self, outwave, inwave, influx):
# could do this with an FFT in pixel space using a sinc
return np.interp(outwave, inwave, influx)

def instrumental_smoothing(self, model_wave_obsframe, model_flux, zred=0, libres=0):
"""Smooth a spectrum by the instrumental resolution, optionally
accounting (in quadrature) the intrinsic library resolution.
Parameters
----------
wave_obs : ndarray of shape (N_pix_model,)
model_wave_obsframe : ndarray of shape (N_pix_model,)
Observed frame wavelengths, in units of AA for the *model*
influx : ndarray of shape (N_pix_model,)
model_flux : ndarray of shape (N_pix_model,)
Flux array corresponding to the observed frame wavelengths
libres : float or ndarray of shape (N_pix_model,)
Expand All @@ -396,21 +413,21 @@ def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0):
``influx`` is simply interpolated onto the wavelength grid
"""
if self.wavelength is None:
return influx
return model_flux
if self.resolution is None:
return np.interp(self.wavelength, wave_obs, influx)
return np.interp(self.wavelength, model_wave_obsframe, model_flux)

# interpolate library resolution onto the instrumental wavelength grid
Klib = np.interp(self.padded_wavelength, wave_obs, libres)
Klib = np.interp(self.padded_wavelength, model_wave_obsframe, libres)
assert np.all(self.padded_resolution >= Klib), "data higher resolution than library"

# quadrature difference of instrumental and library resolution
Kdelta = np.sqrt(self.padded_resolution**2 - Klib**2)
Kdelta_lambda = Kdelta / CKMS * self.padded_wavelength

# Smooth by the difference kernel
outspec_padded = self._smooth_lsf_fft(wave_obs,
influx,
outspec_padded = self._smooth_lsf_fft(model_wave_obsframe,
model_flux,
self.padded_wavelength,
Kdelta_lambda)

Expand All @@ -419,30 +436,8 @@ def instrumental_smoothing(self, wave_obs, influx, zred=0, libres=0):

class UndersampledSpectrum(Spectrum):

def _smooth_lsf_fft(self, inwave, influx, outwave, sigma):
raise NotImplementedError
# TODO does this need to be changed if outwave is undersampled?
# TODO testing
dw = np.gradient(outwave)
sigma_per_pixel = (dw / sigma)
cdf = np.cumsum(sigma_per_pixel)
cdf /= cdf.max()
# check: do we need this?
x_per_pixel = np.gradient(cdf)
x_per_sigma = np.nanmedian(x_per_pixel / sigma_per_pixel)
pix_per_sigma = 1
N = pix_per_sigma / x_per_sigma
nx = int(2**np.ceil(np.log2(N)))
# now evenly sample in the x coordinate
x = np.linspace(0, 1, nx)
dx = 1.0 / nx
# convert x to wave
lam = np.interp(x, cdf, outwave)
newflux = np.interp(lam, inwave, influx)
flux_conv = smooth_fft(dx, newflux, x_per_sigma)
# TODO - does this do the right thing regarding edge/center of pixels?
outflux = rebin(outwave, lam, flux_conv)
return outflux
def _pixelize(self, outwave, inwave, influx):
return rebin(outwave, inwave, influx)


class Lines(Spectrum):
Expand Down
81 changes: 81 additions & 0 deletions tests/tests_undersampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
#import pytest

from prospect.sources import CSPSpecBasis
from prospect.models import SpecModel, templates
from prospect.observation import Spectrum, UndersampledSpectrum

#@pytest.fixture(scope="module")
def build_sps():
sps = CSPSpecBasis(zcontinuous=1)
return sps


def build_model(add_neb=False, sigma_v=200):
model_params = templates.TemplateLibrary["ssp"]
model_params.update(templates.TemplateLibrary["spectral_smoothing"])
model_params["sigma_smooth"] = dict(N=1, isfree=False, init=sigma_v)
if add_neb:
model_params.update(templates.TemplateLibrary["nebular"])
model_params["nebemlineinspec"]["init"] = np.array([False])
model_params["eline_sigma"] = dict(N=1, isfree=False, init=sigma_v)

model_params["tage"]["init"] = 0.2

return SpecModel(model_params)


def build_obs(undersampling=4):

wmin, wmax = 4000, 7000
fwhm = 5 # instrumental LSF FWHM in AA
dl_s = fwhm / 3 # well sampled spectrum
dl_u = fwhm / 2 * undersampling # horribly undersampled spectrum

wave = np.arange(wmin, wmax, dl_s)
resolution = (fwhm/2.355) / wave * 2.998e5 # in km/s
full = Spectrum(wavelength=wave.copy(),
flux=np.ones(len(wave)),
uncertainty=np.ones(len(wave)) / 10,
resolution=resolution,
mask=slice(None),
name="Oversampled")
wave = np.arange(wmin, wmax, dl_u)
resolution = (fwhm/2.355) / wave * 2.998e5 # in km/s
under = UndersampledSpectrum(wavelength=wave.copy(),
flux=np.ones(len(wave)),
uncertainty=np.ones(len(wave)) / 10,
resolution=resolution,
mask=slice(None),
name="Undersampled")

obslist = [full, under]
[obs.rectify() for obs in obslist]
return obslist


#def test_undersample(build_sps, plot=False):
if __name__ == "__main__":
plot = True

sps = build_sps()
obslist = build_obs()
model = build_model(add_neb=True)

preds, x = model.predict(model.theta, observations=obslist, sps=sps)

# TODO: turn this plot into an actual test
if plot:
import matplotlib.pyplot as pl
fig, ax = pl.subplots()
ax.plot(model.observed_wave(model._wave), model._smooth_spec, label="intrinsic")
for p, o in zip(preds, obslist):
if o.kind == "photometry":
ax.plot(o.wavelength, p, "o")
else:
ax.step(o.wavelength, p, where="mid", label=o.name)

ax.set_xlim(o.wavelength.min(), o.wavelength.max())

0 comments on commit f9cc18a

Please sign in to comment.