Source code for pynbody.dependencytracker
"""Tools for tracking how derived arrays depend on other arrays
Users should not need to use this module directly; it is used by :class:`~pynbody.snapshot.simsnap.SimSnap`
to track dependencies between derived arrays.
"""
import threading
class DependencyError(RuntimeError):
pass
class _DependencyContext:
"""Context manager for tracking dependencies between arrays"""
def __init__(self, tracker, name):
self.tracker = tracker
self.name = name
def __enter__(self):
self.tracker._calculation_lock.__enter__()
if self.name in self.tracker._current_calculation_stack:
self.tracker._calculation_lock.__exit__(None, None, None)
raise DependencyError("Circular dependency")
self.tracker._push(self.name)
def __exit__(self, exc_type, exc_val, exc_tb):
self.tracker._pop(self.name)
if exc_type is None:
self.tracker._add_me_to_dependents(self.name)
self.tracker._calculation_lock.__exit__(exc_type, exc_val, exc_tb)
[docs]
class DependencyTracker:
"""Class for tracking dependencies between arrays.
This class is used by :class:`pynbody.snapshot.simsnap.SimSnap` to track how derived arrays
depend on other arrays. Users should not need to use this class directly.
As an example of how this class is used, see the following code.
>>> my_tracker = DependencyTracker()
>>> with my_tracker.calculating('my_array'):
>>> with my_tracker.calculating('my_other_array'):
>>> my_tracker.touching('source_array')
>>> my_tracker.get_dependents('my_array') # -> {}
>>> my_tracker.get_dependents('my_other_array') # -> {'my_array'}
>>> my_tracker.get_dependents('source_array') # -> {'my_other_array', 'my_array'}
>>> with my_tracker.calculating('my_array'):
>>> with my_tracker.calculating('my_array'): # -> raises DependencyError
>>> pass
Note that the class is thread-safe, in the sense that if a second thread starts trying to use it
while a first thread is already mid-way through a derivation, the second thread blocks until
the first thread finishes.
"""
[docs]
def __init__(self):
self._dependencies = {}
self._current_calculation_stack = []
self._calculation_lock = threading.RLock()
def _setup_my_dependencies(self,name):
if name not in self._dependencies:
self._dependencies[name] = set()
def _add_me_to_dependents(self, name):
self._setup_my_dependencies(name)
for other in self._current_calculation_stack:
self._dependencies[name].add(other)
def _push(self, name):
self._current_calculation_stack.append(name)
def _pop(self, name):
assert self._current_calculation_stack[-1]==name
del self._current_calculation_stack[-1]
[docs]
def calculating(self, name):
"""Return a context manager when calculating a named array."""
return _DependencyContext(self, name)
[docs]
def touching(self,name):
"""Note that any ongoing calculations depend on the named array."""
with self._calculation_lock:
self._add_me_to_dependents(name)
[docs]
def get_dependents(self, name):
"""Return the set of arrays that are known to depend on the named array."""
return self._dependencies.get(name, set())