Source code for pynbody.derived

"""
derived
=======

Holds procedures for creating new arrays from existing ones, e.g. for
getting the radial position. For more information see :ref:`derived`.

"""

from . import snapshot
from .snapshot import SimSnap
from . import array
from . import analysis
from . import sph
from . import config
from . import units
import numpy as np
import sys
import logging
import time
import functools

logger = logging.getLogger('pynbody.derived')


[docs]@SimSnap.derived_quantity def r(self): """Radial position""" return ((self['pos'] ** 2).sum(axis=1)) ** (1, 2)
[docs]@SimSnap.derived_quantity def rxy(self): """Cylindrical radius in the x-y plane""" return ((self['pos'][:, 0:2] ** 2).sum(axis=1)) ** (1, 2)
[docs]@SimSnap.derived_quantity def vr(self): """Radial velocity""" return (self['pos'] * self['vel']).sum(axis=1) / self['r']
[docs]@SimSnap.derived_quantity def v2(self): """Squared velocity""" return (self['vel'] ** 2).sum(axis=1)
[docs]@SimSnap.derived_quantity def vt(self): """Tangential velocity""" return np.sqrt(self['v2'] - self['vr'] ** 2)
[docs]@SimSnap.derived_quantity def ke(self): """Specific kinetic energy""" return 0.5 * (self['vel'] ** 2).sum(axis=1)
[docs]@SimSnap.derived_quantity def te(self): """Specific total energy""" return self['ke'] + self['phi']
[docs]@SimSnap.derived_quantity def j(self): """Specific angular momentum""" angmom = np.cross(self['pos'], self['vel']).view(array.SimArray) angmom.units = self['pos'].units * self['vel'].units return angmom
[docs]@SimSnap.derived_quantity def j2(self): """Square of the specific angular momentum""" return (self['j'] ** 2).sum(axis=1)
[docs]@SimSnap.derived_quantity def jz(self): """z-component of the angular momentum""" return self['j'][:, 2]
[docs]@SimSnap.derived_quantity def vrxy(self): """Cylindrical radial velocity in the x-y plane""" return (self['pos'][:, 0:2] * self['vel'][:, 0:2]).sum(axis=1) / self['rxy']
[docs]@SimSnap.derived_quantity def vcxy(self): """Cylindrical tangential velocity in the x-y plane""" f = (self['x'] * self['vy'] - self['y'] * self['vx']) / self['rxy'] f[np.where(f != f)] = 0 return f
[docs]@SimSnap.derived_quantity def vphi(self): """Azimuthal velocity (synonym for vcxy)""" return self['vcxy']
[docs]@SimSnap.derived_quantity def vtheta(self): """Velocity projected to polar direction""" return (np.cos(self['az']) * np.cos(self['theta']) * self['vx'] + np.sin(self['az']) * np.cos(self['theta']) * self['vy'] - np.sin(self['theta']) * self['vy'])
[docs]@SimSnap.derived_quantity def v_mean(self): """SPH-smoothed mean velocity""" from . import sph sph.build_tree(self) nsmooth = config['sph']['smooth-particles'] logger.info( 'Calculating mean velocity with %d nearest neighbours' % nsmooth) sm = array.SimArray(np.empty_like(self['vel']), self['vel'].units) self.kdtree.set_array_ref('rho',self['rho']) self.kdtree.set_array_ref('smooth',self['smooth']) self.kdtree.set_array_ref('mass',self['mass']) self.kdtree.set_array_ref('qty',self['vel']) self.kdtree.set_array_ref('qty_sm',sm) start = time.time() self.kdtree.populate('qty_mean',nsmooth) end = time.time() logger.info('Mean velocity done in %5.3g s' % (end - start)) return sm
[docs]@SimSnap.derived_quantity def v_disp(self): """SPH-smoothed local velocity dispersion""" from . import sph sph.build_tree(self) nsmooth = config['sph']['smooth-particles'] self['rho'] logger.info( 'Calculating velocity dispersion with %d nearest neighbours' % nsmooth) sm = array.SimArray(np.empty(len(self['vel']),dtype=self['vel'].dtype), self['vel'].units) self.kdtree.set_array_ref('rho',self['rho']) self.kdtree.set_array_ref('smooth',self['smooth']) self.kdtree.set_array_ref('mass',self['mass']) self.kdtree.set_array_ref('qty',self['vel']) self.kdtree.set_array_ref('qty_sm',sm) start = time.time() self.kdtree.populate('qty_disp',nsmooth) end = time.time() logger.info('Velocity dispersion done in %5.3g s' % (end - start)) return sm
[docs]@SimSnap.derived_quantity def age(self): """Stellar age determined from formation time and current snapshot time""" return self.properties['time'].in_units(self['tform'].units, **self.conversion_context()) - self['tform']
bands_available = ['u', 'b', 'v', 'r', 'i', 'j', 'h', 'k', 'U', 'B', 'V', 'R', 'I', 'J', 'H', 'K'] def lum_den_template(band, s): val = (10 ** (-0.4 * s[band + "_mag"])) * s['rho'] / s['mass'] val.units = s['rho'].units/s['mass'].units return val for band in bands_available: X = lambda s, b=str(band): analysis.luminosity.calc_mags(s, band=b) X.__name__ = band + "_mag" X.__doc__ = band + " magnitude from analysis.luminosity.calc_mags""" SimSnap.derived_quantity(X) lum_den = functools.partial(lum_den_template,band) lum_den.__name__ = band + "_lum_den" lum_den.__doc__ = "Luminosity density in astronomy-friendly units: 10^(-0.4 %s_mag) per unit volume. " \ "" \ "The magnitude is taken from analysis.luminosity.calc_mags."%band SimSnap.derived_quantity(lum_den)
[docs]@SimSnap.derived_quantity def theta(self): """Angle from the z axis, from [0:2pi]""" return np.arccos(self['z'] / self['r'])
[docs]@SimSnap.derived_quantity def alt(self): """Angle from the horizon, from [-pi/2:pi/2]""" return np.pi / 2 - self['theta']
[docs]@SimSnap.derived_quantity def az(self): """Angle in the xy plane from the x axis, from [-pi:pi]""" return np.arctan2(self['y'], self['x'])
[docs]@SimSnap.derived_quantity def cs(self): """Sound speed""" return np.sqrt(5.0 / 3.0 * units.k * self['temp'] / self['mu'] / units.m_p)
[docs]@SimSnap.derived_quantity def mu(sim, t0=None): """mean molecular mass, i.e. the mean atomic mass per particle""" try: x = sim["HI"] + 2 * sim["HII"] + sim["HeI"] + \ 2 * sim["HeII"] + 3 * sim["HeIII"] x = x**-1 except KeyError: x = np.empty(len(sim)).view(array.SimArray) if t0 is None: t0 = sim['temp'] x[np.where(t0 >= 1e4)[0]] = 0.59 x[np.where(t0 < 1e4)[0]] = 1.3 x.units = units.Unit("1") return x
[docs]@SimSnap.derived_quantity def p(sim): """Pressure""" p = sim["u"] * sim["rho"] * (2. / 3) p.convert_units("Pa") return p
[docs]@SimSnap.derived_quantity def u(self): """Gas internal energy derived from temperature""" gamma = 5. / 3 return self['temp'] * units.k / (self['mu'] * units.m_p * (gamma - 1))
[docs]@SimSnap.derived_quantity def temp(self): """Gas temperature derived from internal energy""" gamma = 5. / 3 mu_est = np.ones(len(self)) for i in range(5): temp = (self['u'] * units.m_p / units.k) * (mu_est * (gamma - 1)) temp.convert_units("K") mu_est = mu(self, temp) return temp
[docs]@SimSnap.derived_quantity def zeldovich_offset(self): """The position offset in the current snapshot according to the Zel'dovich approximation applied to the current velocities. (Only useful in the generation or analysis of initial conditions.)""" from . import analysis bdot_by_b = analysis.cosmology.rate_linear_growth( self, unit='km Mpc^-1 s^-1') / analysis.cosmology.linear_growth_factor(self) a = self.properties['a'] offset = self['vel'] / (a * bdot_by_b) offset.units = self['vel'].units / units.Unit('km Mpc^-1 s^-1 a^-1') return offset
[docs]@SimSnap.derived_quantity def aform(self): """The expansion factor at the time specified by the tform array.""" from . import analysis z = analysis.cosmology.redshift(self, self['tform']) a = 1. / (1. + z) return a
[docs]@SimSnap.derived_quantity def tform(self): """The time of the specified expansion factor in the aform""" from . import analysis t = analysis.cosmology.age(self, 1./self['aform'] - 1.) return t