"""
Tools for connecting different simulation snaphots
The bridge module allows you to take a subview (e.g. a halo) from one snapshot and 'push' it into the other. This is
especially useful if the two snapshots are different time outputs of the same simulation, or closely-related
simulations (e.g. one DMO and one hydro).
Once connected, bridge called on a specific subset of particles in output1 will trace these particles back (or
forward) to the output2, enabling observing a change in their properties, such as position, temperature, etc.
For an introduction on how to use bridges, see the :doc:`tutorial </tutorials/bridge>`.
"""
from __future__ import annotations
import abc
import typing
import weakref
import numpy as np
from .. import util
from . import _bridge
if typing.TYPE_CHECKING:
from .. import snapshot
[docs]
class AbstractBridge(abc.ABC):
"""The abstract base class for bridges between two snapshots.
For more information see the module documentation for :mod:`pynbody.bridge`, or the
:doc:`bridge tutorial </tutorials/bridge>`.
"""
[docs]
def __init__(self, start: snapshot.SimSnap, end: snapshot.SimSnap):
self._start = weakref.ref(start)
self._end = weakref.ref(end)
@abc.abstractmethod
def __call__(self, s: snapshot.SubSnap) -> snapshot.SubSnap:
"""Map from a ``SubSnap`` at one end of the bridge, to the corresponding ``SubSnap`` at the other end"""
def _get_ends(self):
start = self._start()
end = self._end()
if start is None or end is None:
raise RuntimeError("Stale reference to start or endpoint")
return start, end
[docs]
def match_halos(self, halos_1, halos_2, /, threshold=0.5, use_family=None, fill_value=-1, use_halo_indexes=False):
"""Given halo catalogues, identify the likely halo number in the second catalogue for each halo in the first.
For example, if a bridge ``b`` links snapshot ``f1`` (high redshift) to ``f2`` (low redshift) and we perform:
>>> h1 = f1.halos()
>>> h2 = f2.halos()
>>> cat = b.match_halos(h1, h2)
then ``cat`` is now a dictionary such that f1.halos()[i] is the major progenitor for``f2.halos()[cat[i]]``,
assuming ``cat[i]`` is positive.
Halos which cannot be matched because they have too few particles in common give the result ``fill_value``,
default -1. This is determined by the given threshold fraction of particles in common (specified by
``threshold``, default 50%)
Parameters
----------
halos_1 : pynbody.halo.HaloCatalogue
The HaloCatalogue for the first snapshot.
halos_2 : pynbody.halo.HaloCatalogue
The HaloCatalogue for the second snapshot.
threshold : float
The minimum fraction of particles in common for a match to be considered. Default is 0.5.
use_family : str
Only match particles of this family. Default is None, in which case all particles are matched.
Setting this to a family name can be useful if matching between two different simulations where the
relationship between DM particles is known, but perhaps the relationship between gas particles is not
(e.g. a Ramses simulation where actually the gas 'particles' are cells)
use_halo_indexes : bool
If True, instead of returning a dictionary mapping to halo numbers, return a numpy array that matches
halo IDs (which are zero-based indexes into the full list of halos; for more information see
the nomenclature guide in the documentation for :class:`~pynbody.halo.HaloCatalogue`. The default is
False, so that halo numbers are used throughout.
fill_value : int
The value to use for halos that cannot be matched. Default is -1.
"""
self._check_compatible_halo_catalogues(halos_1, halos_2)
particles_in_common_matrix = self.count_particles_in_common(halos_1, halos_2, use_family=use_family)
highest_commonality_index = particles_in_common_matrix.argmax(axis=1)
highest_commonality = particles_in_common_matrix[np.arange(particles_in_common_matrix.shape[0]),
highest_commonality_index]
# return nan for zero division
with np.errstate(divide='ignore', invalid='ignore'):
frac_commonality = highest_commonality/particles_in_common_matrix.sum(axis=1)
frac_commonality[~np.isfinite(frac_commonality)] = 0
invalid_matches = frac_commonality<threshold
if use_halo_indexes:
result = highest_commonality_index
result[invalid_matches] = fill_value
return result
else:
mapper_1 = halos_1.number_mapper
mapper_2 = halos_2.number_mapper
sources = mapper_1.index_to_number(np.arange(len(halos_1)))
destinations = mapper_2.index_to_number(highest_commonality_index)
destinations[invalid_matches] = fill_value
return dict(zip(sources, destinations))
def _check_compatible_halo_catalogues(self, halos_1, halos_2):
if halos_1.base.ancestor is not self._start().ancestor or halos_2.base.ancestor is not self._end().ancestor:
raise ValueError("The HaloCatalogues must be based on the correct snapshots")
[docs]
def fuzzy_match_halos(self, halos_1, halos_2, /, threshold=0.01, use_family=None,
use_halo_indexes=False) -> dict[int, list[tuple[int, float]]]:
"""Given halo catalogues, return possible matches within the second catalogue for each halo in the first.
For details about the parameters to this function, see the documentation for :func:`match_catalog`.
Parameters
----------
halos_1 : pynbody.halo.HaloCatalogue
The HaloCatalogue for the source snapshot
halos_2 : pynbody.halo.HaloCatalogue
The HaloCatalogue for the destination snapshot
threshold : float
The minimum fraction of particles in common for a match to be considered. Default is 0.01.
use_family : str
Only match particles of this family. Default is None, in which case all particles are matched.
use_halo_indexes : bool
If True, instead of returning halo numbers, return halo indexes. Default is False.
Returns
-------
dict[int, list[tuple[int, float]]]
Maps from halo number in the first catalogue to a list of tuples. Each tuple contains the halo number in the
second catalogue and the fraction of particles in the first halo that are in the second halo. If
``use_halo_indexes`` is True, the halo numbers are replaced by halo indexes.
"""
self._check_compatible_halo_catalogues(halos_1, halos_2)
def map_index_to_output(index, for_halos):
if use_halo_indexes:
return index
else:
return for_halos.number_mapper.index_to_number(index)
particles_in_common_matrix = self.count_particles_in_common(halos_1, halos_2, use_family=use_family)
output = {}
for source_index, row in enumerate(particles_in_common_matrix):
if source_index >= len(halos_1):
# count_particles_in_common returns a square matrix, so we might run over the end of the first catalogue
break
this_row_matches = []
row_sum = row.sum()
if row_sum > 0:
frac_particles_transferred = row / row_sum
above_threshold = np.where(frac_particles_transferred > threshold)[0]
above_threshold = above_threshold[np.argsort(frac_particles_transferred[above_threshold])[::-1]]
for column in above_threshold:
this_row_matches.append((map_index_to_output(column, halos_2), frac_particles_transferred[column]))
output[map_index_to_output(source_index, halos_1)] = this_row_matches
return output
[docs]
def count_particles_in_common(self, halos_1, halos_2, /, max_num_halos=None, use_family=None) -> np.ndarray:
"""Return a matrix with the number of particles transferred from ``groups_1`` to groups_2.
Normally, :func:`match_catalog` (or :func:`fuzzy_match_catalog`) are easier to use, but this routine
provides the maximal information.
.. warning::
This routine returns results in terms of halo indexes, rather than halo numbers. This is because halo
indexes are guaranteed to be continuous starting at zero, while halo numbers may have gaps. If you need
to convert from halo indexes to halo numbers, you can use :meth:`~pynbody.halo.HaloCatalogue.number_mapper`
attribute of :class:`~pynbody.halo.HaloCatalogue`. Alternatively, use the :func:`match_catalog` method
which returns results in terms of halo numbers.
Parameters
----------
halos_1 : pynbody.halo.HaloCatalogue
The HaloCatalogue for the first snapshot (the one at the start of the bridge).
halos_2 : pynbody.halo.HaloCatalogue
The HaloCatalogue for the second snapshot (the one at the end of the bridge).
max_num_halos : int, optional
The maximum number of halos
use_family : str
Only match particles of this family. Default is None, in which case all particles are matched.
Returns
-------
numpy.ndarray
A matrix with the number of particles transferred from each halo in the first catalogue to each halo in the
second. The size of the matrix is determined by the maximum number of halos in either catalogue, or
by the value of ``max_num_halos`` if specified.
"""
self._check_compatible_halo_catalogues(halos_1, halos_2)
start, end = self._get_ends()
if use_family:
end = end[use_family]
start = start[use_family]
restricted_start_particles = self(self(start)) # map back and forth to get only particles that are held in common
restricted_end_particles = self(restricted_start_particles) # map back to start in case a reordering is required
restriction_end_indices = restricted_end_particles.get_index_list(end.ancestor)
restriction_start_indices = restricted_start_particles.get_index_list(start.ancestor)
assert len(restriction_end_indices) == len(restriction_start_indices), \
("Internal consistency failure in catalog_transfer_matrix: particles supposedly "
"common to both simulations have two different lengths")
# Need to account for the fact that get_group_array(only_family) will return an array of length Nfamily
# while restriction_start_indices are global indeces to the ancestor snapshot,
# which can start anywhere depending on the family ordering ==> Need to offset them back to start at 0.
restriction_start_indices -= start.ancestor._get_family_slice(use_family).start
restriction_end_indices -= end.ancestor._get_family_slice(use_family).start
g1 = halos_1.get_group_array(family=use_family, use_index=True)[restriction_start_indices]
g2 = halos_2.get_group_array(family=use_family, use_index=True)[restriction_end_indices]
if max_num_halos is None:
max_num_halos = max(len(halos_1), len(halos_2))
transfer_matrix = _bridge.match(g1, g2, 0, max_num_halos)
return transfer_matrix
[docs]
@util.deprecated("match_catalog is deprecated; use match_halos instead")
def match_catalog(self, min_index=1, max_index=30, threshold=0.5, groups_1 = None, groups_2 = None,
use_family = None):
"""Deprecated alternative to :func:`match_halos`. Use that method instead."""
fuzzy_matches = self.fuzzy_match_catalog(min_index, max_index, threshold, groups_1, groups_2, use_family)
identification = np.zeros(max_index+1,dtype=int)
for i,row in enumerate(fuzzy_matches):
if len(row)>0:
identification[i] = row[0][0]
elif i<min_index:
identification[i] = -2
else:
identification[i] = -1
return identification
[docs]
@util.deprecated("fuzzy_match_catalog is deprecated; use fuzzy_match_halos instead")
def fuzzy_match_catalog(self, min_index=1, max_index=30, threshold=0.01,
groups_1 = None, groups_2 = None, use_family=None, only_family=None):
"""Deprecated alternative to :func:`fuzzy_match_halos`. Use that method instead."""
transfer_matrix = self.catalog_transfer_matrix(min_index,max_index,groups_1,groups_2,use_family,only_family)
output = [[]]*min_index
for row in transfer_matrix:
this_row_matches = []
if row.sum()>0:
frac_particles_transferred = np.array(row,dtype=float)/row.sum()
above_threshold = np.where(frac_particles_transferred>threshold)[0]
above_threshold = above_threshold[np.argsort(frac_particles_transferred[above_threshold])[::-1]]
for column in above_threshold:
this_row_matches.append((column+min_index, frac_particles_transferred[column]))
output.append(this_row_matches)
return output
[docs]
@util.deprecated("catalog_transfer_matrix is deprecated; use count_particles_in_common instead")
def catalog_transfer_matrix(self, min_index=1, max_index=30, groups_1=None, groups_2=None,
use_family=None, only_family=None):
"""Deprecated interface to :func:`count_particles_in_common`. Use that method instead."""
# the 'index' naming of the parameter is misleading -- in pynbody v1, the distinction between number and
# index had not been drawn. In fact, by pynbody v2 nomenclature, this function accepts halo numbers, so
# we rename internally to prevent confusion in the compatibility code below.
min_number = min_index
max_number = max_index
del min_index, max_index
if only_family is not None:
if use_family!=only_family and use_family is not None:
raise ValueError("use_family and only_family must be the same if both specified")
use_family = only_family
start, end = self._get_ends()
if groups_1 is None:
groups_1 = start.halos()
else:
assert groups_1.base.ancestor is start.ancestor
if groups_2 is None:
groups_2 = end.halos()
else:
assert groups_2.base.ancestor is end.ancestor
max_number_1 = min(max_number, max(groups_1.keys()))
max_number_2 = min(max_number, max(groups_2.keys()))
indices_to_use_1 = groups_1.number_mapper.number_to_index(np.arange(min_number, max_number_1+1))
indices_to_use_2 = groups_2.number_mapper.number_to_index(np.arange(min_number, max_number_2+1))
max_index = max(indices_to_use_1.max(), indices_to_use_2.max())
transfer_matrix = self.count_particles_in_common(groups_1, groups_2,
max_num_halos=max_index, use_family=use_family)
transfer_matrix_restricted = transfer_matrix[indices_to_use_1, :][:, indices_to_use_2]
return transfer_matrix_restricted
[docs]
class OneToOneBridge(AbstractBridge):
"""Connects two snapshots with identical particle numbers and file layout.
Particle ``i`` in the start point is the same as particle ``i`` in the end point.
"""
[docs]
def __init__(self, start: snapshot.SimSnap, end: snapshot.SimSnap):
if len(start) != len(end):
raise ValueError("OneToOneBridge requires snapshots of the same length")
super().__init__(start, end)
def __call__(self, s):
start, end = self._get_ends()
if s.is_descendant(start):
return end[s.get_index_list(start)]
elif s.is_descendant(end):
return start[s.get_index_list(end)]
else:
raise RuntimeError("Not a subview of either end of the bridge")
[docs]
class OrderBridge(AbstractBridge):
"""Connects to snapshots that both have arrays of identity integers (``iord`` or similar) to identify particles.
Particles ``i_start`` and ``i_end`` are defined to be the same if and only if
``start[order_array][i_start] == start[order_array][i_end]``.
"""
[docs]
def __init__(self, start, end, order_array="iord", monotonic=True, allow_family_change=False):
"""Initialise the ``OrderBridge``
Parameters
----------
start : snapshot.SimSnap
The start point of the bridge
end : snapshot.SimSnap
The end point of the bridge
order_array : str
The name of the array that is used to identify particles. Default is ``iord``.
monotonic : bool
If ``True``, the order_array must be monotonically increasing in both ends of the bridge.
Note that this is not checked for you. If ``False``, the bridging is slower but this is the failsafe option.
allow_family_change : bool
If ``True``, the bridge will allow particles to change family going from one end to the other of the
bridge. Otherwise, it is assumed that the family of a particle is conserved.
"""
super().__init__(start, end)
self._order_array = order_array
self.monotonic = monotonic
self.allow_family_change = allow_family_change
def __call__(self, s):
start, end = self._get_ends()
if s.is_descendant(start):
from_ = start
to_ = end
elif s.is_descendant(end):
from_ = end
to_ = start
else:
raise RuntimeError("Not a subview of either end of the bridge")
iord_to = np.asarray(to_[self._order_array]).view(np.ndarray)
iord_from = np.asarray(s[self._order_array]).view(np.ndarray)
if not self.monotonic:
iord_map_to = np.argsort(iord_to)
iord_map_from = np.argsort(iord_from)
iord_to = iord_to[iord_map_to]
iord_from = iord_from[iord_map_from]
output_index, found_match = _bridge.bridge(iord_to, iord_from)
if not self.monotonic:
output_index = iord_map_to[output_index[np.argsort(iord_map_from)][found_match]]
else:
output_index = output_index[found_match]
if self.allow_family_change:
new_family_index = to_._family_index()[output_index]
# stable sort by family:
output_index = output_index[np.lexsort((new_family_index,))]
return to_[output_index]
[docs]
def bridge_factory(a: snapshot.SimSnap, b: snapshot.SimSnap) -> AbstractBridge:
"""Create a bridge connecting the two specified snapshots.
This function will determine the best type of :ref:`Bridge` to construct between the two snapshots, and return it.
It is called by :func:`pynbody.snapshot.simsnap.SimSnap.bridge`.
For more information see :doc:`the bridge tutorial </tutorials/bridge>`.
"""
not_sure_error = "Don't know how to automatically bridge between two simulations of different formats. "\
"You will need to create your bridge manually by instantiating either the OneToOneBridge or "\
"OrderBridge class appropriately."
from ..snapshot import gadget, gadgethdf, nchilada, ramses, tipsy
a_top = a.ancestor
b_top = b.ancestor
if type(a_top) is not type(b_top):
raise RuntimeError(not_sure_error)
if (isinstance(a_top, tipsy.TipsySnap) or isinstance(a_top, nchilada.NchiladaSnap)):
if "iord" in a_top.loadable_keys():
return OrderBridge(a_top, b_top, monotonic=True)
else:
return OneToOneBridge(a_top, b_top)
elif isinstance(a_top, gadget.GadgetSnap) or isinstance(a_top, gadgethdf.GadgetHDFSnap):
return OrderBridge(a_top, b_top, monotonic=False, allow_family_change=True)
elif isinstance(a_top, ramses.RamsesSnap):
if len(a.gas) > 0 or len(b.gas) > 0:
raise RuntimeError("Cannot bridge AMR gas cells")
return OrderBridge(a_top, b_top, monotonic=False)
else:
raise RuntimeError(not_sure_error)