"""
Ionisation fraction estimations from grids of *cloudy* models
This module provides a way to estimate the equilibrium ionisation fractions of various ions in a gas particle based on
a grid of *cloudy* models. The grid is defined by redshift, temperature and density, and the ionisation fractions are
interpolated from this grid. The grid can be generated by running *cloudy* with a given radiation table and parameters,
or loaded from a pre-existing file.
.. note::
*Cloudy* can be downloaded from https://www.nublado.org/. You will need to compile it yourself if you want to
calculate new tables. The ``hm12`` and ``fg20`` tables provided with pynbody are calculated using *cloudy* version
23.01.
.. warning::
There are a number of limitations to the ionisation fractions calculated using this method. The most important are:
* The ionisation fractions are calculated assuming the recombination and ionisation rates are in equilibrium.
* There is no correction for self-shielding or for local sources of ionising radiation (with the exception of the
deprecated ``v1_duffy_shielded`` table).
* The ionisation fractions are calculated assuming a fixed metallicity of 1/10th solar.
* The UV backgrounds assumed may not be the same as used in any particular simulation.
If ionisation fractions are a crucial part of your scientific analysis, be sure to understand these limitations
and consider using a more sophisticated method e.g. radiative transfer post-processing or in the live simulation.
Information about available tables is provided in the function :func:`use_custom_ion_table`. The default table is
``hm12``, which is calculated based on the Haardt & Madau 2012 ionising background.
"""
import abc
import logging
import os
import subprocess
import h5py
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from .. import util
logger = logging.getLogger('pynbody.analysis.ionfrac')
def _cloudy_output_line_to_dictionary(line):
"""Process a single line from the cloudy ionisation output.
Turn its raw output into a dictionary of ionisation fractions, e.g. HeI: 0.1, HeII: 0.2, HeIII: 0.8.
The ionisation fractions for a given element sum to one.
"""
element_symbols = {
"Hydrogen": "H",
"Helium": "He",
"Lithium": "Li",
"Beryllium": "Be",
"Boron": "B",
"Carbon": "C",
"Nitrogen": "N",
"Oxygen": "O",
"Fluorine": "F",
"Neon": "Ne",
"Sodium": "Na",
"Magnesium": "Mg",
"Aluminium": "Al",
"Silicon": "Si",
"Phosphorus": "P",
"Sulphur": "S",
"Chlorine": "Cl",
"Argon": "Ar",
"Potassium": "K",
"Calcium": "Ca",
"Scandium": "Sc",
"Titanium": "Ti",
"Vanadium": "V",
"Chromium": "Cr",
"Manganese": "Mn",
"Iron": "Fe",
"Cobalt": "Co",
"Nickel": "Ni",
"Copper": "Cu",
"Zinc": "Zn"
}
ion_stages = ["I", "II", "III", "IV", "V", "VI", "VII", "VIII", "IX", "X", "XI", "XII",
"XIII", "XIV", "XV", "XVI", "XVII"]
element_name = line[1:11].strip()
ion_fracs = []
for i in range(17):
this_ion = line[11 + i * 7:18 + i * 7].strip()
try:
ion_fracs.append(10.**float(this_ion))
except ValueError:
break
if element_name not in element_symbols:
return {}
element_symbol = element_symbols[element_name]
if element_symbol == "H":
# Hydrogen has a special case of outputting molecular hydrogen
ion_stages[2] = "2"
ion_fracs = np.asarray(ion_fracs)
ion_fracs /= ion_fracs.sum() # correct any rounding errors
return {element_symbol + ion_stage: float(ion_frac) for ion_stage, ion_frac in zip(ion_stages, ion_fracs)}
def _run_cloudy(redshift, log_temp, log_den, table, cloudy_path, metallicity=0.1):
"""
Run cloudy and return the output ionisation fractions
"""
template = """title pynbody_grid_run
cmb z={redshift}
table {table} z = {redshift}
hden {log_hden}
metals {metallicity}
constant temperature {temperature}
stop zone 1
"""
# correct from total density to hydrogen density on the assumption of 1/10th solar metallicity
Y_over_X = 0.245/0.755 # primordial helium over hydrogen mass fraction
Z_over_X = metallicity * 0.0187 # metals mass fraction, https://www.aanda.org/articles/aa/pdf/2024/01/aa46928-23.pdf
X =1./(1+Y_over_X+Z_over_X)
input = template.format(redshift=redshift, log_hden=log_den + np.log10(X), temperature=10**log_temp, table=table,
metallicity=metallicity)
# cloudy is fussy about its input -- remove any indentation and replace newlines with '\n'
input = '\n'.join([x.strip() for x in input.split('\n')])
# Start the subprocess
process = subprocess.Popen(
[cloudy_path], # Replace with your command and arguments
stdin=subprocess.PIPE, # Allows writing to stdin
stdout=subprocess.PIPE, # Allows reading from stdout
stderr=subprocess.PIPE, # Capture stderr (optional)
text=True # Ensures that communication is in string format (instead of bytes)
)
process.stdin.write(input)
process.stdin.flush() # Flush the input to ensure it is sent
stdout, stderr = process.communicate() # Waits for the process to complete and fetches output
# search for "Log10 Mean Ionisation" in the output
table_start_line_number = None
out_lines = stdout.split('\n')
for i, line in enumerate(out_lines):
if "Log10 Mean Ionisation (over radius)" in line:
table_start_line_number = i
break
if table_start_line_number is None:
raise ValueError("Could not find ionisation table in cloudy output")
result = {}
for i in range(0, 29):
result.update(_cloudy_output_line_to_dictionary(
out_lines[table_start_line_number + i]
))
return result
def _run_cloudy_task_wrapper(args):
"""Wrapper for _run_cloudy that can be called by multiprocessing.Pool"""
return _run_cloudy(*args)
[docs]
class IonFractionTableBase(abc.ABC):
"""Abstract base class for ionization fraction tables"""
[docs]
@abc.abstractmethod
def __init__(self):
pass
[docs]
@abc.abstractmethod
def calculate(self, simulation, ion='ovi'):
"""Calculate the ion fraction for a given ion in the gas particles of a simulation.
Parameters
----------
simulation : pynbody.snapshot.SimSnap
The simulation snapshot to calculate the ion fractions for. The gas particles must have 'rho' and 'temp'
fields.
ion : str
The name of the ion to calculate the fraction for. The default is 'ovi'. Case insensitive.
Returns
-------
array-like
The ion fraction for each gas particle in the simulation, according to the table.
"""
pass
def _clamp_values(self, array, vmin, vmax):
"""Modify the array in place to clamp values to the range [vmin, vmax]"""
np.clip(array, vmin, vmax, out=array)
[docs]
class IonFractionTable(IonFractionTableBase):
"""Class for calculating ion fractions from a grid of *cloudy* models.
Rather than use this class directly, for many uses it is simpler to use :func:`calculate` which will use the
currently loaded or default table.
"""
[docs]
def __init__(self, redshift_values, log_temp_values, log_den_values, tables):
"""Initialise an ion fraction table from raw data.
Most users will instead want to use :meth:`load` to load a pre-existing table, or :meth:`from_cloudy` to
generate a new table by executing cloudy.
Parameters
----------
redshift_values : array_like
Redshift values at which the tables are defined
log_temp_values : array_like
Log10 temperature values at which the tables are defined
log_den_values : array_like
Log10 density values at which the tables are defined
tables : dict
Dictionary of tables, with keys being ion names and values being 3D numpy arrays of ion fraction values.
The shape of each array should be (len(redshift_values), len(log_temp_values), len(log_den_values)).
"""
self._redshift_values = redshift_values
self._log_temp_values = log_temp_values
self._log_den_values = log_den_values
self._tables = {k.upper(): v for k, v in tables.items()}
self._interpolators = {ion: RegularGridInterpolator(
(self._redshift_values, self._log_temp_values, self._log_den_values),
np.log10(np.maximum(self._tables[ion], np.nextafter(0.0, 1.0)))
) for ion in self._tables.keys()}
[docs]
def calculate(self, simulation, ion='ovi'):
"""Calculate the ionisation fraction for the gas particles of a given simulation
Values are interpolated from the (rho,T) table at the appropriate redshift, and any values outside the
range of the table are clamped to the nearest valid value.
Parameters
----------
simulation : pynbody.snapshot.SimSnap
The simulation snapshot to calculate the ion fractions for. The gas particles must have 'rho' and 'temp'
fields.
ion : str
The name of the ion to calculate the fraction for, e.g. HI, MgII, OVI etc. The only molecular fraction
available is H2. The default is 'ovi'. Case insensitive.
Returns
-------
array-like
The ion fraction for each gas particle in the simulation, according to the table.
"""
den_values = np.log10(simulation.gas['rho'].in_units('m_p cm^-3')).view(np.ndarray)
temp_values = np.log10(simulation.gas['temp'].in_units('K')).view(np.ndarray)
redshift_values = np.repeat(simulation.properties['z'], len(simulation.gas))
self._clamp_values(temp_values, np.min(self._log_temp_values), np.max(self._log_temp_values))
self._clamp_values(den_values, np.min(self._log_den_values), np.max(self._log_den_values))
return 10 ** self._interpolators[ion.upper()]((redshift_values, temp_values, den_values))
[docs]
def save(self, filename):
"""Save the table to a numpy .npz file"""
np.savez(filename, redshift_values=self._redshift_values, log_temp_values=self._log_temp_values,
log_den_values=self._log_den_values, **self._tables)
[docs]
def plot(self, ion='ovi', redshift=0.0):
"""Use matplotlib to plot the ion fraction table for a given ion at a given redshift"""
import matplotlib.pyplot as plt
plt.imshow(self._tables[ion.upper()][np.searchsorted(self._redshift_values, redshift)][::-1],
extent=(self._log_den_values[0], self._log_den_values[-1],
self._log_temp_values[0], self._log_temp_values[-1]),
aspect='auto')
plt.xlabel('log10(Density/$m_p$ cm$^{-3}$)')
plt.ylabel('log10(Temperature/K)')
plt.title(ion + ' ion fraction at z=' + str(redshift))
[docs]
@classmethod
def load(cls, filename):
"""Load a table from a numpy .npz file, generated using :meth:`save`
If the file is not found, it is assumed to be a pynbody-provided table. If such a built-in table exists,
the path is modified automatically to point at it. If it does not exist, an attempt is made to download it
from a zenodo repository."""
if not os.path.exists(filename):
if not os.path.exists(cls._table_to_path(filename)):
cls._download_ionfracs(filename)
# this will raise an exception if the download fails, so now we can try again:
filename = cls._table_to_path(filename)
tables = np.load(filename)
return cls(tables['redshift_values'], tables['log_temp_values'], tables['log_den_values'],
{k: tables[k] for k in tables.files
if k not in ['redshift_values', 'log_temp_values', 'log_den_values']})
@classmethod
def _table_to_path(cls, name):
return os.path.join(os.path.dirname(__file__), name + '.npz')
@classmethod
def _download_ionfracs(cls, name):
"""Download an ion fraction table from the pynbody data repository"""
import subprocess
logger.warning("Downloading ion fraction table %s" % name)
url = f"https://zenodo.org/record/13833051/files/{name}.npz?download=1"
filename = cls._table_to_path(name)
# ideally we'd use urllib for this but on macos it fails with a certificate error
subprocess.run(["wget", "-O", filename, url], check=True)
[docs]
@classmethod
def from_cloudy(cls, cloudy_path,
table='hm12',
redshift_range = (0, 15), num_redshifts = 10,
log_temp_range = (2.0, 8.0), num_temps = 10,
log_den_range = (-8.0, 2.0), num_dens = 10):
"""Generate a table by running *cloudy* with the specified ionising radiation table and parameters.
This can take a long time, but the resulting table can then be saved using the :meth:`save` method and reused
by calling :meth:`load`. The grid is computed in parallel using the default number of processors detected
by ``multiprocessing.Pool``. A progress bar is displayed using ``tqdm``.
Parameters
----------
cloudy_path : str
Path to the *cloudy* executable
table : str
Name of the *cloudy* radiation table to use. The default is 'hm12'.
redshift_range : tuple
Minimum and maximum redshift values to use
num_redshifts : int
Number of redshift values to use. These are spaced equally in log(1+z).
log_temp_range : tuple
Minimum and maximum log10 temperature values to use
num_temps : int
Number of temperature values to use, spaced equally in log space
log_den_range : tuple
Minimum and maximum log10 density values to use
num_dens : int
Number of density values to use, spaced equally in log space
"""
tables = {}
# space redshifts z equally in log(1+z):
redshift_values = np.exp(
np.linspace(np.log(1 + redshift_range[0]), np.log(1 + redshift_range[1]), num_redshifts)) - 1.0
log_temp_values = np.linspace(log_temp_range[0], log_temp_range[1], num_temps)
log_den_values = np.linspace(log_den_range[0], log_den_range[1], num_dens)
from multiprocessing import Pool
from tqdm import tqdm
with Pool() as pool:
tasks = [(redshift, log_temp, log_den, table, cloudy_path)
for redshift in redshift_values for log_temp in log_temp_values for log_den in log_den_values]
results = list(tqdm(pool.imap(_run_cloudy_task_wrapper, tasks), total=len(tasks)))
for task_index, (redshift, log_temp, log_den, _, _) in enumerate(tasks):
result = results[task_index]
for ion in result.keys():
if ion not in tables:
tables[ion] = np.zeros((len(redshift_values), len(log_temp_values), len(log_den_values)))
redshift_index = np.searchsorted(redshift_values, redshift, side='left')
temp_index = np.searchsorted(log_temp_values, log_temp, side='left')
den_index = np.searchsorted(log_den_values, log_den, side='left')
tables[ion][redshift_index, temp_index, den_index] = result[ion]
return cls(redshift_values, log_temp_values, log_den_values, tables)
[docs]
class V1IonFractionTable(IonFractionTableBase):
"""Calculates ion fractions from an archived pynbody v1 table"""
[docs]
def __init__(self, filename=None):
if filename is None:
filename = os.path.join(os.path.dirname(__file__), "ionfracs.npz")
if os.path.exists(filename):
# import data
logger.info("Loading %s" % filename)
self._table = np.load(filename)
else:
raise OSError("ionfracs.npz (Ion Fraction table) not found")
[docs]
def calculate(self, simulation, ion='ovi'):
x_vals = self._table['redshiftvals'].view(np.ndarray)
y_vals = self._table['tempvals'].view(np.ndarray)
z_vals = self._table['denvals'].view(np.ndarray)
vals = self._table[ion + 'if'].view(np.ndarray)
return self._calculate_with_table(simulation, x_vals, y_vals, z_vals, vals)
def _get_sim_values(self, simulation, variable):
if variable=='z':
result = np.zeros(len(simulation.gas))
result[:] = simulation.properties['z']
elif variable=='temp':
result = np.log10(simulation.gas['temp']).view(np.ndarray)
elif variable=='rho':
result = np.log10(simulation.gas['rho'].in_units('m_p cm^-3')).view(np.ndarray)
else:
raise ValueError("Unknown variable: " + variable)
return result
def _calculate_with_table(self, simulation, x_vals, y_vals, z_vals, vals,
x_is='z', y_is='temp', z_is='rho'):
x = self._get_sim_values(simulation, x_is)
y = self._get_sim_values(simulation, y_is)
z = self._get_sim_values(simulation, z_is)
self._clamp_values(x, np.min(x_vals), np.max(x_vals))
self._clamp_values(y, np.min(y_vals), np.max(y_vals))
self._clamp_values(z, np.min(z_vals), np.max(z_vals))
# interpolate
interpolator = RegularGridInterpolator((x_vals, y_vals, z_vals), vals)
result_array = interpolator(np.array([x, y, z]).T)
return 10 ** result_array
[docs]
class V1DuffyIonFractionTable(V1IonFractionTable):
"""Calculates HI ion fractions using Alan Duffy's archived pynbody v1 table with self-shielding.
Only HI fractions are available in this table. It is not recommended for new work."""
[docs]
def __init__(self, selfshield = False):
"""Initialise the table
If selfshield is True, self-shielding from Duffy et al 2012 is applied."""
filename = os.path.join(os.path.dirname(__file__), "h1.hdf5")
if os.path.exists(filename):
logger.info("Loading %s" % filename)
self._table = h5py.File(filename, 'r')
else:
raise FileNotFoundError("h1.hdf5 (HI Fraction table) not found")
self._selfshield = selfshield
[docs]
def calculate(self, simulation, ion='ovi'):
if ion.lower()!='hi':
raise ValueError("This table only contains HI fractions")
hi = self._calculate_with_table(simulation, np.asarray(self._table['logd']), np.asarray(self._table['logt']),
np.asarray(self._table['redshift']), np.log10(self._table['ionbal']),
'rho', 'temp', 'z')
if self._selfshield:
# NB this is currently untested and only retained for (probable) backward compatibility
# However, it looks like it sets HI fraction to zero in high density, low T regions; is that right?
## Selfshield criteria from Duffy et al 2012a for EoS gas
hi[simulation.gas['OnEquationOfState'] == 1.] = 0.
hi[(simulation.gas['p'].in_units('K k cm**-3') > 150.)
& (simulation.gas['temp'].in_units('K') < 10.**(4.5))] = 0.
return hi
_ion_table = None
_default_ion_table = 'hm12'
[docs]
class IonTableContext(util.SettingControl):
"""Context manager for temporarily using a custom ionisation fraction table"""
[docs]
def __init__(self, ion_table):
super().__init__(globals(), "_ion_table", ion_table)
[docs]
def get_current_ion_table() -> IonFractionTableBase:
"""Get the currently loaded ionisation table. If none is loaded, the default is loaded and returned.
Returns
-------
IonFractionTableBase
The currently loaded ionisation table.
"""
global _ion_table
if _ion_table is None:
use_custom_ion_table(_default_ion_table)
return _ion_table
[docs]
def use_custom_ion_table(path_or_table):
"""Select an ionisation table to use for subsequent calculations.
The specified table will be used for all subsequent calls to :func:`calculate`. A context manager is returned,
so you can use this function in a ``with`` block to temporarily use a custom table, i.e.:
>>> with use_custom_ion_table('FG20'):
... civ_fg20 = calculate(sim, 'CIV')
>>> civ_hm12 = calculate(sim, 'CIV')
Here the first calculation uses the FG20 table, while the second uses the HM12 table. However, you do not
need to use a context manager if you want to use the custom table indefinitely.
Available tables are:
* ``hm12``: calculated using cloudy 23.01 with the Haardt & Madau (2012) background for redshifts between 0 and 15,
i.e. uses the HM12 radiation density tables shipped with cloudy.
* ``fg20``: calculated by replacing the HM12 background in cloudy with the table by Claude-Andre Faucher-Giguere.
Unfortunately this involves some hacking due to the architecture of cloudy. To reproduce, one needs to download
the FG20 table from https://galaxies.northwestern.edu/uvb-fg20/, and follow the instructions in the readme file.
The table is calculated for redshifts between 0 and 10.
* ``v1``: gives results calculated by Greg Stinson for pynbody v1. It is retained only for backwards compatibility
and we do not recommend using it for new work.
* ``v1_duffy``: gives results for hydrogen only, calculated by Alan Duffy for pynbody v1. It is retained only for
backwards compatibility and we do not recommend using it for new work.
* ``v1_duffy_shielded``: gives results for hydrogen only, calculated by Alan Duffy for pynbody v1, with self-shielding
prescription turned on. It is retained only for backwards compatibility and we do not recommend using it for new
work.
Parameters
----------
path_or_table : str or IonFractionTableBase
If a string, the name of the table to use. If an instance of IonFractionTableBase, the table to use.
Built-in tables are 'v1', 'v1_duffy', 'v1_duffy_shielded', 'hm12', 'fg20' and are case-insensitive.
See above for the origins of these tables.
Returns
-------
IonTableContext
A context manager that can be used to control the lifetime of the table. See above for usage guidance.
"""
if isinstance(path_or_table, IonFractionTableBase):
table = path_or_table
elif path_or_table.lower() == 'v1':
table = V1IonFractionTable()
elif path_or_table.lower() == 'v1_duffy':
table = V1DuffyIonFractionTable()
elif path_or_table.lower() == 'v1_duffy_shielded':
table = V1DuffyIonFractionTable(selfshield=True)
else:
table = IonFractionTable.load(path_or_table)
return IonTableContext(table)
[docs]
def calculate(sim, ion='OVI'):
"""Calculate the fractions for the specified ion in the given simulation.
Uses the currently loaded ion fraction table. If no table is loaded, the default is used. See
:func:`use_custom_ion_table` for how to load a custom table.
For important notes on the way that ion fractions are estimated, see the module documentation
(:mod:`pynbody.analysis.ionfrac`).
Parameters
----------
sim : pynbody.snapshot.SimSnap
The simulation snapshot to calculate the ion fractions for. The gas particles must have 'rho' and 'temp'
fields.
ion : str
The name of the ion to calculate the fraction for, e.g. "HI", "CIV", "MgII" etc. Molecular hydrogen is "H2".
The default is 'OVI'. Case insensitive.
Returns
-------
array-like
The ion fraction for each gas particle in the simulation, according to the table. This is defined as the
number density of the specified ion divided by the number density of the element it is derived from. As
such, the ion fractions for a particular element sum to 1.
"""
return get_current_ion_table().calculate(sim, ion)