Source code for pynbody.snapshot.pkdgravhdf
import configparser
import logging
import warnings
from .. import config_parser, family, units
from .gadgethdf import GadgetHDFSnap, _GadgetHdfMultiFileManager
logger = logging.getLogger('pynbody.snapshot.pkdgravhdf')
try:
import h5py
except ImportError:
h5py = None
_pkd_default_type_map = {}
for x in family.family_names():
try:
_pkd_default_type_map[family.get_family(x)] = \
[q.strip() for q in config_parser.get('pkdgrav3hdf-type-mapping', x).split(",")]
except configparser.NoOptionError:
pass
_pkd_all_hdf_particle_groups = []
for hdf_groups in _pkd_default_type_map.values():
for hdf_group in hdf_groups:
_pkd_all_hdf_particle_groups.append(hdf_group)
class _PkdgravHdfMultiFileManager(_GadgetHdfMultiFileManager) :
_nfiles_groupname = "Header"
_nfiles_attrname = "NumFilesPerSnapshot"
_namemapper_config_section = "pkdgrav3hdf-name-mapping"
def _make_filename_for_cpu(self, filename, n):
return filename + f".{n}"
def get_cosmo_attrs(self):
return self[0].parent['Cosmology'].attrs
[docs]
class PkdgravHDFSnap(GadgetHDFSnap):
# PKDGRAV3 creates files by default with the format
# {name}.{step:05d}.{n}
# where `n>=0` is the index of the file when doing parallel writting.
# It does not contain any ".hdf5" extension in the file names.
_multifile_manager_class = _PkdgravHdfMultiFileManager
_readable_hdf5_test_group = "Header"
_readable_hdf5_test_attr = "Header", "PKDGRAV version"
def _get_units_from_hdf_attr(self, hdfattrs) :
# pkdgrav does not store units as attributes
return units.NoUnit()
def _all_hdf_groups(self):
for hdf_family_name in _pkd_all_hdf_particle_groups:
yield from self._hdf_files.iter_particle_groups_with_name(hdf_family_name)
@classmethod
def _guess_file_ending(cls, f):
return f.with_suffix(f.suffix + ".0")
def _init_unit_information(self):
try:
atr = self._hdf_files.get_unit_attrs()
except KeyError:
warnings.warn("No unit information found in PkdgravHDF file",
RuntimeWarning)
return {}, {}
vel_unit = atr['KmPerSecUnit'] * 1e5 * units.cm / units.s
dist_unit = atr['KpcUnit'] * units.kpc.in_units("cm") * units.cm
mass_unit = atr['MsolUnit'] * units.Msol.in_units("g") * units.g
time_unit = atr['SecUnit'] * units.s
# Create a dictionary for the units, this will come in handy later
unitvar = {'U_V': vel_unit, 'U_L': dist_unit, 'U_M': mass_unit,
'U_T': time_unit, '[K]': units.K,
'SEC_PER_YEAR': units.yr, 'SOLAR_MASS': units.Msol}
# Last two units are to catch occasional arrays like StarFormationRate
# which don't follow the patter of U_ units unfortunately
cgsvar = {'U_M': 'g', 'SOLAR_MASS': 'g', 'U_T': 's',
'SEC_PER_YEAR': 's', 'U_V': 'cm s**-1', 'U_L': 'cm', '[K]': 'K'}
self._hdf_cgsvar = cgsvar
self._hdf_unitvar = unitvar
cosmo = 'HubbleParam' in list(self._get_hdf_parameter_attrs().keys())
if cosmo:
dist_unit *= units.a
vel_unit *= units.a
self._file_units_system = [units.Unit(x) for x in [
vel_unit, dist_unit, mass_unit, "K"]]
def _get_hdf_cosmo_attrs(self):
return self._hdf_files.get_cosmo_attrs()
def _init_properties(self):
atr = self._get_hdf_header_attrs()
# Some attributes may be stored in the Cosmology header
cosmo_atr = self._get_hdf_cosmo_attrs()
cosmo_run = cosmo_atr['Cosmological run']
if cosmo_run:
self.properties['z'] = atr['Redshift']
self.properties['a'] = 1. / (1 + atr['Redshift'])
self.properties['eps'] = float(atr['Softening']) * self._hdf_unitvar['U_L']
# Not all omegas need to be specified in the attributes
try:
self.properties['omegaB0'] = cosmo_atr['Omega_b']
except KeyError:
pass
self.properties['omegaM0'] = cosmo_atr['Omega_m']
self.properties['omegaL0'] = cosmo_atr['Omega_lambda']
self.properties['boxsize'] = atr['BoxSize'] / \
cosmo_atr['HubbleParam'] * units.Mpc * units.a
self.properties['h'] = cosmo_atr['HubbleParam']
else:
self.properties['z'] = 0
self.properties['a'] = 1
self.properties['eps'] = 0
self.properties['time'] = atr['Time'] * self._hdf_unitvar['U_T']
for s, value in self._get_hdf_header_attrs().items():
if s not in ['Time', 'Omega_m', 'Omega_b', 'Omega_lambda',
'BoxSize', 'HubbleParam']:
self.properties[s] = value
for s, value in self._get_hdf_cosmo_attrs().items():
if s not in ['Time', 'Omega_m', 'Omega_b', 'Omega_lambda',
'BoxSize', 'HubbleParam']:
self.properties[s] = value