Source code for pynbody.snapshot.ascii
"""
Supports loading of ASCII snapshots. The format is a simple text file with a header line defining columns.
"""
import os
import pathlib
import numpy as np
from .. import chunk, family, units
from . import SimSnap
_max_buf = 1024 * 512
[docs]
class AsciiSnap(SimSnap):
"""Supports loading of ASCII snapshots. The format is a simple text file with a header line defining columns.
For an ASCII snapshot to be identified automatically using :func:`pynbody.load`, the file must have a `.txt`
extension.
All particles are assumed to be dark matter.
"""
def _setup_slices(self, num_lines, take=None):
disk_family_slice = {family.dm: slice(0,num_lines)}
self._load_control = chunk.LoadControl(
disk_family_slice, _max_buf, take)
self._family_slice = self._load_control.mem_family_slice
self._num_particles = self._load_control.mem_num_particles
[docs]
def __init__(self, filename, take=None, header=None):
super().__init__()
num_particles = 0
self._header = header
with open(filename) as f:
for l in f:
if not header:
header = l
else:
num_particles+=1
self._loadable_keys = header.split()
self._filename = filename
self._file_units_system = [units.Unit("G"), units.Unit("kpc"), units.Unit("Msol")]
self._setup_slices(num_particles,take=take)
self._decorate()
[docs]
def loadable_keys(self, fam=None):
return self._loadable_keys
def _load_arrays(self, array_name_list):
with open(self._filename) as f:
if not self._header: f.readline()
rs=[]
for array_name in array_name_list:
self._create_array(array_name, ndim=1)
rs = [self[array_name] for array_name in array_name_list]
cols = [self._loadable_keys.index(array_name) for array_name in array_name_list]
ncols = len(self._loadable_keys)
buf_shape = _max_buf
b = np.empty(buf_shape)
for readlen, buf_index, mem_index in self._load_control.iterate([family.dm],[family.dm]):
b = np.fromfile(f, count=readlen*ncols, sep=' ').reshape((-1,ncols))
if mem_index is not None:
for r,col in zip(rs,cols):
r[mem_index] = b[buf_index,col]
def _load_array(self, array_name, fam=None):
if fam is not None:
raise OSError("Arrays only loadable at snapshot, not family level")
ars = [array_name]
if array_name not in self._loadable_keys:
ars = self._array_name_ND_to_1D(array_name)
for array_name_i in ars:
if array_name_i not in self._loadable_keys:
raise OSError("No such array on disk")
self._load_arrays(ars)
@classmethod
def _can_load(cls, f: pathlib.Path):
return f.exists() and f.suffix == '.txt'