Source code for pynbody.bridge

"""
Tools for connecting different simulation snaphots

The bridge module allows you to take a subview (e.g. a halo) from one snapshot and 'push' it into the other. This is
especially useful if the two snapshots are different time outputs of the same simulation, or closely-related
simulations (e.g. one DMO and one hydro).

Once connected, bridge called on a specific subset of particles in output1 will trace these particles back (or
forward) to the output2, enabling observing a change in their properties, such as position, temperature, etc.

For an introduction on how to use bridges, see the :doc:`tutorial </tutorials/bridge>`.

"""

from __future__ import annotations

import abc
import typing
import warnings
import weakref

import numpy as np

from .. import family, util
from . import _bridge

if typing.TYPE_CHECKING:
    from .. import snapshot


[docs] class AbstractBridge(abc.ABC): """The abstract base class for bridges between two snapshots. For more information see the module documentation for :mod:`pynbody.bridge`, or the :doc:`bridge tutorial </tutorials/bridge>`. """
[docs] def __init__(self, start: snapshot.SimSnap, end: snapshot.SimSnap): self._start = weakref.ref(start) self._end = weakref.ref(end)
@abc.abstractmethod def __call__(self, s: snapshot.SubSnap) -> snapshot.SubSnap: """Map from a ``SubSnap`` at one end of the bridge, to the corresponding ``SubSnap`` at the other end""" def _get_ends(self): start = self._start() end = self._end() if start is None or end is None: raise RuntimeError("Stale reference to start or endpoint") return start, end
[docs] def match_halos(self, halos_1, halos_2, /, threshold=0.5, use_family=None, fill_value=-1, use_halo_indexes=False): """Given halo catalogues, identify the likely halo number in the second catalogue for each halo in the first. For example, if a bridge ``b`` links snapshot ``f1`` (high redshift) to ``f2`` (low redshift) and we perform: >>> h1 = f1.halos() >>> h2 = f2.halos() >>> cat = b.match_halos(h1, h2) then ``cat`` is now a dictionary such that f1.halos()[i] is the major progenitor for``f2.halos()[cat[i]]``, assuming ``cat[i]`` is positive. Halos which cannot be matched because they have too few particles in common give the result ``fill_value``, default -1. This is determined by the given threshold fraction of particles in common (specified by ``threshold``, default 50%) Parameters ---------- halos_1 : pynbody.halo.HaloCatalogue The HaloCatalogue for the first snapshot. halos_2 : pynbody.halo.HaloCatalogue The HaloCatalogue for the second snapshot. threshold : float The minimum fraction of particles in common for a match to be considered. Default is 0.5. use_family : str Only match particles of this family. Default is None, in which case all particles are matched. Setting this to a family name can be useful if matching between two different simulations where the relationship between DM particles is known, but perhaps the relationship between gas particles is not (e.g. a Ramses simulation where actually the gas 'particles' are cells) use_halo_indexes : bool If True, instead of returning a dictionary mapping to halo numbers, return a numpy array that matches halo IDs (which are zero-based indexes into the full list of halos; for more information see the nomenclature guide in the documentation for :class:`~pynbody.halo.HaloCatalogue`. The default is False, so that halo numbers are used throughout. fill_value : int The value to use for halos that cannot be matched. Default is -1. """ self._check_compatible_halo_catalogues(halos_1, halos_2) particles_in_common_matrix = self.count_particles_in_common(halos_1, halos_2, use_family=use_family) highest_commonality_index = particles_in_common_matrix.argmax(axis=1) highest_commonality = particles_in_common_matrix[np.arange(particles_in_common_matrix.shape[0]), highest_commonality_index] # return nan for zero division with np.errstate(divide='ignore', invalid='ignore'): frac_commonality = highest_commonality / particles_in_common_matrix.sum(axis=1) frac_commonality[~np.isfinite(frac_commonality)] = 0 invalid_matches = frac_commonality < threshold if use_halo_indexes: result = highest_commonality_index result[invalid_matches] = fill_value return result else: mapper_1 = halos_1.number_mapper mapper_2 = halos_2.number_mapper sources = mapper_1.index_to_number(np.arange(len(halos_1))) destinations = mapper_2.index_to_number(highest_commonality_index) destinations[invalid_matches] = fill_value return dict(zip(sources, destinations))
def _check_compatible_halo_catalogues(self, halos_1, halos_2): if halos_1.base.ancestor is not self._start().ancestor or halos_2.base.ancestor is not self._end().ancestor: raise ValueError("The HaloCatalogues must be based on the correct snapshots")
[docs] def fuzzy_match_halos(self, halos_1, halos_2, /, threshold=0.01, use_family=None, use_halo_indexes=False) -> dict[int, list[tuple[int, float]]]: """Given halo catalogues, return possible matches within the second catalogue for each halo in the first. For details about the parameters to this function, see the documentation for :func:`match_catalog`. Parameters ---------- halos_1 : pynbody.halo.HaloCatalogue The HaloCatalogue for the source snapshot halos_2 : pynbody.halo.HaloCatalogue The HaloCatalogue for the destination snapshot threshold : float The minimum fraction of particles in common for a match to be considered. Default is 0.01. use_family : str Only match particles of this family. Default is None, in which case all particles are matched. use_halo_indexes : bool If True, instead of returning halo numbers, return halo indexes. Default is False. Returns ------- dict[int, list[tuple[int, float]]] Maps from halo number in the first catalogue to a list of tuples. Each tuple contains the halo number in the second catalogue and the fraction of particles in the first halo that are in the second halo. If ``use_halo_indexes`` is True, the halo numbers are replaced by halo indexes. """ self._check_compatible_halo_catalogues(halos_1, halos_2) def map_index_to_output(index, for_halos): if use_halo_indexes: return index else: return for_halos.number_mapper.index_to_number(index) particles_in_common_matrix = self.count_particles_in_common(halos_1, halos_2, use_family=use_family) output = {} for source_index, row in enumerate(particles_in_common_matrix): if source_index >= len(halos_1): # count_particles_in_common returns a square matrix, so we might run over the end of the first catalogue break this_row_matches = [] row_sum = row.sum() if row_sum > 0: frac_particles_transferred = row / row_sum above_threshold = np.where(frac_particles_transferred > threshold)[0] above_threshold = above_threshold[np.argsort(frac_particles_transferred[above_threshold])[::-1]] for column in above_threshold: this_row_matches.append((map_index_to_output(column, halos_2), frac_particles_transferred[column])) output[map_index_to_output(source_index, halos_1)] = this_row_matches return output
[docs] def count_particles_in_common(self, halos_1, halos_2, /, max_num_halos=None, use_family=None) -> np.ndarray: """Return a matrix with the number of particles transferred from ``groups_1`` to groups_2. Normally, :func:`match_catalog` (or :func:`fuzzy_match_catalog`) are easier to use, but this routine provides the maximal information. .. warning:: This routine returns results in terms of halo indexes, rather than halo numbers. This is because halo indexes are guaranteed to be continuous starting at zero, while halo numbers may have gaps. If you need to convert from halo indexes to halo numbers, you can use :meth:`~pynbody.halo.HaloCatalogue.number_mapper` attribute of :class:`~pynbody.halo.HaloCatalogue`. Alternatively, use the :func:`match_catalog` method which returns results in terms of halo numbers. Parameters ---------- halos_1 : pynbody.halo.HaloCatalogue The HaloCatalogue for the first snapshot (the one at the start of the bridge). halos_2 : pynbody.halo.HaloCatalogue The HaloCatalogue for the second snapshot (the one at the end of the bridge). max_num_halos : int, optional The maximum number of halos use_family : str Only match particles of this family. Default is None, in which case all particles are matched. Returns ------- numpy.ndarray A matrix with the number of particles transferred from each halo in the first catalogue to each halo in the second. The size of the matrix is determined by the maximum number of halos in either catalogue, or by the value of ``max_num_halos`` if specified. """ self._check_compatible_halo_catalogues(halos_1, halos_2) start, end = self._get_ends() if use_family: end = end[use_family] start = start[use_family] restricted_start_particles = self( self(start)) # map back and forth to get only particles that are held in common restricted_end_particles = self( restricted_start_particles) # map back to start in case a reordering is required restriction_end_indices = restricted_end_particles.get_index_list(end.ancestor) restriction_start_indices = restricted_start_particles.get_index_list(start.ancestor) assert len(restriction_end_indices) == len(restriction_start_indices), \ ("Internal consistency failure in catalog_transfer_matrix: particles supposedly " "common to both simulations have two different lengths") # Need to account for the fact that get_group_array(only_family) will return an array of length Nfamily # while restriction_start_indices are global indeces to the ancestor snapshot, # which can start anywhere depending on the family ordering ==> Need to offset them back to start at 0. restriction_start_indices -= start.ancestor._get_family_slice(use_family).start restriction_end_indices -= end.ancestor._get_family_slice(use_family).start g1 = halos_1.get_group_array(family=use_family, use_index=True)[restriction_start_indices] g2 = halos_2.get_group_array(family=use_family, use_index=True)[restriction_end_indices] if max_num_halos is None: max_num_halos = max(len(halos_1), len(halos_2)) transfer_matrix = _bridge.match(g1, g2, 0, max_num_halos) return transfer_matrix
[docs] @util.deprecated("match_catalog is deprecated; use match_halos instead") def match_catalog(self, min_index=1, max_index=30, threshold=0.5, groups_1=None, groups_2=None, use_family=None): """Deprecated alternative to :func:`match_halos`. Use that method instead.""" fuzzy_matches = self.fuzzy_match_catalog(min_index, max_index, threshold, groups_1, groups_2, use_family) identification = np.zeros(max_index + 1, dtype=int) for i, row in enumerate(fuzzy_matches): if len(row) > 0: identification[i] = row[0][0] elif i < min_index: identification[i] = -2 else: identification[i] = -1 return identification
[docs] @util.deprecated("fuzzy_match_catalog is deprecated; use fuzzy_match_halos instead") def fuzzy_match_catalog(self, min_index=1, max_index=30, threshold=0.01, groups_1=None, groups_2=None, use_family=None, only_family=None): """Deprecated alternative to :func:`fuzzy_match_halos`. Use that method instead.""" transfer_matrix = self.catalog_transfer_matrix(min_index, max_index, groups_1, groups_2, use_family, only_family) output = [[]] * min_index for row in transfer_matrix: this_row_matches = [] if row.sum() > 0: frac_particles_transferred = np.array(row, dtype=float) / row.sum() above_threshold = np.where(frac_particles_transferred > threshold)[0] above_threshold = above_threshold[np.argsort(frac_particles_transferred[above_threshold])[::-1]] for column in above_threshold: this_row_matches.append((column + min_index, frac_particles_transferred[column])) output.append(this_row_matches) return output
[docs] @util.deprecated("catalog_transfer_matrix is deprecated; use count_particles_in_common instead") def catalog_transfer_matrix(self, min_index=1, max_index=30, groups_1=None, groups_2=None, use_family=None, only_family=None): """Deprecated interface to :func:`count_particles_in_common`. Use that method instead.""" # the 'index' naming of the parameter is misleading -- in pynbody v1, the distinction between number and # index had not been drawn. In fact, by pynbody v2 nomenclature, this function accepts halo numbers, so # we rename internally to prevent confusion in the compatibility code below. min_number = min_index max_number = max_index del min_index, max_index if only_family is not None: if use_family != only_family and use_family is not None: raise ValueError("use_family and only_family must be the same if both specified") use_family = only_family start, end = self._get_ends() if groups_1 is None: groups_1 = start.halos() else: assert groups_1.base.ancestor is start.ancestor if groups_2 is None: groups_2 = end.halos() else: assert groups_2.base.ancestor is end.ancestor max_number_1 = min(max_number, max(groups_1.keys())) max_number_2 = min(max_number, max(groups_2.keys())) indices_to_use_1 = groups_1.number_mapper.number_to_index(np.arange(min_number, max_number_1 + 1)) indices_to_use_2 = groups_2.number_mapper.number_to_index(np.arange(min_number, max_number_2 + 1)) max_index = max(indices_to_use_1.max(), indices_to_use_2.max()) transfer_matrix = self.count_particles_in_common(groups_1, groups_2, max_num_halos=max_index, use_family=use_family) transfer_matrix_restricted = transfer_matrix[indices_to_use_1, :][:, indices_to_use_2] return transfer_matrix_restricted
[docs] class OneToOneBridge(AbstractBridge): """Connects two snapshots with identical particle numbers and file layout. Particle ``i`` in the start point is the same as particle ``i`` in the end point. """
[docs] def __init__(self, start: snapshot.SimSnap, end: snapshot.SimSnap): if len(start) != len(end): raise ValueError("OneToOneBridge requires snapshots of the same length") super().__init__(start, end)
def __call__(self, s): start, end = self._get_ends() if s.is_descendant(start): return end[s.get_index_list(start)] elif s.is_descendant(end): return start[s.get_index_list(end)] else: raise RuntimeError("Not a subview of either end of the bridge")
[docs] class OrderBridge(AbstractBridge): """Connects to snapshots that both have arrays of identity integers (``iord`` or similar) to identify particles. Particles ``i_start`` and ``i_end`` are defined to be the same if and only if ``start[order_array][i_start] == start[order_array][i_end]``. """
[docs] def __init__(self, start, end, order_array="iord", monotonic=True, allow_family_change=False, only_families=None): """Initialise the ``OrderBridge`` .. versionchanged:: 2.3.0 The ``only_families`` parameter was added to allow bridging only between specific families. This is especially helpful where the order array is not defined for all families, such as in some RAMSES simulations. Parameters ---------- start : snapshot.SimSnap The start point of the bridge end : snapshot.SimSnap The end point of the bridge order_array : str The name of the array that is used to identify particles. Default is ``iord``. monotonic : bool If ``True``, the order_array must be monotonically increasing in both ends of the bridge. Note that this is not checked for you. If ``False``, the bridging is slower but this is the failsafe option. allow_family_change : bool If ``True``, the bridge will allow particles to change family going from one end to the other of the bridge. Otherwise, it is assumed that the family of a particle is conserved. only_families : list of str or family.Family, optional If specified, only particles in these families will be considered for the bridge. This is useful if the order array is not defined for all families. """ super().__init__(start, end) self._order_array = order_array self.monotonic = monotonic self.allow_family_change = allow_family_change if only_families is not None: self._only_families = [family.get_family(f) for f in only_families] else: self._only_families = None
def __call__(self, s): start, end = self._get_ends() if s.is_descendant(start): from_ = start to_ = end elif s.is_descendant(end): from_ = end to_ = start else: raise RuntimeError("Not a subview of either end of the bridge") if self._only_families is None: iord_to = self._get_iord_array(to_) iord_from = self._get_iord_array(s) output_index = self._get_particle_indices_from_source_and_target_iords(iord_from, iord_to) else: output_index = [] for f in s.families(): if f in self._only_families: iord_from = self._get_iord_array(s[f]) iord_to = self._get_iord_array(to_[f]) family_offset = to_._get_family_slice(f).start output_index_this_fam = self._get_particle_indices_from_source_and_target_iords(iord_from, iord_to) output_index.append(output_index_this_fam + family_offset) if len(output_index) == 0: output_index = np.array([], dtype=np.int64) else: output_index = np.concatenate(output_index) if self.allow_family_change: new_family_index = to_._family_index()[output_index] # stable sort by family: output_index = output_index[np.lexsort((new_family_index,))] return to_[output_index] def _get_iord_array(self, snap): return np.asarray(snap[self._order_array]).view(np.ndarray) def _get_particle_indices_from_source_and_target_iords(self, selected_iords_in_source_simulation, all_iords_in_target_simulation): if not self.monotonic: iord_map_to = np.argsort(all_iords_in_target_simulation) iord_map_from = np.argsort(selected_iords_in_source_simulation) all_iords_in_target_simulation = all_iords_in_target_simulation[iord_map_to] selected_iords_in_source_simulation = selected_iords_in_source_simulation[iord_map_from] output_index, found_match = _bridge.bridge(all_iords_in_target_simulation, selected_iords_in_source_simulation) if not self.monotonic: output_index = iord_map_to[output_index[np.argsort(iord_map_from)][found_match]] else: output_index = output_index[found_match] return output_index
[docs] class RamsesBugOrderBridge(OrderBridge):
[docs] def __init__(self, start, end, order_array="iord", monotonic=False, allow_family_change=False, only_families=None): """A special case of OrderBridge for tracking between Ramses snapshots affected by int32 iord truncation bug. In this bug, the iord array is truncated to `int32` when transmitted from one CPU to another. We first truncate all iords to `int32`, then use heuristics to try to disambiguate collisions that occur due to this bit loss. The heuristics are that star particles must always map onto star particles, and DM particles must map onto DM particles of the same level. """ if only_families is not None: families = [family.get_family(f) for f in only_families] else: families = start.families() for fam_identifier in families: fam = family.get_family(fam_identifier) use_level_hashing = fam == family.dm start[fam]['pynbody_iord_recreation'] = self._make_new_iord_array(start[fam], order_array, use_level_hashing) end[fam]['pynbody_iord_recreation'] = self._make_new_iord_array(end[fam], order_array, use_level_hashing) if monotonic is not False: warnings.warn("RamsesBugOrderBridge does not support monotonic iord arrays; setting monotonic to False") super().__init__(start, end, 'pynbody_iord_recreation', False, allow_family_change, only_families)
@classmethod def _make_new_iord_array(cls, snapshot, order_array_name, use_level_hashing=False): new_order_array = snapshot[order_array_name].astype(np.int32).astype(np.int64) if use_level_hashing: level_guess = np.log2(snapshot['mass']).astype(np.int64) level_guess -= level_guess.max() # we put each level onto its own high-order bits, in the hope this resolves most collisions. new_order_array += (1 + level_guess) * 2 ** 32 # Check that the newly created iords are all unique if len(np.unique(new_order_array)) != len(new_order_array): warnings.warn( "Failed to resolve all conflicts when recreating the iord array in RamsesBugOrderBridge. " "This is likely due to having non-unique iords in one of the two ends of the bridge" "which cannot be mapped out to unique iords on the other end.") return new_order_array
[docs] def bridge_factory(a: snapshot.SimSnap, b: snapshot.SimSnap) -> AbstractBridge: """Create a bridge connecting the two specified snapshots. This function will determine the best type of :ref:`Bridge` to construct between the two snapshots, and return it. It is called by :func:`pynbody.snapshot.simsnap.SimSnap.bridge`. For more information see :doc:`the bridge tutorial </tutorials/bridge>`. """ not_sure_error = "Don't know how to automatically bridge between two simulations of different formats. " \ "You will need to create your bridge manually by instantiating either the OneToOneBridge or " \ "OrderBridge class appropriately." from ..snapshot import gadget, gadgethdf, nchilada, ramses, tipsy a_top = a.ancestor b_top = b.ancestor if type(a_top) is not type(b_top): raise RuntimeError(not_sure_error) if (isinstance(a_top, tipsy.TipsySnap) or isinstance(a_top, nchilada.NchiladaSnap)): if "iord" in a_top.loadable_keys(): return OrderBridge(a_top, b_top, monotonic=True) else: return OneToOneBridge(a_top, b_top) elif isinstance(a_top, gadget.GadgetSnap) or isinstance(a_top, gadgethdf.GadgetHDFSnap): return OrderBridge(a_top, b_top, monotonic=False, allow_family_change=True) elif isinstance(a_top, ramses.RamsesSnap): if len(a.gas) > 0 or len(b.gas) > 0: raise RuntimeError("Cannot bridge AMR gas cells") if a_top.has_potential_negative_iords_bug or b_top.has_potential_negative_iords_bug: warnings.warn("Due to the unexpected presence of negative iord values, one of your snapshots has been identified " "as potentially affected by a bug in RAMSES which leads to bit-loss in the iords. Pynbody will attempt " "to correct for this bug. However if you intentionally have negative iords in your snapshot, this is not " "what you want. In such cases, please reload your file with `pynbody.load(..., " "negative_iords_on_purpose=True)`. For more background see: " "https://github.com/pynbody/pynbody/pull/914, https://github.com/pynbody/pynbody/pull/961") return RamsesBugOrderBridge(a_top, b_top, monotonic=False, only_families=["dm", "star"]) return OrderBridge(a_top, b_top, monotonic=False, only_families=["dm", "star"]) else: raise RuntimeError(not_sure_error)