# Standard library
import sys
import abc
import pickle
from copy import deepcopy
# Third-party
import numpy as np
from astropy.io import fits
from astropy import constants
from astropy import units as u
from astropy.convolution import convolve_fft
from fast_histogram import histogram2d
# Project
from ..util import check_units, check_random_state
from ..filters import FilterSystem, get_filter_names, get_filter_properties
from ..source import Source
__all__ = ['IdealObservation', 'ArtObservation', 'IdealImager', 'ArtImager']
class Observation(metaclass=abc.ABCMeta):
def to_pickle(self, file_name):
"""Pickle observation object."""
pkl_file = open(file_name, 'wb')
pickle.dump(self, pkl_file)
pkl_file.close()
@staticmethod
def from_pickle(file_name):
"""Load pickle of observation object."""
pkl_file = open(file_name, 'rb')
data = pickle.load(pkl_file)
pkl_file.close()
return data
def to_fits(self, file_name, overwrite=True, image_type='image', **kwargs):
"""
Write image to fits file.
Parameters
----------
file_name : str
Name of fits file.
overwrite : str, optional
If True (default), overwrite the image if it exists.
image_type : str, optional
Attribute name of the image to be written.
"""
fits.writeto(file_name,
getattr(self, image_type),
overwrite=overwrite,
**kwargs)
def copy(self):
"""Create deep copy of observation object."""
return deepcopy(self)
[docs]class IdealObservation(Observation):
"""
Return object for the ideal imager.
Parameters
----------
image : `~numpy.ndarray`
The ideal mock image.
bandpass : str
Filter of the observation.
zpt : float
The magnitude zero point.
"""
def __init__(self, image, zpt, bandpass):
self.image = image
self.zpt = zpt
self.bandpass = bandpass
[docs]class ArtObservation(Observation):
"""
Return object for the artificial imager.
Parameters
----------
raw_counts : `~numpy.ndarray`
Raw count image, including Poisson noise.
src_counts : `~numpy.ndarray`
Source count image before Poission noise is added.
sky_counts : float
Counts per pixel from the sky.
var_image : `~numpy.ndarray`
Variance image.
calibration : float
Calibration factor that converts counts to calibrated flux units.
bandpass : str
Filter of the observation.
zpt : float
The magnitude zero point.
exptime : `~astropy.units.Quantity`
The exposure time of the mock observation.
"""
def __init__(self, raw_counts, src_counts, sky_counts, image,
var_image, calibration, zpt, bandpass, exptime, mag_error):
self.raw_counts = raw_counts
self.src_counts = src_counts
self.sky_counts = sky_counts
self.image = image
self.var_image = var_image
self.calibration = calibration
self.zpt = zpt
self.bandpass = bandpass
self.exptime = exptime
self.mag_error = mag_error
mAB_0 = 48.6
def fnu_from_AB_mag(mag):
"""
Convert AB magnitude into flux density fnu in cgs units.
"""
fnu = 10.**((mag + mAB_0)/(-2.5))
return fnu*u.erg/u.s/u.Hz/u.cm**2
class Imager(metaclass=abc.ABCMeta):
"""Base class for Imager objects."""
def _check_source(self, source):
"""Verify that input object is a Source object."""
if Source not in source.__class__.__mro__:
raise Exception(f'{type(src)} is not a valid Source object.')
def _check_bandpass(self, bandpass):
"""Verify bandpass is the loaded filter list."""
if bandpass not in self.filters:
raise Exception(f'{bandpass} filter properties were not provided.')
def inject_stars(self, x, y, signal, xy_dim, mask=None):
"""
Inject sources into image.
Parameters
----------
x : `~numpy.ndarray`
Source x coordinates.
y : `~numpy.ndarray`
Source y coordinates.
signal : `~numpy.ndarray`
The signal to inject into image (e.g., flux, counts, etc...).
xy_dim : int or list-like
The dimensions of mock image in xy units. If `int` is given, it is
assumed to be both the x and y dimensions: (xy_dim, xy_dim).
mask : `~numpy.ndarray`, optional
Boolean mask that is set to True for stars you want to inject.
If None, all stars will be injected.
Returns
-------
image : `~numpy.ndarray`
The mock image *before* psf convolution.
"""
bins = tuple(np.asarray(xy_dim).astype(int))
hist_range = [[0, bins[0] - 1], [0, bins[1]- 1]]
if mask is not None:
_x = x[mask]
_y = y[mask]
_s = signal[mask]
else:
_x, _y, _s = x, y, signal
image = histogram2d(_x, _y, bins=bins, weights=_s, range=hist_range).T
return image
def apply_seeing(self, image, psf=None, boundary='wrap'):
"""
Convolve mock image with psf.
Parameters
----------
image : `~numpy.ndarray`
The mock image.
psf : `~numpy.ndarray` or None, optional
The point-spread function. If None, will return the input image.
boundary : {'fill', 'wrap'}, optional
A flag indicating how to handle boundaries:
* 'fill': set values outside the array boundary to fill_value
(default)
* 'wrap': periodic boundary
The `None` and 'extend' parameters are not supported for FFT-based
convolution.
Returns
-------
image : `~numpy.ndarray`
The psf-convolved image. If `psf` is not `None`, the original image
is returned.
"""
if psf is not None:
image = convolve_fft(image , psf,
boundary=boundary,
normalize_kernel=True)
return image
def inject_smooth_model(self, image, source, bandpass, zpt):
"""Inject smooth model if it exists."""
return NotImplementedError()
def observe(self, bandpass, psf, **kwargs):
"""Mock observe source."""
return NotImplementedError()
[docs]class IdealImager(Imager):
"""
Ideal imager for making noise-free images.
.. note::
Stars are injected in the center of pixels even if subpixel coordinates
are given. If you need subpixel precision, we recommend creating
upsampled mock images and downsampling to the desired resolution.
"""
[docs] def inject_smooth_model(self, image, source, bandpass, zpt):
"""
Inject smooth component of the star system into image if it exists.
Parameters
----------
image : `~numpy.ndarray`
The artificial image.
source : `~artpop.source.Source`
The artificial source.
bandpass : str
Observational bandpass of the mock observation.
zpt : float
Photometric zero point.
Returns
-------
image : `~numpy.ndarray`
The artificial image with the smooth model injected into it. If no
smooth model exists, the original image will be returned.
"""
if hasattr(source, 'smooth_model'):
if source.smooth_model is None:
image = image
else:
mag = source.sp.mag_integrated_component(bandpass)
amp, amp_image, name = source.mag_to_image_amplitude(mag, zpt)
setattr(source.smooth_model, name, amp_image)
yy, xx = np.mgrid[:image.shape[0], :image.shape[1]]
image = image + source.smooth_model(xx, yy)
else:
image = image
return image
[docs] def observe(self, source, bandpass, psf=None, zpt=27, mask=None, **kwargs):
"""
Make ideal observation.
Parameters
----------
source : `~artpop.source.Source`
Source to mock observer.
bandpass : str
Filter of observation. Must be a filter in the given
photometric system(s).
psf : `~numpy.ndarray` or None, optional
The point-spread function. If None, will not psf-convolve image.
zpt : float, optional
The magnitude zero point of the mock image.
mask : `~numpy.ndarray`, optional
Boolean mask that is set to True for stars you want to inject.
If None, all stars will be injected.
Returns
-------
observation : `~artpop.image.IdealObservation`
Observation object for the ideal imager.
"""
self._check_source(source)
flux = 10**(0.4 * (zpt - source.mags[bandpass]))
image = self.inject_stars(source.x, source.y, flux,
source.xy_dim, mask)
image = self.inject_smooth_model(image, source, bandpass, zpt)
image = self.apply_seeing(image, psf, **kwargs)
observation = IdealObservation(image=image, bandpass=bandpass, zpt=zpt)
return observation
[docs]class ArtImager(Imager):
"""
Imager for making fully artificial images.
.. note::
Stars are injected in the center of pixels even if subpixel coordinates
are given. If you need subpixel precision, we recommend creating
upsampled mock images and downsampling to the desired resolution.
.. note::
If you use a `phot_system` and its pre-calculated filter properties, the
conversion from magnitude to counts assumes AB magnitudes.
Parameters
----------
phot_system : str or list-like, optional
Name of the photometric system(s). Use this option if you are using
magnitudes from one of the systems with pre-calculated filter
properties included with ArtPop. See `~artpop.phot_system_list`.
diameter : float or `~astropy.units.Quantity`, optional
Diameter of the telescope aperture. If a float is given, the units
will be assumed to be meters.
read_noise : float, optional
RMS of Gaussian read noise. Set to zero by default.
efficiency : float, optional
Efficiency factor (e.g., to account for CCD efficiency, atmospheric
transmission, etc). It is set to one by default.
dlam : dict of floats or dict of ~astropy.units.Quantity`, optional
Dictionary containing the filter widths. Used for converting magnitudes
into counts. Filter names should be the keys and the widths the values.
If the wavelengths are given as floats, angstroms will be assumed.
lam_eff : dict of floats or dict of ~astropy.units.Quantity`, optional
Dictionary containing the filter effective wavelengths. Used for
converting magnitudes into counts. Filter names should be the keys
and the widths should be the values. If the wavelengths are given as
floats, angstroms will be assumed.
filter_system : `~artpop.FilterSystem`, optional
Filter system object with filter curves for the filters you use to
create artificial images. This object calculates `dlam` and `lam_eff`
for converting magnitudes into counts.
zpt_inst : dictionary, optional
Instrumental zero points. Format should be a dictionary with the filter
names as keys and the zero points as values. If not None, this alone
sets the magnitude associated with 1 count per second. This parameter
takes precedence; parameters such as the diameter will be ignored.
random_state : `None`, int, list of ints, or `~numpy.random.RandomState`
If `None`, return the `~numpy.random.RandomState` singleton used by
``numpy.random``. If `int`, return a new `~numpy.random.RandomState`
instance seeded with the `int`. If `~numpy.random.RandomState`,
return it. Otherwise raise ``ValueError``.
"""
def __init__(self, phot_system=None, diameter=10, read_noise=0.0,
efficiency=1.0, dlam=None, lam_eff=None, filter_system=None,
zpt_inst=None, random_state=None):
self.efficiency = efficiency
self.read_noise = read_noise
self.diameter = check_units(diameter, 'm')
self.rng = check_random_state(random_state)
self.dlam = None
self.lam_eff = None
self.filters = None
self.zpt_inst = None
if phot_system is not None:
self.dlam = {}
self.lam_eff = {}
self.phot_system = phot_system
self.filters = get_filter_names(phot_system)
props = get_filter_properties()
for filt in self.filters:
select = props['bandpass'] == filt
self.dlam[filt] = props[select]['dlam'][0]
self.lam_eff[filt] = props[select]['lam_eff'][0]
elif zpt_inst is not None:
self.filters = list(zpt_inst.keys())
self.zpt_inst = zpt_inst
elif dlam is not None:
assert lam_eff is not None, 'must give both dlam and lam_eff'
self.filters = list(dlam.keys())
self.dlam = dlam
self.lam_eff = lam_eff
elif filter_system is not None:
assert type(filter_system) == FilterSystem
self.filter_system = filter_system
self.filters = filter_system.filter_names
for filt in self.filters:
self.dlam[filt] = filter_system.dlam(filt).value
self.lam_eff[filt] = filter_system.lam_eff(filt).value
else:
msg = 'must give phot_system or (dlam, lam_eff) or filter_system.'
raise Exception('In order to convert mags to counts, you ' + msg)
if self.zpt_inst is None:
for filt in self.filters:
self.dlam[filt] = check_units(self.dlam[filt], 'angstrom')
self.lam_eff[filt] = check_units(
self.lam_eff[filt], 'angstrom')
@property
def area(self):
"""Area of aperture."""
r = self.diameter / 2
return np.pi * r**2
[docs] def mag_to_counts(self, mags, bandpass, exptime):
"""
Convert magnitudes to counts.
Parameters
----------
mags : `~numpy.ndarray` or `~astropy.table.Column`
AB magnitudes to be converted into counts.
bandpass : str
Filter of observation. Must be a filter in the given
photometric system(s).
exptime : float or `~astropy.units.Quantity`
Exposure time. If float is given, the units are assumed to
be `~astropy.units.second`.
Returns
-------
counts : `~numpy.ndarray`
The counts associated with each magnitude.
"""
exptime = check_units(exptime, 's')
if self.zpt_inst is None:
fnu = fnu_from_AB_mag(mags)
dlam = self.dlam[bandpass]
lam_eff = self.lam_eff[bandpass]
photon_flux = (dlam / lam_eff) * fnu / constants.h.to('erg s')
counts = photon_flux * self.area.to('cm2') * exptime.to('s')
counts = counts.decompose()
assert counts.unit == u.dimensionless_unscaled
counts *= self.efficiency
counts = counts.value
else:
counts = 10**(0.4 * (self.zpt_inst[bandpass] - mags))
counts *= exptime.to('s').value
return counts
[docs] def sb_to_counts_per_pixel(self, sb, bandpass, exptime, pixel_scale):
"""
Convert a constant surface brightness into counts per pixel.
Parameters
----------
sb : float
Surface brightness in units of `~astropy.units.mag` per square
`~astropy.units.arcsec`.
bandpass : str
Filter of observation. Must be a filter in the given
photometric system(s).
exptime : float or `~astropy.units.Quantity`
Exposure time. If float is given, the units are assumed to
be `~astropy.units.second`.
pixel_scale : `~astropy.units.Quantity`
Pixel scale.
Returns
-------
counts_per_pixel : float
Counts per pixel associated with the given surface brightness.
"""
exptime = check_units(exptime, 's')
if self.zpt_inst is None:
fnu_per_square_arcsec = fnu_from_AB_mag(sb) / u.arcsec**2
dlam = self.dlam[bandpass]
lam_eff = self.lam_eff[bandpass]
pixel_scale = pixel_scale.to('arcsec / pixel')
E_lam = (constants.h * constants.c / lam_eff).decompose().to('erg')
flam_per_square_arcsec = fnu_per_square_arcsec *\
constants.c.to('angstrom/s') / lam_eff**2
flam_per_pixel = flam_per_square_arcsec * pixel_scale**2
photon_flux_per_sq_pixel = (flam_per_pixel * dlam / E_lam).\
decompose().to('1/(cm2*pix2*s)')
counts_per_pixel = photon_flux_per_sq_pixel * exptime.to('s')
counts_per_pixel *= self.area.to('cm2') * u.pixel**2
assert counts_per_pixel.unit == u.dimensionless_unscaled
counts_per_pixel *= self.efficiency
counts_per_pixel = counts_per_pixel.value
else:
counts_per_pixel = 10**(0.4 * (self.zpt_inst[bandpass] - sb))
counts_per_pixel *= exptime.to('s').value
return counts_per_pixel
[docs] def calibration(self, bandpass, exptime, zpt=27.0):
"""
Calculate the calibration factor, which converts counts into
calibrated flux units.
Parameters
----------
bandpass : str
Filter of observation. Must be a filter in the given
photometric system(s).
exptime : float or `~astropy.units.Quantity`
Exposure time. If float is given, the units are assumed to
be `~astropy.units.second`.
zpt : float, optional
The magnitude zero point of the mock image.
Returns
-------
cali_factor : float
The calibration factor.
"""
if self.zpt_inst is None:
dlam_over_lam = self.dlam[bandpass] / self.lam_eff[bandpass]
lam_factor = (dlam_over_lam / constants.h.cgs).value
cali_factor = 10**(0.4 * zpt) * 10**(0.4 * mAB_0) / lam_factor
cali_factor /= exptime.to('s').value
cali_factor /= self.area.to('cm2').value
cali_factor /= self.efficiency
else:
cali_factor = 10**(0.4 * (zpt - self.zpt_inst[bandpass]))
cali_factor /= exptime.to('s').value
return cali_factor
[docs] def inject_smooth_model(self, image, source, bandpass, exptime, zpt):
"""
Inject smooth component of the star system into image if it exists.
Parameters
----------
image : `~numpy.ndarray`
The artificial image.
source : `~artpop.source.Source`
The artificial source.
bandpass : str
Observational bandpass of the mock observation.
exptime : float or `~astropy.units.Quantity`
Exposure time. If `float` is given, the units are assumed to
be `~astropy.units.second`.
zpt : float
Photometric zero point.
Returns
-------
image : `~numpy.ndarray`
The artificial image with the smooth model injected into it. If no
smooth model exists, the original image will be returned.
"""
if hasattr(source, 'smooth_model'):
if source.smooth_model is None:
image = image
else:
mag = source.sp.mag_integrated_component(bandpass)
amp, _, name = source.mag_to_image_amplitude(mag, zpt)
amp_counts = self.sb_to_counts_per_pixel(
amp, bandpass, exptime, source.pixel_scale)
setattr(source.smooth_model, name, amp_counts)
yy, xx = np.mgrid[:image.shape[0], :image.shape[1]]
image = image + source.smooth_model(xx, yy)
else:
image = image
return image
[docs] def observe(self, source, bandpass, exptime, sky_sb=None, psf=None,
zpt=27.0, mask=None, **kwargs):
"""
Make artificial observation.
Parameters
----------
source : `~artpop.source.Source`
Artificial source to be observed.
bandpass : str
Filter of observation. Must be a filter in the given
photometric system(s).
exptime : float or `~astropy.units.Quantity`
Exposure time. If `float` is given, the units are assumed to
be `~astropy.units.second`.
sky_sb : float or None, optional
Constant surface brightness of the sky. If None is given, then no
sky noise will be added.
psf : `~numpy.ndarray` or None, optional
The point-spread function. If None, will not psf-convolve image.
zpt : float, optional
The magnitude zero point of the mock image.
mask : `~numpy.ndarray`, optional
Boolean mask that is set to True for stars you want to inject.
If None, all stars will be injected.
Returns
-------
observation : `~artpop.image.ArtObservation`
Observation object for the artificial imager.
"""
self._check_bandpass(bandpass)
self._check_source(source)
exptime = check_units(exptime, 's')
counts = self.mag_to_counts(source.mags[bandpass], bandpass, exptime)
src_counts = self.inject_stars(source.x, source.y,
counts, source.xy_dim, mask)
src_counts = self.inject_smooth_model(src_counts, source,
bandpass, exptime, zpt)
src_counts = self.apply_seeing(src_counts, psf, **kwargs)
if sky_sb is not None:
sky_counts = self.sb_to_counts_per_pixel(
sky_sb, bandpass, exptime, source.pixel_scale)
n_s = np.sqrt(self.read_noise**2 + sky_counts + counts) / counts
mag_error = 2.5 * np.log10(1 + n_s)
else:
src_counts[src_counts < 0] = 0
sky_counts = 0
mag_error = None
# HACK: Use Normal distribution for Windows. Poisson sampling
# breaks in Windows due to long ints being 32 bit.
if sys.platform =='win32':
_c = src_counts + sky_counts
raw_counts = self.rng.normal(loc=_c, scale=np.sqrt(_c))
else:
raw_counts = self.rng.poisson(src_counts + sky_counts)
if self.read_noise > 0.0:
rn = self.rng.normal(scale=self.read_noise, size=src_counts.shape)
raw_counts = raw_counts + rn
cali = self.calibration(bandpass, exptime, zpt)
image_cali = (raw_counts - sky_counts) * cali
var = raw_counts + self.read_noise**2
observation = ArtObservation(
raw_counts=raw_counts,
src_counts=src_counts,
sky_counts=sky_counts,
image=image_cali,
var_image=var,
calibration=cali,
zpt=zpt,
bandpass=bandpass,
exptime=exptime,
mag_error=mag_error
)
return observation