Source code for skysurvey.survey.healpix

"""
This module provides HEALPix-based functions for handling survey observations on the sky.
"""

from .core import BaseSurvey

import pandas
import numpy as np
import healpy as hp
import warnings


[docs] def get_ipix_in_range(nside, ra_range=None, dec_range=None, in_rad=False): """Get the healpix pixel index (ipix) that are with a given ra and dec range. Parameters ---------- nside : int Healpix nside. ra_range, dec_range: 2d-array, None, optional Min and max to define a coordinate range to be considered. None means no limit. in_rad: bool, optional Are the ra and dec coordinates in radian (True) or degree (False). Returns ------- list List of healpix pixel index ipix. """ npix = hp.nside2npix(nside) pixs = np.arange(npix) # list of all healpix pixels if ra_range is None and dec_range is None: return pixs ras,decs = hp.pix2ang(nside, pixs) ras = (np.pi/2-ras) # only dec range if ra_range is None: if not in_rad: dec_range = np.multiply(dec_range, np.pi/180) # works if list given return pixs[(decs>=dec_range[0]) & (decs<=dec_range[1])] # only ra range if dec_range is None: if not in_rad: ra_range = np.multiply(ra_range, np.pi/180) # works if list given return pixs[(ras>=ra_range[0]) & (ras<=ra_range[1])] # both if not in_rad: ra_range = np.multiply(ra_range, np.pi/180) # works if list given dec_range = np.multiply(dec_range, np.pi/180) # works if list given return pixs[(ras>=ra_range[0]) & (ras<=ra_range[1]) & (decs>=dec_range[0]) & (decs<=dec_range[1])]
# ================== # # # # Healpix # # # # ================== #
[docs] class HealpixSurvey( BaseSurvey ): """ The `HealpixSurvey` class. Parameters ---------- nside : int healpix nside parameter data: `pandas.DataFrame` observing data. See also -------- :func:`from_data`: loads the instance given observing data. :func:`from_random`: generate random observing data and loads the instance. """ def __init__(self, nside, data=None): """ Initialize the HealpixSurvey class.""" super().__init__(data) self._nside = nside
[docs] @classmethod def from_data(cls, nside, data): """ Load an instance given survey data and healpix size (nside). Parameters ---------- nside : int healpix nside parameter data: pandas.DataFrame observing data. Returns ------- instance See also -------- :func:`from_random`: generate random observing data and loads the instance. """ return cls(nside=nside, data=data)
[docs] @classmethod def from_random(cls, nside, size, bands, mjd_range, skynoise_range, ra_range=None, dec_range=None, rng=None, **kwargs): """ Load an instance with random observing data. Parameters ---------- nside : int healpix nside parameter size: int number of observations to draw bands: list of str list of bands that should be drawn. mjd_range: list or array min and max mjd for the random drawing. skynoise_range: list or array min and max skynoise for the random drawing. ra_range, dec_range: 2d-array, None min and max to define a coordinate range to be considered. None means no limit. rng : None, int, (Bit)Generator, optional seed for the random number generator. (doc adapted from numpy's `np.random.default_rng` docstring. See that documentation for details.) If None, an unpredictable entropy will be pulled from the OS. If an `int`, (>0), it will set the initial `BitGenerator` state. If a `(Bit)Generator`, it will be returned as a `Generator` unaltered. **kwargs: goes to the ``draw_random()`` method Returns ------- `HealpixSurvey` """ this = cls(nside=nside) this.draw_random(size, bands, mjd_range, skynoise_range, ra_range=ra_range, dec_range=dec_range, inplace=True, rng=rng, **kwargs) return this
[docs] @classmethod def from_pointings(cls, nside, data, footprint=None, moc=None, rakey="ra", deckey="dec", backend="polars", use_pyarrow_extension_array=False, **kwargs): """ Loads an instance given observing poitings of a survey. This loads an ``polygon.PolygonSurvey`` using from_pointing and converts that into an healpix using the ``to_healpix()`` method. Parameters ---------- nside : int healpix nside parameter data: `pandas.DataFrame` or dict observing data, must contain the rakey and deckey columns. footprint: `shapely.geometry` footprint in the sky of the observing camera moc: `mocpy.MOC` MOC representation of the observing camera rakey: str name of the R.A. column (in deg) deckey: str name of the Declination column (in deg) backend: str which backend to use to merge the data (speed issue): - `polars` (fastest): requires polars installed -> converted to pandas at the end - `pandas` (classic): the normal way - `dask` (lazy): as persisted dask.dataframe is returned use_pyarrow_extension_array: bool = ignored in backend != 'polars' or polars_to_pandas is not True = should the pandas dataframe be based on numpy array (slow to load but faster then) or based on pyarrow array (like in polars) ; faster but numpy.asarray will be used by pandas when need (which will then slow things down). **kwargs: goes to ``polygon.PolygonSurvey.from_pointings`` Returns ------- instance """ from .polygon import PolygonSurvey # Create a generic polygon survey polysurvey = PolygonSurvey.from_pointings(data, footprint=footprint, moc=moc, rakey=rakey, deckey=deckey, **kwargs) # convert it to healpix return polysurvey.to_healpix(nside, backend=backend, pass_data=True, polars_to_pandas=True, use_pyarrow_extension_array=use_pyarrow_extension_array)
# ============== # # Methods # # ============== #
[docs] def get_field_area(self): """ Area (deg**2) of a healpy pixel. """ return hp.nside2pixarea(self.nside, degrees = True)
[docs] def get_observed_area(self, min_obs=1): """ Get the observed area (in deg**2). A healpix is consider observed if present more than min_obs time. Parameters ---------- min_obs: int minimum number of observations to consider a field as observed. Returns ------- float """ if min_obs <=1: # 0 or 1 the same nfields = self.data["fieldid"].nunique() else: nobs = self.data["fieldid"].value_counts() nfields = len(nobs[nobs>min_obs]) return self.get_field_area() * nfields
[docs] def get_polygons(self, observed_fields=False, as_vertices=False, origin=180): """Get a list of polygons. Parameters ---------- observed_fields: bool, optional Should this be limited to observed fields? as_vertices: bool, optional Should this returns a list of shapely.geometry.Polygon (False) or its vertices (shape N [fields], 2 [ra, dec], 4[corners]). origin: float, optional Origin of the R.A. coordinate (center of image). Returns ------- list (as_vertices) - list of polygon - list of vertices """ if observed_fields: fieldid = self.data[self.fieldids.name].unique() else: fieldid = self.fieldids corners = hp.boundaries(nside=self.nside, pix=fieldid) corners = np.moveaxis(corners,1,2) ang = np.asarray([hp.vec2ang(corners_, lonlat=True) for corners_ in corners]) ang[:,0,:] = (origin-ang[:,0,:])%360 # set back origin if as_vertices: return ang from shapely import geometry polygons = [geometry.Polygon(ang_.T) for ang_ in ang] return polygons
[docs] def get_skyarea(self, as_multipolygon=True, buffer=0.01): """Get multipolygon (or list) of field geometries. Parameters ---------- as_multipolygon: bool, optional If True, returns a multipolygon. Otherwise, returns unary_union of polygons buffer: float, None, optional buffer (in deg) around the polygon. This helps joining edges and reduces the number of isolated sky-pixels which may artificially slow down computation Returns ------- `shapely.geometry.MultiPolygon` or list """ from shapely import ops, geometry ps = self.get_polygons(observed_fields=True, as_vertices=False) if as_multipolygon: skyarea = geometry.MultiPolygon(ps) else: skyarea = ops.unary_union(ps) if buffer is not None: skyarea = skyarea.buffer(buffer) return skyarea
# ------- # # core # # ------- #
[docs] def radec_to_fieldid(self, radec, origin=180, observed_fields=False): """Get the fieldid associated to the given coordinates. Parameters ---------- radec: `pandas.DataFrame` or 2d array Coordinates in degree. origin: float, optional Value of the central R.A. observed_fields: bool, optional Should this be limited to fields actually observed? This is ignored is self.data is None. Returns ------- `pandas.DataFrame` """ if type(radec) is pandas.DataFrame: ra = np.asarray(radec["ra"].values, dtype="float") dec = np.asarray(radec["dec"].values, dtype="float") index = radec.index.copy() if index.name is None: index.name = "index_radec" else: ra, dec = np.atleast_1d(radec) ra = np.atleast_1d(ra) dec = np.atleast_1d(dec) index = pandas.Index(np.arange( len(ra) ), name="index_radec") fields = hp.ang2pix(self.nside, (90 - dec) * np.pi/180, (origin-ra) * np.pi/180) df = pandas.DataFrame(fields, columns = [self.fieldids.name], index=index) if observed_fields: observed_fields = self.data[self.fieldids.name].unique() df = df[df[self.fieldids.name].isin(observed_fields)] return df
[docs] def get_field_centroid(self, origin=180): """Get the centroid of the fields. Parameters ---------- origin: float, optional Origin of the ra coordinates. Returns ------- (array, array) ra, dec """ dec, ra = np.asarray(hp.pix2ang(self.nside, self.fieldids))*180/np.pi dec = 90-dec ra = (origin-ra)%360 return ra, dec
# ------- # # draw # # ------- #
[docs] def draw_random(self, size, bands, mjd_range, skynoise_range, gain_range=1, zp_range=25, ra_range=None, dec_range=None, inplace=False, nside=None, rng=None, **kwargs): """ Draw observations. Parameters ---------- size: int number of observations to draw bands: list of str list of bands that should be drawn. ra_range, dec_range: 2d-array, None min and max to define a coordinate range to be considered. None means no limit. skynoise_range, gain_range, zp_range: 2d-array, float, int range to be considered. If float or int, this value will always be used. otherwise, uniform distribution between the range assumed. inplace: bool shall this method replace the current self.data or return a new instance of the class with the generated observing data. nside: int = ignore if inplace is set to True = provide a new healpix nside parameters. Returns ------- class instance or None see the inplace option. See also -------- :func:`from_random`: generate random observing data and loads the instance. set_data: set the observing data to the instance. """ if nside is None: # don't change nside nside = self.nside elif inplace: # change nside warnings.warn("Cannot change nside with inplace=True, a copy (inplace=False) is returned.") inplace = False data = self._draw_random(nside, size, bands, mjd_range, skynoise_range, ra_range=ra_range, dec_range=dec_range, gain_range=gain_range, zp_range=zp_range, rng=rng, **kwargs) if not inplace: return self.__class__.from_data(nside=nside, data=data) self.set_data(data)
# ----------- # # PLOTTER # # ----------- #
[docs] def show(self, stat='size', column=None, title=None, data=None, vmin=None, vmax=None, seed=None, **kwargs): """ Shows the sky coverage using `healpy.mollview`. Parameters ---------- stat: str element to be passed to groupby.agg() could be e.g.: 'mean', 'std' etc. If stat = 'size', this returns data["fieldid"].value_counts() (slightly faster than groupby("fieldid").size()). columns: str column of the dataframe the stat should be applied to. = ignored if stat='size' = title: str title of the healpy.mollview plot. (`healpy.mollview` option) data: `pandas.DataFrame`, None data you want this to be applied to. if None, a copy of self.data is used. = leave to None if unsure = Returns ------- None See also -------- :func:`get_fieldstat`: get observing statistics for the fields """ if data is None: if self.data is None: rng = np.random.default_rng(seed=seed) data = rng.uniform(size=self.nfields) else: data = self.get_fieldstat(stat=stat, columns=column, incl_zeros=True, fillna=np.nan, data=data) else: if type(data) is dict: data = pandas.Series(data) if type(data) is pandas.Series: data = data.reindex(self.fieldids).values if vmin is not None: data[data<vmin] = vmin if vmax is not None: data[data>vmax] = vmax return hp.mollview(data, title=title, **kwargs)
# ============== # # Static Methods # # ============== # @staticmethod def _draw_random(nside, size, bands, mjd_range, skynoise_range, gain_range=1, zp_range=[27,30], ra_range=None, dec_range=None, rng=None): """Draw observations | internal. Parameters ---------- nside : int Healpix nside parameter. size: int Number of observations to draw. bands: list of str List of bands that should be drawn. mjd_range: list or array Min and max mjd for the random drawing. skynoise_range: list or array Min and max skynoise for the random drawing. gain_range: list or array, optional Min and max gain for the random drawing. zp_range: list or array, optional Min and max zp for the random drawing. ra_range, dec_range: 2d-array, None, optional Min and max to define a coordinate range to be considered. None means no limit. rng : None, int, (Bit)Generator, optional seed for the random number generator. (doc adapted from numpy's `np.random.default_rng` docstring. See that documentation for details.) If None, an unpredictable entropy will be pulled from the OS. If an ``int``, (>0), it will set the initial `BitGenerator` state. If a `(Bit)Generator`, it will be returned as a `Generator` unaltered. Returns ------- `pandas.DataFrame` A DataFrame with the drawn observations. """ rng = np.random.default_rng(rng) # np.resize(1, 2) -> [1,1] mjd = rng.uniform(*np.resize(mjd_range,2), size=size) band = rng.choice(bands, size=size) skynoise = rng.uniform(*np.resize(skynoise_range, 2), size=size) gain = rng.uniform(*np.resize(gain_range, 2), size=size) zp = rng.uniform(*np.resize(zp_range, 2), size=size) # = coords # no radec limit if ra_range is None and dec_range is None: npix = hp.nside2npix(nside) ipix = rng.uniform(0, npix, size=size) else: ipix_ok = get_ipix_in_range(nside, ra_range=ra_range, dec_range=dec_range) ipix = rng.choice(ipix_ok, size=size) # data sorted by mjd data = pandas.DataFrame(zip(mjd, band, skynoise, gain, zp, ipix), columns=["mjd","band","skynoise", "gain", "zp","fieldid"] ).sort_values("mjd" ).reset_index(drop=False) # don't need to know the creation order return data # ============== # # Properties # # ============== # @property def nside(self): """ Healpix nside parameter (defines the 'fields' size and number). """ return self._nside @property def nfields(self): """ Number of fields (shortcut to npix). """ return self.npix @property def npix(self): """ Number of healpix pixels. """ if not hasattr(self, "_npix") or self._npix is None: self._npix = hp.nside2npix(self.nside) return self._npix @property def fieldids(self): """ Id of the individual fields. """ fieldids = np.arange( self.npix ) # use pandas.index for self consistency with polygon.survey return pandas.Index(fieldids, name="fieldid")
[docs] def metadata(self): """Pandas Series containing meta data information. Returns ------- pandas.Series """ meta = super().metadata meta["nside"] = self.nside return meta