Source code for pynbody.analysis.halo

"""
Functions for calculating crucial properties of halos, such as their center, shape, and virial radius.

Also contains functions for transforming the simulation so that a given halo is centered.


"""

import functools
import logging
import math
import operator
import warnings

import numpy as np

from .. import array, config, filt, transformation, units, util
from . import _com, cosmology, profile

logger = logging.getLogger('pynbody.analysis.halo')


[docs] def shrink_sphere_center(sim, r=None, shrink_factor=0.7, min_particles=100, num_threads = None, particles_for_velocity = 0, families_for_velocity = ['dm', 'star']): """ Return the center according to the shrinking-sphere method of Power et al (2003) Most users will want to use the higher-level :func:`center` function, which actually performs the centering operation. This function is a lower-level interface that calculates the center but does not move the particles. Parameters ---------- sim : SimSnap The simulation snapshot to center r : float | str, optional Initial search radius. If None, a rough estimate is used. shrink_factor : float, optional The amount to shrink the search radius by on each iteration min_particles : int, optional Minimum number of particles within the search radius. When this number is reached, the search is complete. num_threads : int, optional Number of threads to use for the calculation. If None, the number of threads is taken from the configuration. particles_for_velocity : int, optional If > min_particles, a velocity centre is calculated when the number of particles falls below this threshold, and returned. families_for_velocity : list, optional The families to use for the velocity centering. Default is ['dm', 'star'], because gas particles may be involved in violent outflows making them a risky choice for velocity centering. Returns ------- com : SimArray The center of mass of the final sphere vel : SimArray The center of mass velocity of the final sphere. Only returned if particles_for_velocity > min_particles. """ if num_threads is None: num_threads = config['number_of_threads'] if r is None: # use rough estimate for a maximum radius # results will be insensitive to the exact value chosen r = (sim["x"].max() - sim["x"].min()) / 2 elif isinstance(r, str) or issubclass(r.__class__, units.UnitBase): if isinstance(r, str): r = units.Unit(r) r = r.in_units(sim['pos'].units, **sim.conversion_context()) mass = np.asarray(sim['mass'], dtype='double') pos = np.asarray(sim['pos'], dtype='double') R = _com.shrink_sphere_center(pos, mass, min_particles, particles_for_velocity, shrink_factor, r, num_threads) com, final_radius, velocity_radius = R logger.info("Radius for velocity measurement = %s", velocity_radius) logger.info("Final SSC=%s", com) com_to_return = array.SimArray(com, sim['pos'].units) if particles_for_velocity > min_particles: fam_filter = functools.reduce(operator.or_, (filt.FamilyFilter(f) for f in families_for_velocity)) final_sphere = sim[filt.Sphere(velocity_radius, com) & fam_filter] logger.info("Particles in sphere = %d", len(final_sphere)) if len(final_sphere) == 0: warnings.warn("Final sphere is empty; cannot return a velocity. This probably implies something is " "wrong with the position centre too.", RuntimeWarning) return com_to_return, np.array([0., 0., 0.]) else: vel = final_sphere.mean_by_mass('vel') logger.info("Final velocity=%s", vel) return com_to_return, vel else: return com_to_return
[docs] def virial_radius(sim, cen=None, overden=178, r_max=None, rho_def='matter'): """Calculate the virial radius of the halo centered on the given coordinates. The default is here defined by the sphere centered on cen which contains a mean density of overden * rho_M_0 * (1+z)^3. Parameters ---------- sim : SimSnap The simulation snapshot for which to calculate the virial radius. cen : array_like, optional The center of the halo. If None, the halo is assumed to be already centered. overden : float, optional The overdensity of the halo. Default is 178. r_max : float, optional The maximum radius to search for the virial radius. If None, the maximum radius of any particle in *sim* is used rho_def : str, optional The density definition to use. Default is 'matter', which uses the matter density at the redshift of the simulation. Alternatively, 'critical' can be used for the critical density at this redshift. Returns ------- float The virial radius of the halo in the position units of *sim*. """ if r_max is None: r_max = (sim["x"].max() - sim["x"].min()) else: if cen is not None: sim = sim[filt.Sphere(r_max, cen)] else: sim = sim[filt.Sphere(r_max)] r_min = 0.0 if rho_def == 'matter': ref_density = sim.properties["omegaM0"] * cosmology.rho_crit(sim, z=0) * (1.0 + sim.properties["z"]) ** 3 elif rho_def == 'critical': ref_density = cosmology.rho_crit(sim, z=sim.properties["z"]) else: raise ValueError(rho_def + "is not a valid definition for the reference density") target_rho = overden * ref_density logger.info("target_rho=%s", target_rho) if cen is not None: transform = sim.translate(-np.asanyarray(cen)) else: transform = transformation.NullTransformation(sim) with transform: sim = sim[filt.Sphere(r_max)] with sim.immediate_mode: mass_ar = np.asarray(sim['mass']) r_ar = np.asarray(sim['r']) rho = lambda r: util.sum_if_lt(mass_ar,r_ar,r)/(4. * math.pi * (r ** 3) / 3) result = util.bisect(r_min, r_max, lambda r: target_rho - rho(r), epsilon=0, eta=1.e-3 * target_rho) return result
def _potential_minimum(sim): i = sim["phi"].argmin() return sim["pos"][i].copy()
[docs] def hybrid_center(sim, r='3 kpc', **kwargs): """Determine the center of the halo by finding the shrink-sphere-center near the potential minimum Most users will want to use the general :func:`center` function, which actually performs the centering operation. This function is a lower-level interface that calculates the center but does not move the particles. Parameters ---------- sim : SimSnap The simulation snapshot of which to find the center r : float | str, optional Radius from the potential minimum to search for the center. Default is 3 kpc. Remaining parameters are passed onto :func:`shrink_sphere_center`. Returns ------- com : SimArray The center of mass of the final sphere vel : SimArray The center of mass velocity of the final sphere. Only returned if particles_for_velocity > min_particles. """ try: cen_a = _potential_minimum(sim) except KeyError: cen_a = center_of_mass(sim) return shrink_sphere_center(sim[filt.Sphere(r, cen_a)], **kwargs)
[docs] def vel_center(sim, cen_size="1 kpc", return_cen=False, move_all=True, **kwargs): """Recenter the snapshot on the center of mass velocity inside a sphere of specified radius Attempts to use the star particles, falling back to gas or dark matter if necessary. Parameters ---------- sim : SimSnap The simulation snapshot to center cen_size : str or float, optional The size of the sphere to use for the velocity centering. Default is 1 kpc. return_cen : bool, optional If True, only return the velocity center without actually moving the snapshot. Default is False. move_all : bool, optional If True (default), move the entire snapshot. Otherwise only move the particles in the halo passed in. retcen : bool, optional Deprecated alias for return_cen Returns ------- Transformation | SimArray Normally, a transformation object that can be used to revert the transformation. However, if return_cen is True, a SimArray containing the velocity center coordinates is returned instead, and the snapshot is not transformed. """ if "retcen" in kwargs: return_cen = kwargs.pop("retcen") warnings.warn("The 'retcen' keyword is deprecated. Use 'return_cen' instead.", DeprecationWarning) logger.info("Finding halo velocity center...") if move_all: target = sim.ancestor else: target = sim cen = sim.star[filt.Sphere(cen_size)] if len(cen) < 5: # fall-back to DM cen = sim.dm[filt.Sphere(cen_size)] if len(cen) < 5: # fall-back to gas cen = sim.gas[filt.Sphere(cen_size)] if len(cen) < 5: # very weird snapshot, or mis-centering! raise ValueError("Insufficient particles around center to get velocity") vcen = (cen['vel'].transpose() * cen['mass']).sum(axis=1) / \ cen['mass'].sum() vcen.units = cen['vel'].units logger.info("vcen=%s", vcen) if return_cen: return vcen else: return target.offset_velocity(-vcen)
[docs] def center(sim, mode=None, return_cen=False, with_velocity=True, cen_size="1 kpc", cen_num_particles=10000, move_all=True, wrap=False, **kwargs): """Transform the ancestor snapshot so that the provided snapshot is centred The centering scheme is determined by the ``mode`` keyword. As well as the position, the velocity can also be centred. The following centring modes are available: * *pot*: potential minimum * *com*: center of mass * *ssc*: shrink sphere center * *hyb*: for most halos, returns the same as ssc, but works faster by starting iteration near potential minimum Before the main centring routine is called, the snapshot is translated so that the halo is already near the origin. The box is then wrapped so that halos on the edge of the box are handled correctly. Parameters ---------- sim : SimSnap The simulation snapshot from which to derive a centre. The ancestor snapshot is then transformed. mode : str or function, optional The method to use to determine the centre. If None, the default is taken from the configuration. Accepted values are discussed above. A function returning the centre, or a pair of centres (position and velocity) can also be passed. cen_size : str or float, optional The size of the sphere to use for the velocity centering. Default is 1 kpc. Note that this is only used if velocity centring is requested but the underlying method does not return a velocity centre. For example, if using the 'ssc' method, the cen_num_particles keyword should be used instead. cen_num_particles : int, optional The number of particles to use for the velocity centering. Default is 5000. This is passed to the 'ssc' method, which then finds the sphere with approximately this number of particles in it for the velocity centering. with_velocity: bool, optional If True, also center the velocity. Default is True. return_cen: bool, optional If True, only return the center without actually centering the snapshot. Default is False. move_all: bool, optional If True (default), move the entire snapshot. Otherwise only move the particles in the halo/subsnap passed into this function. vel: bool, optional Deprecated alias for with_velocity. Default is True. retcen: bool, optional If True, only return the center without centering the snapshot. Default is False. Returns ------- Transformation | SimArray Normally, a transformation object that can be used to revert the transformation. However, if return_cen is True, a SimArray containing the center coordinates is returned instead, and the snapshot is not transformed. """ if 'vel' in kwargs: warnings.warn("The 'vel' keyword is deprecated. Use 'with_velocity' instead.", DeprecationWarning) with_velocity = kwargs.pop('vel') if 'retcen' in kwargs: warnings.warn("The 'retcen' keyword is deprecated. Use 'return_cen' instead.", DeprecationWarning) return_cen = kwargs.pop('retcen') global config if mode is None: mode = config['centering-scheme'] try: fn = {'pot': _potential_minimum, 'com': lambda s : s.mean_by_mass('pos'), 'ssc': functools.partial(shrink_sphere_center, particles_for_velocity=cen_num_particles), 'hyb': hybrid_center}[mode] except KeyError: fn = mode if move_all: target = sim.ancestor else: target = sim if wrap: # centre on something within the halo and wrap initial_offset = -sim['pos'][0] transform = target.translate(initial_offset) target.wrap() else: transform = transformation.NullTransformation(target) initial_offset = np.array([0., 0., 0.]) try: centre = fn(sim, **kwargs) if len(centre) == 2: # implies we have a velocity centre as well centre, vel_centre = centre else: vel_centre = None if return_cen: transform.revert() return centre - initial_offset transform = transform.translate(-centre) if with_velocity: if vel_centre is None : vel_centre = vel_center(sim, cen_size=cen_size, return_cen=True) logger.info("vel_centre=%s", vel_centre) transform = transform.offset_velocity(-vel_centre) except: transform.revert() raise return transform
[docs] @util.deprecated("halo_shape is deprecated. Use shape instead.") def halo_shape(sim, N=100, rin=None, rout=None, bins='equal'): """Deprecated wrapper around :func:`shape`, for backwards compatibility. The halo must be pre-centred, e.g. using :func:`center`. Caution is advised when assigning large number of bins and radial ranges with many particles, as the algorithm becomes very slow. Parameters ---------- N : int The number of homeoidal shells to consider. Shells with few particles will take longer to fit. rin : float The minimum radial bin in units of sim['pos']. By default this is taken as rout/1000. Note that this applies to axis a, so particles within this radius may still be included within homeoidal shells. rout : float The maximum radial bin in units of sim['pos']. By default this is taken as the largest radial value in the halo particle distribution. bins : str The spacing scheme for the homeoidal shell bins. 'equal' initialises radial bins with equal numbers of particles, with the exception of the final bin which will accomodate remainders. This number is not necessarily maintained during fitting. 'log' and 'lin' initialise bins with logarithmic and linear radial spacing. Returns ------- rbin : SimArray The radial bins used for the fitting. ba : array The axial ratio b/a as a function of radius. ca : array The axial ratio c/a as a function of radius. angle : array The angle of the a-direction with respect to the x-axis as a function of radius. Es : array The rotation matrices for each shell. """ angle = lambda E: np.arccos(abs(E[:,0,0])) rbin, axis_lengths, num_particles, rotation_matrices = shape(sim.dm, nbins=N, rmin=rin, rmax=rout, bins=bins) ba = axis_lengths[:, 1] / axis_lengths[:, 0] ca = axis_lengths[:, 2] / axis_lengths[:, 0] return rbin, ba.view(np.ndarray), ca.view(np.ndarray), angle(rotation_matrices), rotation_matrices
[docs] def shape(sim, nbins=100, rmin=None, rmax=None, bins='equal', ndim=3, max_iterations=10, tol=1e-3, justify=False): """Calculates the shape of the provided particles in homeoidal shells, over a range of nbins radii. Homeoidal shells maintain a fixed area (ndim=2) or volume (ndim=3). Note that all provided particles are used in calculating the shape, so e.g. to measure dark matter halo shape from a halo with baryons, you should pass only the dark matter particles. The simulation must be pre-centred, e.g. using :func:`center`. The algorithm is sensitive to substructure, which should ideally be removed. Caution is advised when assigning large number of bins and radial ranges with many particles, as the algorithm becomes very slow. Parameters ---------- nbins : int The number of homeoidal shells to consider. Shells with few particles will take longer to fit. rmin : float The minimum radial bin in units of sim['pos']. By default this is taken as rout/1000. Note that this applies to axis a, so particles within this radius may still be included within homeoidal shells. rmax : float The maximum radial bin in units of sim['pos']. By default this is taken as the largest radial value in the halo particle distribution. bins : str The spacing scheme for the homeoidal shell bins. 'equal' initialises radial bins with equal numbers of particles, with the exception of the final bin which will accomodate remainders. This number is not necessarily maintained during fitting. 'log' and 'lin' initialise bins with logarithmic and linear radial spacing. ndim : int The number of dimensions to consider; either 2 or 3 (default). If ndim=2, the shape is calculated in the x-y plane. If using ndim=2, you may wish to make a cut in the z direction before passing the particles to this routine (e.g. using :class:`pynbody.filt.BandPass`). max_iterations : int The maximum number of shape calculations (default 10). Fewer iterations will result in a speed-up, but with a bias towards spheroidal results. tol : float Convergence criterion for the shape calculation. Convergence is achieved when the axial ratios have a fractional change <=tol between iterations. justify : bool Align the rotation matrix directions such that they point in a single consistent direction aligned with the overall halo shape. This can be useful if working with slerps. Returns ------- rbin : SimArray The radial bins used for the fitting axis_lengths : SimArray A nbins x ndim array containing the axis lengths of the ellipsoids in each shell num_particles : np.ndarray The number of particles within each bin rotation_matrices : np.ndarray The rotation matrices for each shell """ # Sanitise inputs: if (rmax == None): rmax = sim['r'].max() if (rmin == None): rmin = rmax / 1E3 assert ndim in [2, 3] assert max_iterations > 0 assert tol > 0 assert rmin >= 0 assert rmax > rmin assert nbins > 0 if ndim == 2: assert np.sum((sim['rxy'] >= rmin) & (sim['rxy'] < rmax)) > nbins * 2 elif ndim == 3: assert np.sum((sim['r'] >= rmin) & (sim['r'] < rmax)) > nbins * 2 if bins not in ['equal', 'log', 'lin']: bins = 'equal' # Handy 90 degree rotation matrices: Rx = np.array([[1, 0, 0], [0, 0, -1], [0, 1, 0]]) Ry = np.array([[0, 0, 1], [0, 1, 0], [-1, 0, 0]]) Rz = np.array([[0, -1, 0], [1, 0, 0], [0, 0, 1]]) # -----------------------------FUNCTIONS----------------------------- sn = lambda r, N: np.append([r[i * int(len(r) / N):(1 + i) * int(len(r) / N)][0] \ for i in range(N)], r[-1]) # General equation for an ellipse/ellipsoid: def Ellipsoid(pos, a, R): x = np.dot(R.T, pos.T) return np.sum(np.divide(x.T, a) ** 2, axis=1) # Define moment of inertia tensor: def MoI(r, m, ndim=3): return np.array([[np.sum(m * r[:, i] * r[:, j]) for j in range(ndim)] for i in range(ndim)]) # Calculate the shape in a single shell: def shell_shape(r, pos, mass, a, R, r_range, ndim=3): # Find contents of homoeoidal shell: mult = r_range / np.mean(a) in_shell = (r > min(a) * mult[0]) & (r < max(a) * mult[1]) pos, mass = pos[in_shell], mass[in_shell] inner = Ellipsoid(pos, a * mult[0], R) outer = Ellipsoid(pos, a * mult[1], R) in_ellipse = (inner > 1) & (outer < 1) ellipse_pos, ellipse_mass = pos[in_ellipse], mass[in_ellipse] # End if there is no data in range: if not len(ellipse_mass): return a, R, np.sum(in_ellipse) # Calculate shape tensor & diagonalise: D = list(np.linalg.eigh(MoI(ellipse_pos, ellipse_mass, ndim) / np.sum(ellipse_mass))) # Rescale axis ratios to maintain constant ellipsoidal volume: R2 = np.array(D[1]) a2 = np.sqrt(abs(D[0]) * ndim) div = (np.prod(a) / np.prod(a2)) ** (1 / float(ndim)) a2 *= div return a2, R2, np.sum(in_ellipse) # Re-align rotation matrix: def realign(R, a, ndim): if ndim == 3: if a[0] > a[1] > a[2] < a[0]: pass # abc elif a[0] > a[1] < a[2] < a[0]: R = np.dot(R, Rx) # acb elif a[0] < a[1] > a[2] < a[0]: R = np.dot(R, Rz) # bac elif a[0] < a[1] > a[2] > a[0]: R = np.dot(np.dot(R, Rx), Ry) # bca elif a[0] > a[1] < a[2] > a[0]: R = np.dot(np.dot(R, Rx), Rz) # cab elif a[0] < a[1] < a[2] > a[0]: R = np.dot(R, Ry) # cba elif ndim == 2: if a[0] > a[1]: pass # ab elif a[0] < a[1]: R = np.dot(R, Rz[:2, :2]) # ba return R # Calculate the angle between two vectors: def angle(a, b): return np.arccos(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) # Flip x,y,z axes of R2 if they provide a better alignment with R1. def flip_axes(R1, R2): for i in range(len(R1)): if angle(R1[:, i], -R2[:, i]) < angle(R1[:, i], R2[:, i]): R2[:, i] *= -1 return R2 # -----------------------------FUNCTIONS----------------------------- # Set up binning: r = np.array(sim['r']) if ndim == 3 else np.array(sim['rxy']) pos = np.array(sim['pos'])[:, :ndim] mass = np.array(sim['mass']) if (bins == 'equal'): # Bins contain equal number of particles full_bins = sn(np.sort(r[(r >= rmin) & (r <= rmax)]), nbins * 2) bin_edges = full_bins[0:nbins * 2 + 1:2] rbins = full_bins[1:nbins * 2 + 1:2] elif (bins == 'log'): # Bins are logarithmically spaced bin_edges = np.logspace(np.log10(rmin), np.log10(rmax), nbins + 1) rbins = np.sqrt(bin_edges[:-1] * bin_edges[1:]) elif (bins == 'lin'): # Bins are linearly spaced bin_edges = np.linspace(rmin, rmax, nbins + 1) rbins = 0.5 * (bin_edges[:-1] + bin_edges[1:]) # Initialise the shape arrays: rbins = array.SimArray(rbins, sim['pos'].units) axis_lengths = array.SimArray(np.zeros([nbins, ndim]), sim['pos'].units) N_in_bin = np.zeros(nbins).astype('int') rotations = [0] * nbins # Loop over all radial bins: for i in range(nbins): # Initial spherical shell: a = np.ones(ndim) * rbins[i] a2 = np.zeros(ndim) a2[0] = np.inf R = np.identity(ndim) # Iterate shape estimate until a convergence criterion is met: iteration_counter = 0 while ((np.abs(a[1] / a[0] - np.sort(a2)[-2] / max(a2)) > tol) & \ (np.abs(a[-1] / a[0] - min(a2) / max(a2)) > tol)) & \ (iteration_counter < max_iterations): a2 = a.copy() a, R, N = shell_shape(r, pos, mass, a, R, bin_edges[[i, i + 1]], ndim) iteration_counter += 1 # Adjust orientation to match axis ratio order: R = realign(R, a, ndim) # Ensure consistent coordinate system: if np.sign(np.linalg.det(R)) == -1: R[:, 1] *= -1 # Update profile arrays: a = np.flip(np.sort(a)) axis_lengths[i], rotations[i], N_in_bin[i] = a, R, N # Ensure the axis vectors point in a consistent direction: if justify: _, _, _, R_global = shape(sim, nbins=1, rmin=rmin, rmax=rmax, ndim=ndim) rotations = np.array([flip_axes(R_global, i) for i in rotations]) return rbins, np.squeeze(axis_lengths.T).T, N_in_bin, np.squeeze(rotations)