"""
Filters are used to define subsets of simulations, especially (but not exclusively) spatial sub-regions.
The basic idea is that a :class:`Filter` object stores the abstract definition of the subset, and then it can be
called with a simulation to return a boolean array indicating which particles are in the subset. The implementation
of filters is designed to be as efficient as possible, and in many cases the selection is done in C with OpenMP
parallelisation. Additionally, if a simulation has a :class:`pynbody.kdtree.KDTree` built (via
:meth:`pynbody.snapshot.simsnap.SimSnap.build_tree`), then the selection can be done using the KDTree, which is considerably
faster for very large simulations.
Filters can be combined using the logical operators `&`, `|` and `~` to create more complex selections.
For a friendly introduction, see :doc:`/tutorials/filters`.
"""
from __future__ import annotations
import pickle
from typing import TYPE_CHECKING
import numpy as np
from numpy.typing import ArrayLike
from .. import family, units
from . import geometry_selection
if TYPE_CHECKING:
from .. import snapshot
[docs]
class Filter:
"""Base class for all filters. Filters are callables that take simulations as input and return a boolean mask"""
[docs]
def where(self, sim: snapshot.SimSnap):
"""Return the indices of particles that are in the filter.
This is a convenience method that is equivalent to np.where(f(sim)) but may be more efficient for some filters.
"""
return np.where(self(sim))
def __call__(self, sim: snapshot.SimSnap):
"""Return a boolean mask indicating which particles are in the filter."""
return np.ones(len(sim), dtype=bool)
def __and__(self, f2):
return And(self, f2)
def __invert__(self):
return Not(self)
def __or__(self, f2):
return Or(self, f2)
def __repr__(self):
return "Filter()"
def __hash__(self):
return hash(pickle.dumps(self))
def __eq__(self, other):
if type(self) is not type(other):
return False
for k, v in self.__dict__.items():
if k not in other.__dict__:
return False
else:
equal = other.__dict__[k]==v
if isinstance(equal, np.ndarray):
equal = equal.all()
if not equal:
return False
return True
def _get_wrap_in_position_units(self, sim):
"""Helper method to get the simulation box wrap in units of the position array.
If the boxsize is undefined, returns -1.0"""
sim_ancestor = sim.ancestor
if 'boxsize' in sim.properties:
wrap = sim_ancestor.properties['boxsize']
if units.is_unit_like(wrap):
wrap = float(wrap.in_units(sim_ancestor['pos'].units,
**sim_ancestor.conversion_context()))
else:
wrap = -1.0
return wrap
[docs]
def cubic_cell_intersection(self, centroids):
"""Compute the intersection with cubic cells with the specified centroids.
This is currently used by the swift loader to figure out which cells to load"""
raise NotImplementedError("Cell intersections are not implemented for this filter")
@classmethod
def _get_boxsize_and_delta_from_centroids(self, centroids):
"""Helper for calculating cubic cell intersections"""
deltax = (centroids[1] - centroids[0]).max()
# check we are interpreting this right: the ((deltax+max-min)/deltax)^3
# should be the number of cells
ncells = len(centroids)
maxpos = centroids.max()
minpos = centroids.min()
boxsize = (deltax + maxpos - minpos)
ncells_from_geometry = np.round(boxsize / deltax) ** 3
assert ncells == ncells_from_geometry, "Geometry of cells doesn't match expectations"
return boxsize, deltax
[docs]
class FamilyFilter(Filter):
"""A filter that selects particles based on their family."""
[docs]
def __init__(self, family_):
self.family = family.get_family(family_, False)
def __repr__(self):
return "FamilyFilter("+self.family.name+")"
def __call__(self, sim):
slice_ = sim._get_family_slice(self.family)
flags = np.zeros(len(sim), dtype=bool)
flags[slice_] = True
return flags
[docs]
class And(Filter):
"""A filter that selects particles that are in both of two other filters.
You can construct this filter conveniently using the ``&`` operator, i.e.
>>> f = f1 & f2
returns a filter that selects particles that are in both ``f1`` and ``f2``.
"""
[docs]
def __init__(self, f1, f2):
self.f1 = f1
self.f2 = f2
def __call__(self, sim):
return self.f1(sim) * self.f2(sim)
def __repr__(self):
return "(" + repr(self.f1) + " & " + repr(self.f2) + ")"
[docs]
def cubic_cell_intersection(self, centroids):
return self.f1.cubic_cell_intersection(centroids) & \
self.f2.cubic_cell_intersection(centroids)
[docs]
class Or(Filter):
"""A filter that selects particles that are in either of two other filters.
You can construct this filter conveniently using the ``|`` operator, i.e.
>>> f = f1 | f2
returns a filter that selects particles that are in either ``f1`` or ``f2``."""
[docs]
def __init__(self, f1, f2):
self.f1 = f1
self.f2 = f2
def __call__(self, sim):
return self.f1(sim) + self.f2(sim)
def __repr__(self):
return "(" + repr(self.f1) + " | " + repr(self.f2) + ")"
[docs]
def cubic_cell_intersection(self, centroids):
return self.f1.cubic_cell_intersection(centroids) | \
self.f2.cubic_cell_intersection(centroids)
[docs]
class Not(Filter):
"""A filter that selects particles that are not in another filter.
You can construct this filter conveniently using the ``~`` operator, i.e.
>>> f = ~f1
returns a filter that selects particles that are not in ``f1``.
"""
[docs]
def __init__(self, f):
self.f = f
def __call__(self, sim):
x = self.f(sim)
return np.logical_not(x)
def __repr__(self):
return "~" + repr(self.f)
[docs]
def cubic_cell_intersection(self, centroids):
return ~self.f1.cubic_cell_intersection(centroids)
[docs]
class Sphere(Filter):
"""
A filter that selects particles within `radius` of the point `cen`.
"""
[docs]
def __init__(self, radius: float | str | units.UnitBase, cen: ArrayLike = (0, 0, 0)):
"""Create a sphere filter.
Parameters
----------
radius :
The radius of the sphere. If a string, it is interpreted as a unit string.
cen :
The centre of the sphere. If a :class:`pynbody.snapshot.simsnap.SimArray`, units can be provided and
will be correctly accounted for.
"""
self.cen = np.asarray(cen)
if self.cen.shape != (3,):
raise ValueError("Centre must be length 3 array")
if isinstance(radius, str):
radius = units.Unit(radius)
self.radius = radius
def __call__(self, sim):
with sim.immediate_mode:
pos = sim['pos']
cen, radius = self._get_cen_and_radius_as_float(pos)
wrap = self._get_wrap_in_position_units(sim)
return geometry_selection.selection(np.asarray(pos),'sphere',(cen[0], cen[1], cen[2], radius), wrap)
def _get_cen_and_radius_as_float(self, pos):
radius = self.radius
cen = self.cen
if units.has_units(cen):
cen = cen.in_units(pos.units)
if units.is_unit_like(radius):
radius = float(radius.in_units(pos.units,
**pos.conversion_context()))
return cen, radius
[docs]
def where(self, sim):
if hasattr(sim, "kdtree"):
cen, radius = self._get_cen_and_radius_as_float(sim["pos"])
return (np.sort(sim.kdtree.particles_in_sphere(cen, radius)),)
else:
return super().where(sim)
[docs]
def cubic_cell_intersection(self, centroids):
boxsize, deltax = self._get_boxsize_and_delta_from_centroids(centroids)
# the maximum offset from the cell centre to any corner:
expand_distance = (deltax/2) * np.sqrt(3)
return geometry_selection.selection(np.asarray(centroids), 'sphere',
(self.cen[0], self.cen[1], self.cen[2], self.radius+expand_distance),
boxsize)
def __repr__(self):
if units.is_unit(self.radius):
return f"Sphere('{str(self.radius)}', {repr(self.cen)})"
else:
return f"Sphere({self.radius:.2e}, {repr(self.cen)})"
[docs]
class Cuboid(Filter):
"""A filter that selects particles within a cuboid defined by two opposite corners."""
[docs]
def __init__(self, x1: float | str | units.UnitBase, y1: float | str | units.UnitBase = None,
z1: float | str | units.UnitBase = None, x2: float | str | units.UnitBase = None,
y2: float | str | units.UnitBase = None, z2: float | str | units.UnitBase = None):
"""Create a cuboid filter.
If any of the cube coordinates ``x1``, ``y1``, ``z1``, ``x2``, ``y2``, ``z2`` are not specified they are
determined as ``y1=x1``; ``z1=x1``; ``x2=-x1``; ``y2=-y1``; ``z2=-z1``.
"""
x1, y1, z1, x2, y2, z2 = (
units.Unit(x) if isinstance(x, str) else x for x in (x1, y1, z1, x2, y2, z2))
if y1 is None:
y1 = x1
if z1 is None:
z1 = x1
if x2 is None:
x2 = -x1
if y2 is None:
y2 = -y1
if z2 is None:
z2 = -z1
self.x1, self.y1, self.z1, self.x2, self.y2, self.z2 = x1, y1, z1, x2, y2, z2
def __call__(self, sim):
wrap = self._get_wrap_in_position_units(sim)
return self._get_mask(sim['pos'], self._get_boundaries(sim, wrap), wrap)
def _get_mask(self, pos, boundaries, wrap):
x1, y1, z1, x2, y2, z2 = boundaries
return geometry_selection.selection(pos, 'cube', (x1, y1, z1, x2, y2, z2), wrap).view(dtype=bool)
def _get_boundaries(self, sim, wrap=None):
if wrap is None:
wrap = self._get_wrap_in_position_units(sim)
x1,y1,z1,x2,y2,z2 = (x.in_units(sim["pos"].units, **sim["pos"].conversion_context())
if units.is_unit_like(x) else x
for x in (self.x1, self.y1, self.z1, self.x2, self.y2, self.z2))
if x2 < x1:
x2 += wrap
if y2 < y1:
y2 += wrap
if z2 < z1:
z2 += wrap
if x2 < x1 or y2 < y1 or z2 < z1:
raise ValueError("Cuboid boundaries are not well defined")
return x1, y1, z1, x2, y2, z2
def _get_bounding_sphere(self, cuboid_boundaries):
x1, y1, z1, x2, y2, z2 = cuboid_boundaries
return ((x1 + x2) / 2, (y1 + y2) / 2, (z1 + z2) / 2), np.sqrt(
(x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) / 2
[docs]
def where(self, sim):
if hasattr(sim, "kdtree"):
# KDTree doesn't currently natively find cuboid regions, so we get the bounding sphere
# and the search for the cuboid within that
cuboid_boundaries = self._get_boundaries(sim)
cen, radius = self._get_bounding_sphere(cuboid_boundaries)
in_sphere_particles = sim.kdtree.particles_in_sphere(cen, radius)
pos = sim["pos"][in_sphere_particles]
wrap = self._get_wrap_in_position_units(sim)
mask = self._get_mask(pos, cuboid_boundaries, wrap)
return np.sort(in_sphere_particles[mask]),
else:
return super().where(sim)
[docs]
def cubic_cell_intersection(self, centroids):
boxsize, deltax = self._get_boxsize_and_delta_from_centroids(centroids)
shift = deltax/2
boundaries = (self.x1 - shift, self.y1 - shift, self.z1 - shift,
self.x2 + shift, self.y2 + shift, self.z2 + shift)
return self._get_mask(centroids, boundaries, boxsize)
def __repr__(self):
x1, y1, z1, x2, y2, z2 = ("'%s'" % str(x)
if units.is_unit_like(x) else x
for x in (self.x1, self.y1, self.z1, self.x2, self.y2, self.z2))
return f"Cuboid({x1}, {y1}, {z1}, {x2}, {y2}, {z2})"
[docs]
class Disc(Filter):
"""A filter that selects particles within a disc of specified extent and thickness."""
[docs]
def __init__(self, radius: float | str | units.UnitBase, height: float | str | units.UnitBase,
cen: ArrayLike = (0, 0, 0)):
"""Create a disc filter.
In keeping with other parts of pynbody, the disc is defined in the x-y plane, with the z-axis being the
symmetry axis. This is useful in conjunction with automated disc alignment e.g.
:meth:`pynbody.analysis.angmom.sideon`.
Parameters
----------
radius :
The radius of the disc (in the xy-plane). If a string, it is interpreted as a unit string.
height :
The thickness of the disc (in the z-direction). If a string, it is interpreted as a unit string.
cen :
The centre of the disc. If a :class:`pynbody.snapshot.simsnap.SimArray`, units can be provided and
will be correctly accounted for.
"""
self.cen = np.asarray(cen)
if self.cen.shape != (3,):
raise ValueError("Centre must be length 3 array")
if isinstance(radius, str):
radius = units.Unit(radius)
if isinstance(height, str):
height = units.Unit(height)
self.radius = radius
self.height = height
def __call__(self, sim):
radius = self.radius
height = self.height
if units.is_unit_like(radius):
radius = float(
radius.in_units(sim["pos"].units, **sim["pos"].conversion_context()))
if units.is_unit_like(height):
height = float(
height.in_units(sim["pos"].units, **sim["pos"].conversion_context()))
distance = (((sim["pos"] - self.cen)[:, :2]) ** 2).sum(axis=1)
return (distance < radius ** 2) * (np.abs(sim["z"] - self.cen[2]) < height)
def __repr__(self):
radius = self.radius
height = self.height
radius, height = (
("'%s'" % str(x) if units.is_unit_like(x) else '%.2e' % x) for x in (radius, height))
return f"Disc({radius}, {height}, {repr(self.cen)})"
[docs]
class BandPass(Filter):
"""Selects particles in a bandpass of a named property"""
[docs]
def __init__(self, prop: str, min: float | str | units.UnitBase, max: float | str | units.UnitBase):
"""Create a bandpass filter.
Parameters
----------
prop :
The name of the simulation array to filter on.
min :
The minimum value of the property. If a string, it is interpreted as a unit string.
max :
The maximum value of the property. If a string, it is interpreted as a unit string.
"""
if isinstance(min, str):
min = units.Unit(min)
if isinstance(max, str):
max = units.Unit(max)
self._prop = prop
self._min = min
self._max = max
def __call__(self, sim):
min_ = self._min
max_ = self._max
prop = self._prop
if units.is_unit_like(min_):
min_ = float(
min_.in_units(sim[prop].units, **sim.conversion_context()))
if units.is_unit_like(max_):
max_ = float(
max_.in_units(sim[prop].units, **sim.conversion_context()))
return ((sim[prop] > min_) * (sim[prop] < max_))
def __repr__(self):
min_, max_ = (("'%s'" % str(x) if units.is_unit_like(
x) else '%.2e' % x) for x in (self._min, self._max))
return f"BandPass('{self._prop}', {min_}, {max_})"
[docs]
class HighPass(Filter):
"""Selects particles exceeding a specified threshold of a named property
"""
[docs]
def __init__(self, prop: str, min: float | str | units.UnitBase):
"""Create a high-pass filter.
Parameters
----------
prop :
The name of the simulation array to filter on.
min :
The minimum value of the property. If a string, it is interpreted as a unit string.
"""
if isinstance(min, str):
min = units.Unit(min)
self._prop = prop
self._min = min
def __call__(self, sim):
min_ = self._min
prop = self._prop
if units.is_unit_like(min_):
min_ = float(
min_.in_units(sim[prop].units, **sim.conversion_context()))
return (sim[prop] > min_)
def __repr__(self):
min = ("'%s'" % str(self._min) if units.is_unit_like(
self._min) else '%.2e' % self._min)
return f"HighPass('{self._prop}', {min})"
[docs]
class LowPass(Filter):
"""Selects particles below a specified threshold of a named property
"""
[docs]
def __init__(self, prop: str, max: float | str | units.UnitBase):
"""Create a low-pass filter.
Parameters
----------
prop :
The name of the simulation array to filter on.
max :
The maximum value of the property. If a string, it is interpreted as a unit string.
"""
if isinstance(max, str):
max = units.Unit(max)
self._prop = prop
self._max = max
def __call__(self, sim):
max_ = self._max
prop = self._prop
if units.is_unit_like(max_):
max_ = float(
max_.in_units(sim[prop].units, **sim.conversion_context()))
return (sim[prop] < max_)
def __repr__(self):
max = ("'%s'" % str(self._max) if isinstance(
self._max, units.UnitBase) else '%.2e' % self._max)
return f"LowPass('{self._prop}', {max})"
[docs]
class Annulus(And):
"""A filter that selects particles in between two spheres specified by radii `r1` and `r2` centered on `cen`."""
[docs]
def __init__(self, r1: float | str | units.UnitBase, r2: float | str | units.UnitBase, cen: ArrayLike = (0, 0, 0)):
"""Create an annulus filter.
Parameters
----------
r1 :
The inner radius of the annulus. If a string, it is interpreted as a unit string.
r2 :
The outer radius of the annulus. If a string, it is interpreted as a unit string.
cen :
The centre of the annulus. If a :class:`pynbody.snapshot.simsnap.SimArray`, units can be provided and
will be correctly accounted for.
"""
super().__init__(~Sphere(r1, cen), Sphere(r2, cen))
[docs]
class SolarNeighborhood(And):
"""A filter that selects particles in a disc between 2d radii `r1` and `r2` and thickness `height`.
As for :class:`Disc`, the galaxy disc is defined in the x-y plane, with the z-axis being the symmetry axis.
Default parameters are provided that are approximately the solar neighborhood (coarsely selected).
"""
[docs]
def __init__(self, r1: float | str | units.UnitBase = units.Unit("5 kpc"),
r2: float | str | units.UnitBase = units.Unit("10 kpc"),
height: float | str | units.UnitBase = units.Unit("2 kpc"),
cen: ArrayLike = (0, 0, 0)):
"""Create a solar neighborhood filter.
Parameters
----------
r1 :
The inner radius of the disc. If a string, it is interpreted as a unit string.
r2 :
The outer radius of the disc. If a string, it is interpreted as a unit string.
height :
The thickness of the disc. If a string, it is interpreted as a unit string.
cen :
The centre of the disc. If a :class:`pynbody.snapshot.simsnap.SimArray`, units can be provided and
will be correctly accounted for.
"""
super().__init__(~Disc(r1, height, cen), Disc(r2, height, cen))