Source code for randomfield.lensing

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Utilities for weak lensing calculations.
"""
from __future__ import print_function, division

import numpy as np

import scipy.interpolate
import scipy.integrate
import scipy.special

import astropy.constants
import astropy.units

import powertools


[docs]def calculate_lensing_weights(cosmology, z, DC=None, DA=None, scaled_by_h=True): """ Calculate the geometric weights for lensing. We adopt the following convention for the lensing weight function: .. math:: \omega_E(D, D_{src}) = \\frac{3}{2} (H_0/c)^2 \Omega_m (1 + z(D)) \\frac{D_A(D_{src}-D)}{D_A(D_{src})} \Theta(D_{src}-D) where :math:`D` and :math:`D_{src}` are the comoving distance to the lensing mass and source galaxy, respectively, and :math:`z` is the lensing mass redshift. The weight dimensions are inverse length squared, in units of either Mpc/h or Mpc (depending on the ``scaled_by_h`` parameter). The combination: .. math:: W_{EE}(D, D_{src}) = \omega_E(D, D_{src})^2 D_A(D)^3 determines the geometric weight (i.e., independent of the 2D wavenumber :math:`\ell`) of a mass at :math:`D` lensing a source at :math:`D_{src}`. The limiting values are: .. math:: W_{EE}(0, D_{src}) = W_{EE}(D_{src}, D_{src}) = 0 and the function is broadly peaked with a maximum near :math:`z_{lens} = z_{src}/2` for realistic cosmologies. Use :func:`calculate_shear_power` to calculate the lensing shear power associated with the returned weight functions. Parameters ---------- cosmology : instance of :class:`astropy.cosmology.FLRW` Background cosmology that specifies the values of Omega_k, Omega_m and (if ``scaled_by_h`` is False) H0 to use. If ``DA`` is not provided, the cosmology is also used to calculate ``DA`` values from the input ``DC`` values. z : numpy array 1D array of redshifts to use for the grid of source and lens positions in the calculated weights. Values must be increasing but do not need to be equally spaced. DC : numpy array, optional 1D array of :meth:`comoving distances <astropy.cosmology.FLRW.comoving_distance>` along the line of sight corresponding to each redshift. Units should be either Mpc (when ``scaled_by_h`` is False) or Mpc/h (``scaled_by_h`` is True). Will be calculated from the cosmology if not specified. DA : numpy array, optional 1D array of :meth:`comoving transverse distances <astropy.cosmology.FLRW.comoving_transverse_distance>` corresponding to each redshift. Units should be either Mpc (when ``scaled_by_h`` is False) or Mpc/h (``scaled_by_h`` is True). Will be calculated from the cosmology if not specified. scaled_by_h : bool, optional Specifies the units of the input ``DC`` and ``DA`` arrays and the output weights. When True, inputs must be in Mpc/h and the result is in h/Mpc. When False, inputs must be in Mpc and the result is in 1/Mpc. Returns ------- numpy.ndarray Two dimensional array with shape (nz, nz) where nz is the number of input redshifts. The value ``out[i,j]`` gives the weight function for a source at redshift ``z[i]`` as a function of lensing redshift ``z[j]`` with dimensions of inverse length (h/Mpc or 1/Mpc, depending on the value of ``scaled_by_h``). Note that all values with j > i are zero, since they correspond to sources in front of the lensing mass. """ try: assert len(z.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Redshifts must be a 1D array.') if not np.array_equal(np.unique(z), z) or z[0] < 0: raise ValueError('Redshifts must be increasing values >= 0.') n = z.size # Calculate distances if necessary. try: scale = cosmology.h if scaled_by_h else 1. if DC is None: DC = (cosmology.comoving_distance(z) .to(astropy.units.Mpc).value * scale) if DA is None: DA = (cosmology.comoving_transverse_distance(z) .to(astropy.units.Mpc).value * scale) except AttributeError: raise ValueError('Invalid cosmology: cannot calculate distances.') if DC.shape != z.shape: raise ValueError('Shapes not compatible: z and DC.') if DA.shape != z.shape: raise ValueError('Shapes not compatible: z and DA.') # Calculate the dimensioned curvature constant K in units of (Mpc/h)**-2. clight_km_s = astropy.constants.c.to( astropy.units.km / astropy.units.s).value H0 = 100. if scaled_by_h else 100. * cosmology.h K = -cosmology.Ok0 * (H0 / clight_km_s)**2 # Tabulate cosK(D), which is dimensionless. if K < 0: sqrt_abs_K = np.sqrt(-K) cosK = np.cosh(sqrt_abs_K * DC) elif K > 0: sqrt_K = np.sqrt(K) cosK = np.cos(sqrt_K * DC) else: cosK = np.ones_like(DA, dtype=float) # Tabulate DA(D) / DA(Dsrc) on an n x n grid, using the value 1 when D=Dsrc. DAratio = np.empty((n, n), dtype=float) DAratio[0] = 1. DAratio[1:] = DA / DA[1:, np.newaxis] # Tabulate q(D,Dsrc) * DA(D) = cosK(D) - cosK(Dsrc) * DA(D) / DA(Dsrc) # on an n x n grid, in units of (Mpc/h)**-1. weights = cosK - cosK[:,np.newaxis] * DAratio # Zero entries < 0 where D > Dsrc. weights[weights < 0] = 0 # Multiply by 1 + z(D) weights *= 1 + z # Multiply by the constant 3/2 (H0/c)**2 Omega_m with units of # (h/Mpc)**2 or (1/Mpc)**2, depending on scaled_by_h. weights *= 1.5 * (H0 / clight_km_s)**2 * cosmology.Om0 return weights
[docs]def tabulate_3D_variances(ell, DA, growth, power): """ Tabulate 3D matter-power contributions to shear power variances. This function is defined as: .. math:: V(\ell, D_A) = \\frac{\pi}{\ell} G(D)^2 \Delta^2_\delta(k=\ell/D_A(D), z=0) where :math:`D` is the comoving distance corresponding to the comoving transverse distance :math:`D_A` and :math:`\ell` is a 2D wavenumber. Use :func:`calculate_shear_power` to calculate the lensing shear power associated with the returned 3D matter-power contributions. Parameters ---------- ell : numpy.ndarray 1D array of 2D wavenumbers where shear power variances should be tabulated. Values must be positive and increasing, but do not need to be equally spaced. DA : numpy.ndarray 1D array of :meth:`comoving transverse distances <astropy.cosmology.FLRW.comoving_transverse_distance>` :math:`D_A(z)` where shear power variances should be tabulated. Values must be positive and increasing, but do not need to be equally spaced. Values can be in either Mpc/h or Mpc, but must be consistent with the power spectrum normalization. growth : numpy.ndarray 1D array of growth function values :math:`G(z)` corresponding to each input :math:`DA(z)` value. Must have the same number of elements as DA. Can be calculated using :func:`randomfield.cosmotools.get_growth_function`. power : structured numpy.ndarray Power spectrum to use, which meets the criteria tested by :func:`randomfield.powertools.validate_power`. Can be calculated using :func:`randomfield.cosmotools.calculate_power` if the optional ``classy`` package is installed. Values of k and P(k) can either be in Mpc/h or Mpc units, but must be consistent with the values of DA. Returns ------- numpy.ndarray Two dimensional array with shape (nell, nDA) where nell = len(ell) and nDA = len(DA). The value ``out[i,j]`` gives the contribution to the shear variance :math:`\Delta^2_{EE}` at 2D wavenumber ``ell[i]`` from lensing by mass inhomogeneities at comoving transverse distance ``DA[j]``. The output is dimensionless. """ try: assert len(ell.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Wavenumbers ell must be a 1D array.') if not np.array_equal(np.unique(ell), ell) or ell[0] <= 0: raise ValueError('Wavenumbers ell must be increasing values > 0.') try: assert len(DA.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Distances DA must be a 1D array.') if not np.array_equal(np.unique(DA), DA) or DA[0] <= 0: raise ValueError('Distances DA must be increasing values > 0.') try: assert DA.shape == growth.shape except (AssertionError, AttributeError): raise ValueError('Growth array must match DA array dimensions.') # Build an interpolator for the dimensionless function # Delta**2(k) = k**3/(2*pi**2) * P(k) that is linear in log10(k). power = powertools.validate_power(power) k_grid = power['k'] Delta2_grid = k_grid**3 / (2 * np.pi**2) * power['Pk'] Delta2 = scipy.interpolate.interp1d(np.log10(k_grid), Delta2_grid, kind='linear', copy=False) # Tabulate a 2D array of k values for each (ell, DA). log10k_of_DA = np.log10(ell[:, np.newaxis] / DA) # Tabulate pi/ell * Delta**2(k) * G(DA)**2 values on this grid. return (np.pi / ell[:, np.newaxis]) * Delta2(log10k_of_DA) * growth**2
[docs]def calculate_shear_power(DC, DA, weights, variances, mode='shear-shear-auto'): """ Calculate the shear power spectrum as a function of source position. The result is given as (X = E,g): .. math:: \Delta^2_{EX}(\ell) = \\frac{\ell^2}{2\pi} C_{EX}(\ell) and calculated as a convolution of the input weight functions :math:`\omega_E(D, D_{src})` and 3D variances :math:`V(\ell, D_A)`. There are three possible calculations, depending on the ``mode`` parameter. Shear-shear auto power is calculated as: .. math:: \Delta^2_{EE}(D_{src}, \ell) = \int_{D_{min}}^{D_{src}} \omega_E(D, D_{src})^2 D_A(D)^3 V(\ell, D_A(D)) dD Shear-shear cross power is calculated as: .. math:: \Delta^2_{EE}(D_1, D_2, \ell) = \int_{D_{min}}^{\min(D_1,D_2)} \omega_E(D, D_1) \omega_E(D, D_2) D_A(D)^3 V(\ell, D_A(D)) dD Shear-galaxy cross power is calculated as: .. math:: \Delta^2_{Eg}(D_{src}, D_g, \ell) = \omega_E(D_g, D_{src}) D_A(D_g)^3 V(\ell, D_A(D_g)) Note that no bias factor is included in the shear-galaxy calculation, so results should be interpreted as :math:`\Delta^2_{Eg} / b_g`. The convolution integrals above are estimated using :func:`Simpson's rule <scipy.integrate.simps>` and finer grids will generally yield more accurate results. Note that the integrals above are truncated at :math:`D_{min} > 0` equal to ``DC[0]``, in order to set a finite upper bound on the 3D wavenumber :math:`k = \ell/D_A`. This truncation procedure effectively ignores any lensing by mass inhomogeneities closer than ``DC[0]``. Reducing :math:`D_{min}` will increase the accuracy of the calculation but also requires either increasing the range of 3D wavenumbers :math:`k` or decreasing the range of 2D wavenumbers :math:`\ell` used to calculate :math:`V(\ell, D_A(D))`. It is the caller's responsibility to ensure that the inputs are all calculated on consistent grids and with consistent units (Mpc or Mpc/h). Parameters ---------- DC : numpy.ndarray 1D array of :meth:`comoving distances <astropy.cosmology.FLRW.comoving_distance>` along the line of sight corresponding to each redshift. Units can be either Mpc or Mpc/h, but must be consistent with how the weights and variances were calculated. DA : numpy.ndarray 1D array of :meth:`comoving transverse distances <astropy.cosmology.FLRW.comoving_transverse_distance>` corresponding to each redshift. Units can be in either Mpc/h or Mpc, but must be consistent with DC and with how the weights and variances were calculated. weights : numpy.ndarray 2D array of geometric lensing weights :math:`\omega_E(D, D_{src})`, normally obtained by calling :func:`calculate_lensing_weights`. Units (Mpc or Mpc/h) must be consistent with those used for ``DC`` and ``variances``. The shape must be (nDC, nDC) where nDC = len(DC). variances : numpy.ndarray 2D array of 3D matter power contributions :math:`V(\ell, D_A)` to the shear variance, normally obtained by calling :func:`tabulate_3D_variances`. Must be tabulated using units that are consistent with those used for ``DC`` and ``weights``. The shape must be (nell, nDC) where nDC = len(DC). mode : str, optional Must be one of 'shear-shear-auto', 'shear-shear-cross', or 'shear-galaxy-cross'. Specifies the type of power spectrum that is calculated and determines the shape of the result. Returns ------- numpy.ndarray For 'shear-shear-auto', the result is a 2D array of lensing power spectra with shape (nDC, nell) where nDC = len(DC) and nell = weights.shape[0]. The value ``out[i,n]`` gives :math:`\Delta^2_{EE}` as a function of ``ell[n]`` for a lensed source at distance ``DC[i]`` (where ``ell`` is the array of 2D wavenumbers used to :func:`tabulate the input variances <tabulate_3D_variances>`). For 'shear-shear-cross', the result is a 3D array of lensing power cross spectra with shape (nDC, nDC, nell). The value ``out[i,j,n]`` gives :math:`\Delta^2_{EE}` as a function of ``ell[n]`` for lensed sources at distances ``DC[i]`` and ``DC[j]``. The result is symmetric in ``i`` and ``j``, and the diagonal ``i = j`` equals the 'shear-shear-auto' result. For 'shear-galaxy-cross', the result is a 3D array of galaxy-lensing cross spectra with shape (nDC, nDC, nell). The value ``out[i,j,n]`` gives :math:`\Delta^2_{Eg}` as a function of ``ell[n]`` for a lensed source at distance ``DC[i]`` and a galaxy at distance ``DC[j]``. """ modes = ('shear-shear-auto', 'shear-shear-cross', 'shear-galaxy-cross') if mode not in modes: raise ValueError('Invalid mode. Pick one of: {0}.' .format(','.join(modes))) try: assert len(DC.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Distances DC must be a 1D array.') if not np.array_equal(np.unique(DC), DC) or DC[0] <= 0: raise ValueError('Distances DC must be increasing values > 0.') nDC = DC.size try: assert len(DA.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Distances DA must be a 1D array.') if DA.size != DC.size: raise ValueError('Distance arrays DC and DA must have same size.') try: assert len(weights.shape) == 2 except (AssertionError, AttributeError): raise ValueError('Weights must be a 2D array.') if weights.shape != (nDC, nDC): raise ValueError('Weights have wrong shape. Expected ({0}, {1}).' .format(nDC, nDC)) try: assert len(variances.shape) == 2 except (AssertionError, AttributeError): raise ValueError('Variances must be a 2D array.') if variances.shape[1] != nDC: raise ValueError('Variances has wrong second dimension. Expected {0}.' .format(nDC)) nell = variances.shape[0] if mode == 'shear-shear-auto': # Initialize the result array. Delta2 = np.empty((nDC, nell), dtype=float) # Loop over lensed source positions. for i in range(nDC): integrand = weights[i]**2 * DA**3 * variances Delta2[i] = scipy.integrate.simps(y=integrand, x=DC, axis=-1) elif mode == 'shear-shear-cross': # Initialize the result array. Delta2 = np.empty((nDC, nDC, nell), dtype=float) # Loop over lensed source positions. for i in range(nDC): for j in range(i+1): integrand = weights[i] * weights[j] * DA**3 * variances Delta2[i, j] = scipy.integrate.simps(y=integrand, x=DC, axis=-1) if i > j: Delta2[j, i] = Delta2[i, j] elif mode == 'shear-galaxy-cross': # Initialize the result array. Delta2 = np.empty((nDC, nDC, nell), dtype=float) # Loop over lensed source positions. for i in range(nDC): Delta2[i] = np.transpose(weights[i] * DA**3 * variances) return Delta2
[docs]def calculate_correlation_function(Delta2, ell, theta, order=0): """ Transform a function of ell into a function of angular separation. The transform is defined as: .. math:: \\xi(\Delta\\theta) = (-1)^{\\nu/2} \int_{\ell_{min}}^{\ell_{max}} \Delta^2(\ell) J_\\nu(\ell \Delta\\theta) \\frac{d\ell}{\ell} where the order :math:`\\nu` should be 0 and 4 for the shear-shear correlations :math:`\\xi_+` and :math:`\\xi_-`, respectively, and 2 for the shear-galaxy correlation :math:`\\xi_{Eg}`. The result is only approximate since the integral above is truncated at the limits :math:`\\ell_{min}` and :math:`\\ell_{max}` corresponding to ``ell[0]`` and ``ell[-1]``, respectively. The resulting finite integral is estimated using :func:`Simpson's rule <scipy.integrate.simps>` (in :math:`\log\ell`) and a finer grid in ``ell`` will generally yield more accurate results. Parameters ---------- Delta2 : numpy.ndarray 3D array of cross power spectra :math:`\Delta^2(z_1,z_2,\ell)`. ell : numpy.ndarray 1D array of 2D wavenumbers :math:`\ell` where the input cross power spectra are tabulated. dtheta : numpy.ndarray 1D array of 2D angular separations :math:`\Delta\\theta` where the output cross correlations should be tabulated. order : int Order :math:`\\nu` of the Bessel function :math:`J_{\\nu}` to use for the transform. Must be 0, 2, or 4. Returns ------- numpy.ndarray Array of cross correlations :math:`\\xi(z_1,z_2,\Delta\\theta)`. The result is only calculated for :math:`z_1 \ge z_2`. If order equals 0 or 4, the result is symmetrized. Otherwise, values for :math:`z_1 < z_2` are returned as zero. """ if order not in (0, 2, 4): raise ValueError('Invalid order. Choose from 0, 2, 4.') try: assert len(ell.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Wavenumbers ell must be a 1D array.') if not np.array_equal(np.unique(ell), ell) or ell[0] <= 0: raise ValueError('Wavenumbers ell must be increasing values > 0.') num_ell = ell.size try: assert len(Delta2.shape) == 3 except (AssertionError, AttributeError): raise ValueError('Delta2 must be a 3D array.') num_z = len(Delta2) if Delta2.shape != (num_z, num_z, num_ell): raise ValueError('Delta2 has wrong shape {0}. Expected {1}.' .format(Delta2.shape, (num_z, num_z, num_ell))) try: assert len(theta.shape) == 1 except (AssertionError, AttributeError): raise ValueError('Separations theta must be a 1D array.') if not np.array_equal(np.unique(theta), theta) or theta[0] <= 0: raise ValueError('Separations theta must be increasing values > 0.') num_theta = theta.size theta_min = 2 * np.pi / ell[-1] theta_max = 2 * np.pi / ell[0] if theta[0] < theta_min or theta[-1] > theta_max: raise ValueError( 'Maximum allowed theta coverage is [{0:.3f}, {1:.3f}] rad' .format(theta_min, theta_max)) # Calculate the kernel as a 2D array where # kernel[i,j] = Jn(ell[j] * theta[i]) kernel = scipy.special.jv(order, ell * theta[:, np.newaxis]) # Tabulate values of log10(ell), which are the integration abscissas. log_ell = np.log10(ell) xi_shape = (num_z, num_z, num_theta) if order == 2: xi = np.zeros(xi_shape, dtype=float) else: xi = np.empty(xi_shape, dtype=float) for i in range(num_z): for j in range(i+1): integrand = Delta2[i, j] * kernel xi[i, j] = scipy.integrate.simps(y=integrand, x=log_ell, axis=-1) if order != 2 and i > j: xi[j, i] = xi[i, j] if order == 2: # Use a unfunc to ensure this happens in place. xi = np.negative(xi, out=xi) return xi