"""
Implements loading of nchilada files.
"""
import os
import struct
import warnings
import xml.dom.minidom
import numpy as np
from .. import chunk, family, units
from . import SimSnap, namemapper
_name_map, _rev_name_map = namemapper.setup_name_maps('nchilada-name-mapping')
_translate_array_name = namemapper.name_map_function(_name_map, _rev_name_map)
_type_codes = list(map(np.dtype, [None, 'int8', 'uint8', 'int16', 'uint16',
'int32', 'uint32', 'int64', 'uint64',
'float32', 'float64']))
_max_buf = 1024 * 512
[docs]
class NchiladaSnap(SimSnap):
"""Implements loading of nchilada files."""
def _load_header(self, f):
# Used to use xdrlib, but that is deprecated in Python 3.11
# The following code just follows the old xdrlib implementation
buf= f.read(28)
pos= 0
# Read int, walrus assignment updates pos
assert struct.unpack('>l',buf[pos:(pos:=pos+4)])[0] == 1062053
# Read double, walrus assignment updates pos
t= struct.unpack('>d',buf[pos:(pos:=pos+8)])[0]
# Read four ints, walrus assignment updates pos
highword, nbod, ndim, code = (
struct.unpack('>l',buf[pos:(pos:=pos+4)])[0] for i in range(4)
)
return t, nbod, ndim, _type_codes[code]
def _update_loadable_keys(self):
self._loadable_keys_registry = {}
d = self._loadable_keys_registry
fams = self._dom_sim.getElementsByTagName('family')
for f in sorted(fams, key=lambda x: str(x)):
fam_name = f.attributes['name'].value
our_fam = family.get_family(fam_name)
d_f = {}
for a in f.getElementsByTagName('attribute'):
our_name = _translate_array_name(
a.attributes['name'].value, reverse=True)
filename = os.path.join(
self._filename, a.attributes['link'].value)
d_f[our_name] = filename
d[our_fam] = d_f
def _setup_slices(self, take=None):
disk_family_slice = {}
i = 0
# for each family, find an array (any array) to determine length and
# set up a logical map of particles on disk
for f in sorted(self._loadable_keys_registry.keys()):
ars = self._loadable_keys_registry[f]
with open(list(ars.values())[0], 'rb') as tf:
header_time, nbod, _, _ = self._load_header(tf)
disk_family_slice[f] = slice(i, i + nbod)
i += nbod
self.properties['a'] = header_time
self._load_control = chunk.LoadControl(
disk_family_slice, _max_buf, take)
self._family_slice = self._load_control.mem_family_slice
self._num_particles = self._load_control.mem_num_particles
[docs]
def __init__(self, filename, **kwargs):
"""Load a nchilada file.
Parameters
----------
filename : str
The path to the nchilada output to load. Nchilada outputs are directories containing a description.xml
file and a number of binary files.
take : np.ndarray, optional
The array of particles to load. If not specified, all particles are loaded.
"""
super().__init__()
must_have_paramfile = kwargs.get('must_have_paramfile', False)
take = kwargs.get('take', None)
self._dom_sim = xml.dom.minidom.parse(
os.path.join(filename, "description.xml")).getElementsByTagName('simulation')[0]
self._filename = filename
self._update_loadable_keys()
self._setup_slices(take=take)
self._paramfilename = kwargs.get('paramfile', None)
self._decorate()
if 'dKpcUnit' not in self._paramfile:
if must_have_paramfile:
raise RuntimeError("Could not find .param file for this run. Place it in the run's directory or parent directory.")
else:
warnings.warn(
"No readable param file in the run directory or parent directory: using defaults.", RuntimeWarning)
self._file_units_system = [
units.Unit(x) for x in ('G', '1 kpc', '1e10 Msol')]
[docs]
def loadable_keys(self, fam=None):
if fam is not None:
return list(self._loadable_keys_registry[fam].keys())
else:
loadable = None
for f in self._loadable_keys_registry.values():
if loadable is None:
loadable = set(f.keys())
else:
loadable = loadable.intersection(iter(f.keys()))
return list(loadable)
def _open_file_for_array(self, fam, array_name):
fname = self._loadable_keys_registry[fam].get(array_name, None)
if not fname:
raise OSError("No such array on disk")
f = open(fname, 'rb')
return f
def _attempt_load_all_families(self, array_name):
fams = self.families()
universal_dtype = None
universal_ndim = None
if fams==[]:
return
for fam in fams:
# this will raise an IOError propagating upwards if any family doesn't have the appropriate array
with self._open_file_for_array(fam, array_name) as f:
_, nbod, ndim, dtype = self._load_header(f)
if universal_dtype is not None:
if ndim!=universal_ndim:
raise OSError("Mismatching dimensions for array")
if dtype!=universal_dtype:
raise OSError("Mismatching data type for array")
universal_ndim, universal_dtype = ndim, dtype
self._create_array(array_name,universal_ndim,universal_dtype,False)
for fam in fams:
self._load_array(array_name, fam)
def _load_array(self, array_name, fam=None):
if fam is None:
self._attempt_load_all_families(array_name)
return
f = self._open_file_for_array(fam, array_name)
_, nbod, ndim, dtype = self._load_header(f)
if array_name not in list(self.keys()):
self._create_family_array(array_name, fam, ndim=ndim,dtype=dtype)
r = self[fam][array_name]
if units.has_units(r):
r.convert_units(self._default_units_for(array_name))
else:
r.set_default_units()
disk_dtype = dtype.newbyteorder('>')
buf_shape = _max_buf
if ndim > 1:
buf_shape = (_max_buf, ndim)
b = np.empty(buf_shape)
# skip over min and max values (see issue #211)
np.fromfile(f, dtype=disk_dtype, count=2 * ndim)
for readlen, buf_index, mem_index in self._load_control.iterate(fam, fam):
b = np.fromfile(f, dtype=disk_dtype, count=readlen * ndim)
if ndim > 1:
b = b.reshape((readlen, ndim))
if mem_index is not None:
r[mem_index] = b[buf_index]
f.close()
@classmethod
def _can_load(cls, f):
return os.path.isdir(f) and os.path.exists(os.path.join(f, "description.xml"))