Source code for skysurvey.tools.projection

"""
This module provides spatial utility functions for projecting camera footprints onto the sky and matching coordinates to survey fields.
"""

import warnings
import numpy as np
import pandas
import geopandas

from shapely import geometry

_DEG2RA = np.pi / 180 # compute once.

[docs] def radecmodel_to_skysurface(radecmodel, ntrial=2e5, frac=True): """ Compute the sky area covered by points drawn from a `ModelDAG` model in RA/Dec space. This function samples points from a `ModelDAG` model, projects them onto a unit sphere, and computes the convex hull of the projected points to estimate the sky area. The area can be returned as a fraction of the total sky (4π steradians) or in steradians. Parameters ---------- radecmodel : object A `ModelDAG`-compatible model that generates RA/Dec points. ntrial : int, optional Number of points to sample from the model. Default is 2e5. Tests suggest 2e5 is good at 0.01%. frac : bool, optional If True, return the area as a fraction of the total sky (4π steradians). If False, return the area in steradians. Default is True. Returns ------- float The sky area covered by the sampled points. If `frac=True`, the value is a fraction of the total sky. If `frac=False`, the value is in steradians. Notes ----- - The RA/Dec points are converted to radians and projected using `sin(dec)` to account for spherical geometry. - The convex hull of the projected points is computed to estimate the sky area. - The total sky area is 4π steradians, which corresponds to 41253 square degrees. Examples -------- >>> from modeldag import ModelDAG >>> radecmodel = ... # Your ModelDAG-compatible RA/Dec model >>> area_frac = radecmodel_to_skysurface(radecmodel, ntrial=1e5, frac=True) >>> print(f"Fraction of sky covered: {area_frac:.4f}") """ from modeldag import ModelDAG import geopandas as gpd mdag = ModelDAG({"radec": radecmodel}) data = mdag.draw( int(ntrial) ) # account for projection delta_ra*delta_sindec data["sin_dec_rad"] = np.sin(data["dec"]*_DEG2RA) data["ra_rad"] = data["ra"]*_DEG2RA # get the (projected) skyarea # in radian projected_skyarea = gpd.GeoDataFrame(geometry=gpd.points_from_xy(data["ra_rad"], data["sin_dec_rad"]) ).union_all().convex_hull if frac: return projected_skyarea.area / (4*np.pi) return projected_skyarea.area # steradian
[docs] def project_to_radec(verts_or_polygon, ra, dec): """ Project a geometry (or its vertices) to given ra, dec coordinates. Parameters ---------- verts_or_polygon: shapely.Polygon or 2d-array geometry or vertices representing the camera footprint in the sky if vertices, the format is: x, y = vertices ra: float or array poiting(s) R.A. dec: float or array poiting(s) declination Returns ------- list If input are vertices, returns a list of new verticies. If input are geometries, returns a list of new geometries. """ if isinstance(verts_or_polygon, geometry.Polygon): # polygon as_polygon = True fra, fdec = np.asarray(verts_or_polygon.exterior.xy) else: as_polygon = False fra, fdec = np.asarray(verts_or_polygon) ra = np.atleast_1d(ra) dec = np.atleast_1d(dec) ra_, dec_ = np.squeeze(rot_xz_sph((fra/np.cos(fdec*np.pi/180))[:,None], fdec[:,None], dec) ) ra_ += ra pointings = np.asarray([ra_, dec_]).T if as_polygon: return [geometry.Polygon(p) for p in pointings] return pointings
[docs] def spatialjoin_radec_to_fields(radec, fields, how="inner", predicate="intersects", index_radec="index_radec", allow_dask=True, **kwargs): """ Join the radec coordinates with the fields. Parameters ---------- radec: DataFrame or 2d-array coordinates of the points. - DataFrame: must have the "ra" and "dec" columns. This will use the DataFrame's index are data index. - 2d array (shape N,2): returned index will be 'range(len(ra))' fields : [`geopandas.geoserie`, `geopandas.geodataframe` or dict] Fields contains the fieldid and fields shapes. Several forms are accepted: - dict: {fieldid: 2d-array, fieldid: 2d-array ...} here, the 2d-array are the field's vertices. - geoserie: geopandas.GeoSeries with index as fieldid and geometry as field's vertices. - geodataframe: geopandas.GeoDataFrame with the 'fieldid' column and geometry as field's vertices. Returns ------- `GeoDataFrame` (`geometry.sjoin` result) """ # -------- # # Coords # # -------- # if type(radec) in [np.ndarray, list, tuple]: inshape = np.shape(radec) if inshape[-1] != 2: raise ValueError(f"shape of radec must be (N, 2), {inshape} given.") radec = pandas.DataFrame(np.atleast_2d(radec), columns=["ra","dec"]) # Points to be considered geoarray = geopandas.points_from_xy(*radec[["ra","dec"]].values.T) geopoints = geopandas.GeoDataFrame({index_radec:radec.index}, geometry=geoarray) # -------- # # Fields # # -------- # # goes from dict to geoseries (more natural) fields = parse_fields(fields) # -------- # # Joining # # -------- # # This goes linearly as size of fields if len(fields)>30_000 and allow_dask: try: import dask_geopandas except ImportError: pass # no more warnings, we will deal with it. else: if isinstance(type(fields.index), pandas.MultiIndex): # not supported fields = dask_geopandas.from_geopandas(fields, npartitions=10) geopoints = dask_geopandas.from_geopandas(geopoints, npartitions=10) else: warnings.warn("cannot use dask_geopandas with MultiIndex fields dataframe") sjoined = geopoints.sjoin(fields, how="inner", predicate="intersects", **kwargs) if "dask" in str( type(sjoined) ): sjoined = sjoined.compute() # multi-index if type(fields.index) is pandas.MultiIndex: sjoined = sjoined.rename({f"index_right{i}":name for i, name in enumerate(fields.index.names)}, axis=1) else: sjoined = sjoined.rename({"index_right": fields.index.name}, axis=1) return sjoined
[docs] def parse_fields(fields): """ Read various formats for fields and returns it as a `geodataframe`. Parameters ---------- fields : [`geopandas.geoserie`, `geopandas.geodataframe` or dict] Fields contains the fieldid and fields shapes. Several forms are accepted: - dict: {fieldid: 2d-array or regions, fieldid: 2d-array or regions ...} here, the 2d-array are the field's vertices or a astropy/ds9 regions - `geoserie`: `geopandas.GeoSeries` with index as fieldid and geometry as field's vertices. - `geodataframe`: `geopandas.GeoDataFrame` with the 'fieldid' column and geometry as field's vertices. Returns ------- `GeoDataFrame` (`geometry.sjoin` result) Examples -------- provide a dict of ds9 regions >>> fields = {450:"box(50,30, 3,4,0)", 541:"ellipse(190,-10,1.5,1,50)"} >>> geodf = parse_fields(fields) """ if type(fields) is dict: values = fields.values() indexes = fields.keys() # dict of array goes to shapely.Geometry as expected by geopandas test_kind = type( values.__iter__().__next__() ) # check the first if test_kind in [np.ndarray, list, tuple]: values = [geometry.Polygon(v) for v in values] if test_kind is str or "regions.shapes" in str(test_kind): values = [regions_to_shapely(v) for v in values] fields = geopandas.GeoSeries(values, index = indexes) if type(fields) is geopandas.geoseries.GeoSeries: fields = geopandas.GeoDataFrame({"fieldid":fields.index}, geometry=fields.values) elif type(fields) is not geopandas.geodataframe.GeoDataFrame: raise ValueError("cannot parse the format of the input 'fields' variable. Should be dict, GeoSeries or GeoPandas") return fields
[docs] def regions_to_shapely(region): """ Converts astropy Region into a shapely geometry. Parameters ---------- region: str or Regions (see astropy-regions.readthedocs.io) - If str, it is assumed to be in the dr9 ircs format, e.g.: ``region = "box(40.0, 50.0, 5.0, 4.0, 0.0)"``. - If Regions, region will be converted into the str format, using ``region = region.serialize("ds9").strip().split("\\n")[-1]``. The following format have been implemented: - box - circle - ellipse - polygon Returns ------- Shapely's Geometry the geometry will depend on the input regions. Raises ------ NotImplementedError if the format is not recognised. Examples -------- >>> shapely_ellipse = regions_to_shapely('ellipse(54,43.4, 4, 2,-10)') >>> shapely_rotated_rectangle = regions_to_shapely('box(-30,0.4, 4, 2,80)') """ import shapely if "regions.shapes" in str(type(region)): # Regions format -> dr9 icrs format region = region.serialize("ds9").strip().split("\n")[-1] tregion = type(region) if tregion is not str: raise ValueError(f"cannot parse the input region format ; {tregion} given") # it works, let's parse it. which, params = region.replace(")","").split("(") params = np.asarray(params.split(","), dtype="float") # Box, if which == "box": # rectangle centerx, centery, width, height, angle = params minx, miny, maxx, maxy = centerx-width, centery-height, centerx+width, centery+height geom = geometry.box(minx, miny, maxx, maxy, ccw=True) if angle != 0: geom = shapely.affinity.rotate(geom, angle) # Cercle elif which == "circle": centerx, centery, radius = params geom = geometry.Point(centerx, centery).buffer(radius) # Ellipse elif which == "ellipse": centerx, centery, a, b, theta = params # unity circle geom = geometry.Point(centerx, centery).buffer(1) geom = shapely.affinity.scale(geom, a,b) if theta != 0: geom = shapely.affinity.rotate(geom, theta) # Ellipse elif which == "polygon": params = (params + 180) %360 - 180 coords = params.reshape(int(len(params)/2),2) geom = geometry.Polygon(coords) else: raise NotImplementedError(f"the {which} form not implemented. box, circle, ellpse and polygon are.") # shapely's geometry return geom
# # Projection coordinates. #
[docs] def cart2sph(vec): """ Converts cartesian [x,y,z] to spherical [r, theta, phi] coordinates (in degrees). Parameters ---------- vec: array x, y, z Returns ------- array [r, theta, phi] """ x, y ,z = vec v = np.sqrt(x**2 + y**2 + z**2) return np.asarray([v, (np.arctan2(y,x) / _DEG2RA + 180) % 360 - 180, np.arcsin(z/v) / _DEG2RA])
[docs] def sph2cart(vec): """ Converts spherical coordinates [r, theta, phi] to cartesian coordinates [x,y,z]. Parameters ---------- vec: array r, theta, phi ; angles in degrees Returns ------- array [x, y, z] """ v, l, b = vec[0], np.asarray(vec[1])*_DEG2RA, np.asarray(vec[2])*_DEG2RA # noqa: E741 return np.asarray([v*np.cos(b)*np.cos(l), v*np.cos(b)*np.sin(l), v*np.sin(b)])
[docs] def rot_xz(vec, theta): """ Rotates cartesian vector v [x,y,z] by angle theta around axis (0,1,0). Parameters ---------- vec: array x, y, z theta: float angle in degree Returns ------- array rotated x, y, z """ return [vec[0]*np.cos(theta*_DEG2RA) - vec[2]*np.sin(theta*_DEG2RA), vec[1][None,:], vec[2]*np.cos(theta*_DEG2RA) + vec[0]*np.sin(theta*_DEG2RA)]
[docs] def rot_xz_sph(l, b, theta): # noqa: E741 """ Rotate spherical coordinate (l,b = theta, phi) by angle theta around axis (0,1,0) (calls does to :func:`rot_xz` and :func:`cart2sph`). Parameters ---------- l, b: float spherical coordinate theta: float angle in degree Returns ------- array [r, theta, phi] """ v_rot = rot_xz( sph2cart([1,l,b]), theta) return cart2sph(v_rot)[1:]