"""
This module defines the `PolygonSurvey` class, a survey type where fields are represented as shapely polygons projected onto the sky.
"""
import pandas
import numpy as np
import geopandas
from shapely import geometry
import warnings
from .core import BaseSurvey
from ..tools.projection import spatialjoin_radec_to_fields, parse_fields, project_to_radec
# ================== #
# #
# Polygon #
# #
# ================== #
[docs]
class PolygonSurvey( BaseSurvey ):
""" The `PolygonSurvey` class.
Parameters
----------
data: `pandas.DataFrame`
observing data.
fields: `geopandas.GeoDataFrame`
field definitions.
_DEFAULT_FIELDS : `geopandas.GeoDataFrame` or None
The default field definitions for this survey type.
Subclasses should override this to provide specific survey grids.
"""
_DEFAULT_FIELDS = None
def __init__(self, data=None, fields=None):
"""
Initialize the PolygonSurvey class."""
if fields is None:
if self._DEFAULT_FIELDS is None:
raise NotImplementedError("No default fields known for this class. No fields given")
fields = self._DEFAULT_FIELDS
self._fields = self._parse_fields(fields)
super().__init__(data)
[docs]
@classmethod
def from_pointings(cls, data, footprint=None, moc=None, rakey="ra", deckey="dec"):
"""
Load an instance from pointings.
Parameters
----------
data: `pandas.DataFrame` or dict
observing data, must contain the rakey and deckey columns.
footprint: `shapely.geometry`
footprint in the sky of the observing camera
moc: `mocpy.MOC`
MOC representation of the observing camera
rakey: str
name of the R.A. column (in deg)
deckey: str
name of the Declination column (in deg)
Returns
-------
`PolygonSurvey`
"""
if type(data) is dict:
data = pandas.DataFrame.from_dict(data).copy()
if footprint is not None:
field_pointings = project_to_radec(footprint, ra=data[rakey], dec=data[deckey])
elif moc is not None:
skycoords = moc.get_boundaries()
ra, dec = [], []
for skycoord in skycoords:
ra.append(skycoord.ra.deg)
dec.append(skycoord.dec.deg)
ra = np.concatenate(ra)
dec = np.concatenate(dec)
footprint = np.vstack((ra, dec))
pointings = project_to_radec(footprint, ra=data[rakey], dec=data[deckey])
field_pointings = [geometry.Polygon(p) for p in pointings]
fields = geopandas.GeoDataFrame(geometry=field_pointings, index=data.index.copy(),)
fields.index.name = "fieldid"
data["fieldid"] = fields.index.copy()
return cls(data=data, fields=fields)
[docs]
@classmethod
def from_random(cls, size,
bands, mjd_range, skynoise_range,
fields=None, **kwargs):
"""
Load an instance with random observing data.
Parameters
----------
size: int
number of observations to draw
bands: list of str
list of bands that should be drawn.
mjd_range: list or array
min and max mjd for the random drawing.
skynoise_range: list or array
min and max skynoise for the random drawing.
fields: `geopandas.GeoDataFrame`
field definitions.
**kwargs goes to the ``draw_random()`` method
Returns
-------
`PolygonSurvey`
"""
this = cls(fields=fields)
this.draw_random(size, bands,
mjd_range, skynoise_range,
inplace=True, **kwargs)
return this
# ============== #
# Methods #
# ============== #
[docs]
def get_fields(self, observed=True):
"""
Get the fields.
Parameters
----------
observed: bool
if True, return only the observed fields.
Returns
-------
`geopandas.GeoDataFrame`
"""
if observed:
if len(self.fieldids.names)==1: # Index
observed_fields = self.data[ self.fieldids.name ].unique()
fields = self.fields.loc[observed_fields].copy()
else: # MultiIndex
observed_fields = self.data[self.fieldids.names].drop_duplicates(ignore_index=True)
observed_fields = observed_fields.set_index(self.fieldids.names)
fields = self.fields.loc[(observed_fields.index.levels[0], observed_fields.index.levels[1]),]
else:
fields = self.fields.copy()
return fields
[docs]
def get_observed_area(self, nside=200):
"""Measure the observed area.
This uses healpy for accuracy.
Parameters
----------
nside: int, optional
Healpix nside.
Returns
-------
float
Area in deg2.
"""
hsurvey = self.to_healpix(nside=nside, pass_data=False)
return hsurvey.get_observed_area()# min_obs=min_obs) # not correct with pass_data=False yet.
[docs]
def to_healpix(self, nside, pass_data=True, backend="polars",
polars_to_pandas=True,
use_pyarrow_extension_array=False):
"""Convert the current polygon survey into a healpix survey.
Parameters
----------
nside: int
Healpix nside.
pass_data: bool, optional
Should the returned survey have the full data of just the fieldid matching?
backend: str, optional
Which backend to use to merge the data (speed issue):
- `polars` (fastest): requires polars installed -> converted to pandas at the end
- `pandas` (classic): the normal way
- `dask` (lazy): as persisted dask.dataframe is returned
polars_to_pandas: bool, optional
= ignored if backend != 'polars' =
Should the dataframe be converted into a pandas.DataFrame or say a polars.DataFrame
(using the to_pandas() option).
use_pyarrow_extension_array: bool, optional
= ignored in backend != 'polars' or polars_to_pandas is not True =
Should the pandas dataframe be based on numpy array (slow to load but faster then)
or based on pyarrow array (like in polars) ; faster but numpy.asarray will be
used by pandas when need (which will then slow things down).
Returns
-------
`HealpixSurvey`
"""
from .healpix import HealpixSurvey
hpsurvey = HealpixSurvey(nside)
ra, dec = hpsurvey.get_field_centroid()
# make sure to typing follows
dtype_fieldid = "int16" if nside < 50 else "int32"
fields = self.radec_to_fieldid( np.asarray([ra,dec]).T
).reset_index(names=["fieldid_hp"]
).astype({**self.data.dtypes.loc[self.fieldids.names].to_dict(),
**{"fieldid_hp":dtype_fieldid}}
)
# limit to what has been observed
if self.data is not None and self.fieldids.names in self.data:
fields = fields[fields[self.fieldids.names].isin(self.data[self.fieldids.names].unique())]
# What kind of data are passed
if not pass_data: # minimal
datain = fields
else: # all, but how ?
#
# Checking what is possible
#
if backend == "polars":
try:
import polars as pl
except ImportError:
# warnings.warn("You do not have polars installed. conda/pip install polars. falling back to pandas backend")
backend = "pandas"
if backend == "dask":
try:
import dask.dataframe as dd
except ImportError:
warnings.warn("You do not have dask installed. conda/pip install dask. falling back to pandas backend")
backend = "pandas"
# now let's merge
if backend == "pandas": # classic
datain = self.data.merge(fields, on=self.fieldids.names)
elif backend == "polars": # fast
p_fields = pl.from_pandas(fields)
p_data = pl.from_pandas(self.data)
datain = p_data.join(p_fields, on=self.fieldids.names)
if polars_to_pandas:
datain = datain.to_pandas(use_pyarrow_extension_array=use_pyarrow_extension_array)
elif backend == "dask": # dask
d_data = dd.from_pandas(self.data, chunksize=1_000_000)
# match types
d_fields = dd.from_pandas(fields, chunksize=1_000_000)
datain = d_data.merge(d_fields, on=self.fieldids.names).persist()
else:
raise NotImplementedError(f"Input backend {backend} is not implemented .")
# dask.dataframe.rename has no axis option but columns or index
name_mapping = {"fieldid":"fieldid_survey", "fieldid_hp":"fieldid"}
if "pandas" in str( type(datain) ): # pandas.DataFrame
datain = datain.rename(name_mapping, axis=1)
elif "polars" in str( type(datain) ): # polars.DataFrame
datain = datain.rename(name_mapping)
elif "dask" in str( type(datain) ): # dask.DataFrame
datain = datain.rename(columns=name_mapping)
else:
raise ValueError("cannot parse the returned DataFrame (yo")
hpsurvey.set_data(datain)
return hpsurvey
# ------- #
# core #
# ------- #
[docs]
def radec_to_fieldid(self, radec, observed_fields=False):
"""Get the fieldid associated to the given coordinates.
Parameters
----------
radec: `pandas.DataFrame` or 2d array
Coordinates in degree.
observed_fields: bool, optional
Should this be limited to fields actually observed?
This is ignored is ``self.data`` is None.
Returns
-------
`pandas.DataFrame`
"""
if type(radec) in [np.ndarray, list, tuple]:
inshape = np.shape(radec)
if inshape[-1] != 2:
raise ValueError(f"shape of radec must be (N, 2), {inshape} given.")
radec = pandas.DataFrame(radec, columns=["ra","dec"])
fields = self.get_fields(observed=observed_fields)
_keyindex = 'index_radec'
projection = spatialjoin_radec_to_fields(radec, fields,
index_radec=_keyindex,
).sort_values(_keyindex
).set_index(_keyindex)[self.fields.index.names]
return projection
# ------- #
# draw #
# ------- #
[docs]
def draw_random(self, size,
bands, mjd_range, skynoise_range,
gain_range=1, zp_range=25,
inplace=False, fieldids=None,
**kwargs):
"""
Draw random observations.
Parameters
----------
size: int
number of observations to draw
bands: list of str
list of bands that should be drawn.
mjd_range: list or array
min and max mjd for the random drawing.
skynoise_range: list or array
min and max skynoise for the random drawing.
gain_range: list or array
min and max gain for the random drawing.
zp_range: list or array
min and max zp for the random drawing.
inplace: bool
if True, the data are stored in the instance.
Otherwise, a new instance is returned.
fieldids: list
list of fieldids to draw from.
**kwargs goes to ``_draw_random``
Returns
-------
`PolygonSurvey` or None
"""
if fieldids is None:
fieldids = self.fieldids
data = self._draw_random(fieldids, size, bands, mjd_range, skynoise_range,
gain_range=gain_range, zp_range=zp_range,
**kwargs)
if not inplace:
return self.__class__.from_data(data=data, fields=self.fields)
self.set_data(data)
# ----------- #
# PLOTTER #
# ----------- #
[docs]
def show(self, stat='size', column=None, title=None, data=None, origin = 180,
vmin=None, vmax=None, cmap="tab10",
autoscale=False,
grid=True, **kwargs):
"""Show the sky coverage.
Parameters
----------
stat: str, optional
Statistic to plot.
column: str, optional
Column to use for the statistic.
title: str, optional
Title of the plot.
data: pandas.DataFrame, optional
Data to plot.
origin: float, optional
Origin of the ra coordinates.
vmin, vmax: float, optional
Min and max values for the colorbar.
cmap: str, optional
Colormap to use.
autoscale: bool, optional
If True, autoscale the plot.
grid: bool, optional
If True, show the grid.
**kwargs
Goes to `matplotlib.collections.PolyCollection`.
Returns
-------
`matplotlib.figure`
"""
import matplotlib.pyplot as plt
from matplotlib.collections import PolyCollection
import cartopy.crs as ccrs
if data is None and self.data is not None and len(self.data)>0:
data = self.get_fieldstat(stat=stat, columns=column, incl_zeros=True,
fillna=np.nan, data=data)
geodf = self.fields.copy()
xy = np.stack(geodf["geometry"].apply(lambda x: ((np.asarray(x.exterior.xy)).T) ).values)
# correct edge effects
flag_egde = np.any(np.diff(xy, axis=1)>300, axis=1)[:,0]
xy[flag_egde] = ((xy[flag_egde] + origin)%360 - origin)
geodf["xy"] = list(xy)
rng = np.random.default_rng()
data = rng.uniform(size=len(geodf))
if vmin is None:
vmin = np.nanmin(data)
if vmax is None:
vmax = np.nanmax(data)
geodf["value"] = (data-vmin)/(vmax-vmin)
# figure
rect = [0.15,0.22,0.75,0.75]
fig = plt.figure(figsize=[7,5])
ax = fig.add_axes(rect, projection=ccrs.Mollweide())
ax.set_global() # not sure why we need that
prop = dict(edgecolor="0.7", lw=1, alpha=0.5,
transform = ccrs.PlateCarree(central_longitude=origin)
)
# color
cmap = plt.get_cmap(cmap)
#
coll = PolyCollection(geodf["xy"], array=geodf["value"],
cmap=cmap, **prop)
ax.add_collection(coll)
if grid:
ax.gridlines()
if autoscale:
ax.autoscale()
return fig
# ============== #
# Static Methods #
# ============== #
@staticmethod
def _parse_fields(fields):
"""
Parse the fields.
Parameters
----------
fields: `geopandas.GeoDataFrame`
field definitions.
Returns
-------
`geopandas.GeoDataFrame`
"""
return parse_fields(fields)
@staticmethod
def _draw_random(fieldids, size,
bands,
mjd_range, skynoise_range,
gain_range=1,
zp_range=[27,30],
rng=None):
"""
Draw random observations.
Parameters
----------
fieldids: list
list of fieldids to draw from.
size: int
number of observations to draw
bands: list of str
list of bands that should be drawn.
mjd_range: list or array
min and max mjd for the random drawing.
skynoise_range: list or array
min and max skynoise for the random drawing.
gain_range: list or array
min and max gain for the random drawing.
zp_range: list or array
min and max zp for the random drawing.
rng : None, int, `(Bit)Generator`, optional
seed for the random number generator.
(doc adapted from numpy's `np.random.default_rng` docstring.
See that documentation for details.)
If None, an unpredictable entropy will be pulled from the OS.
If an ``int``, (>0), it will set the initial `BitGenerator` state.
If a `(Bit)Generator`, it will be returned as a `Generator` unaltered.
Returns
-------
`pandas.DataFrame`
"""
rng = np.random.default_rng()
# np.resize(1, 2) -> [1,1]
mjd = rng.uniform(*np.resize(mjd_range,2), size=size)
band = rng.choice(bands, size=size)
skynoise = rng.uniform(*np.resize(skynoise_range, 2), size=size)
gain = rng.uniform(*np.resize(gain_range, 2), size=size)
zp = rng.uniform(*np.resize(zp_range, 2), size=size)
# = coords
fieldid = rng.choice(fieldids, size=size)
# data sorted by mjd
data = pandas.DataFrame(zip(mjd, band, skynoise, gain, zp, fieldid),
columns=["mjd","band","skynoise", "gain", "zp","fieldid"]
).sort_values("mjd"
).reset_index(drop=False) # don't need to know the creation order
return data
# ============== #
# Properties #
# ============== #
@property
def fieldids(self):
"""List of fields id."""
if self.fields is None:
return None
return self.fields.index
@property
def nfields(self):
"""Number of fields."""
if self.fields is None:
return None
return len(self.fields)