Source code for pynbody.array.shared

"""Support for numpy arrays in shared memory.

.. seealso::
    There is information about using shared arrays to create parallel workflows in
    :ref:`using_shared_arrays`.

"""
import atexit
import functools
import os
import platform
import random
import signal
import time
import weakref
from functools import reduce
from typing import Optional

import numpy as np

# Platform detection and imports
_IS_WINDOWS = platform.system() == 'Windows'

if _IS_WINDOWS:
    from . import win_detail as _platform_detail
else:
    from . import posix_detail as _platform_detail

from ...configuration import config_parser
from .. import SimArray

_owned_shared_memory_names = []

_rng: Optional[random.Random] = None
_rng_is_for_pid = None

def _create_rng():
    """Create and seed a random number generator for generating unique shared memory names.

    Previously the RNG was seeded with the current time, which could lead to non-unique names
    if called in quick succession. Now it is seeded with the process ID and the current time in milliseconds.

    This can still lead to issues when multiprocessing forks, so _ensure_rng checks what PID the rng was
    created on."""

    global _rng, _rng_is_for_pid
    _rng = random.Random()
    _rng.seed(os.getpid() + int(time.time() * 1000))
    _rng_is_for_pid = os.getpid()

def _ensure_rng():
    """Ensures that the random number generator is created and was seeded on *this* process."""
    global _rng
    if _rng is None or _rng_is_for_pid != os.getpid():
        _create_rng()

class SharedArrayNotFound(OSError):
    pass

