Source code for pynbody.util.hdf_vds
"""Toolset for creating a virtual dataset from multiple HDF5 files."""
from __future__ import annotations
import atexit
import os
import tempfile
import weakref
from typing import Iterable
import h5py
[docs]
class TempHDF5File(h5py.File):
"""HDF5 file that auto-deletes on close or program exit."""
[docs]
def __init__(self, path, *args, **kwargs):
# Create temporary file
self._temp_path = path
# Initialize h5py.File with temp path
super().__init__(self._temp_path, *args, **kwargs)
# Register cleanup handlers
self._cleanup_registered = True
atexit.register(self._cleanup)
self._weakref = weakref.ref(self, lambda ref: self._cleanup())
def _cleanup(self):
"""Remove temporary file if it exists."""
if hasattr(self, '_cleanup_registered') and self._cleanup_registered:
self._cleanup_registered = False
if os.path.exists(self._temp_path):
try:
os.unlink(self._temp_path)
except OSError:
pass # File might already be deleted
[docs]
def close(self):
"""Close file and delete temp file."""
super().close()
self._cleanup()
@property
def temp_path(self):
"""Get the temporary file path."""
return self._temp_path
[docs]
class HdfVdsMaker:
"""Tool for creating a virtual dataset from multiple HDF5 files."""
[docs]
def __init__(self, hdf_files: list[h5py.File | str]):
self._files = []
for f in hdf_files:
if isinstance(f, h5py.File):
self._files.append(f)
else:
self._files.append(h5py.File(f, 'r'))
[docs]
def concatenation_keys(self) -> Iterable[str]:
"""Returns all keys to concatenate as VDS"""
# TODO - make this general
return['Subhalos', 'SubhaloParticles', 'NestedSubhalos']
[docs]
def copy_keys(self) -> Iterable[str]:
"""Returns all keys to copy from the first file into the VDS.
Examples of copy keys are headers or one-off arrays"""
# TODO - make this general
return ['SnapshotId']
[docs]
def make_hdf_vfile(self, filepath: str) -> h5py.File:
"""Create an HDF file with virtual datasets combining the datasets in the input files."""
with h5py.File(name=filepath, mode='w') as hdf_vfile:
for k in self.concatenation_keys():
self.write_single_vds(k, hdf_vfile)
for k in self.copy_keys():
self.write_single_vds(k, hdf_vfile, first_only=True)
return hdf_vfile
[docs]
def write_single_vds(self, key: str, target_hdf_file: h5py.File, first_only: bool=False):
"""Write a single virtual dataset to the target HDF file."""
shape = None
sources = []
slices = []
dtype = None
offset = 0
files = [self._files[0]] if first_only else self._files
for f in files:
source_dataset: h5py.Dataset = f[key]
if shape is None:
dtype = source_dataset.dtype
shape = source_dataset.shape
else:
if dtype != source_dataset.dtype:
raise ValueError(f"The dtypes of array {key} are inconsistent between files")
if source_dataset.shape[1:] != shape[1:]:
raise ValueError(f"The shapes of array {key} are inconsistent between files")
shape = (source_dataset.shape[0] + shape[0],)+shape[1:]
sources.append(h5py.VirtualSource(source_dataset))
slices.append(slice(offset, offset+len(source_dataset)))
offset+=len(source_dataset)
layout = h5py.VirtualLayout(shape=shape, dtype=dtype)
for slice_, vsource in zip(slices, sources):
layout[slice_] = vsource
target_hdf_file.create_virtual_dataset(key, layout)
[docs]
def get_temporary_hdf_vfile(self) -> h5py.File:
"""Create the HDF file with virtual datasets in a temporary directory, such that it is deleted on closure"""
# An ideal solution is to make a file then unlink it while keep it open, but Windows doesn't like that.
# Instead, we create a temporary file then use a wrapper around HDF5 that deletes it on closure.
self._temp_fd, self._temp_path = tempfile.mkstemp(suffix='.h5')
os.close(self._temp_fd)
hdf_vfile = h5py.File(name=self._temp_path, mode='w')
for k in self.concatenation_keys():
self.write_single_vds(k, hdf_vfile)
for k in self.copy_keys():
self.write_single_vds(k, hdf_vfile, first_only=True)
hdf_vfile.close()
# Windows seems to have a problem with reading when these files are still open, so close them
for f in self._files:
f.close()
return TempHDF5File(self._temp_path, mode='r')