"""
.. module:: convergence
:platform: Unix
:synopsis: This module implements the tools to compute topological statistics on 2D convergence maps: measure the power spectrum, counting peaks, measure the minkowski functionals; handling of masks is also provided
.. moduleauthor:: Andrea Petri <[email protected]>
"""
from __future__ import division
from operator import mul
from functools import reduce
from extern import _topology
import numpy as np
#FFT engines
from numpy.fft import rfft2
from scipy.ndimage import filters
#Units
from astropy.units import deg,arcsec,rad,quantity
#FITS
from astropy.io import fits
try:
import matplotlib.pyplot as plt
matplotlib = True
except ImportError:
matplotlib = False
################################################
########Spin0 class#############################
################################################
class Spin0(object):
def __init__(self,data,angle,masked=False,**kwargs):
#Sanity check
assert angle.unit.physical_type in ["angle","length"]
assert data.shape[0]==data.shape[1],"The map must be a square!!"
#Convert to double precision for calculation accuracy
if data.dtype==np.float:
self.data = data
else:
self.data = data.astype(np.float)
self.side_angle = angle
self.resolution = self.side_angle / self.data.shape[0]
if self.side_angle.unit.physical_type=="angle":
self.resolution = self.resolution.to(arcsec)
self.lmin = 2.0*np.pi/self.side_angle.to(rad).value
self.lmax = np.sqrt(2)*np.pi/self.resolution.to(rad).value
self._masked = masked
#Extra keyword arguments
self._extra_attributes = kwargs.keys()
@property
def info(self):
"""
Displays some of the information stored in the map (mainly resolution)
"""
print("Pixels on a side: {0}".format(self.data.shape[0]))
print("Pixel size: {0}".format(self.resolution))
print("Total angular size: {0}".format(self.side_angle))
print("lmin={0:.1e} ; lmax={1:.1e}".format(self.lmin,self.lmax))
@classmethod
def load(cls,filename,format=None,**kwargs):
"""
This class method allows to read the map from a data file, in various formats
:param filename: name of the file in which the map is saved
:type filename: str.
:param format: the format of the file in which the map is saved (can be a callable too); if None, it's detected automatically from the filename
:type format: str. or callable
:param kwargs: the keyword arguments are passed to the format (if callable)
:type kwargs: dict.
:returns: Spin0 instance with the loaded map
"""
if format is None:
extension = filename.split(".")[-1]
if extension in ["fit","fits"]:
format="fits"
else:
raise IOError("File format not recognized from extension '{0}', please specify it manually".format(extension))
if format=="fits":
hdu = fits.open(filename)
data = hdu[0].data
angle = hdu[0].header["ANGLE"] * deg
hdu.close()
else:
angle,data = format(filename,**kwargs)
return cls(data,angle)
def save(self,filename,format=None,double_precision=False):
"""
Saves the map to an external file, of which the format can be specified (only fits implemented so far)
:param filename: name of the file on which to save the plane
:type filename: str.
:param format: format of the file, only FITS implemented so far; if None, it's detected automatically from the filename
:type format: str.
:param double_precision: if True saves the Plane in double precision
:type double_precision: bool.
"""
if format is None:
extension = filename.split(".")[-1]
if extension in ["fit","fits"]:
format="fits"
else:
raise IOError("File format not recognized from extension '{0}', please specify it manually".format(extension))
if format=="fits":
if double_precision:
hdu = fits.PrimaryHDU(self.data)
else:
hdu = fits.PrimaryHDU(self.data.astype(np.float32))
hdu.header["ANGLE"] = (self.side_angle.to(deg).value,"angle of the map in degrees")
hdulist = fits.HDUList([hdu])
hdulist.writeto(filename,clobber=True)
else:
raise ValueError("Format {0} not implemented yet!!".format(format))
def setAngularUnits(self,unit):
"""
Convert the angular units of the map to the desired unit
:param unit: astropy unit instance to which to perform the conversion
:type unit: astropy units
"""
#Sanity check
assert unit.physical_type=="angle"
self.side_angle = self.side_angle.to(unit)
def mean(self):
"""
Computes the mean value of the pixels, taking into account eventual masking
:returns: float.
"""
if not self._masked:
return self.data.mean()
else:
if not hasattr(self,"_full_mask"):
self.maskBoundaries()
return self.data[self._full_mask].mean()
def std(self):
"""
Computes the standard deviation of the pixels, taking into account eventual masking
:returns: float.
"""
if not self._masked:
return self.data.std()
else:
if not hasattr(self,"_full_mask"):
self.maskBoundaries()
return self.data[self._full_mask].std()
def getValues(self,x,y):
"""
Extract the map values at the requested (x,y) positions; this is implemented using the numpy fast indexing routines, so the formats of x and y must follow the numpy advanced indexing rules. Periodic boundary conditions are enforced
:param x: x coordinates at which to extract the map values (if unitless these are interpreted as radians)
:type x: numpy array or quantity
:param y: y coordinates at which to extract the map values (if unitless these are interpreted as radians)
:type y: numpy array or quantity
:returns: numpy array with the map values at the specified positions, with the same shape as x and y
:raises: IndexError if the formats of x and y are not the proper ones
"""
assert isinstance(x,np.ndarray) and isinstance(y,np.ndarray)
#x coordinates
if type(x)==quantity.Quantity:
assert x.unit.physical_type=="angle"
j = np.mod(((x / self.resolution).decompose().value).astype(np.int32),self.data.shape[1])
else:
j = np.mod((x / self.resolution.to(rad).value).astype(np.int32),self.data.shape[1])
#y coordinates
if type(y)==quantity.Quantity:
assert y.unit.physical_type=="angle"
i = np.mod(((y / self.resolution).decompose().value).astype(np.int32),self.data.shape[0])
else:
i = np.mod((y / self.resolution.to(rad).value).astype(np.int32),self.data.shape[0])
#Return the map values at the specified coordinates
return self.data[i,j]
def cutRegion(self,extent):
"""
Cut a subset of the map, with the same resolution (warning! tested on square cuts only!)
:param extent: region in the map to select (xmin,xmax,ymin,ymax), must have units
:type extent: array with units
:returns: new ConvergenceMap instance that encloses the selected region
"""
xmin,xmax,ymin,ymax = extent
assert (xmax-xmin)==(ymax-ymin),"Only square cuts implemented so far!"
x = np.arange(xmin.value,xmax.value,self.resolution.to(extent.unit).value)
y = np.arange(ymin.value,ymax.value,self.resolution.to(extent.unit).value)
#Initialize the meshgrid
xx,yy = np.meshgrid(x,y) * extent.unit
return self.__class__(data=self.getValues(xx,yy),angle=(self.resolution*xx.shape[0]).to(extent.unit))
def visualize(self,fig=None,ax=None,colorbar=False,**kwargs):
"""
Visualize the convergence map; the kwargs are passed to imshow
"""
if not matplotlib:
raise ImportError("matplotlib is not installed, cannot visualize!")
#Instantiate figure
if (fig is None) or (ax is None):
self.fig,self.ax = plt.subplots()
else:
self.fig = fig
self.ax = ax
#Plot the map
ax0 = self.ax.imshow(self.data,origin="lower",interpolation="nearest",extent=[0,self.side_angle.value,0,self.side_angle.value],**kwargs)
self.ax.set_xlabel(r"$x$({0})".format(self.side_angle.unit.to_string()))
self.ax.set_ylabel(r"$y$({0})".format(self.side_angle.unit.to_string()))
#Add a colorbar maybe
if colorbar:
plt.colorbar(ax0,ax=self.ax)
def savefig(self,filename):
"""
Saves the map visualization to an external file
:param filename: name of the file on which to save the map
:type filename: str.
"""
self.fig.savefig(filename)
def mask(self,mask_profile,inplace=False):
"""
Applies a mask to the convergence map: all masked pixels are given a nan value because they cannot be used in the calculations
:param mask_profile: profile of the mask, must be an array of 1 byte intergers that are either 0 (if the pixel is masked) or 1 (if the pixel is not masked). Must be of the same shape as the original map
:type mask_profile: array. or ConvergenceMap instance
:param inplace: if True the masking is performed in place and the original map is lost, otherwise a new instance of ConvergenceMap with the masked map is returned
:returns: the masked convergence map if inplace is False, otherwise a float corresponding to the masked fraction of the map
"""
if inplace:
new_map = self
else:
new_map = self.__class__(self.data.copy(),self.side_angle)
if isinstance(mask_profile,np.ndarray):
assert mask_profile.dtype == np.int8
assert len(mask_profile[mask_profile==0]) + len(mask_profile[mask_profile==1]) == reduce(mul,mask_profile.shape),"The mask must be made of 0 and 1 only!"
assert mask_profile.shape == self.data.shape
new_map.data[mask_profile==0] = np.nan
elif isinstance(mask_profile,self.__class__):
assert mask_profile.side_angle == self.side_angle
assert len(mask_profile.data[mask_profile.data==0]) + len(mask_profile.data[mask_profile.data==1]) == reduce(mul,mask_profile.data.shape),"The mask must be made of 0 and 1 only!"
assert mask_profile.data.shape == self.data.shape
new_map.data[mask_profile.data==0] = np.nan
else:
raise TypeError("Mask type not supported")
new_map._masked = True
new_map._mask = ~np.isnan(new_map.data)
new_map._masked_fraction = 1.0 - new_map._mask.sum() / reduce(mul,new_map.data.shape)
#Recompute gradients
if (hasattr(new_map,"gradient_x") or hasattr(new_map,"gradient_y")):
new_map.gradient()
if (hasattr(new_map,"hessian_xx") or hasattr(new_map,"hessian_yy") or hasattr(new_map,"hessian_xy")):
new_map.hessian()
#Return
if inplace:
return new_map._masked_fraction
else:
return new_map
def maskBoundaries(self):
"""
Computes the mask boundaries defined in the following way: a boundary is a region where the convergence value is defined, but the gradients are not defined.
:returns: float. : perimeter/area ratio of the mask boundaries
"""
if not self._masked:
print("The map is not masked!!")
return None
#First check that the instance has the gradient and hessian attributes; if not, compute them
if not (hasattr(self,"gradient_x") and hasattr(self,"gradient_y")):
self.gradient()
if not (hasattr(self,"hessian_xx") and hasattr(self,"hessian_yy") and hasattr(self,"hessian_xy")):
self.hessian()
#Check where gradient starts to have problems
nan_gradient_pixels = np.isnan(self.gradient_x) + np.isnan(self.gradient_y)
self._gradient_boundary = ~self._mask ^ nan_gradient_pixels
#Check where the hessian has alsp problems
nan_gradient_pixels = nan_gradient_pixels + np.isnan(self.hessian_xx) + np.isnan(self.hessian_yy) + np.isnan(self.hessian_xy)
self._hessian_boundary = ~self._mask ^ nan_gradient_pixels
#Create attribute that holds the full mask (including gradients)
self._full_mask = self._mask * (~nan_gradient_pixels)
assert self._full_mask.sum() < self._mask.sum()
#Compute perimeter/area of the mask
perimeter_area = self._hessian_boundary.sum() / (~self._full_mask).sum()
#Return
return perimeter_area
@property
def maskedFraction(self):
"""
Computes the masked fraction of the map
:returns: float, the masked fraction of the map
"""
if not self._masked:
return 0.0
else:
return self._masked_fraction
@property
def boundary(self):
"""
Computes the boundaries of the masked regions, defined as the regions in which the convergence is still well defined but the first and second derivatives are not
:returns: array of bool. of the same shape as the map, with True values along the boundaries
"""
if not hasattr(self,"_hessian_boundary"):
self.maskBoundaries()
return self._hessian_boundary
def gradient(self,x=None,y=None,save=True):
"""
Computes the gradient of the map and sets the gradient_x,gradient_y attributes accordingly
:param x: optional, x positions at which to evaluate the gradient
:type x: array with units
:param y: optional, y positions at which to evaluate the gradient
:type y: array with units
:param save: if True saves the gradient as attrubutes
:type save: bool.
:returns: tuple -- (gradient_x,gradient_y)
>>> test_map = ConvergenceMap.load("map.fit")
>>> gx,gy = test_map.gradient()
"""
if (x is not None) and (y is not None):
assert x.shape==y.shape,"x and y must have the same shape!"
#x coordinates
if type(x)==quantity.Quantity:
assert x.unit.physical_type==self.side_angle.unit.physical_type
j = np.mod(((x / self.resolution).decompose().value).astype(np.int32),self.data.shape[1])
else:
j = np.mod((x / self.resolution.to(rad).value).astype(np.int32),self.data.shape[1])
#y coordinates
if type(y)==quantity.Quantity:
assert y.unit.physical_type==self.side_angle.unit.physical_type
i = np.mod(((y / self.resolution).decompose().value).astype(np.int32),self.data.shape[0])
else:
i = np.mod((y / self.resolution.to(rad).value).astype(np.int32),self.data.shape[0])
else:
i = None
j = None
#Call the C backend
gradient_x,gradient_y = _topology.gradient(self.data,j,i)
#Return the gradients
if (x is not None) and (y is not None):
return gradient_x.reshape(x.shape),gradient_y.reshape(x.shape)
else:
if save:
self.gradient_x = gradient_x
self.gradient_y = gradient_y
return gradient_x,gradient_y
def hessian(self,x=None,y=None,save=True):
"""
Computes the hessian of the map and sets the hessian_xx,hessian_yy,hessian_xy attributes accordingly
:param x: optional, x positions at which to evaluate the hessian
:type x: array with units
:param y: optional, y positions at which to evaluate the hessian
:type y: array with units
:param save: if True saves the gradient as attrubutes
:type save: bool.
:returns: tuple -- (hessian_xx,hessian_yy,hessian_xy)
>>> test_map = ConvergenceMap.load("map.fit")
>>> hxx,hyy,hxy = test_map.hessian()
"""
if (x is not None) and (y is not None):
assert x.shape==y.shape,"x and y must have the same shape!"
#x coordinates
if type(x)==quantity.Quantity:
assert x.unit.physical_type==self.side_angle.unit.physical_type
j = np.mod(((x / self.resolution).decompose().value).astype(np.int32),self.data.shape[1])
else:
j = np.mod((x / self.resolution.to(rad).value).astype(np.int32),self.data.shape[1])
#y coordinates
if type(y)==quantity.Quantity:
assert y.unit.physical_type==self.side_angle.unit.physical_type
i = np.mod(((y / self.resolution).decompose().value).astype(np.int32),self.data.shape[0])
else:
i = np.mod((y / self.resolution.to(rad).value).astype(np.int32),self.data.shape[0])
else:
i = None
j = None
#Call the C backend
hessian_xx,hessian_yy,hessian_xy = _topology.hessian(self.data,j,i)
#Return the hessian
if (x is not None) and (y is not None):
return hessian_xx.reshape(x.shape),hessian_yy.reshape(x.shape),hessian_xy.reshape(x.shape)
else:
if save:
self.hessian_xx = hessian_xx
self.hessian_yy = hessian_yy
self.hessian_xy = hessian_xy
return hessian_xx,hessian_yy,hessian_xy
def pdf(self,thresholds,norm=False):
"""
Computes the one point probability distribution function of the convergence map
:param thresholds: thresholds extremes that define the binning of the pdf
:type thresholds: array
:param norm: normalization; if set to a True, interprets the thresholds array as units of sigma (the map standard deviation)
:type norm: bool.
:returns: tuple -- (threshold midpoints -- array, pdf normalized at the midpoints -- array)
:raises: AssertionError if thresholds array is not provided
>>> test_map = ConvergenceMap.load("map.fit")
>>> thresholds = np.arange(map.data.min(),map.data.max(),0.05)
>>> nu,p = test_map.pdf(thresholds)
"""
assert thresholds is not None
midpoints = 0.5 * (thresholds[:-1] + thresholds[1:])
if norm:
sigma = self.data.std()
else:
sigma = 1.0
#Compute the histogram
if self._masked:
hist,bin_edges = np.histogram(self.data[self._mask],bins=thresholds*sigma,density=True)
else:
hist,bin_edges = np.histogram(self.data,bins=thresholds*sigma,density=True)
#Return
return midpoints,hist*sigma
def peakCount(self,thresholds,norm=False):
"""
Counts the peaks in the map
:param thresholds: thresholds extremes that define the binning of the peak histogram
:type thresholds: array
:param norm: normalization; if set to a True, interprets the thresholds array as units of sigma (the map standard deviation)
:type norm: bool.
:returns: tuple -- (threshold midpoints -- array, differential peak counts at the midpoints -- array)
:raises: AssertionError if thresholds array is not provided
>>> test_map = ConvergenceMap.load("map.fit")
>>> thresholds = np.arange(map.data.min(),map.data.max(),0.05)
>>> nu,peaks = test_map.peakCount(thresholds)
"""
assert thresholds is not None
midpoints = 0.5 * (thresholds[:-1] + thresholds[1:])
#Check if the map is masked
if self._masked:
if not hasattr(self,"_full_mask"):
self.maskBoundaries()
mask_profile = self._full_mask
else:
mask_profile = None
#Decide if normalizing thresholds
if norm:
if self._masked:
sigma = self.data[self._full_mask].std()
else:
sigma = self.data.std()
else:
sigma = 1.0
return midpoints,_topology.peakCount(self.data,mask_profile,thresholds,sigma)
def locatePeaks(self,thresholds,norm=False):
"""
Locate the peaks in the map
:param thresholds: thresholds extremes that define the binning of the peak histogram
:type thresholds: array
:param norm: normalization; if set to a True, interprets the thresholds array as units of sigma (the map standard deviation)
:type norm: bool.
:returns: tuple -- (peak height -- array, peak locations -- array with units)
:raises: AssertionError if thresholds array is not provided
>>> test_map = ConvergenceMap.load("map.fit")
>>> thresholds = np.arange(map.data.min(),map.data.max(),0.05)
>>> height,positions = test_map.locatePeaks(thresholds)
"""
assert thresholds is not None
midpoints = 0.5 * (thresholds[:-1] + thresholds[1:])
#Check if the map is masked
if self._masked:
if not hasattr(self,"_full_mask"):
self.maskBoundaries()
mask_profile = self._full_mask
else:
mask_profile = None
#Decide if normalizing thresholds
if norm:
if self._masked:
sigma = self.data[self._full_mask].std()
else:
sigma = self.data.std()
else:
sigma = 1.0
#Count the number of pixels between the selected thresholds
relevant_pixels = ((self.data>=thresholds[0]*sigma) * (self.data<=thresholds[-1]*sigma)).sum()
#Return the result of the C backend call, scaled to the proper units
peak_values,peak_locations = _topology.peakLocations(self.data,mask_profile,thresholds,sigma,relevant_pixels)
return peak_values,(peak_locations*self.resolution).to(self.side_angle.unit)
def minkowskiFunctionals(self,thresholds,norm=False):
"""
Measures the three Minkowski functionals (area,perimeter and genus characteristic) of the specified map excursion sets
:param thresholds: thresholds that define the excursion sets to consider
:type thresholds: array
:param norm: normalization; if set to a True, interprets the thresholds array as units of sigma (the map standard deviation)
:type norm: bool.
:returns: tuple -- (nu -- array, V0 -- array, V1 -- array, V2 -- array) nu are the bins midpoints and V are the Minkowski functionals
:raises: AssertionError if thresholds array is not provided
>>> test_map = ConvergenceMap.load("map.fit")
>>> thresholds = np.arange(-2.0,2.0,0.2)
>>> nu,V0,V1,V2 = test_map.minkowskiFunctionals(thresholds,norm=True)
"""
assert thresholds is not None
midpoints = 0.5 * (thresholds[:-1] + thresholds[1:])
#Check if the map is masked
if self._masked:
if not hasattr(self,"_full_mask"):
self.maskBoundaries()
mask_profile = self._full_mask
else:
mask_profile = None
#Decide if normalize thresholds or not
if norm:
if self._masked:
sigma = self.data[self._full_mask].std()
else:
sigma = self.data.std()
else:
sigma = 1.0
#Check if gradient and hessian attributes are available; if not, compute them
if not (hasattr(self,"gradient_x") and hasattr(self,"gradient_y")):
self.gradient()
if not (hasattr(self,"hessian_xx") and hasattr(self,"hessian_yy") and hasattr(self,"hessian_xy")):
self.hessian()
#Compute the Minkowski functionals and return them as tuple
v0,v1,v2 = _topology.minkowski(self.data,mask_profile,self.gradient_x,self.gradient_y,self.hessian_xx,self.hessian_yy,self.hessian_xy,thresholds,sigma)
return midpoints,v0,v1,v2
def moments(self,connected=False,dimensionless=False):
"""
Measures the first nine moments of the convergence map (two quadratic, three cubic and four quartic)
:param connected: if set to True returns only the connected part of the moments
:type connected: bool.
:param dimensionless: if set to True returns the dimensionless moments, normalized by the appropriate powers of the variance
:type dimensionless: bool.
:returns: array -- (sigma0,sigma1,S0,S1,S2,K0,K1,K2,K3)
>>> test_map = ConvergenceMap.load("map.fit")
>>> var0,var1,sk0,sk1,sk2,kur0,kur1,kur2,kur3 = test_map.moments()
>>> sk0,sk1,sk2 = test_map.moments(dimensionless=True)[2:5]
>>> kur0,kur1,kur2,kur3 = test_map.moments(connected=True,dimensionless=True)[5:]
"""
#First check that the instance has the gradient and hessian attributes; if not, compute them
if not (hasattr(self,"gradient_x") and hasattr(self,"gradient_y")):
self.gradient()
if not (hasattr(self,"hessian_xx") and hasattr(self,"hessian_yy") and hasattr(self,"hessian_xy")):
self.hessian()
#Decide if using the full map or only the unmasked region
if self._masked:
if not hasattr(self,"_full_mask"):
self.maskBoundaries()
data = self.data[self._full_mask]
gradient_x = self.gradient_x[self._full_mask]
gradient_y = self.gradient_y[self._full_mask]
hessian_xx = self.hessian_xx[self._full_mask]
hessian_yy = self.hessian_yy[self._full_mask]
hessian_xy = self.hessian_xy[self._full_mask]
else:
data = self.data
gradient_x = self.gradient_x
gradient_y = self.gradient_y
hessian_xx = self.hessian_xx
hessian_yy = self.hessian_yy
hessian_xy = self.hessian_xy
#Quadratic moments
sigma0 = data.std()
sigma1 = np.sqrt((gradient_x**2 + gradient_y**2).mean())
#Cubic moments
S0 = (data**3).mean()
S1 = ((data**2)*(hessian_xx + hessian_yy)).mean()
S2 = ((gradient_x**2 + gradient_y**2)*(hessian_xx + hessian_yy)).mean()
#Quartic moments
K0 = (data**4).mean()
K1 = ((data**3) * (hessian_xx + hessian_yy)).mean()
K2 = ((data) * (gradient_x**2 + gradient_y**2) * (hessian_xx + hessian_yy)).mean()
K3 = ((gradient_x**2 + gradient_y**2)**2).mean()
#Compute connected moments (only quartic affected)
if connected:
K0 -= 3 * sigma0**4
K1 += 3 * sigma0**2 * sigma1**2
K2 += sigma1**4
K3 -= 2 * sigma1**4
#Normalize moments to make them dimensionless
if dimensionless:
S0 /= sigma0**3
S1 /= (sigma0 * sigma1**2)
S2 *= (sigma0 / sigma1**4)
K0 /= sigma0**4
K1 /= (sigma0**2 * sigma1**2)
K2 /= sigma1**4
K3 /= sigma1**4
sigma0 /= sigma0
sigma1 /= sigma1
#Return the array
return np.array([sigma0,sigma1,S0,S1,S2,K0,K1,K2,K3])
def powerSpectrum(self,l_edges):
"""
Measures the power spectrum of the convergence map at the multipole moments specified in the input
:param l_edges: Multipole bin edges
:type l_edges: array
:returns: tuple -- (l -- array,Pl -- array) = (multipole moments, power spectrum at multipole moments)
:raises: AssertionError if l_edges are not provided
>>> test_map = ConvergenceMap.load("map.fit")
>>> l_edges = np.arange(200.0,5000.0,200.0)
>>> l,Pl = test_map.powerSpectrum(l_edges)
"""
assert not self._masked,"Power spectrum calculation for masked maps is not allowed yet!"
assert l_edges is not None
if self.side_angle.unit.physical_type=="length":
raise NotImplementedError("Power spectrum measurement not implemented yet if side physical unit is length!")
l = 0.5*(l_edges[:-1] + l_edges[1:])
#Calculate the Fourier transform of the map with numpy FFT
ft_map = rfft2(self.data)
#Compute the power spectrum with the C backend implementation
power_spectrum = _topology.rfft2_azimuthal(ft_map,ft_map,self.side_angle.to(deg).value,l_edges)
#Output the power spectrum
return l,power_spectrum
def cross(self,other,statistic="power_spectrum",**kwargs):
"""
Measures a cross statistic between maps
:param other: The other convergence map
:type other: ConvergenceMap instance
:param statistic: the cross statistic to measure (default is 'power_spectrum')
:type statistic: string or callable
:param kwargs: the keyword arguments are passed to the statistic (when callable)
:type kwargs: dict.
:returns: tuple -- (l -- array,Pl -- array) = (multipole moments, cross power spectrum at multipole moments) if the statistic is the power spectrum, otherwise whatever statistic() returns on call
:raises: AssertionError if the other map has not the same shape as the input one
>>> test_map = ConvergenceMap.load("map.fit",format=load_fits_default_convergence)
>>> other_map = ConvergenceMap.load("map2.fit",format=load_fits_default_convergence)
>>> l_edges = np.arange(200.0,5000.0,200.0)
>>> l,Pl = test_map.cross(other_map,l_edges=l_edges)
"""
#The other map must be of the same type as this one
assert isinstance(other,self.__class__)
if statistic=="power_spectrum":
assert not (self._masked or other._masked),"Power spectrum calculation for masked maps is not allowed yet!"
if self.side_angle.unit.physical_type=="length":
raise NotImplementedError("Power spectrum measurement not implemented yet if side physical unit is length!")
assert "l_edges" in kwargs.keys()
l_edges = kwargs["l_edges"]
assert l_edges is not None
l = 0.5*(l_edges[:-1] + l_edges[1:])
assert self.side_angle == other.side_angle
assert self.data.shape == other.data.shape
#Calculate the Fourier transform of the maps with numpy FFTs
ft_map1 = rfft2(self.data)
ft_map2 = rfft2(other.data)
#Compute the cross power spectrum with the C backend implementation
cross_power_spectrum = _topology.rfft2_azimuthal(ft_map1,ft_map2,self.side_angle.to(deg).value,l_edges)
#Output the cross power spectrum
return l,cross_power_spectrum
else:
return statistic(self,other,**kwargs)
def smooth(self,scale_angle,kind="gaussian",inplace=False,**kwargs):
"""
Performs a smoothing operation on the convergence map
:param scale_angle: size of the smoothing kernel (must have units)
:type scale_angle: float.
:param kind: type of smoothing to be performed (only implemented gaussian so far)
:type kind: str.
:param inplace: if set to True performs the smoothing in place overwriting the old convergence map
:type inplace: bool.
:param kwargs: the keyword arguments are passed to the filter function
:type kwargs: dict.
:returns: ConvergenceMap instance (or None if inplace is True)
"""
assert not self._masked,"You cannot smooth a masked convergence map!!"
assert kind == "gaussian","Only gaussian smoothing implemented!!"
assert scale_angle.unit.physical_type==self.side_angle.unit.physical_type
#Compute the smoothing scale in pixel units
smoothing_scale_pixel = (scale_angle * self.data.shape[0] / (self.side_angle)).decompose().value
#Perform the smoothing
smoothed_data = filters.gaussian_filter(self.data,smoothing_scale_pixel,**kwargs)
#Return the result
if inplace:
self.data = smoothed_data
#If gradient attributes are present, recompute them
if (hasattr(self,"gradient_x") or hasattr(self,"gradient_y")):
self.gradient()
if (hasattr(self,"hessian_xx") or hasattr(self,"hessian_yy") or hasattr(self,"hessian_xy")):
self.hessian()
return None
else:
#Copy the extra attributes as well
kwargs = dict()
for attribute in self._extra_attributes:
kwargs[attribute] = getattr(self,attribute)
return self.__class__(smoothed_data,self.side_angle,masked=self._masked,**kwargs)
def __add__(self,rhs):
"""
Defines addition operator between ConvergenceMap instances; the convergence values are summed
:returns: ConvergenceMap instance with the result of the sum
:raises: AssertionError if the operation cannot be performed
"""
if isinstance(rhs,self.__class__):
assert self.side_angle == rhs.side_angle
assert self.data.shape == rhs.data.shape
new_data = self.data + rhs.data
elif type(rhs) == np.float:
new_data = self.data + rhs
elif type(rhs) == np.ndarray:
assert rhs.shape == self.data.shape
new_data = self.data + rhs
else:
raise TypeError("The right hand side cannot be added!!")
#Copy the extra attributes as well
kwargs = dict()
for attribute in self._extra_attributes:
kwargs[attribute] = getattr(self,attribute)
return self.__class__(new_data,self.side_angle,masked=self._masked,**kwargs)
def __mul__(self,rhs):
"""
Defines the multiplication operator between ConvergenceMap instances; the convergence values are multiplied (for example the rhs can be a mask...)
:returns: ConvergenceMap instances with the result of the multiplication
:raises: AssertionError if the operation cannot be performed
"""
if isinstance(rhs,self.__class__):
assert self.side_angle == rhs.side_angle
assert self.data.shape == rhs.data.shape
new_data = self.data * rhs.data
elif type(rhs) == np.float:
new_data = self.data * rhs
elif type(rhs) == np.ndarray:
assert rhs.shape == self.data.shape
new_data = self.data * rhs
else:
raise TypeError("Cannot multiply by the right hand side!!")
#Copy the extra attributes as well
kwargs = dict()
for attribute in self._extra_attributes:
kwargs[attribute] = getattr(self,attribute)
return self.__class__(new_data,self.side_angle,masked=self._masked,**kwargs)
###############################################
##########ConvergenceMap class#################
###############################################
[docs]class ConvergenceMap(Spin0):
"""
A class that handles 2D convergence maps and allows to compute their topological descriptors (power spectrum, peak counts, minkowski functionals)
>>> from lenstools import ConvergenceMap
>>> from lenstools.defaults import load_fits_default_convergence
>>> test_map = ConvergenceMap.load("map.fit",format=load_fits_default_convergence)
>>> imshow(test_map.data)
"""
###############################################
###################Mask class##################
###############################################
[docs]class Mask(ConvergenceMap):
"""
A class that handles the convergence masked regions
"""
def __init__(self,data,angle,masked=False):
super(Mask,self).__init__(data,angle,masked)
assert len(self.data[self.data==0]) + len(self.data[self.data==1]) == reduce(mul,self.data.shape),"The mask must be made of 0 and 1 only!"
self._masked_fraction = len(self.data[self.data==0]) / reduce(mul,self.data.shape)
@property
def maskedFraction(self):
"""
Computes the fraction of masked pixels
"""
return self._masked_fraction
def maskBoundaries(self):
raise AttributeError("Can only call this method on a ConvergenceMap instance!")
@property
def boundary(self):
raise AttributeError("This property is only defined on ConvergenceMap instances!")