Source code for skysurvey.tools.utils

"""
This module provides utility functions for drawing sky coordinates, and applying observational noise.
"""

import pandas
import healpy as hp
import numpy as np

from astropy.cosmology import Planck15
from scipy.stats import rv_discrete
from scipy.interpolate import InterpolatedUnivariateSpline as Spline1d
from shapely import geometry

try:
    from ligo.skymap.bayestar import rasterize 
    from ligo.skymap.io import read_sky_map
    import ligo.skymap.distance as ligodist
    LIGO_SKYMAP_IMPORTED = True
except ImportError:
    LIGO_SKYMAP_IMPORTED = False

[docs] def get_skynoise_from_maglimit(maglim, zp=30): """ Get the noise associated to the 5-sigma limit magnitude. Parameters ---------- maglim : float 5-sigma limiting magnitude. zp : float, optional Zero point. Default is 30. Returns ------- float Sky noise. """ flux_5sigma = 10**(-0.4*(maglim - zp)) skynoise = flux_5sigma/5. return skynoise
# ================= # # Noise Generator # # ================= #
[docs] def build_covariance(as_dataframe=False, **kwargs): """ Convert kwargs into covariance matrix. Parameters ---------- as_dataframe: bool should this return a np.array (False) or build a dataframe from it ? (True) kwargs: the format is the following: {'{key1}': err, '{key2}': err, 'cov_{key1}{key2}': covariance, # or 'cov_{key2}{key1}' both are looked for } Returns ------- array or dataframe, param_names See `as_dataframe`. """ param_names, errors = np.stack([[key,val] for key, val in kwargs.items() if not key.startswith("cov")]).T cov_diag = np.diag(np.asarray(errors, dtype="float")**2) # not sure I need this for loop though... for i, ikey in enumerate(param_names): for j, jkey in enumerate(param_names): if j==i: continue cov_diag[i,j] = kwargs.get(f"cov_{ikey}{jkey}", kwargs.get(f"cov_{jkey}{ikey}", 0)) if as_dataframe: cov_diag = pandas.DataFrame(cov_diag, index=param_names, columns=param_names) return cov_diag, param_names
[docs] def apply_gaussian_noise(target_or_data, seed=None, **kwargs): """ Apply random gaussian noise to the target. Pass the entries error and covariance as kwargs following this format: {'{key1}': err, '{key2}': err, 'cov_{key1}{key2}': covariance, # or 'cov_{key2}{key1}' both are looked for } Parameters ---------- target_or_data: ``skysurvey.Target`` or `pandas.DataFrame` a target (of child of) or directly it's `target.data` This will affect what is returned. Returns ------- target or dataframe According to input. """ import pandas if type(target_or_data) is pandas.DataFrame: data = target_or_data target = None else: target = target_or_data data = target.data # create the covariance matrix covmatrix, names = build_covariance(**kwargs) # create the noise rng = np.random.default_rng(seed=seed) noise = rng.multivariate_normal( np.zeros( len(names)), covmatrix, size=( len(target.data), ) ) # create the noisy data form datanoisy = data.copy() datanoisy[[f"{k}_true" for k in names]] = datanoisy[names] # set truth datanoisy[names] += pandas.DataFrame(noise, index=data.index, columns=names) # affect data # store the input noise information info = pandas.DataFrame(data=np.atleast_2d(list(kwargs.values())), columns=list(kwargs.keys())) info= info.reindex(data.index, method="ffill") info.rename({k:f"{k}_err" for k in names}, axis=1, inplace=True) # to finally get the new data. newdata = datanoisy.merge(info, left_index=True, right_index=True) # returns a target or a dataframe according to input if target is not None: return target.__class__.from_data(newdata) return newdata
[docs] def random_radec(size=None, skyarea=None, ra_range=[0, 360], dec_range=[-90,90], rng=None): """ Draw the sky positions. Parameters ---------- size: int, None number of draw ra_range: 2d-array = ignored if skyarea given = right-accension boundaries (min, max) dec_range: 2d-array = ignored if skyarea given = declination boundaries skyarea: `shapely.geometry.(Multi)Polyon` Area to consider. Default is None. If skyarea is given, ra_range, dec_range is ignored. rng : None, int, `(Bit)Generator`, optional seed for the random number generator. (doc adapted from numpy's `np.random.default_rng` docstring. See that documentation for details.) If None, an unpredictable entropy will be pulled from the OS. If an ``int``, (>0), it will set the initial `BitGenerator` state. If a `(Bit)Generator`, it will be returned as a `Generator` unaltered. Returns ------- 2d-array list of ra, list of dec. """ rng = np.random.default_rng(rng) # => MultiPolygon if type(skyarea) is geometry.MultiPolygon: radecs = [random_radec(size=size, ra_range=ra_range, dec_range=dec_range, skyarea=skyarea_, rng=rng) for skyarea_ in skyarea.geoms] # at this stage they are already in. ra = np.concatenate([radec_[0] for radec_ in radecs]) dec = np.concatenate([radec_[1] for radec_ in radecs]) # never twice the same indexes = rng.choice(np.arange(len(ra)), size=size, replace=False) ra, dec = ra[indexes], dec[indexes] # limit to exact input request return ra, dec # => Polygon or no skyarea if skyarea is not None: # change the ra_range default_skyrea = geometry.Polygon([ [ra_range[0], dec_range[0]], [ra_range[0], dec_range[1]], [ra_range[1], dec_range[1]], [ra_range[1], dec_range[0]]]) skyarea = default_skyrea.intersection(skyarea) size_to_draw = size*4 ramin, decmin, ramax, decmax = skyarea.bounds ra_range = [ramin, ramax] dec_range = [decmin, decmax] else: size_to_draw = size # => Draw RA, Dec dec_sin_range = np.sin(np.asarray(dec_range)*np.pi/180) ra = rng.uniform(*ra_range, size=size_to_draw) dec = np.arcsin( rng.uniform(*dec_sin_range, size=size_to_draw) ) / (np.pi/180) if skyarea is not None: from shapely.vectorized import contains flag = contains(skyarea, ra, dec) ra, dec = ra[flag], dec[flag] # those in the polygon indexes = rng.choice(np.arange( len(ra) ), size=size, replace=False) # never twice the same ra, dec = ra[indexes], dec[indexes] return ra, dec
[docs] def surface_of_skyarea(skyarea, incl_projection=True): """ Convert input skyarea into deg**2. Parameters ---------- skyarea: `shapely.geometry.(Multi)Polyon` Area to consider. incl_projection : bool, optional If True, correct for spherical sky projection before computing the area, returning the true solid angle. If False, return the raw area in flat Ra/Dec space. Default is True. Returns ------- float Area in deg**2 of the input skyarea, with or without sky projection correction. """ if type(skyarea) is str and skyarea != "full": return None if "shapely" in str(type(skyarea)): if not incl_projection: return skyarea.area else: # create a new skyarea deformed. ra, dec = np.asarray(skyarea.exterior.xy) dec = np.sin(dec / 180*np.pi) * 180/np.pi # keeps degree return geometry.Polygon( np.vstack([ra, dec]).T).area return skyarea
[docs] def parse_skyarea(skyarea): """ Pass through the skyarea as a shapely geometry. Parameters ---------- skyarea: `shapely.geometry.(Multi)Polyon` Area to consider. Returns ------- `shapely.geometry.(Multi)Polygon` The input skyarea unchanged. """ if type(skyarea) is str and skyarea != "full": return None return skyarea
[docs] def random_radecz_skymap(size=None,skymap={}, filename=None, do_3d=True, nside=512, ra_range=None,dec_range=None, zcmb_range=None, cosmo=Planck15, batch_size=1000, rng=None): """ Draw random (RA, Dec, redshift) coordinates from a 3D gravitational wave sky map. Parameters ---------- size : int Number of samples to draw. Default is None. skymap : dict or astropy Table, optional Pre-loaded sky map in moc format (as returned by `ligo.skymap.io.read_sky_map`). Ignored if `filename` is provided. filename : str, optional Path to a LIGO/Virgo sky map fits file. If provided, the sky map is loaded from this file. Default is None. do_3d : bool, optional If True, load the sky map with 3D distance information. Default is True. nside : int, optional HEALPix resolution parameter used to rasterize the sky map. Default is 512. ra_range : 2-element array, optional Right ascension range [min, max] in degrees to restrict the draw. Pixels outside this range have their probability set to 0. Default is None. dec_range : 2-element array, optional Declination range [min, max] in degrees to restrict the draw. Pixels outside this range have their probability set to 0. Default is None. zcmb_range : 2-element array, optional Redshift range [zmin, zmax] to restrict the distance sampling. If None, no redshift restriction is applied. Default is None. cosmo : `astropy.cosmology`, optional Cosmology used to convert luminosity distances to redshifts. Default is Planck15. batch_size : int, optional Number of pixels drawn per batch (to avoid memory issues for large `size`). Default is 1000. rng : None, int, or `(Bit)Generator`, optional Seed for the random number generator. If None, an unpredictable entropy will be pulled from the OS. If an int (>0), it sets the initial BitGenerator state. If a (Bit)Generator, it is returned unaltered. Returns ------- ra : array Right ascension of the sampled positions, in degrees. dec : array Declination of the sampled positions, in degrees. zs : array Redshifts of the sampled positions. """ if not LIGO_SKYMAP_IMPORTED: raise ImportError("ligo.skymap could not be imported. Please make sure it is installed.") if filename is not None: if do_3d: skymap = read_sky_map(filename, moc=True, distances=True) if "PROBDENSITY_SAMPLES" in skymap.columns: skymap.remove_columns( [ f"{name}_SAMPLES" for name in [ "PROBDENSITY", "DISTMU", "DISTSIGMA", "DISTNORM", ] ] ) else: skymap = read_sky_map(filename, moc=True, distances=False) skymap_raster = rasterize( skymap, order=hp.nside2order(nside) ) if "DISTMU" in skymap_raster.columns: ( skymap_raster["DISTMEAN"], skymap_raster["DISTSTD"], mom_norm, ) = ligodist.parameters_to_moments( skymap_raster["DISTMU"], skymap_raster["DISTSIGMA"], ) prob = skymap_raster["PROB"] prob[~np.isfinite(skymap_raster["DISTMU"])] = 0. prob[skymap_raster["DISTMU"] < 0.] = 0. prob[prob < 0.] = 0. npix = len(prob) nside = hp.npix2nside(npix) theta, phi = hp.pix2ang(nside, np.arange(npix)) ra_map = np.rad2deg(phi) dec_map = np.rad2deg(0.5*np.pi - theta) if ra_range is not None: idx = np.where((ra_map < ra_range[0]) | (ra_map > ra_range[1]))[0] prob[idx] = 0.0 if dec_range is not None: idx = np.where((dec_map < dec_range[0]) | (dec_map > dec_range[1]))[0] prob[idx] = 0.0 prob = prob / np.sum(prob) idx = np.where(prob<0)[0] distn = rv_discrete(values=(np.arange(npix), prob)) ipix = distn.rvs(size=min(size, batch_size)) while len(ipix) < size: ipix = np.append(ipix, distn.rvs(size=min(size-len(ipix), batch_size))) ra, dec = hp.pix2ang(nside, ipix, lonlat=True) # If no zcmb_range provided set the upper limit to 1e9 Mpc (z >> 1000) if zcmb_range is not None: z_tmp = np.linspace(zcmb_range[0], zcmb_range[1], 1000) dist_range = [cosmo.luminosity_distance(zcmb_range[0]).value, cosmo.luminosity_distance(zcmb_range[1]).value] else: dist_range = [0, 1e9] z_tmp = np.linspace(0, 10, 1000) z_d = Spline1d(cosmo.luminosity_distance(z_tmp).value, z_tmp) dists = -np.ones(size) dists_in_range = np.zeros(size, dtype=bool) rng = np.random.default_rng(rng) while not np.all(dists_in_range): ipix_tmp = ipix[~dists_in_range] dists[~dists_in_range] = (skymap_raster["DISTMEAN"][ipix_tmp] + skymap_raster["DISTSTD"][ipix_tmp] * rng.normal(size=np.sum(~dists_in_range))) dists_in_range = (dists > dist_range[0]) & (dists < dist_range[1]) zs = z_d(dists) return ra, dec, zs