Source code for skysurvey.lightcurves

"""
This module provides functions to create lightcurves by applying a transient model 
to survey observing logs. (used by dataset.realize_lightcurves).
"""


import copy
import pandas
import numpy as np
import sncosmo


[docs] def get_obsdata(template, observations, parameters, zpsys="ab", incl_error=True, discard_bands=False, trim_observations=False, phase_range=None): """ Get observed data using `sncosmo.realize_lcs()`. Parameters ---------- template: `sncosmo.Model` an sncosmo model from which we can draw observations observations: `pandas.DataFrame` Dataframe containing the observing infortation. requested entries: TBD parameters: `pandas.DataFrame` Dataframe containing the target parameters information. These depend on you model. incl_error: bool should the returned flux contain a random gaussian scatter drawn from the flux_err ? If False, lightcurve flux are "perfect". discard_bands: bool If True, if the sncosmo model is not defined in a given observeing band, the observation is discarded altogether, to prevent `sncosmo.realize_lcs()` from crashing. This only works for bands that are too blue for now. Returns ------- MultiIndex DataFrame all the observations for all targets See also -------- ``DataSet.from_targets_and_survey``: generate a DataSet from target and survey's object """ # if missing, we assume to work in a 'zpsys' system. if "zpsys" not in observations: observations["zpsys"] = zpsys dataobs = observations[["band", "mjd", "zp", "zpsys", "gain", "skynoise"]] # should this discard band not covered by sncosmo ? if not discard_bands: # realize lightcurve list_of_observations = realize_lightcurves(dataobs, template, parameters, scatter=incl_error, trim_observations=trim_observations, phase_range=phase_range) return list_of_observations # could be None else: # we create each lightcurves per bands. bands = np.unique(observations['band']) lcs = [] for band in bands: masked_dataobs = dataobs[dataobs['band']==band] z_max = sncosmo.get_bandpass(band).minwave()/template.minwave()-1 masked_parameters = parameters[parameters['z'] < z_max] list_of_observations = realize_lightcurves(masked_dataobs, template, masked_parameters, scatter=incl_error, trim_observations=trim_observations, phase_range=phase_range) if list_of_observations is not None and len(list_of_observations) > 0: lcs.append( list_of_observations ) return pandas.concat(lcs)
def _get_obsdata_(data, **kwargs): """ Internal method to simplify ``get_obsdata`` using single input (for map). Parameters ---------- data: list 3 entries: template: `sncosmo.Model` an sncosmo model from which we can draw observations observations: `pandas.DataFrame` Dataframe containing the observing infortation. requested entries: TBD parameters: `pandas.DataFrame` Dataframe containing the target parameters information. These depend on you model. **kwargs : goes to ``get_obsdata`` Returns ------- MultiIndex DataFrame all the observations for all targets See also -------- ``DataSet.from_targets_and_survey``: generate a DataSet from target and survey's object """ return get_obsdata(*data, **kwargs) # ====================== # # # # Realize Lightcurves # # # # ====================== #
[docs] def realize_lightcurves(observations, model, parameters, trim_observations=False, phase_range=None, scatter=True, rng=None): """Realize data for a set of SNe given a set of observations. Note: adapted from `sncosmo.realize_lcs`, but: - replacing astropy.Table by pandas.DataFrame - removing ability to use aliases. (no time lost in that) - removing thresh (no time lost in that) Parameters ---------- observations : `pandas.DataFrame` Table of observations. Must contain the following column names: ``band``, ``mjd``, ``zp``, ``zpsys``, ``gain``, ``skynoise``. model : `sncosmo.Model` The model to use in the simulation. parameters : `pandas.DataFrame` List of parameters to feed to the model for realizing each light curve. trim_observations : bool, optional If True, only observations with times between ``model.mintime()`` and ``model.maxtime()`` are included in result table for each SN. Default is False. phase_range: list, None, optional If given, only observations within the given rest-frame phase range will be considered. scatter : bool, optional If True, the ``flux`` value of the realized data is calculated by adding a random number drawn from a Normal Distribution with a standard deviation equal to the ``fluxerror`` of the observation to the bandflux value of the observation calculated from model. Default is True. rng : None, int, `(Bit)Generator`, optional seed for the random number generator. (doc adapted from numpy's `np.random.default_rng` docstring. See that documentation for details.) If None, an unpredictable entropy will be pulled from the OS. If an ``int``, (>0), it will set the initial `BitGenerator` state. If a `(Bit)Generator`, it will be returned as a `Generator` unaltered. Returns ------- sne : list of `pandas.DataFrame` Table of realized data for each item in ``params``. Notes ----- ``skynoise`` is the image background contribution to the flux measurement error (in units corresponding to the specified zeropoint and zeropoint system). To get the error on a given measurement, ``skynoise`` is added in quadrature to the photon noise from the source. It is left up to the user to calculate ``skynoise`` as they see fit as the details depend on how photometry is done and possibly how the PSF is is modeled. As a simple example, assuming a Gaussian PSF, and perfect PSF photometry, ``skynoise`` would be ``4 * pi * sigma_PSF * sigma_pixel`` where ``sigma_PSF`` is the standard deviation of the PSF in pixels and ``sigma_pixel`` is the background noise in a single pixel in counts. """ lcs = [] indexes = [] # Copy model so we don't mess up the user's model. model = copy.copy(model) # loops over targets for target_index, param in parameters.iterrows(): # update the model to the current parameters model.set( **param.to_dict() ) # Select times for output that fall within tmin amd tmax of the model if trim_observations: # {min/max}time includes redshift phase dilatation. snobs = observations[ observations['mjd'].between(model.mintime(), model.maxtime()) ] else: snobs = observations if phase_range is not None: # copy made before phase_range_obsframe = np.asarray( phase_range ) * (1 + model.get("z")) + model.get("t0") snobs = observations[ observations['mjd'].between(*phase_range_obsframe) ] # explicitly detect no observations and add an empty table if len(snobs) == 0: continue flux = model.bandflux(snobs['band'], snobs['mjd'], zp=snobs['zp'], zpsys=snobs['zpsys']) fluxerr = np.sqrt(snobs['skynoise']**2 + np.abs(flux) / snobs['gain']) # Scatter fluxes by the fluxerr # np.atleast_1d is necessary here because of an apparent bug in # np.random.normal: when the inputs are both length 1 arrays, # the output is a Python float! if scatter: rng = np.random.default_rng(rng) flux = np.atleast_1d( rng.normal(flux, fluxerr) ) # output data = snobs.merge(pandas.DataFrame({"flux": flux, "fluxerr": fluxerr}, index=snobs.index), left_index=True, right_index=True) lcs.append( data ) indexes.append( target_index ) if len(lcs)==0: return None return pandas.concat(lcs, keys=pandas.Index(indexes, name=parameters.index.name) )