Source code for pynbody.halo.number_array

"""Support for generic halo catalogues based on an array of halo numbers for each particle."""

import numpy as np

from . import HaloCatalogue
from .details.number_mapping import create_halo_number_mapper
from .details.particle_indices import HaloParticleIndices


[docs] class HaloNumberCatalogue(HaloCatalogue): """A generic catalogue using an array of halo numbers, one per particle. This is the output format used by SKID, for example. """
[docs] def __init__(self, sim, array='grp', ignore=None, **kwargs): """Construct a GrpCatalogue, extracting halos based on a simulation-wide integer array with their numbers *sim* - the SimSnap for which the halos will be constructed *array* - the name of the array which should be present, loadable or derivable across the simulation *ignore* - a special value indicating "no halo", or None if no such special value is defined """ sim[array] # noqa - trigger lazy-loading and/or kick up a fuss if unavailable self._array = array self._ignore = ignore self._halo_numbers = np.unique(sim[array]) number_mapper = create_halo_number_mapper(self._trim_array_for_ignore(self._halo_numbers)) HaloCatalogue.__init__(self, sim, number_mapper=number_mapper)
def _trim_array_for_ignore(self, array): assert len(array) == len(self._halo_numbers) if self._ignore is None: return array if self._ignore == self._halo_numbers[0]: return array[1:] elif self._ignore == self._halo_numbers[-1]: return array[:-1] else: raise ValueError( "ignore must be either the smallest or largest value in the array") def _get_all_particle_indices(self): halo_number_per_particle = self.base[self._array] particle_index_list = np.argsort(halo_number_per_particle, kind='mergesort') start = np.searchsorted(halo_number_per_particle[particle_index_list], self._halo_numbers) stop = np.concatenate((start[1:], [len(particle_index_list)])) particle_index_list_boundaries = self._trim_array_for_ignore( np.hstack((start[:, np.newaxis], stop[:, np.newaxis])) ) return HaloParticleIndices(particle_ids = particle_index_list, boundaries = particle_index_list_boundaries) def _get_particle_indices_one_halo(self, halo_number): if halo_number == self._ignore: self._no_such_halo(halo_number) array = np.where(self.base[self._array] == halo_number)[0] if len(array) == 0: self._no_such_halo(halo_number) return array def _no_such_halo(self, i): raise KeyError(f"No such halo {i}")
[docs] def get_group_array(self, family=None, use_index=False, fill_value=-1): if family is not None: result = self.base[family][self._array] else: result = self.base[self._array] if use_index: result = np.copy(result) ignored = result == self._ignore not_ignored = ~ignored result[ignored] = fill_value result[not_ignored] = self.number_mapper.number_to_index(result[not_ignored]) return result
@classmethod def _can_load(cls, sim, arr_name='grp'): if (arr_name in sim.loadable_keys()) or (arr_name in list(sim.keys())) : return True else: return False
[docs] class AmigaGrpCatalogue(HaloNumberCatalogue): """A catalogue of halos using Alyson Brooks' post-processed AHF output (turned into a SKID-like array)"""
[docs] def __init__(self, sim): super().__init__(sim, array='amiga.grp')
@classmethod def _can_load(cls, sim, arr_name='amiga.grp'): return super()._can_load(sim, arr_name)