"""
Methods for describing parts of files to load.
This module provides generalized logic for getting parts of sequential data off disk.
It is for internal use. If you want to write a loader that supports partial loading,
it will make it a lot easier.
The steps for loading particle data are as follows
1. Set up a description of the particles you have on disk. This is a dictionary
mapping a family type to a slice, e.g.
.. code-block:: python
on_disk = {pynbody.family.dm: slice(0,100), pynbody.family.gas: slice(100, 150)}
describes a file with 100 dark matter particles followed by 50 gas particles.
2. Create a :class:`LoadControl` object.
.. code-block:: python
load_control = pynbody.chunk.LoadControl(on_disk, chunk_size, take)
Here,
* on_disk is the dictionary you set up in the last step.
* chunk_size is the maximum number of particles you are willing to load off
disk at once these will have to be stored in a temporary array, so you don't
want it to be too large; but also reading a small number of large chunks is
more efficient, so you don't want it to be too small. No careful experimentation
has been done with this, but chunk_sizes of around 10000 seem to work OK.
* take describes what to load in. Currently this is either ``None`` (= load the whole file)
or a list of ids (= load the specified particles). However this may be expanded
in future to a more comprehensive syntax. The idea is your code will not have to
change when this happens, and will automatically support more advanced partial loading
specifications.
3. Load your particle data. The :class:`LoadControl` object has an ``iterate`` method. This returns
step-by-step instructions that take you through the file, specifying what to keep and what
to throw away in a simple-to-use fashion. See the help for :func:`LoadControl.iterate` for details
on how to implement this final step.
"""
from __future__ import annotations
import copy
from typing import TYPE_CHECKING, Callable, Iterable, Iterator
import numpy as np
import pynbody.util.indexing_tricks
from .. import util
if TYPE_CHECKING:
from .. import family
[docs]
class LoadControl:
"""LoadControl provides the logic required for partial loading.
See the documentation for :mod:`pynbody.chunk` for more information."""
[docs]
def __init__(self, family_slice: dict[family.Family, slice], max_chunk: int, clauses: np.ndarray | None):
"""Initialize a LoadControl object.
*Inputs:*
*family_slice*: a dictionary of family slices describing the contiguous
layout of families on disk
*max_chunk*: the guaranteed maximum chunk of data to load in a single
read operation. Larger values are likely more efficient, but also require
bigger temporary buffers in your reader code.
*clauses*: a description of the type of partial loading to implement. If None, all data is loaded.
Otherwise, currently the only supported option is a numpy array of particle ids to load.
"""
self._disk_family_slice = family_slice
self._generate_family_order()
# generate simulation-level ID list
if hasattr(clauses, "__len__"):
self._ids = np.asarray(clauses, dtype=np.int64)
else:
self._ids = None # no partial loading!
self.generate_family_id_lists()
self._generate_mem_slice()
self.mem_num_particles = self.mem_family_slice[
self._ordered_families[-1]].stop
self.disk_num_particles = self._disk_family_slice[
self._ordered_families[-1]].stop
self._generate_chunks(max_chunk)
@staticmethod
def _scan_for_next_stop(ids, offset_start, id_maximum):
from . import scan
return scan.scan_for_next_stop(ids, offset_start, id_maximum)
def generate_family_id_lists(self):
if self._ids is None:
self._family_ids = None
return
self._family_ids = {}
offset = 0
stop = 0
for fam in self._ordered_families:
sl = self._disk_family_slice[fam]
self._family_ids[fam] = self._ids[
(self._ids >= sl.start) * (self._ids < sl.stop)] - sl.start
def _generate_family_order(self):
famlist = []
for fam, sl in self._disk_family_slice.items():
famlist.append((fam, sl.start, sl.stop))
famlist.sort(key=lambda x: x[1])
famlist.sort(key=lambda x: x[2])
self._ordered_families = [x[0] for x in famlist]
def _generate_mem_slice(self):
if self._ids is None:
self.mem_family_slice = self._disk_family_slice
return
self.mem_family_slice = {}
stop = 0
for current_family in self._ordered_families:
start = stop
stop = stop + len(self._family_ids[current_family])
self.mem_family_slice[current_family] = slice(start, stop)
def _generate_null_chunks(self, max_chunk):
"""Generate internal chunk map in the special case that we are loading all data.
See also :func:`_generate_chunks` for the general case.
"""
self._family_chunks = {}
for current_family in self._ordered_families:
self._family_chunks[current_family] = []
disk_sl = self._disk_family_slice[current_family]
for i0 in range(0, disk_sl.stop - disk_sl.start, max_chunk):
nread = min(disk_sl.stop - disk_sl.start - i0, max_chunk)
buf_sl = slice(0, nread)
mem_sl = slice(i0, i0 + nread)
self._family_chunks[current_family].append(
(nread, buf_sl, mem_sl))
def _generate_chunks(self, max_chunk):
"""Generate internal chunk map
This must satisfy the requirements that:
* maximum chunk size does not exceed *max_chunk*
* chunks do not cross family boundaries
"""
if self._ids is None:
self._generate_null_chunks(max_chunk)
return
self._family_chunks = {}
for current_family in self._ordered_families:
self._family_chunks[current_family] = []
disk_sl = self._disk_family_slice[current_family]
ids = self._family_ids[current_family]
i = 0
disk_ptr = 0
mem_ptr = 0
while disk_ptr < disk_sl.stop - disk_sl.start:
disk_ptr_end = disk_ptr + \
min(disk_sl.stop - disk_sl.start - disk_ptr, max_chunk - 1)
j = self._scan_for_next_stop(ids, i, disk_ptr_end - 1)
nread_disk = disk_ptr_end - disk_ptr
assert (ids[i:j] < disk_ptr_end).all()
if i != j:
mem_slice = slice(mem_ptr, mem_ptr + j - i)
else:
mem_slice = None
disk_mask = ids[i:j] - disk_ptr
mem_ptr = mem_ptr + j - i
i = j
disk_ptr = disk_ptr_end
self._family_chunks[current_family].append(
(nread_disk, disk_mask, mem_slice))
[docs]
def iterate_with_interrupts(self, families_on_disk: list[family.Family], families_in_memory: list[family.Family],
disk_interrupt_points: Iterable[int], disk_interrupt_fn: Callable,
multiskip: bool = False):
"""Yields instructions for loading an array with the specified families, breaking at specified file offsets
Performs the same function as :func:`iterate` but additionally takes a list of exact file offsets
*disk_interrupt_points* at which to interrupt the loading process and call a user-specified function
*disk_interrupt_fn*.
Parameters
----------
disk_interrupt_points: Iterable
List (or other iterable) of disk offsets at which to call the interrupt function, in ascending order
disk_interrupt_fn: Callable
Function which takes the file offset as an argument, and is called precisely at the point that the disk
interrupt point is reached
See :func:`iterate` for other parameters.
"""
fpos = 0
i = 0
next_dip = disk_interrupt_points[i]
for nread_disk, disk_slice, mem_slice in self.iterate(families_on_disk, families_in_memory, multiskip):
while next_dip and fpos + nread_disk > next_dip:
# an interrupt falls in the middle of our slice
# work out what to read first
len_pre = next_dip - fpos
if disk_slice is not None:
d_slice_pre = pynbody.util.indexing_tricks.concatenate_indexing(
slice(0, len_pre), disk_slice)
len_m_pre = pynbody.util.indexing_tricks.indexing_length(d_slice_pre)
m_slice_pre = pynbody.util.indexing_tricks.concatenate_indexing(
mem_slice, slice(0, len_m_pre))
# work out what to read second
len_post = nread_disk - len_pre
d_slice_post = copy.copy(pynbody.util.indexing_tricks.concatenate_indexing(
disk_slice, slice(len_m_pre, pynbody.util.indexing_tricks.indexing_length(mem_slice))))
# the copy above is necessary to ensure we don't end up inadvertently modifying
# list of offsets somewhere else
# that's the disk slice relative to having read the whole thing continuously.
# Offset to reflect what we've missed.
if isinstance(d_slice_post, slice):
d_slice_post = slice(
d_slice_post.start - len_pre, d_slice_post.stop - len_pre, d_slice_post.step)
else:
d_slice_post -= len_pre
len_m_post = pynbody.util.indexing_tricks.indexing_length(d_slice_post)
m_slice_post = pynbody.util.indexing_tricks.concatenate_indexing(
mem_slice, slice(len_m_pre, len_m_pre + len_m_post))
if pynbody.util.indexing_tricks.indexing_length(d_slice_post) == 0:
d_slice_post = None
m_slice_post = None
if pynbody.util.indexing_tricks.indexing_length(d_slice_pre) == 0:
d_slice_pre = None
m_slice_pre = None
else:
d_slice_pre = None
m_slice_pre = None
len_post = nread_disk - len_pre
d_slice_post = None
m_slice_post = None
yield len_pre, d_slice_pre, m_slice_pre
fpos += len_pre
disk_interrupt_fn(disk_interrupt_points[i])
# prepare for next interrupt
i += 1
if len(disk_interrupt_points) > i:
next_dip = disk_interrupt_points[i]
else:
next_dip = None
# update 'input' to reflect post reading slice (may still need further
# decomposition)
nread_disk, disk_slice, mem_slice = len_post, d_slice_post, m_slice_post
yield nread_disk, disk_slice, mem_slice
fpos += nread_disk
[docs]
def iterate(self, families_on_disk: list[family.Family], families_in_memory: list[family.Family],
multiskip: bool = False) -> Iterator[tuple[int, slice | None, slice | None]]:
"""Yields step-by-step instructions for partial-loading an array with the specified families.
A typical read loop should be as follows:
.. code-block:: python
for readlen, buffer_index, memory_index in ctl.iterate(fams_on_disk, fams_in_mem) :
data = read_entries(count=readlen)
if memory_index is not None :
target_array[memory_index] = data[buffer_index]
Obviously this can be optimized, for instance to skip through
file data when memory_index is None rather than read and discard it.
Parameters
----------
families_on_disk : list
List of families for which the array exists on disk
families_in_memory : list
List of families for which we want to read the array into memory
multiskip : bool
If True, skip commands (i.e. entries with buffer_index=None)
can have readlen greater than the block length
Yields
------
readlen : int
Number of entries to read from disk
buffer_index : slice | None
Slice to read from the resulting buffer, or None if this particular read is to be ignored (skipped)
memory_index : slice | None
Slice to write into memory, or None if ``buffer_index`` is None
"""
mem_offset = 0
skip_accumulation = 0
for current_family in self._ordered_families:
if skip_accumulation > 0:
yield skip_accumulation, None, None
skip_accumulation = 0
if current_family not in families_on_disk:
assert current_family not in families_in_memory
else:
if current_family in families_in_memory:
for nread_disk, disk_mask, mem_slice in self._family_chunks[current_family]:
if mem_slice is None:
if multiskip:
skip_accumulation += nread_disk
else:
yield nread_disk, None, None
else:
if skip_accumulation > 0:
yield skip_accumulation, None, None
skip_accumulation = 0
mem_slice_offset = slice(
mem_slice.start + mem_offset, mem_slice.stop + mem_offset)
yield nread_disk, disk_mask, mem_slice_offset
mem_fs = self.mem_family_slice[current_family]
mem_offset += mem_fs.stop - mem_fs.start
else:
for nread_disk, disk_mask, mem_slice in self._family_chunks[current_family]:
yield nread_disk, None, None