[docs] class SharedMemorySimArray(SimArray): """A simulation array that is backed onto shared memory.""" __slots__ = ['_shared_fname', '_shared_owner', '_shared_mem_ref'] _shared_fname: str _shared_owner: bool _shared_mem_ref: object def __del__(self): global _owned_shared_memory_names if hasattr(self, '_shared_fname') and getattr(self, '_shared_owner', False): if self._shared_fname in _owned_shared_memory_names: _owned_shared_memory_names.remove(self._shared_fname) try: _platform_detail.unlink_shared_memory(self._shared_fname) except: pass
[docs] def make_shared_array(dims, dtype, zeros=False, fname=None, create=True, offset = None, strides = None, num_bytes=None) -> SharedMemorySimArray: """Create or reconstruct an array of dimensions *dims* with the given numpy *dtype*, backed on shared memory. If *create* is True, a new shared memory array is created. If *create* is False, the shared memory array is opened (and *fname* must be specified). Parameters ---------- dims: tuple | int The dimensions of the numpy array to create dtype: The data type to use zeros: If True, zero the array; otherwise leave uninitialised fname: str, optional The shared memory name to use. If None, and *create* is True, a random name will be generated. create: bool Whether to create the shared array, or to open existing shared memory. If the latter, the *fname* must be specified and the caller is responsible for making sure the dtype and dims match the original array. offset: int, optional The offset into the shared memory to use. This is only valid with create=False strides: tuple, optional The strides to use. This is only valid with create=False num_bytes: int, optional The number of bytes to allocate for / reference from the shared memory. If not specified, it is calculated as the product of the dimensions and the size of the dtype. """ if fname is None: if not create: raise ValueError("When opening an existing shared array, fname must be specified") _ensure_rng() fname = "pynbody-" + \ ("".join([_rng.choice('abcdefghijklmnopqrstuvwxyz') for _ in range(10)])) if not hasattr(dims, '__len__'): dims = (dims,) zero_size = False if dims[0] == 0: zero_size = True dims = (1,) + dims[1:] # Calculate the total size as an integer size = int(reduce(np.multiply, dims)) if num_bytes is None: num_bytes = size * np.dtype(dtype).itemsize if create: _register_sigterm_handler() if offset is not None: raise ValueError("Offset only valid when opening an existing shared array") if strides is not None: raise ValueError("Strides only valid when opening an existing shared array") # Platform-specific shared memory creation mapped_mem = _platform_detail.create_shared_memory(fname, num_bytes) _owned_shared_memory_names.append(fname) else: mapped_mem = _platform_detail.open_shared_memory(fname, num_bytes) if offset is None: offset = 0 ret_ar = np.frombuffer(mapped_mem, dtype=dtype, count=size, offset=offset).reshape(dims).view(SharedMemorySimArray) ret_ar._shared_fname = fname ret_ar._shared_owner = create # Keep a reference to the shared memory object to prevent cleanup while array exists if _IS_WINDOWS: mem_info = _platform_detail.get_shared_memory_info(fname) if mem_info: ret_ar._shared_mem_ref = mem_info['memory'] if strides: ret_ar.strides = strides if zero_size: ret_ar = ret_ar[1:] if zeros: ret_ar[:] = 0 return ret_ar
[docs] @atexit.register def delete_dangling_shared_memory(): """Ensures that all shared memory has been cleaned up.""" global _owned_shared_memory_names for fname in _owned_shared_memory_names: try: _platform_detail.unlink_shared_memory(fname) except: pass _owned_shared_memory_names = [] _platform_detail.cleanup_all_shared_memory()
_sigterm_handler_is_registered = False def _sigterm_handler(signum, frame): delete_dangling_shared_memory() def _register_sigterm_handler(): """Registers a handler to clean up shared memory in the event of a SIGTERM signal.""" global _sigterm_handler_is_registered if not _sigterm_handler_is_registered: # On Windows, SIGTERM handling is limited, but we'll still register it signal.signal(signal.SIGTERM, _sigterm_handler) _sigterm_handler_is_registered = True
[docs] def get_num_shared_arrays_owned(): """Returns the number of shared arrays currently owned by this process. A shared array is only considered owned if this process is reponsible for unlinking it on exit. """ return len(_owned_shared_memory_names)
class _deconstructed_shared_array(tuple): pass def _shared_array_deconstruct(ar, transfer_ownership=False): """Deconstruct an array backed onto shared memory into something that can be passed between processes efficiently. If *transfer_ownership* is True, also transfers responsibility for deleting the underlying memory (if this process has it) to the reconstructing process. New code should use :func:`pack` instead.""" assert isinstance(ar, SimArray) ar_base = ar while isinstance(ar_base.base, SimArray): ar_base = ar_base.base assert isinstance(ar_base, SharedMemorySimArray), "Cannot prepare an array for shared use unless it was created in shared memory" ownership_out = transfer_ownership and ar_base._shared_owner if transfer_ownership: ar_base._shared_owner = False offset = ar.__array_interface__['data'][0] - \ ar_base.__array_interface__['data'][0] num_bytes_in_underlying_buffer = ar_base.nbytes return _deconstructed_shared_array((ar.dtype, ar.shape, ar_base._shared_fname, ownership_out, offset, ar.strides, num_bytes_in_underlying_buffer)) def _shared_array_reconstruct(X): dtype, dims, fname, ownership, offset, strides, num_bytes = X assert not ownership # transferring ownership not actually supported in current implementation new_ar = make_shared_array(dims, dtype, fname=fname, create=False, offset=offset, strides=strides, num_bytes=num_bytes) return new_ar def _recursive_shared_array_deconstruct(input, transfer_ownership=False) : """Works through items in input, deconstructing any shared memory arrays into transferrable references. New code should use :func:`pack` instead.""" output = [] if isinstance(input, SimArray): return _shared_array_deconstruct(input, transfer_ownership) for item in input: if isinstance(item, SimArray): item = _shared_array_deconstruct(item, transfer_ownership) elif isinstance(item, list) or isinstance(item, tuple): item = _recursive_shared_array_deconstruct(item, transfer_ownership) output.append(item) return output def _recursive_shared_array_reconstruct(input): """Works through items in input, reconstructing any shared memory arrays from transferrable references. New code should use :func:`unpack` instead.""" if isinstance(input, _deconstructed_shared_array): return _shared_array_reconstruct(input) output = [] for item in input: if isinstance(item, _deconstructed_shared_array): item = _shared_array_reconstruct(item) elif isinstance(item, list) or isinstance(item, tuple): item = _recursive_shared_array_reconstruct(item) output.append(item) return output class RemoteKeyboardInterrupt(Exception): pass
[docs] def shared_array_remote(fn): """A decorator for functions that are expected to run on a 'remote' process, accepting shared memory inputs. The decorator reconstructs any shared memory arrays that have been packed into a reference by :func:`remote_map`. """ @functools.wraps(fn) def new_fn(args, **kwargs): try: import signal assert hasattr( args, '__len__'), "Function must be called from remote_map to use shared arrays" assert args[0] == '__pynbody_remote_array__', "Function must be called from remote_map to use shared arrays" args = _recursive_shared_array_reconstruct(args) signal.signal(signal.SIGINT, signal.SIG_DFL) output = fn(*args[1:], **kwargs) signal.signal(signal.SIGINT, signal.SIG_IGN) return _recursive_shared_array_deconstruct([output], True)[0] except KeyboardInterrupt: signal.signal(signal.SIGINT, signal.SIG_IGN) raise RemoteKeyboardInterrupt() new_fn.__pynbody_remote_array__ = True return new_fn
[docs] def remote_map(pool, fn, *iterables): """Equivalent to pool.map, but turns any shared memory arrays into a reference that can be passed between processes. The function *fn* must be wrapped with the :func:`shared_array_remote` decorator for this to work correctly. Parameters ---------- pool : multiprocessing.Pool The pool to use for parallel processing fn : function The function to apply to each element of the iterable. This function must be wrapped with :func:`shared_array_remote` to use shared arrays. *iterables : iterable The iterables to pass to the function. If more than one iterable is passed, the function is called with arguments from each iterable in turn. Returns ------- list The results of applying the function to each element of the iterable. If the function returns shared arrays, these are transferred back to the parent process and returned fully reconstructed. """ assert getattr(fn, '__pynbody_remote_array__', False), "Function must be wrapped with shared_array_remote to use shared arrays" iterables_deconstructed = _recursive_shared_array_deconstruct( iterables) try: results = pool.map(fn, list(zip( ['__pynbody_remote_array__'] * len(iterables_deconstructed[0]), *iterables_deconstructed))) except RemoteKeyboardInterrupt: raise KeyboardInterrupt return _recursive_shared_array_reconstruct(results)
[docs] def pack(array, transfer_ownership=False): """Turn an array backed onto shared memory into something that can be passed between processes Parameters ---------- array : SimArray The array to pack. Note that this must be a shared memory array, created via :func:`make_shared_array`, or a view of such an array. Snapshots load arrays into shared memory only if you have called :func:`pynbody.snapshot.simsnap.SimSnap.enable_shared_arrays` first. transfer_ownership : bool If True, the receiving process will take over responsibility for cleaning up the shared memory. Returns ------- array_description : object A description of the array that can be passed between processes (e.g. using pickle to turn it into a short string that can be sent via a pipe). """ return _recursive_shared_array_deconstruct(array, transfer_ownership)
[docs] def unpack(array_description): """Reconstruct an array backed onto shared memory from a deconstructed array (returned by :func:`pack`). Parameters ---------- array_description : object A description of the array that has been passed in from another process, where :func:`pack` was called. Returns ------- array : SimArray A view on the same shared memory array that was passed in to :func:`pack`. """ return _recursive_shared_array_reconstruct(array_description)