Source code for skysurvey.target.core

"""
The basic `Target` and `Transient` object are defined in this module.
Pre-defined transients (like SNIa) inherit from Transient and new object you want to define shall.
"""

import warnings
import numpy as np
import pandas

from copy import deepcopy
from tqdm import tqdm
from astropy import cosmology, time
from astropy.utils.decorators import classproperty

from ..tools.utils import parse_skyarea


[docs] class Target( object ): """ Base class for targets. This class provides a framework for representing astronomical targets, including their models, templates, and cosmological parameters. Parameters ---------- _KIND : str, optional The kind of target, by default "unknow" _TEMPLATE : object, optional The template for the target, by default None _MODEL : dict, optional The model for the target, by default None _MAGSYS : str The photometric magnitude system. Defaults to "ab". _PEAK_ABSMAG_BAND : str The bandpass in which the peak absolute magnitude is defined. Defaults to "bessellb". _AMPLITUDE_NAME : str The name of the parameter used to scale the model flux. Defaults to "amplitude". _COSMOLOGY : astropy.cosmology, optional The cosmology to use, by default cosmology.Planck18 See Also -------- ``from_setting``: loads an instance given model parameters (dict) """ _KIND = "unknow" _TEMPLATE = None _MODEL = None # dict config # Params to set peak amplitude _MAGSYS = "ab" _PEAK_ABSMAG_BAND = "bessellb" _AMPLITUDE_NAME = "amplitude" # - Cosmo _COSMOLOGY = cosmology.Planck18 def __init__(self): pass # def __repr__(self): # """ String representation of the instance. """ # return self.__str__() # def __str__(self): # """ String representation of the instance. """ # return self.__class__
[docs] @classmethod def from_setting(cls, setting, **kwargs): """ Load the target from a setting dictionary. .. note:: Not implemented yet. Parameters ---------- setting : dict Dictionary containing the model parameters. **kwargs Additional keyword arguments. Returns ------- Target The loaded target. """ raise NotImplementedError("from_setting is not Implemented ")
[docs] @classmethod def from_data(cls, data, template=None, model=None, **kwargs): """Load the instance given existing data. This means that the model will be ignored as data will not be generated but input. Parameters ---------- data : `pandas.DataFrame` DataFrame containing (at least) the template parameters. template : str, `sncosmo.Source`, `sncosmo.Model` or ``skysurvey.Template``, optional The template source. If a string is given, it is assumed to be a `sncosmo` model name. By default None. model : dict, optional Defines how template parameters are drawn and how they are connected. The model will update the default `cls._MODEL` if any. If None, `cls._MODEL` is used as default. By default None. Returns ------- Target The loaded target. See Also -------- ``from_draw``: loads the instance from a random draw of targets given the model """ init_kwargs, kwargs = cls._parse_init_kwargs_(**kwargs) this = cls(**init_kwargs) if template is not None: this.set_template(template, rate_update=False) if model is not None: this.update_model(**model, rate_update=False) # will update any model entry. if template is not None and model is not None: this._update_rate_in_model_() this.set_data(data) return this
[docs] @classmethod def from_draw(cls, size=None, model=None, template=None, zmax=None, tstart=None, tstop=None, zmin=0, nyears=None, skyarea=None, rate=None, effect=None, cosmology=None, verbose=False, set_amplitude=False, **kwargs): """Load the instance from a random draw of targets given the model. kwargs may hold specific transient options like: - magabs for TSTransient(){self._additional_input} (see class.__init__). Parameters ---------- size : int, optional Number of target you want to sample. If None, 1 is assumed. Ignored if `nyears` is given. By default None. model : dict, optional Defines how template parameters are drawn and how they are connected. The model will update the default `cls._MODEL` if any. If None, `cls._MODEL` is used as default. By default None. template : str, optional Name of the template (`sncosmo.Model(source)`). If None, `cls._TEMPLATE` is used as default. By default None. zmax : float, optional Maximum redshift to be simulated. By default None. tstart : float, str, optional Starting time of the simulation. If a string is given, it is converted to mjd. By default None. tstop : float, str, optional Ending time of the simulation. If a string is given, it is converted to mjd. If `tstart` and `nyears` are both given, `tstop` will be overwritten by `tstart + 365.25 * nyears`. By default None. zmin : float, optional Minimum redshift to be simulated. By default 0. nyears : float, optional If given, `nyears` will set: - `size`: it will be the number of target expected up to `zmax` in the given number of years. This uses `get_rate(zmax)`. - `tstop`: `tstart + 365.25 * nyears` By default None. skyarea : None, str, geometry, optional Sky area to be considered. - str: 'full' (equivalent to None), ['extra-galactic', not implemented yet] - geometry: shapely.Geometry - None: full sky By default None. rate : float, callable, optional The transient rate. If a float is given, it is assumed to be the number of targets per Gpc3. If a callable is given, it is supposed to be a function of z that returns the volumetric rate as a function of wavelength. effect : [type], optional [description]. By default None. cosmology: None, astropy.Cosmology, optional specify the cosmology to be used. set_amplitude: bool should the amplitude of the template be set at this stage ? **kwargs Goes to ``self.update_model_parameter()``. Returns ------- Target The loaded target with data, model and template loaded. See Also -------- ``from_setting``: loads an instance given model parameters (dict) """ init_kwargs, kwargs = cls._parse_init_kwargs_(**kwargs) this = cls(**init_kwargs) # backward compatibility if cosmology is not None: this.set_cosmology(cosmology) if template is not None: this.set_template(template) if rate is not None: this.set_rate(rate) if model is not None: this.update_model(**model, rate_update=False) # will update any model entry. if effect is not None: this.add_effect(effect) # may update the model entry. if kwargs: this.update_model_parameter(**kwargs, rate_update=False) # cleaning rate automatic feeding in model this._update_rate_in_model_() _ = this.draw( size=size, zmin=zmin, zmax=zmax, tstart=tstart, tstop=tstop, nyears=nyears, skyarea=skyarea, inplace=True, # creates self.data verbose=verbose, set_amplitude=set_amplitude ) return this
@classmethod def _parse_init_kwargs_(cls, **kwargs): """ Trick to add specific subclass kwargs into the init. """ # first kwargs/dict => init # second kwargs => rest (template) return {}, kwargs
[docs] def set_cosmology(self, cosmology): """Set the cosmology to be used across the target. Parameters ---------- cosmology: astropy.Cosmology the cosmology to be used. """ self._cosmology = cosmology
# ------------- # # Template # # ------------- #
[docs] def set_template(self, template, rate_update=False): """Set the template. .. note:: It is unlikely you want to set this directly. Parameters ---------- template : str, `sncosmo.Source`, `sncosmo.Model` or ``skysurvey.Template`` This will reset ``self.template`` to the new template source. rate_update : bool, optional [description], by default False See Also -------- ``from_draw``: load the instance by a random draw generation. ``from_setting``: loads an instance given model parameters """ from ..template import parse_template self._template = parse_template(template) if rate_update: warnings.warn("rate_update in set_template is not implemented. If you see this message, contact Mickael")
[docs] def get_template(self, index=None, as_model=False, data=None, set_magabs=False, **kwargs): """Get a template (`sncosmo.Model`). Parameters ---------- index : int, optional Index of a target (see `self.data.index`) to set the template parameters to that of the target. If None, the default `sncosmo.Model` parameters will be used. By default None. as_model : bool, optional should this return the sncosmo.Model (True) or the skysurvey.Template (for info sncosmo.Model => skysurvey.Template.sncosmo_model) data: `pandas.DataFrame`, None, optional which data should be used to set the parameter of the template. Ignored if index is None. set_magabs: bool, optional should the peal magnitude of the template be set to magabs ? **kwargs Goes to ``self.template.get()`` and passed to `sncosmo.Model`. Returns ------- ``skysurvey.Template`` or `sncosmo.Model` An instance of the template (or its associated `sncosmo.Model`). (see ``as_model``) See Also -------- ``get_target_template``: get a template set to the target parameters. ``get_template_parameters``: get the template parameters for the given target """ if data is None: data = self.data if index is not None: prop = self.get_template_parameters(index, data=data).to_dict() kwargs = prop | kwargs _ = kwargs.pop(self.amplitude_name, None) sncosmo_model = self.template.get(**kwargs) if index is not None and set_magabs: peak_absmag = data.loc[index, "magabs"] peak_absmag_band = self.peak_absmag_band peak_absmag_magsys = self.magsys sncosmo_model.set_source_peakabsmag( absmag=peak_absmag, band=peak_absmag_band, magsys=peak_absmag_magsys, cosmo=self.cosmology ) if not as_model: from ..template import Template return Template.from_sncosmo(sncosmo_model) return sncosmo_model
[docs] def get_target_template(self, index, as_model=False, **kwargs): """Get a template set to the target parameters. This is a shortcut to `get_template(index=index, **kwargs)`. Parameters ---------- index : int Index of a target (see `self.data.index`) to set the template parameters to that of the target. as_model : bool, optional should this return the `sncosmo.Model` (True) or the ``skysurvey.Template`` (for info `sncosmo.Model` => ``skysurvey.Template.sncosmo_model``) **kwargs Goes to ``self.template.get()`` and passed to `sncosmo.Model`. Returns ------- ``skysurvey.Template`` or `sncosmo.Model` An instance of the template (or its associated `sncosmo.Model`). (see as_model) See Also -------- ``get_template``: get a template instance (`sncosmo.Model`) ``get_template_parameters``: get the template parameters for the given target """ return self.get_template(index=index, as_model=as_model, **kwargs)
[docs] def get_target_flux(self, index, band, phase, zp=None, zpsys=None, restframe=True): """Flux through the given bandpass(es) at the given time(s). Default return value is flux in photons / s / cm^2. If `zp` and `zpsys` are given, flux(es) are scaled to the requested zeropoints. Parameters ---------- index : int Index of a target (see `self.data.index`) to set the template parameters to that of the target. band : str or list_like Name(s) of Bandpass(es) in registry. phase : float or list_like Phase in day. zp : float or list_like, optional If given, zeropoint to scale flux to (must also supply `zpsys`). If not given, flux is not scaled. By default None. zpsys : str or list_like, optional Name of a magnitude system in the registry, specifying the system that `zp` is in. By default None. restframe : bool, optional Is phase given in restframe? By default True. Returns ------- float or `numpy.ndarray` Flux in photons / s /cm^2, unless `zp` and `zpsys` are given, in which case flux is scaled so that it corresponds to the requested zeropoint. Return value is `float` if all input parameters are scalars, `numpy.ndarray` otherwise. """ sncosmo_model = self.get_target_template(index).sncosmo_model phase_obs = phase if not restframe else phase*(1+self.data.loc[index]["z"]) return sncosmo_model.bandflux(band, sncosmo_model.get('t0')+phase_obs, zp=zp, zpsys=zpsys)
[docs] def get_target_peakmag(self, index, band, magsys="ab"): """Peak magnitude through the given bandpass(es) at the given time(s). Parameters ---------- index : int Index of a target (see `self.data.index`) to set the template parameters to that of the target. band : str or list_like Name(s) of Bandpass(es) in registry. magsys : str or list_like, optional Name(s) of `sncosmo.MagSystem` in registry. By default "ab". Returns ------- float Magnitude at peak for the given band. """ sncosmo_model = self.get_template(index=index, set_magabs=True, as_model=True) return sncosmo_model.bandmag(band, magsys, sncosmo_model.get("t0") + sncosmo_model._source.peakphase(band))
[docs] def get_target_mag(self, index, band, phase, magsys="ab", restframe=True): """Magnitude through the given bandpass(es) at the given time(s). Parameters ---------- index : int Index of a target (see `self.data.index`) to set the template parameters to that of the target. band : str or list_like Name(s) of Bandpass(es) in registry. phase : float or list_like Phase in day. magsys : str or list_like, optional Name(s) of `sncosmo.MagSystem` in registry. By default "ab". restframe : bool, optional Is phase given in restframe? By default True. Returns ------- float or `numpy.ndarray` Magnitude for each item in time, band, magsys. The return value is a float if all parameters are not interables. The return value is an `numpy.ndarray` if any are interable. """ sncosmo_model = self.get_template(index=index, set_magabs=True, as_model=True) phase_obs = phase if not restframe else phase*(1+self.data.loc[index]["z"]) return sncosmo_model.bandmag(band=band, time=sncosmo_model.get('t0')+phase_obs, magsys=magsys)
[docs] def clone_target_change_entry(self, index, name, values, as_dataframe=False): """Get a clone of the given target at the given redshifts. This: 1. copies the index entries, 2. sets the `name` to the input `values` 3. redraw the model starting from `name` (creating a new dataframe) 4. (optional) sets a new instance with the updated dataframe Parameters ---------- index : int Index of a target (see `self.data.index`). name : str Name of the entry to change. values : list, array New values for this entry. as_dataframe : bool, optional Should this return the created new dataframe (True) or a new instance (False). By default False. Returns ------- `Target` or DataFrame The cloned target or the new dataframe. """ dd = self.data.loc[index].to_frame().T dd.loc[index, name] = np.atleast_1d(values) dd = dd.explode(name) # dd[name] = dd[name].convert_dtypes() data = self.model.redraw_from(name, dd, incl_name=False) if as_dataframe: return data return self.__class__.from_data(data, model=self.model.model, template=self.template)
# -------------- # # Getter # # -------------- #
[docs] def get_template_parameters(self, index=None, data=None): """Get the template parameters for the given target. This method selects from `self.data` the parameters that actually are parameters of the template (and disregards the rest). Parameters ---------- index : int, optional Index of a target (see ``self.data.index``) to get the template parameters from that target only. By default None. Returns ------- `pandas.DataFrame` or `pandas.Series` The template parameters. See Also -------- ``template_parameter``: parameters of the template (`sncosmo.Model`) | argument ``get_template``: get a template instance (`sncosmo.Model`) """ if data is None: data = self.data known = self.get_template_columns(data=data) prop = data[known] if index is not None: return prop.loc[index] return prop
[docs] def get_template_columns(self, data=None): """Get the data columns that are template parameters. Returns ------- `pandas.Index` The template columns. """ if data is None: data = self.data return data.columns[np.isin(data.columns, self.template_parameters)]
# -------------- # # Apply # # -------------- #
[docs] def apply_gaussian_noise(self, errmodel, data=None): """Apply gaussian noise to current entries. Parameters ---------- errmodel : dict Dict that will feed a `ModelDAG`. The format is `{x: {func:, kwargs:{}}}`. This will draw `x_err` following the this formula and will update `x` assuming `x_true` for the original `x` and `x_err` for the given `x` drawn here. You can refeer to the original `x` using `'@x_true'` in the func kwargs. data : None, optional Original dataframe to be noisified. If None `self.data` is used. By default None. Returns ------- `Target` or DataFrame - `self` if data is None - `DataFrame` otherwise. Examples -------- >>> import skysurvey >>> from scipy import stats >>> errmodel = {"x1": {"func": stats.lognorm.rvs, "kwargs":{"s":0.6, "loc":0.001, "scale":0.15}}, ... "c": {"func": stats.lognorm.rvs, "kwargs":{"s":0.7, "loc":0.03, "scale":0.01}}, ... "magobs": {"func": stats.lognorm.rvs, "kwargs":{"s":0.9, "loc":0.03, "scale":0.01}}, ... } >>> snia = skysurvey.SNeIa.from_draw(1000) >>> snia = snia.apply_gaussian_noise(errmodel, data=snia.data) """ from modeldag.tools import apply_gaussian_noise if data is None: data = self.data as_dataframe = False else: as_dataframe = True new_data = apply_gaussian_noise(errmodel, data=data) if as_dataframe: return new_data return self.__class__.from_data(new_data, model=self.model.model, template=self.template)
# -------------- # # Converts # # -------------- #
[docs] def magabs_to_magobs(self, z, magabs, cosmology=None): """Convert absolute magnitude into observed magnitude. This is done given the (cosmological) redshift and a cosmology. Parameters ---------- z : float, array-like Cosmological redshift. magabs : float, array-like Absolute magnitude. cosmology: `astropy.Cosmology`, None specify the cosmology to use to convert observed- to absolute-magnitude. If None, self.cosmology is used. *Careful* with specifying the cosmology, in a self consistant why. Returns ------- array-like Array of observed magnitude (`distmod(z) + magabs`). """ if cosmology is None: cosmology = self.cosmology return self._magabs_to_magobs(z, magabs, cosmology=cosmology)
@staticmethod def _magabs_to_magobs(z, magabs, cosmology): """Convert absolute magnitude into observed magnitude. This is an internal method. Parameters ---------- z : float, array-like Cosmological redshift. magabs : float, array-like Absolute magnitude. cosmology : `astropy.Cosmology` Cosmology to use. Returns ------- array-like Array of observed magnitude (`distmod(z) + magabs`). """ return cosmology.distmod(np.asarray(z, dtype="float32")).value + magabs # -------------- # # Model # # -------------- #
[docs] def set_model(self, model, rate_update=True): """Set the target model. The model defines what template parameters to draw and how they are connected. .. note:: It is unlikely you need to use that directly. Parameters ---------- model : dict or `ModelDAG` Model that will be used to draw the `Target` parameter. rate_update : bool, optional Should this check for rate options and feedin `rate=self.rate`? By default True. Returns ------- None See Also -------- ``from_setting``: loads an instance given model parameters (dict) ``from_draw``: loads and draw random data. """ from modeldag import ModelDAG if type( model ) is dict: model = ModelDAG(model, self) self._model = model if rate_update: self._update_rate_in_model_()
[docs] def set_data(self, data, incl_template=True): """Attach data to this instance. Parameters ---------- data : `pandas.DataFrame` DataFrame containing (at least) the template parameters. incl_template : bool, optional If data does not contain the template column should this add it? By default True. Returns ------- None """ if "template" not in data and incl_template: if self.template is None: templatename = "unknown" else: templatename = self.template_source.name data["template"] = templatename self._data = data
[docs] def get_model(self, **kwargs): """Get a copy of the model (dict). You can change the model you get (not the current model) using the kwargs. Parameters ---------- **kwargs Can change the model entry parameters for istance, `t0: {"low":0, "high":10}` will update `model["t0"]["param"] = ...` Returns ------- dict A copy of the model (with param potentially updated). See Also -------- ``update_model``: change the current model (not just the one you get) ``get_model_parameter``: access the model parameters. """ return self.model.get_model(**kwargs)
[docs] def get_model_parameter(self, entry, key, default=None, model=None): """Access a parameter of the model. Parameters ---------- entry : str Name of the variable as given by the model dict. key : str Name of the parameters. default : any, optional Value returned if the parameter is not found. By default None. model : modelDAG, optional Get the parameter of this model instead of `self.model`. Use with caution. By default None. Returns ------- any Value of the entry parameter. Examples -------- >>> self.get_model_parameter('redshift', 'zmax', None) """ if model is None: model = self.model return model.model[entry]["kwargs"].get(key, default)
[docs] def update_model_parameter(self, rate_update=True, **kwargs): """Change the kwargs entry of a model.""" # use copy to avoid classmethod issues for k, v in kwargs.items(): self.model.model[k]["kwargs"] = self.model.model[k].get("kwargs",{}) | v if rate_update: self._update_rate_in_model_()
[docs] def update_model(self, rate_update=True, **kwargs): """Change the given entries of the model. Parameters ---------- rate_update : bool, optional [description], by default True **kwargs Will update any model entry (or create a new one at the end). Examples -------- Changing the `b` entry function and make it depends on `a` >>> rng = np.random.default_rng() >>> self.update_model(b={"func":rng.normal, "kwargs":{"loc":"@a", "scale":1}}) """ new_model = self.model.model | kwargs _ = self.set_model(new_model, rate_update=rate_update)
def _update_rate_in_model_(self, warn_if_more=1): """Update the rate in the model.""" keys = self.model.get_func_with_args("rate") if len(keys)>warn_if_more: warnings.warn(f"more than {warn_if_more} entries have 'rate' in their options ({keys=})") self.update_model_parameter(**{k: {"rate": self.rate} for k in keys}, rate_update=False)
[docs] def add_effect(self, effect, model=None, data=None, overwrite=False, **kwargs): """Add an effect to the target affecting how spectra or lightcurve are generated. This changes the template, using ``self.template.add_effect()``, and changes the target's model if ``effect.model`` is set. Parameters ---------- effect : dict, ``skysurvey.effect.Effect`` Effect that should be used to change the target. e.g. ``mw_ebv = skysurvey.effect.Effect.from_name('mw')`` These format are accepted: - dict: ``{effect: sncosmo.Effect, "name": str, "frame": str, (model: optionel)}`` - ``skysurvey.effect.Effect`` model : dict, optional Defines how the data will be drawn. This updates ``self.model``. By default None. data : `pandas.DataFrame`, optional Value that will be added to the data to capture the effect (if any). If data and model are given, model is not used. By default None. overwrite : bool, optional [description], by default False **kwargs: goes to ``self.data.merge(data, **kwargs)`` if data is given. Ignored otherwise. Returns ------- None """ if type(effect) is dict: from .. import Effect effect = Effect(**effect) # update the model if model is not None: effect._model = model if effect.model is not None: self.update_model(**effect.model, rate_update=False) # update the data if data is not None: if self.data is None: warnings.warn("no current data. cannot merge. input effect 'data' is ignored") else: new_data = self.data.merge(data, **kwargs) self.set_data(new_data) elif effect.model is not None and self.data is not None: # if not self.data, this will be drawn along with the data on time. keys_to_draw = list(effect.model.keys()) if not overwrite and np.any([k in self.data for k in keys_to_draw]): warnings.warn(f"some or all of {keys_to_draw} are already in self.data. Set overwrite to True to overwrite them. Data unchanged.") else: new_data = self.model.redraw_from(keys_to_draw, self.data) self.set_data(new_data) # update the template from this effect _ = self.template.add_effect(effect)
# -------------- # # Plotter # # -------------- #
[docs] def show_scatter(self, xkey, ykey, ckey=None, ax=None, fig=None, index=None, data=None, colorbar=True, bins=None, bcolor="0.6", err_suffix="_err", **kwargs): """Show a scatter plot of the data. Parameters ---------- xkey : str The key for the x-axis data. ykey : str The key for the y-axis data. ckey : str, optional The key for the color-axis data. By default None. ax : `matplotlib.axes.Axes`, optional The axes on which to plot. By default None. fig : `matplotlib.figure.Figure`, optional The figure on which to plot. By default None. index : int, optional The index of the data to plot. By default None. data : `pandas.DataFrame`, optional The data to plot. By default None. colorbar : bool, optional Whether to show a colorbar. By default True. bins : int, optional The number of bins to use for the histogram. By default None. bcolor : str, optional The color of the bins. By default "0.6". err_suffix : str, optional The suffix for the error columns. By default "_err". **kwargs Additional keyword arguments to pass to `ax.scatter`. Returns ------- `matplotlib.figure.Figure` The figure containing the plot. """ import matplotlib.pyplot as plt # ------- # # Data # # ------- # if data is None: data = self.data if index is None else self.data.loc[index] xvalue = data[xkey] yvalue = data[ykey] cvalue = None if ckey is None else data[ckey] # ------- # # axis # # ------- # if ax is None: if fig is None: import matplotlib.pyplot as plt fig = plt.figure(figsize=[7,4]) ax = fig.add_subplot(111) else: fig = ax.figure # scatter prop = {**dict(zorder=3), **kwargs} sc = ax.scatter(xvalue, yvalue, c=cvalue, **prop) # errorbar if f"{xkey}{err_suffix}" in data or f"{ykey}{err_suffix}" in data: xerr = data.get(f"{xkey}{err_suffix}") yerr = data.get(f"{ykey}{err_suffix}") zorder = prop.pop("zorder") - 1 _ = ax.errorbar(xvalue, yvalue, xerr=xerr, yerr=yerr, ls="None", marker="None", zorder=zorder, ecolor="0.7") if cvalue is not None and colorbar: fig.colorbar(sc, ax=ax) if bins is not None: from matplotlib.colors import to_rgba binned = pandas.cut(xvalue, bins) # defines the bins # Add them to a copy of the dataframe along with the y-data data_tmp = data[[ykey]].copy() data_tmp["xbins"] = binned # compute the binned mean, std and size (err_mean = std/sqrt(size-1) gbins = data_tmp.groupby("xbins")[ykey].agg(["mean","std", "size"]).reset_index() # get the bin centroid bincentroid = gbins["xbins"].apply(lambda x: x.mid) # and show the bins ax.errorbar(bincentroid.values, gbins["mean"], yerr=gbins["std"]/np.sqrt(gbins["size"]-1), ls="None", marker="s", mfc=to_rgba(bcolor, 0.8), mec=bcolor, zorder=9, ms=7, ecolor=bcolor) return fig
# =============== # # Draw Methods # # =============== #
[docs] def draw(self, size=None, zmax=None, zmin=0, tstart=None, tstop=None, nyears=None, skyarea=None, inplace=False, model=None, verbose=False, set_amplitude=False, **kwargs): """Draw the parameter model (using ``self.model.draw()``). Parameters ---------- size : int, optional Number of target you want to draw. Ignored is `nyears` is not None. By default None. zmax : float, optional Maximum redshift to be simulated. By default None. zmin : int, optional Minimum redshift to be simulated. By default 0. tstart : float, optional Starting time of the simulation. By default None. tstop : float, optional Ending time of the simulation. If `tstart` and `nyears` are both given, `tstop` will be overwritten by `tstart + 365.25 * nyears`. By default None. nyears : float, optional If given, `nyears` will set: - `size`: it will be the number of target expected up to `zmax` in the given number of years. This uses `get_rate(zmax)`. - `tstop`: `tstart + 365.25 * nyears` By default None. skyarea : None, str, geometry, optional Sky area to be considered. - str: 'full' (equivalent to None), 'extra-galactic' - geometry: shapely.Geometry - None: full sky By default None. inplace : bool, optional Sets `self.data` to the newly drawn dataframe. By default False. model : [type], optional [description]. By default None. set_amplitude: bool should the template amplitude be computed. Returns ------- DataFrame The simulated dataframe. """ # # Drawn model # if model is None: drawn_model = self.model # a modelDAG else: from modeldag import ModelDAG current_model_dict = self.model.model drawn_model = ModelDAG( current_model_dict | model, obj=self) # => tstart, tstop format if type(tstart) is str: tstart = time.Time(tstart).mjd elif type(tstart) is time.Time: tstart = tstart.mjd if type(tstop) is str: tstop = time.Time(tstop).mjd elif type(tstop) is time.Time: tstop = tstop.mjd # => nyears and times if nyears is None and (tstart is not None and tstop is not None): nyears = (tstop-tstart)/365.25 if nyears is not None and (tstart is not None and tstop is None): tstop = tstart + nyears*365.25 if nyears is not None and (tstart is None and tstop is not None): tstart = tstop - nyears*365.25 if nyears is None and size is None: raise ValueError(" You must provide either nyears or size") if nyears is not None and size is not None: nyears = None # its job is done. # # Redshift # # zmax # -> get forward entries that have 'zmax' as parameters key_redshift = drawn_model.get_func_with_args("zmax") for zkey in key_redshift: if zmax is not None: kwargs.setdefault(zkey, {}).update({"zmax": zmax}) elif nyears is not None: zmax = self.get_model_parameter(zkey, "zmax", None, model=drawn_model) # zmin # -> get forward entries that have 'zmin' as parameters key_redshift = drawn_model.get_func_with_args("zmin") for zkey in key_redshift: # note: Why condition "on redshift" ? if zmin is not None and "redshift" in self.model.model: kwargs.setdefault(zkey, {}).update({"zmin": zmin}) elif nyears is not None: zmin = self.get_model_parameter(zkey, "zmin", None, model=drawn_model) if tstop is not None: if type( tstop ) is str: tstop = time.Time(tstop).mjd kwargs.setdefault("t0", {}).update({"high": tstop}) # # time range # if tstart is not None: if type( tstart ) is str: tstart = time.Time(tstart).mjd kwargs.setdefault("t0",{}).update({"low": tstart}) if tstop is None and nyears is None: # do 1 year by default kwargs.setdefault("t0",{}).update({"high": tstart+365.25}) # tstart is None, then what ? elif tstop is not None and nyears is not None: tstart = tstop - 365.25*nyears # fixed later elif nyears is not None: tstart = self.get_model_parameter("t0", "low", None, model=drawn_model) # # Sky area # skyarea = parse_skyarea(skyarea) # shapely.geometry or skyarea if skyarea is not None: param_affected = drawn_model.get_func_with_args("skyarea") if "radec" in drawn_model.model.keys() and "radec" not in param_affected: warnings.warn("radec in model, skyarea given, but the radec func does not accept skyarea.") if len(param_affected) ==0: warnings.warn("skyarea given but no model have skyarea as parameters. This is ignored.") for k in param_affected: kwargs.setdefault(k, {}).update({"skyarea": skyarea}) # # Size # # skyarea affect get_rate if nyears is not None: from .rates import get_ntargets from ..tools.projection import radecmodel_to_skysurface if "radec" in drawn_model.model.keys(): # radec model radec_model = deepcopy(drawn_model.model["radec"]) # as updated by requested kwargs radec_model["kwargs"] |= kwargs.get("radec", {}) f_area = radecmodel_to_skysurface( radec_model ) else: if skyarea is not None: warnings.warn("skyarea given, but no radec not in model | *nyears* will not account for skyarea.") f_area = 1 # redefine timing given nyears kwargs.setdefault("t0", {}).update({"low": tstart, "high": tstart + 365.25*nyears}) if zmin is None: zmin = 0 # get_ntargets is full sky. f_area corrects that. ntarget_per_year = get_ntargets(zmax, rate=self.rate, zmin=zmin, zstep=1e-4, astype="float", cosmology=self.cosmology) size = int(ntarget_per_year * nyears * f_area) # actually draw the data data = drawn_model.draw(size=size, **kwargs) # patch the missing `amplitude` back to .data if set_amplitude: amplitudes = np.zeros( len(data) ) for i in tqdm(range(size)) if verbose else range(size): sncosmo_model_i = self.get_template(index=i, as_model=True, data=data, set_magabs=True) amplitude = sncosmo_model_i.get(self.amplitude_name) amplitudes[i] = amplitude data[self.amplitude_name] = amplitudes # shall data be attached to the object? if inplace: # lower precision data = data.astype( {k: str(v).replace("64","32") for k, v in data.dtypes.to_dict().items()}) self.set_data(data) # since this is inplace, let's update stored model kwargs self.update_model_parameter(**kwargs) return data
# ============== # # Properties # # ============== # @classproperty def amplitude_name(self): """The name of the amplitude parameter.""" if not hasattr(self, "_amplitude_name"): self._amplitude_name = self._AMPLITUDE_NAME return self._amplitude_name @classproperty def peak_absmag_band(self): """The band used to set the peak absolute magnitude.""" if not hasattr(self, "_peak_absmag_band"): self._peak_absmag_band = self._PEAK_ABSMAG_BAND return self._peak_absmag_band @classproperty def magsys(self): """The magnitude system used for the peak absolute magnitude.""" if not hasattr(self, "_magsys"): self._magsys = self._MAGSYS return self._magsys @classproperty def kind(self): """The kind of target.""" if not hasattr(self,"_kind"): self._kind = self._KIND return self._kind @property def cosmology(self): """The cosmology to use.""" if not hasattr(self, "_cosmology") or self._cosmology is None: self.set_cosmology( self._COSMOLOGY ) return self._cosmology # model @property def model(self): """The model of the target.""" if not hasattr(self, "_model") or self._model is None: from copy import deepcopy self.set_model( deepcopy(self._MODEL) if self._MODEL is not None else {} ) return self._model @property def data(self): """The data of the target.""" if not hasattr(self,"_data"): return None return self._data # template @property def template(self): """The template of the target.""" if not hasattr(self,"_template") or self._template is None: self.set_template(self._TEMPLATE) return self._template @property def template_source(self): """The source of the template.""" return self.template.source @property def template_parameters(self): """The parameters of the template.""" return self.template.parameters @property def template_effect_parameters(self): """The effect parameters of the template.""" return self.template.effect_parameters
[docs] class Transient( Target ): """ A transient target. This class inherits from `Target` and adds a rate parameter. Parameters ---------- _RATE : float, optional The rate of the transient, by default None """ # - Transient _RATE = None # ============== # # Methods # # ============== # # Rates
[docs] def set_rate(self, float_or_func): """Set the transient rate. Parameters ---------- float_or_func : float or callable If a float is given, it is assumed to be the number of targets per Gpc3. If a callable is given, it is supposed to be a function of z that returns the volumetric rate as a function of wavelength. """ if callable(float_or_func): self._rate = float_or_func else: self._rate = float(float_or_func)
[docs] def draw_redshift(self, zmax, zmin=0, zstep=1e-4, size=None, rate=None, **kwargs): """Draw redshift based on the rate (see `get_rate()`). Parameters ---------- zmax : float Maximum redshift. zmin : float, optional Minimum redshift. By default 0. zstep : float, optional Redshift step. By default 1e-4. size : int, optional Number of redshifts to draw. By default None. rate : float, callable, optional The transient rate. If None, `self.rate` is used. By default None. If a float is given, it is assumed to be the number of targets per Gpc3. If a callable is given, it is supposed to be a function of z that returns the volumetric rate as a function of wavelength. **kwargs Additional keyword arguments to pass to ``draw_redshift``. Returns ------- array The drawn redshifts. """ from .rates import draw_redshift if rate is None: rate = self.rate return draw_redshift(size=size, rate=rate, zmax=zmax, zmin=zmin, zstep=zstep, cosmology=self.cosmology, **kwargs)
# ------- # # GETTER # # ------- #
[docs] def get_rate(self, z, rate=None, **kwargs): """Get the number of target (per year) up to the given redshift. Parameters ---------- z : float Redshift. rate : float, callable, optional If None, `self.rate` is used. If a float is given, it is assumed to be the number of targets per Gpc3. If a callable is given, it is supposed to be a function of z that returns the volumetric rate as a function of wavelength. **kwargs Goes to the rate function (if a function, not a number). Returns ------- int The number of targets. See Also -------- ``draw_redshift``: draws redshifts from rate distribution. """ from .rates import get_rate if rate is None: rate = self.rate return get_rate(z, rate=rate, **kwargs)
[docs] def get_lightcurve(self, band, times, sncosmo_model=None, index=None, in_mag=False, zp=25, zpsys="ab", **kwargs): """Get the transient lightcurve. Parameters ---------- band : str, list Name of the band (should be known by sncosmo) or list of. times : float, list Time of the observations. sncosmo_model : sncosmo.Model, optional The sncosmo model to use. By default None. index : int, optional The index of the target. By default None. in_mag : bool, optional If True, the lightcurve is returned in magnitude. By default False. zp : float, optional The zeropoint to use. By default 25. zpsys : str, optional The zeropoint system to use. By default "ab". **kwargs Additional keyword arguments to pass to `self.template.get_lightcurve`. Returns ------- ndarray 1 lightcurve per band. """ # get the template if index is not None: if sncosmo_model is None: sncosmo_model = self.get_template(index=index, as_model=True, set_magabs=True) else: prop = self.get_template_parameters(index).to_dict() kwargs = prop | kwargs return self.template.get_lightcurve(band, times, sncosmo_model=sncosmo_model, in_mag=in_mag, zp=zp, zpsys=zpsys, **kwargs)
[docs] def get_spectrum(self, time, lbdas, as_phase=True, sncosmo_model=None, index=None, **kwargs): """Get the transient spectrum at the given phase (time). Parameters ---------- time : float or list_like Time(s) in days. If `None` (default), the times corresponding to the native phases of the model are used. lbdas : float or list_like Wavelength(s) in Angstroms. If `None` (default), the native wavelengths of the model are used. as_phase : bool, optional Is the given time a phase? (`as_phase=True`) or a actual time (False). By default True. sncosmo_model : [type], optional [description]. By default None. index : [type], optional [description]. By default None. Returns ------- flux : float or `numpy.ndarray` Spectral flux density values in ergs / s / cm^2 / Angstrom. See Also -------- ``get_lightcurve``: get the transient lightcurve """ prop = {} # get the template if index is not None: if sncosmo_model is None: sncosmo_model = self.get_template(index=index, as_model=True, set_magabs=True) else: prop = self.get_template_parameters(index).to_dict() kwargs = prop | kwargs return self.template.get_spectrum(time, lbdas, sncosmo_model=sncosmo_model, as_phase=as_phase, **kwargs)
# ------------ # # Show LC # # ------------ #
[docs] def show_lightcurve(self, band, index, params=None, ax=None, fig=None, colors=None, phase_range=None, npoints=500, zp=25, zpsys="ab", format_time=True, t0_format="mjd", in_mag=False, invert_mag=True, **kwargs): """Show the lightcurve. Parameters ---------- band : str The band to show. index : int The index of the target. params : dict, optional Parameters to pass to `get_target_template`. By default None. ax : `matplotlib.axes.Axes`, optional The axes to show the lightcurve on. By default None. fig : `matplotlib.figure.Figure`, optional The figure to show the lightcurve on. By default None. colors : list, optional The colors to use for the lightcurve. By default None. phase_range : list, optional The phase range to show. By default None. npoints : int, optional The number of points to show. By default 500. zp : float, optional The zero point to use. By default 25. zpsys : str, optional The zero point system to use. By default "ab". format_time : bool, optional Whether to format the time. By default True. t0_format : str, optional The format of the time. By default "mjd". in_mag : bool, optional Whether to show the magnitude. By default False. invert_mag : bool, optional Whether to invert the magnitude. By default True. **kwargs Additional keyword arguments to pass to ``template.show_lightcurve``. Returns ------- `matplotlib.figure.Figure` The figure containing the plot. """ # get the template if params is None: params = {} template = self.get_target_template(index, set_magabs=True, **params) return template.show_lightcurve(band, params=params, ax=ax, fig=fig, colors=colors, phase_range=phase_range, npoints=npoints, zp=zp, zpsys=zpsys, format_time=format_time, t0_format=t0_format, in_mag=in_mag, invert_mag=invert_mag, **kwargs)
# ============== # # Properties # # ============== # # Rate @property def rate(self): """Rate of the transient. If float, it is assumed to be the volumetric rate in Gpc-3 / yr-1. If a callable is given, it is supposed to be a function of z that returns the volumetric rate as a function of wavelength. """ if not hasattr(self,"_rate"): self.set_rate( self._RATE ) # default return self._rate