"""
snapshot
========
This module implements the :class:`~pynbody.snapshot.SimSnap` class which manages and stores snapshot data.
It also implements the :class:`~pynbody.snapshot.SubSnap` class (and relatives) which
represent different views of an existing :class:`~pynbody.snapshot.SimSnap`.
"""
from .. import array
from .. import family
from .. import util
from .. import filt
from .. import units
from .. import config
from .. import simdict
from .. import dependencytracker
from ..units import has_units
import numpy as np
import copy
import weakref
import sys
import hashlib
import time
import warnings
import threading
import re
import gc
import traceback
import logging
from functools import reduce
logger = logging.getLogger('pynbody.snapshot')
[docs]class SimSnap(object):
"""The class for managing simulation snapshots.
For most purposes, SimSnaps should be initialized through
:func:`~pynbody.load` or :func:`~pynbody.new`.
For a basic tutorial explaining how to load a file as a SimSnap
see :doc:`tutorials/data_access`.
*Getting arrays or subsnaps*
Once a :class:`SimSnap` object ``f`` is instantiated, it can
be used in various ways. The most common operation is to
access something with the code ``f[x]``. Depending on the
type of ``x``, various behaviours result:
- If ``x`` is a string, the array named by ``x`` is returned. If
no such array exists, the framework attempts to load or
derive an array of that name (in that order). If this is
unsuccessful, a `KeyError` is raised.
- If ``x`` is a python `slice` (e.g. ``f[5:100:3]``) or an array of
integers (e.g. ``f[[1,5,100,200]]``) a subsnap containing only the
mentioned particles is returned.
See :doc:`tutorials/data_access` for more information.
- If ``x`` is a numpy array of booleans, it is interpreted as a mask and
a subsnap containing only those particles for which x[i] is True.
This means that f[condition] is a shortcut for f[np.where(condition)].
- If ``x`` is a :class:`pynbody.filt.Filter` object, a subsnap
containing only the particles which pass the filter condition
is returned.
See :doc:`tutorials/data_access` for more information.
- If ``x`` is a :class:`pynbody.family.Family` object, a subsnap
containing only the particles in that family is returned. In practice
for most code it is more convenient to write e.g. ``f.dm`` in place of
the equivalent syntax f[pynbody.family.dm].
*Getting metadata*
The property `filename` gives the filename of a snapshot.
There is also a `properties` dictionary which
contains further metadata about the snapshot. See :ref:`subsnaps`.
"""
_derived_quantity_registry = {}
_decorator_registry = {}
_loadable_keys_registry = {}
_persistent = ["kdtree", "_immediate_cache", "_kdtree_derived_smoothing"]
# The following will be objects common to a SimSnap and all its SubSnaps
_inherited = ["_immediate_cache_lock",
"lazy_off", "lazy_derive_off", "lazy_load_off", "auto_propagate_off",
"properties", "_derived_array_names", "_family_derived_array_names",
"_dependency_tracker", "immediate_mode", "delay_promotion"]
# These 3D arrays get four views automatically created, one reflecting the
# full Nx3 data, the others reflecting Nx1 slices of it
#
# TO DO: This should probably be read in from a config file
_split_arrays = {'pos': ('x', 'y', 'z'),
'vel': ('vx', 'vy', 'vz')}
@classmethod
def _array_name_1D_to_ND(self, name):
"""Map a 1D array name to a corresponding 3D array name, or return None
if no such mapping is possible.
e.g. 'vy' -> 'vel'; 'acc_z' -> 'acc'; 'mass' -> None"""
for k, v in self._split_arrays.items():
if name in v:
return k
generic_match = re.findall("^(.+)_[xyz]$", name)
if len(generic_match) == 1 and generic_match[0] not in self._split_arrays:
return generic_match[0]
return None
@classmethod
def _array_name_ND_to_1D(self, array_name):
"""Give the 3D array names derived from a 3D array.
This routine makes no attempt to establish whether the array
name passed in should indeed be a 3D array. It just returns
the 1D slice names on the assumption that it is. This is an
important distinction between this procedure and the reverse
mapping as implemented by _array_name_1D_to_ND."""
if array_name in self._split_arrays:
array_name_1D = self._split_arrays[array_name]
else:
array_name_1D = [array_name + "_" + i for i in ('x', 'y', 'z')]
return array_name_1D
def _array_name_implies_ND_slice(self, array_name):
"""Returns True if, at best guess, the array name corresponds to a 1D slice
of a ND array, on the basis of names alone.
This routine first looks at special cases (pos -> x,y,z for example),
then looks for generic names such as acc_x - however this would only be
considered a "match" for a ND subslice if 'acc' is in loadable_keys().
"""
for v in self._split_arrays.values():
if array_name in v:
return True
generic_match = re.findall("^(.+)_[xyz]$", array_name)
loadable_keys = self.loadable_keys()
keys = list(self.keys())
if len(generic_match) == 1 and generic_match[0] not in self._split_arrays:
return generic_match[0] in loadable_keys or generic_match[0] in keys
return False
def __init__(self):
"""Initialize an empty, zero-length SimSnap.
For most purposes SimSnaps should instead be initialized through
:func:`~pynbody.load` or :func:`~pynbody.new`.
"""
self._arrays = {}
self._num_particles = 0
self._family_slice = {}
self._family_arrays = {}
self._derived_array_names = []
self._family_derived_array_names = {}
for i in family._registry:
self._family_derived_array_names[i] = []
self._autoconvert = None
self._dependency_tracker = dependencytracker.DependencyTracker()
self._immediate_cache_lock = threading.RLock()
self._persistent_objects = {}
self._unifamily = None
# If True, when new arrays are created they are in shared memory by
# default
self._shared_arrays = False
self.lazy_off = util.ExecutionControl()
# use 'with lazy_off :' blocks to disable all hidden/lazy behaviour
self.lazy_derive_off = util.ExecutionControl()
# use 'with lazy_derive_off : ' blocks to disable lazy-derivation
self.lazy_load_off = util.ExecutionControl()
# use 'with lazy_load_off : ' blocks to disable lazy-loading
self.auto_propagate_off = util.ExecutionControl()
# use 'with auto_propagate_off : ' blocks to disable auto-flagging changes
# (i.e. prevent lazy-evaluated arrays from auto-re-evaluating when their
# dependencies change)
self.immediate_mode = util.ExecutionControl()
# use 'with immediate_mode: ' to always return actual numpy arrays, rather
# than IndexedSubArrays which point to sub-parts of numpy arrays
self.immediate_mode.on_exit = lambda: self._clear_immediate_mode()
self.delay_promotion = util.ExecutionControl()
# use 'with delay_promotion: ' to prevent any family arrays being promoted
# into simulation arrays (which can cause confusion because the array returned
# from create_family_array might have properties you don't expect)
self.delay_promotion.on_exit = lambda: self._delayed_array_promotions(
)
self.__delayed_promotions = []
self.properties = simdict.SimDict({})
self._file_units_system = []
############################################
# THE BASICS: SIMPLE INFORMATION
############################################
@property
def filename(self):
return self._filename
def __len__(self):
return self._num_particles
def __repr__(self):
if self._filename != "":
return "<SimSnap \"" + self._filename + "\" len=" + str(len(self)) + ">"
else:
return "<SimSnap len=" + str(len(self)) + ">"
[docs] def families(self):
"""Return the particle families which have representitives in this SimSnap.
The families are ordered by their appearance in the snapshot."""
out = []
start = {}
for fam in family._registry:
sl = self._get_family_slice(fam)
if sl.start != sl.stop:
out.append(fam)
start[fam] = (sl.start)
out.sort(key=start.__getitem__)
return out
############################################
# THE BASICS: GETTING AND SETTING
############################################
def __getitem__(self, i):
"""Return either a specific array or a subview of this simulation. See
the class documentation (:class:`SimSnap`) for more information."""
if isinstance(i, str):
return self._get_array_with_lazy_actions(i)
elif isinstance(i, slice):
return SubSnap(self, i)
elif isinstance(i, family.Family):
return FamilySubSnap(self, i)
elif isinstance(i, np.ndarray) and np.issubdtype(np.bool, i.dtype):
return self._get_subsnap_from_mask_array(i)
elif isinstance(i, (list, tuple, np.ndarray, filt.Filter)):
return IndexedSubSnap(self, i)
elif isinstance(i, int) or isinstance(i, np.int32) or isinstance(i, np.int64):
return IndexedSubSnap(self, (i,))
raise TypeError
def __setitem__(self, name, item):
"""Set the contents of an array in this snapshot"""
if self.is_derived_array(name) and not self.auto_propagate_off:
raise RuntimeError("Derived array is not writable")
if isinstance(name, tuple) or isinstance(name, list):
index = name[1]
name = name[0]
else:
index = None
self._assert_not_family_array(name)
if isinstance(item, array.SimArray):
ax = item
else:
ax = np.asanyarray(item).view(array.SimArray)
if name not in list(self.keys()):
# Array needs to be created. We do this through the
# private _create_array method, so that if we are operating
# within a particle-specific subview we automatically create
# a particle-specific array
try:
ndim = len(ax[0])
except TypeError:
ndim = 1
except IndexError:
ndim = ax.shape[-1] if len(ax.shape) > 1 else 1
# The dtype will be the same as an existing family array if
# one exists, or the dtype of the source array we are copying
dtype = self._get_preferred_dtype(name)
if dtype is None:
dtype = getattr(item, 'dtype', None)
self._create_array(name, ndim, dtype=dtype)
# Copy in contents if the contents isn't actually pointing to
# the same data (which will be the case following operations like
# += etc, since these call __setitem__).
self._set_array(name, ax, index)
def __delitem__(self, name):
if name in self._family_arrays:
# mustn't have simulation-level array of this name
assert name not in self._arrays
del self._family_arrays[name]
for v in self._family_derived_array_names.values():
if name in v:
del v[v.index(name)]
else:
del self._arrays[name]
if name in self._derived_array_names:
del self._derived_array_names[
self._derived_array_names.index(name)]
def _get_subsnap_from_mask_array(self,mask_array):
if len(mask_array.shape) > 1 or mask_array.shape[0] > len(self):
raise ValueError("Incorrect shape for masking array")
else:
return self[np.where(mask_array)]
def _get_array_with_lazy_actions(self, name):
if name in list(self.keys()):
self._dependency_tracker.touching(name)
return self._get_array(name)
with self._dependency_tracker.calculating(name):
self.__resolve_obscuring_family_array(name)
if not self.lazy_off:
if not self.lazy_load_off:
self.__load_if_required(name)
if not self.lazy_derive_off:
self.__derive_if_required(name)
return self._get_array(name)
def __load_if_required(self, name):
if name not in list(self.keys()):
try:
self.__load_array_and_perform_postprocessing(name)
except IOError:
pass
def __derive_if_required(self, name):
if name not in list(self.keys()):
self._derive_array(name)
def __resolve_obscuring_family_array(self, name):
if name in self.family_keys():
self.__remove_family_array_if_derived(name)
if name in self.family_keys():
self.__load_remaining_families_if_loadable(name)
if name in self.family_keys():
in_fam, out_fam = self.__get_included_and_excluded_families_for_array(name)
raise KeyError("""%r is a family-level array for %s. To use it over the whole simulation you need either to delete it first, or create it separately for %s.""" % (
name, in_fam, out_fam))
def __get_included_and_excluded_families_for_array(self,name):
in_fam = []
out_fam = []
for x in self.families():
if name in self[x]:
in_fam.append(x)
else:
out_fam.append(x)
return in_fam, out_fam
def __remove_family_array_if_derived(self, name):
if self.is_derived_array(name):
del self.ancestor[name]
def __load_remaining_families_if_loadable(self, name):
in_fam, out_fam = self.__get_included_and_excluded_families_for_array(name)
try:
for fam in out_fam:
self.__load_array_and_perform_postprocessing(name, fam=fam)
except IOError:
pass
def _get_persist(self, hash, name):
try:
return self._persistent_objects[hash][name]
except:
return None
def _set_persist(self, hash, name, obj=None):
if hash not in self._persistent_objects:
self._persistent_objects[hash] = {}
self._persistent_objects[hash][name] = obj
def _clear_immediate_mode(self):
for k, v in self._persistent_objects.items():
if '_immediate_cache' in v:
del v['_immediate_cache']
def __getattr__(self, name):
"""This function overrides the behaviour of f.X where f is a SimSnap object.
It serves two purposes; first, it provides the family-handling behaviour
which makes f.dm equivalent to f[pynbody.family.dm]. Second, it implements
persistent objects -- properties which are shared between two equivalent SubSnaps."""
if name in SimSnap._persistent:
obj = self.ancestor._get_persist(self._inclusion_hash, name)
if obj:
return obj
try:
return self[family.get_family(name)]
except ValueError:
pass
raise AttributeError("%r object has no attribute %r" % (
type(self).__name__, name))
def __setattr__(self, name, val):
"""This function overrides the behaviour of setting f.X where f is a SimSnap object.
It serves two purposes; first it prevents overwriting of family names (so you can't
write to, for instance, f.dm). Second, it implements persistent objects -- properties
which are shared between two equivalent SubSnaps."""
if name in family.family_names():
raise AttributeError("Cannot assign family name " + name)
if name in SimSnap._persistent:
self.ancestor._set_persist(self._inclusion_hash, name, val)
else:
return object.__setattr__(self, name, val)
def __delattr__(self, name):
"""This function allows persistent objects (as shared between two equivalent SubSnaps)
to be permanently deleted."""
if name in SimSnap._persistent:
obj = self.ancestor._get_persist(self._inclusion_hash, name)
if obj:
self.ancestor._set_persist(self._inclusion_hash, name, None)
try:
object.__delattr__(self, name)
except AttributeError:
pass
return
object.__delattr__(self, name)
############################################
# DICTIONARY EMULATION FUNCTIONS
############################################
[docs] def keys(self):
"""Return the directly accessible array names (in memory)"""
return list(self._arrays.keys())
[docs] def has_key(self, name):
"""Returns True if the array name is accessible (in memory)"""
return name in list(self.keys())
[docs] def values(self):
"""Returns a list of the actual arrays in memory"""
x = []
for k in list(self.keys()):
x.append(self[k])
return x
[docs] def items(self):
"""Returns a list of tuples describing the array
names and their contents in memory"""
x = []
for k in list(self.keys()):
x.append((k, self[k]))
return x
[docs] def get(self, key, alternative=None):
"""Standard python get method, returns self[key] if
key in self else alternative"""
try:
return self[key]
except KeyError:
return alternative
def iterkeys(self):
for k in list(self.keys()):
yield k
__iter__ = iterkeys
def itervalues(self):
for k in self:
yield self[k]
def iteritems(self):
for k in self:
yield (k, self[k])
############################################
# DICTIONARY-LIKE FUNCTIONS
# (not in the normal interface for dictionaries,
# but serving similar purposes)
############################################
[docs] def has_family_key(self, name):
"""Returns True if the array name is accessible (in memory) for at least one family"""
return name in self.family_keys()
[docs] def loadable_keys(self, fam=None):
"""Returns a list of arrays which can be lazy-loaded from
an auxiliary file."""
return []
[docs] def derivable_keys(self):
"""Returns a list of arrays which can be lazy-evaluated."""
res = []
for cl in type(self).__mro__:
if cl in self._derived_quantity_registry:
res += list(self._derived_quantity_registry[cl].keys())
return res
[docs] def all_keys(self):
"""Returns a list of all arrays that can be either lazy-evaluated
or lazy loaded from an auxiliary file."""
return self.derivable_keys() + self.loadable_keys()
[docs] def family_keys(self, fam=None):
"""Return list of arrays which are not accessible from this
view, but can be accessed from family-specific sub-views.
If *fam* is not None, only those keys applying to the specific
family will be returned (equivalent to self.fam.keys())."""
if fam is not None:
return [x for x in self._family_arrays if fam in self._family_arrays[x]]
else:
return list(self._family_arrays.keys())
############################################
# ANCESTRY FUNCTIONS
############################################
[docs] def is_ancestor(self, other):
"""Returns true if other is a subview of self"""
if other is self:
return True
elif hasattr(other, 'base'):
return self.is_ancestor(other.base)
else:
return False
[docs] def is_descendant(self, other):
"""Returns true if self is a subview of other"""
return other.is_ancestor(self)
@property
def ancestor(self):
"""The original SimSnap from which this view is derived (potentially self)"""
if hasattr(self, 'base'):
return self.base.ancestor
else:
return self
[docs] def get_index_list(self, relative_to, of_particles=None):
"""Get a list specifying the index of the particles in this view relative
to the ancestor *relative_to*, such that relative_to[get_index_list(relative_to)]==self."""
# Implementation for base snapshot
if self is not relative_to:
raise RuntimeError("Not a descendant of the specified simulation")
if of_particles is None:
of_particles = np.arange(len(self))
return of_particles
############################################
# SET-LIKE OPERATIONS FOR SUBSNAPS
############################################
[docs] def intersect(self, other, op=np.intersect1d):
"""Returns the set intersection of this simulation view with another view
of the same simulation"""
anc = self.ancestor
if not anc.is_ancestor(other):
raise RuntimeError("Parentage is not suitable")
a = self.get_index_list(anc)
b = other.get_index_list(anc)
return anc[op(a, b)]
[docs] def union(self, other):
"""Returns the set union of this simulation view with another view
of the same simulation"""
return self.intersect(other, op=np.union1d)
[docs] def setdiff(self, other):
"""Returns the set difference of this simulation view with another view
of the same simulation"""
return self.intersect(other, op=np.setdiff1d)
############################################
# UNIT MANIPULATION
############################################
[docs] def conversion_context(self):
"""Return a dictionary containing a (scalefactor) and h
(Hubble constant in canonical units) for this snapshot, ready for
passing into unit conversion functions."""
d = {}
wanted = ['a', 'h']
for x in wanted:
if x in self.properties:
d[x] = self.properties[x]
return d
def _override_units_system(self):
"""Look for and process a text file with a custom units system for this snapshot.
The text file should be named <filename>.units and contain unit specifications, one-per-line, e.g.
pos: kpc a
vel: km s^-1
mass: Msol
This override functionality needs to be explicitly called by a subclass after it has initialised
its best guess at the units.
"""
try:
f = open(self.filename+".units","r")
except IOError:
return
name_mapping = {'pos': 'distance', 'vel': 'velocity'}
units_dict = {}
for line in f:
if (not line.startswith("#")):
if ":" not in line:
raise IOError("Unknown format for units file %r"%(self.filename+".units"))
else:
t, u = list(map(str.strip,line.split(":")))
t = name_mapping.get(t,t)
units_dict[t] = u
self.set_units_system(**units_dict)
[docs] def set_units_system(self, velocity=None, distance=None, mass=None, temperature=None):
"""Set the unit system for the snapshot by specifying any or
all of `velocity`, `distance`, `mass` and `temperature`
units. The units can be given as strings or as pynbody `Unit`
objects.
If any of the units are not specified and a previous
`file_units_system` does not exist, the defaults are used.
"""
from .. import config_parser
import configparser
# if the units system doesn't exist (if this is a new snapshot), create
# one
if len(self._file_units_system) < 3:
warnings.warn("Previous unit system incomplete -- using defaults")
self._file_units_system = [
units.Unit(x) for x in ('G', '1 kpc', '1e10 Msol')]
else:
# we want to change the base units -- so convert to original
# units first and then set all arrays to new unit system
self.original_units()
# if any are missing, work them out from what we already have:
if velocity is None:
velocity = self.infer_original_units('km s^-1')
if distance is None:
distance = self.infer_original_units('kpc')
if mass is None:
mass = self.infer_original_units('Msol')
if temperature is None:
temperature = self.infer_original_units('K')
new_units = []
for x in [velocity, distance, mass, temperature]:
if x is not None:
new_units.append(units.Unit(x))
self._file_units_system = new_units
# set new units for all known arrays
for arr_name in list(self.keys()):
arr = self[arr_name]
# if the array has units, then use the current units, else
# check if a default dimension for this array exists in
# the configuration
if arr.units != units.NoUnit():
ref_unit = arr.units
else:
try:
ref_unit = config_parser.get(
'default-array-dimensions', arr_name)
except configparser.NoOptionError:
# give up -- no applicable dimension found
continue
arr.set_units_like(ref_unit)
[docs] def original_units(self):
"""Converts all arrays'units to be consistent with the units of
the original file."""
self.physical_units(distance=self.infer_original_units('km'),
velocity=self.infer_original_units('km s^-1'),
mass=self.infer_original_units('Msol'), persistent=False)
def _autoconvert_array_unit(self, ar, dims=None, ucut=3):
"""Given an array ar, convert its units such that the new units span
dims[:ucut]. dims[ucut:] are evaluated in the conversion (so will be things like
a, h etc).
If dims is None, use the internal autoconvert state to perform the conversion."""
if dims is None:
dims = self.ancestor._autoconvert
if dims is None:
return
if ar.units is not None:
try:
d = ar.units.dimensional_project(dims)
except units.UnitsException:
return
new_unit = reduce(lambda x, y: x * y, [
a ** b for a, b in zip(dims, d[:ucut])])
if new_unit != ar.units:
logger.info("Converting %s units from %s to %s" %
(ar.name, ar.units, new_unit))
ar.convert_units(new_unit)
[docs] def physical_units(self, distance='kpc', velocity='km s^-1', mass='Msol', persistent=True):
"""
Converts all array's units to be consistent with the
distance, velocity, mass basis units specified.
Base units can be specified using keywords.
**Optional Keywords**:
*distance*: string (default = 'kpc')
*velocity*: string (default = 'km s^-1')
*mass*: string (default = 'Msol')
*persistent*: boolean (default = True); apply units change to future lazy-loaded arrays if True
"""
global config
dims = [units.Unit(x) for x in (distance, velocity, mass, 'a', 'h')]
all = list(self._arrays.values())
for x in self._family_arrays:
all += list(self._family_arrays[x].values())
for ar in all:
self._autoconvert_array_unit(ar.ancestor, dims)
for k in list(self.properties.keys()):
v = self.properties[k]
if isinstance(v, units.UnitBase):
try:
new_unit = v.dimensional_project(dims)
except units.UnitsException:
continue
new_unit = reduce(lambda x, y: x * y, [
a ** b for a, b in zip(dims, new_unit[:3])])
new_unit *= v.ratio(new_unit, **self.conversion_context())
self.properties[k] = new_unit
if persistent:
self._autoconvert = dims
else:
self._autoconvert = None
[docs] def infer_original_units(self, dimensions):
"""Given a unit (or string) `dimensions`, returns a unit with the same
physical dimensions which is in the unit schema of the current file."""
dimensions = units.Unit(dimensions)
d = dimensions.dimensional_project(
self._file_units_system + ["a", "h"])
new_unit = reduce(lambda x, y: x * y, [
a ** b for a, b in zip(self._file_units_system, d)])
return new_unit
def _default_units_for(self, array_name):
"""Attempt to construct and return the units for the named array
on disk, using what we know about the purpose of arrays (in config.ini)
and the original unit system (via infer_original_units)."""
array_name = self._array_name_1D_to_ND(array_name) or array_name
u = units._default_units.get(array_name, None)
if u is not None:
u = self.infer_original_units(u)
return u
[docs] def halos(self, *args, **kwargs):
"""Tries to instantiate a halo catalogue object for the given
snapshot, using the first available method (as defined in the
configuration files)."""
from .. import halo
for c in config['halo-class-priority']:
try:
if c._can_load(self, *args, **kwargs):
return c(self, *args, **kwargs)
except TypeError:
pass
for c in config['halo-class-priority']:
try:
if c._can_run(self, *args, **kwargs):
return c(self, *args, **kwargs)
except TypeError:
pass
raise RuntimeError("No halo catalogue found for %r" % str(self))
[docs] def bridge(self, other):
"""Tries to construct a bridge function between this SimSnap
and another one.
This function calls :func:`pynbody.bridge.bridge_factory`. For
more information see :ref:`bridge-tutorial`, or the reference
documentation for :py:mod:`pynbody.bridge`.
"""
from .. import bridge
return bridge.bridge_factory(self, other)
[docs] def load_copy(self):
"""Tries to load a copy of this snapshot, using partial loading to select
only a subset of particles corresponding to a given SubSnap"""
if getattr(self.ancestor,'partial_load',False):
raise NotImplementedError("Cannot load a copy of data that was itself partial-loaded")
return load(self.ancestor.filename, take=self.get_index_list(self.ancestor))
############################################
# HELPER FUNCTIONS FOR LAZY LOADING
############################################
def _load_array(self, array_name, fam=None):
"""This function is called by the framework to load an array
from disk and should be overloaded by child classes.
If *fam* is not None, the array should be loaded only for the
specified family.
"""
raise IOError("No lazy-loading implemented")
def __load_array_and_perform_postprocessing(self, array_name, fam=None):
"""Calls _load_array for the appropriate subclass, but also attempts to convert
units of anything that gets loaded and automatically loads the whole ND array
if this is a subview of an ND array"""
array_name = self._array_name_1D_to_ND(array_name) or array_name
# keep a record of every array in existence before load (in case it
# triggers loading more than we expected, e.g. coupled pos/vel fields
# etc)
anc = self.ancestor
pre_keys = set(anc.keys())
# the following function builds a dictionary mapping families to a set of the
# named arrays defined for them.
fk = lambda: dict([(fami, set([k for k in list(anc._family_arrays.keys()) if fami in anc._family_arrays[k]]))
for fami in family._registry])
pre_fam_keys = fk()
with self.delay_promotion:
# delayed promotion is required here, otherwise units get messed up when
# a simulation array gets promoted mid-way through our loading process.
#
# see the gadget unit test, test_unit_persistence
if fam is not None:
self._load_array(array_name, fam)
else:
try:
self._load_array(array_name, fam)
except IOError:
for fam_x in self.families():
self._load_array(array_name, fam_x)
# Find out what was loaded
new_keys = set(anc.keys()) - pre_keys
new_fam_keys = fk()
for fami in new_fam_keys:
new_fam_keys[fami] = new_fam_keys[fami] - pre_fam_keys[fami]
# If the loader hasn't given units already, try to determine the defaults
# Then, attempt to convert what was loaded into friendly units
for v in new_keys:
if not units.has_units(anc[v]):
anc[v].units = anc._default_units_for(v)
anc._autoconvert_array_unit(anc[v])
for f, vals in new_fam_keys.items():
for v in vals:
if not units.has_units(anc[f][v]):
anc[f][v].units = anc._default_units_for(v)
anc._autoconvert_array_unit(anc[f][v])
############################################
# VECTOR TRANSFORMATIONS OF THE SNAPSHOT
############################################
def transform(self, matrix):
from .. import transformation
return transformation.transform(self, matrix)
def _transform(self, matrix):
"""Transforms the snapshot according to the 3x3 matrix given."""
for x in list(self.keys()):
ar = self[x]
if len(ar.shape) == 2 and ar.shape[1] == 3:
self[x] = np.dot(matrix, ar.transpose()).transpose()
[docs] def rotate_x(self, angle):
"""Rotates the snapshot about the current x-axis by 'angle' degrees."""
angle *= np.pi / 180
return self.transform(np.matrix([[1, 0, 0],
[0, np.cos(angle), -np.sin(angle)],
[0, np.sin(angle), np.cos(angle)]]))
[docs] def rotate_y(self, angle):
"""Rotates the snapshot about the current y-axis by 'angle' degrees."""
angle *= np.pi / 180
return self.transform(np.matrix([[np.cos(angle), 0, np.sin(angle)],
[0, 1, 0],
[-np.sin(angle), 0, np.cos(angle)]]))
[docs] def rotate_z(self, angle):
"""Rotates the snapshot about the current z-axis by 'angle' degrees."""
angle *= np.pi / 180
return self.transform(np.matrix([[np.cos(angle), -np.sin(angle), 0],
[np.sin(angle), np.cos(angle), 0],
[0, 0, 1]]))
[docs] def wrap(self, boxsize=None, convention='center'):
"""Wraps the positions of the particles in the box to lie between
[-boxsize/2, boxsize/2].
If no boxsize is specified, self.properties["boxsize"] is used."""
if boxsize is None:
boxsize = self.properties["boxsize"]
if isinstance(boxsize, units.UnitBase):
boxsize = float(boxsize.ratio(self[
"pos"].units, **self.conversion_context()))
if convention=='center':
for coord in "x", "y", "z":
self[coord][np.where(self[coord] < -boxsize / 2)] += boxsize
self[coord][np.where(self[coord] > boxsize / 2)] -= boxsize
elif convention=='upper':
for coord in "x", "y", "z":
self[coord][np.where(self[coord] < 0)] += boxsize
self[coord][np.where(self[coord] > boxsize)] -= boxsize
else:
raise ValueError("Unknown wrapping convention")
############################################
# WRITING FUNCTIONS
############################################
def write(self, fmt=None, filename=None, **kwargs):
if filename is None and "<" in self.filename:
raise RuntimeError(
'Cannot infer a filename; please provide one (use obj.write(filename="filename"))')
if fmt is None:
if not hasattr(self, "_write"):
raise RuntimeError(
'Cannot infer a file format; please provide one (e.g. use obj.write(filename="filename", fmt=pynbody.tipsy.TipsySnap)')
self._write(self, filename, **kwargs)
else:
fmt._write(self, filename, **kwargs)
[docs] def write_array(self, array_name, fam=None, overwrite=False, **kwargs):
"""
Write out the array with the specified name.
Some of the functionality is available via the
:func:`pynbody.array.SimArray.write` method, which calls the
present function with appropriate arguments.
**Input**
*array_name* - the name of the array to write
**Optional Keywords**
*fam* (None) - Write out only one family; or provide a list to
write out a set of families.
"""
# Determine whether this is a write or an update
if fam is None:
fam = self.families()
# It's an update if we're not fully replacing the file on
# disk, i.e. there exists a family f in self.families() but
# not in fam for which array_name is loadable
is_update = any([array_name in self[
f].loadable_keys() and f not in fam for f in self.families()])
if not hasattr(self, "_write_array"):
raise IOError(
"The underlying file format class does not support writing individual arrays back to disk")
if is_update and not hasattr(self, "_update_array"):
raise IOError(
"The underlying file format class does not support updating arrays on disk")
# It's an overwrite if we're writing over something loadable
is_overwriting = any([array_name in self[
f].loadable_keys() for f in fam])
if is_overwriting and not overwrite:
# User didn't specifically say overwriting is OK
raise IOError(
"This operation would overwrite existing data on disk. Call again setting overwrite=True if you want to enable this behaviour.")
if is_update:
self._update_array(array_name, fam=fam, **kwargs)
else:
self._write_array(self, array_name, fam=fam, **kwargs)
############################################
# LOW-LEVEL ARRAY MANIPULATION
############################################
def _get_preferred_dtype(self, array_name):
"""Return the 'preferred' numpy datatype for a named array.
This is mainly useful when creating family arrays for new families, to be
sure the datatype chosen matches"""
if hasattr(self, 'base'):
return self.base._get_preferred_dtype(array_name)
elif array_name in list(self.keys()):
return self[array_name].dtype
elif array_name in self.family_keys():
return self._family_arrays[array_name][list(self._family_arrays[array_name].keys())[0]].dtype
else:
return None
def _create_array(self, array_name, ndim=1, dtype=None, zeros=True, derived=False, shared=None):
"""Create a single snapshot-level array of dimension len(self) x ndim, with
a given numpy dtype.
*kwargs*:
- *ndim*: the number of dimensions for each particle
- *dtype*: a numpy datatype for the new array
- *zeros*: if True, zeros the array (which takes a bit of time); otherwise
the array is uninitialized
- *derived*: if True, this new array will be flagged as a derived array
which makes it read-only
- *shared*: if True, the array will be built on top of a shared-memory array
to make it possible to access from another process
"""
# Does this actually correspond to a slice into a 3D array?
NDname = self._array_name_1D_to_ND(array_name)
if NDname:
self._create_array(
NDname, ndim=3, dtype=dtype, zeros=zeros, derived=derived)
return
if ndim == 1:
dims = self._num_particles
else:
dims = (self._num_particles, ndim)
if shared is None:
shared = self._shared_arrays
new_array = array._array_factory(dims, dtype, zeros, shared)
new_array._sim = weakref.ref(self)
new_array._name = array_name
new_array.family = None
# new_array.set_default_units(quiet=True)
self._arrays[array_name] = new_array
if derived:
if array_name not in self._derived_array_names:
self._derived_array_names.append(array_name)
if ndim == 3:
array_name_1D = self._array_name_ND_to_1D(array_name)
for i, a in enumerate(array_name_1D):
self._arrays[a] = new_array[:, i]
self._arrays[a]._name = a
def _create_family_array(self, array_name, family, ndim=1, dtype=None, derived=False, shared=None):
"""Create a single array of dimension len(self.<family.name>) x ndim,
with a given numpy dtype, belonging to the specified family. For arguments
other than *family*, see the documentation for :func:`~pynbody.snapshot.SimSnap._create_array`.
Warning: Do not assume that the family array will be available after
calling this funciton, because it might be a 'completion' of existing
family arrays, at which point the routine will actually be creating
a simulation-level array, e.g.
sim._create_family_array('bla', dm)
sim._create_family_array('bla', star)
'bla' in sim.family_keys() # -> True
'bla' in sim.keys() # -> False
sim._create_family_array('bla', gas)
'bla' in sim.keys() # -> True
'bla' in sim.family_keys() # -> False
sim[gas]['bla'] *is* guaranteed to exist, however, it just might
be a view on a simulation-length array.
"""
NDname = self._array_name_1D_to_ND(array_name)
if NDname:
self._create_family_array(
NDname, family, ndim=3, dtype=dtype, derived=derived)
return
self_families = self.families()
if len(self_families) == 1 and family in self_families:
# If the file has only one family, just go ahead and create
# a normal array
self._create_array(
array_name, ndim=ndim, dtype=dtype, derived=derived)
return
if ndim == 1:
dims = self[family]._num_particles
else:
dims = (self[family]._num_particles, ndim)
# Determine what families already have an array of this name
fams = []
dtx = None
try:
fams = list(self._family_arrays[array_name].keys())
dtx = self._family_arrays[array_name][fams[0]].dtype
except KeyError:
pass
fams.append(family)
if dtype is not None and dtx is not None and dtype != dtx:
# We insist on the data types being the same for, e.g. sim.gas['my_prop'] and sim.star['my_prop']
# This makes promotion to simulation-level arrays possible.
raise ValueError("Requested data type %r is not consistent with existing data type %r for family array %r" % (
str(dtype), str(dtx), array_name))
if all([x in fams for x in self_families]):
# If, once we created this array, *all* families would have
# this array, just create a simulation-level array
if self._promote_family_array(array_name, ndim=ndim, derived=derived, shared=shared) is not None:
return None
# if we get here, either the array cannot be promoted to simulation level, or that would
# not be appropriate, so actually go ahead and create the family array
if shared is None:
shared = self._shared_arrays
new_ar = array._array_factory(dims, dtype, False, shared)
new_ar._sim = weakref.ref(self)
new_ar._name = array_name
new_ar.family = family
def sfa(n, v):
try:
self._family_arrays[n][family] = v
except KeyError:
self._family_arrays[n] = dict({family: v})
sfa(array_name, new_ar)
if derived:
if array_name not in self._family_derived_array_names[family]:
self._family_derived_array_names[family].append(array_name)
if ndim == 3:
array_name_1D = self._array_name_ND_to_1D(array_name)
for i, a in enumerate(array_name_1D):
sfa(a, new_ar[:, i])
self._family_arrays[a][family]._name = a
def _del_family_array(self, array_name, family):
"""Delete the array with the specified name for the specified family"""
del self._family_arrays[array_name][family]
if len(self._family_arrays[array_name]) == 0:
del self._family_arrays[array_name]
derive_track = self._family_derived_array_names[family]
if array_name in derive_track:
del derive_track[derive_track.index(array_name)]
def _get_from_immediate_cache(self, name, fn):
"""Retrieves the named numpy array from the immediate cache associated
with this snapshot. If the array does not exist in the immediate
cache, function fn is called with no arguments and must generate
it."""
with self._immediate_cache_lock:
if not hasattr(self, '_immediate_cache'):
self._immediate_cache = [{}]
cache = self._immediate_cache[0]
hx = hash(name)
if hx not in cache:
cache[hx] = fn()
return cache[hx]
def _get_array(self, name, index=None, always_writable=False):
"""Get the array of the specified *name*, optionally
for only the particles specified by *index*.
If *always_writable* is True, the returned array is
writable. Otherwise, it is still normally writable, but
not if the array is flagged as derived by the framework."""
x = self._arrays[name]
if x.derived and not always_writable:
x = x.view()
x.flags['WRITEABLE'] = False
if index is not None:
if type(index) is slice:
ret = x[index]
else:
ret = array.IndexedSimArray(x, index)
ret.family = None
return ret
else:
return x
def _get_family_array(self, name, fam, index=None, always_writable=False):
"""Get the family-level array with specified *name* for the family *fam*,
optionally for only the particles specified by *index* (relative to the
family slice).
If *always_writable* is True, the returned array is writable. Otherwise
it is still normally writable, but not if the array is flagged as derived
by the framework.
"""
try:
x = self._family_arrays[name][fam]
except KeyError:
raise KeyError("No array " + name + " for family " + fam.name)
if x.derived and not always_writable:
x = x.view()
x.flags['WRITEABLE'] = False
if index is not None:
if type(index) is slice:
x = x[index]
else:
if _subarray_immediate_mode or self.immediate_mode:
x = self._get_from_immediate_cache(name,
lambda: x[index])
else:
x = array.IndexedSimArray(x, index)
return x
def _set_array(self, name, value, index=None):
"""Update the contents of the snapshot-level array to that
specified by *value*. If *index* is not None, update only that
subarray specified."""
util.set_array_if_not_same(self._arrays[name], value, index)
def _set_family_array(self, name, family, value, index=None):
"""Update the contents of the family-level array to that
specified by *value*. If *index* is not None, update only that
subarray specified."""
util.set_array_if_not_same(self._family_arrays[name][family],
value, index)
def _create_arrays(self, array_list, ndim=1, dtype=None, zeros=True):
"""Create a set of arrays *array_list* of dimension len(self) x ndim, with
a given numpy dtype."""
for array in array_list:
self._create_array(array, ndim, dtype, zeros)
def _get_family_slice(self, fam):
"""Turn a specified Family object into a concrete slice which describes
which particles in this SimSnap belong to that family."""
try:
return self._family_slice[fam]
except KeyError:
return slice(0, 0)
def _family_index(self):
"""Return an array giving the family number of each particle in this snapshot,
something like 0,0,0,0,1,1,2,2,2, ... where 0 means self.families()[0] etc"""
if hasattr(self, "_family_index_cached"):
return self._family_index_cached
ind = np.empty((len(self),), dtype='int8')
for i, f in enumerate(self.ancestor.families()):
ind[self._get_family_slice(f)] = i
self._family_index_cached = ind
return ind
def _assert_not_family_array(self, name):
"""Raises a ValueError if the specified array name is connected to
a family-specific array"""
if name in self.family_keys():
raise KeyError("Array " + name + " is a family-level property")
def _delayed_array_promotions(self):
"""Called automatically to catch up with pending array promotions"""
for x in self.__delayed_promotions:
self._promote_family_array(*x)
self.__delayed_promotions = []
def _promote_family_array(self, name, ndim=1, dtype=None, derived=False, shared=None):
"""Create a simulation-level array (if it does not exist) with
the specified name. Copy in any data from family-level arrays
of the same name."""
if ndim == 1 and self._array_name_1D_to_ND(name):
return self._promote_family_array(self._array_name_1D_to_ND(name), 3, dtype)
if self.delay_promotion:
# if array isn't already scheduled for promotion, do so now
if not any([x[0] == name for x in self.__delayed_promotions]):
self.__delayed_promotions.append(
[name, ndim, dtype, derived, shared])
return None
if dtype is None:
try:
x = list(self._family_arrays[name].keys())[0]
dtype = self._family_arrays[name][x].dtype
for x in list(self._family_arrays[name].values()):
if x.dtype != dtype:
warnings.warn("Data types of family arrays do not match; assuming " + str(
dtype), RuntimeWarning)
except IndexError:
pass
dmap = [name in self._family_derived_array_names[
i] for i in self._family_arrays[name]]
some_derived = any(dmap)
all_derived = all(dmap)
if derived:
some_derived = True
if not derived:
all_derived = False
if name not in self._arrays:
self._create_array(
name, ndim=ndim, dtype=dtype, derived=all_derived, shared=shared)
try:
for fam in self._family_arrays[name]:
if has_units(self._family_arrays[name][fam]) and not has_units(self._arrays[name]):
self._arrays[name].units = self._family_arrays[
name][fam].units
# inherits the units from the first dimensional family array found.
# Note that future copies, once the units are set, invoke the correct conversion
# and raise a UnitsException if such a conversion is
# impossible.
try:
self._arrays[name][self._get_family_slice(
fam)] = self._family_arrays[name][fam]
except units.UnitsException:
# There is a problem getting everything into the same units. The trouble is
# that having got here if we let the exception propagate, we're going to
# end up with the SimSnap in an inconsistent state. So force the copy
# ignoring the units and raise a warning
warnings.warn(
"When conjoining family arrays to create a snapshot level array, the units could not be unified. You will now have a snapshot-level array %r with inconsistent unit information" % name)
self._arrays[name].base[self._get_family_slice(
fam)] = self._family_arrays[name][fam].base
del self._family_arrays[name]
if ndim == 3:
for v in self._array_name_ND_to_1D(name):
del self._family_arrays[v]
gc.collect()
except KeyError:
pass
if some_derived:
if all_derived:
self._derived_array_names.append(name)
else:
warnings.warn(
"Conjoining derived and non-derived arrays. Assuming result is non-derived, so no further updates will be made.", RuntimeWarning)
for v in self._family_derived_array_names.values():
if name in v:
del v[v.index(name)]
return self._arrays[name]
############################################
# DERIVED ARRAY SYSTEM
############################################
@classmethod
def derived_quantity(cl, fn):
if cl not in SimSnap._derived_quantity_registry:
SimSnap._derived_quantity_registry[cl] = {}
SimSnap._derived_quantity_registry[cl][fn.__name__] = fn
fn.__stable__ = False
return fn
@classmethod
def stable_derived_quantity(cl, fn):
if cl not in SimSnap._derived_quantity_registry:
SimSnap._derived_quantity_registry[cl] = {}
SimSnap._derived_quantity_registry[cl][fn.__name__] = fn
fn.__stable__ = True
return fn
def _find_deriving_function(self, name):
for cl in type(self).__mro__:
if cl in self._derived_quantity_registry \
and name in self._derived_quantity_registry[cl]:
return self._derived_quantity_registry[cl][name]
else:
return None
def _derive_array(self, name, fam=None):
"""Calculate and store, for this SnapShot, the derivable array 'name'.
If *fam* is not None, derive only for the specified family.
This searches the registry of @X.derived_quantity functions
for all X in the inheritance path of the current class.
"""
global config
calculated = False
fn = self._find_deriving_function(name)
if fn:
logger.info("Deriving array %s" % name)
with self.auto_propagate_off:
if fam is None:
result = fn(self)
ndim = result.shape[-1] if len(
result.shape) > 1 else 1
self._create_array(
name, ndim, dtype=result.dtype, derived=not fn.__stable__)
write_array = self._get_array(
name, always_writable=True)
else:
result = fn(self[fam])
ndim = result.shape[-1] if len(
result.shape) > 1 else 1
# check if a family array already exists with a different dtype
# if so, cast the result to the existing dtype
# numpy version < 1.7 does not support doing this in-place
if self._get_preferred_dtype(name) != result.dtype \
and self._get_preferred_dtype(name) is not None:
if int(np.version.version.split('.')[1]) > 6 :
result = result.astype(self._get_preferred_dtype(name),copy=False)
else :
result = result.astype(self._get_preferred_dtype(name))
self[fam]._create_array(
name, ndim, dtype=result.dtype, derived=not fn.__stable__)
write_array = self[fam]._get_array(
name, always_writable=True)
self.ancestor._autoconvert_array_unit(result)
write_array[:] = result
if units.has_units(result):
write_array.units = result.units
def _dirty(self, name):
"""Declare a given array as changed, so deleting any derived
quantities which depend on it"""
name = self._array_name_1D_to_ND(name) or name
if name=='pos':
for v in self.ancestor._persistent_objects.values():
if 'kdtree' in v:
del v['kdtree']
if not self.auto_propagate_off:
for d_ar in self._dependency_tracker.get_dependents(name):
if d_ar in self or self.has_family_key(d_ar):
if self.is_derived_array(d_ar):
del self[d_ar]
self._dirty(d_ar)
[docs] def is_derived_array(self, name, fam=None):
"""Returns True if the array or family array of given name is
auto-derived (and therefore read-only)."""
fam = fam or self._unifamily
if fam:
return (name in self._family_derived_array_names[fam]) or name in self._derived_array_names
elif name in list(self.keys()):
return name in self._derived_array_names
elif name in self.family_keys():
return all([name in self._family_derived_array_names[i] for i in self._family_arrays[name]])
else:
return False
[docs] def unlink_array(self, name):
"""If the named array is auto-derived, this destroys the link so that
the array becomes editable but no longer auto-updates."""
if self.is_derived_array(name):
if name in self.family_keys():
for fam in self._family_arrays[name]:
track = self._family_derived_array_names[fam]
if name in track:
del track[track.index(name)]
else:
del self._derived_array_names[
self._derived_array_names.index(name)]
else:
raise RuntimeError("Not a derived array")
############################################
# CONVENIENCE FUNCTIONS
############################################
[docs] def mean_by_mass(self, name):
"""Calculate the mean by mass of the specified array."""
m = np.asanyarray(self["mass"])
ret = array.SimArray(
(self[name].transpose() * m).transpose().mean(axis=0) / m.mean(), self[name].units)
return ret
############################################
# SNAPSHOT DECORATION
############################################
@classmethod
def decorator(cl, fn):
if cl not in SimSnap._decorator_registry:
SimSnap._decorator_registry[cl] = []
SimSnap._decorator_registry[cl].append(fn)
return fn
def _decorate(self):
for cl in type(self).__mro__:
if cl in self._decorator_registry:
for fn in self._decorator_registry[cl]:
fn(self)
############################################
# HASHING AND EQUALITY TESTING
############################################
@property
def _inclusion_hash(self):
try:
rval = self.__inclusion_hash
except AttributeError:
try:
index_list = self.get_index_list(self.ancestor)
hash = hashlib.md5(index_list.data)
self.__inclusion_hash = hash.digest()
except:
logging.warn(
"Encountered a problem while calculating your inclusion hash. %s" % traceback.format_exc())
rval = self.__inclusion_hash
return rval
def __hash__(self):
return hash((object.__hash__(self.ancestor), self._inclusion_hash))
def __eq__(self, other):
"""Equality test for Snapshots. Returns true if both sides of the
== operator point to the same data."""
if self is other:
return True
return hash(self) == hash(other)
############################################
# COPYING
############################################
def __deepcopy__(self, memo=None):
create_args = {}
for fam in family._registry:
sl = self._get_family_slice(fam)
if sl.start != sl.stop:
create_args[fam.name] = len(self[fam])
new_snap = new(**create_args)
# ordering fix
for k in copy.copy(list(new_snap._family_slice.keys())):
new_snap._family_slice[k] = copy.copy(self._get_family_slice(k))
for k in list(self.keys()):
new_snap[k] = self[k]
for k in self.family_keys():
for fam in family._registry:
if len(self[fam]) > 0:
self_fam = self[fam]
if k in list(self_fam.keys()) and not self_fam.is_derived_array(k):
new_snap[fam][k] = self_fam[k]
new_snap.properties = copy.deepcopy(self.properties, memo)
new_snap._file_units_system = copy.deepcopy(self._file_units_system, memo)
return new_snap
_subarray_immediate_mode = False
# Set this to True to always get copies of data when indexing is
# necessary. This is mainly a bug testing/efficiency checking mode --
# shouldn't be necessary
[docs]class SubSnap(SimSnap):
"""Represent a sub-view of a SimSnap, initialized by specifying a
slice. Arrays accessed through __getitem__ are automatically
sub-viewed using the given slice."""
def __init__(self, base, _slice):
self.base = base
self._file_units_system = base._file_units_system
self._unifamily = base._unifamily
self._inherit()
if isinstance(_slice, slice):
# Various slice logic later (in particular taking
# subsnaps-of-subsnaps) requires having positive
# (i.e. start-relative) slices, so if we have been passed a
# negative (end-relative) index, fix that now.
if _slice.start is None:
_slice = slice(0, _slice.stop, _slice.step)
if _slice.start < 0:
_slice = slice(len(
base) + _slice.start, _slice.stop, _slice.step)
if _slice.stop is None or _slice.stop > len(base):
_slice = slice(_slice.start, len(base), _slice.step)
if _slice.stop < 0:
_slice = slice(_slice.start, len(
base) + _slice.stop, _slice.step)
self._slice = _slice
descriptor = "[" + str(_slice.start) + ":" + str(_slice.stop)
if _slice.step is not None:
descriptor += ":" + str(_slice.step)
descriptor += "]"
else:
raise TypeError("Unknown SubSnap slice type")
self._num_particles = util.indexing_length(_slice)
self._descriptor = descriptor
def _inherit(self):
for x in self._inherited:
setattr(self, x, getattr(self.base, x))
def _get_array(self, name, index=None, always_writable=False):
if _subarray_immediate_mode or self.immediate_mode:
return self._get_from_immediate_cache(name,
lambda: self.base._get_array(
name, None, always_writable)[self._slice])
else:
ret = self.base._get_array(name, util.concatenate_indexing(
self._slice, index), always_writable)
ret.family = self._unifamily
return ret
def _set_array(self, name, value, index=None):
self.base._set_array(
name, value, util.concatenate_indexing(self._slice, index))
def _get_family_array(self, name, fam, index=None, always_writable=False):
base_family_slice = self.base._get_family_slice(fam)
sl = util.relative_slice(base_family_slice,
util.intersect_slices(self._slice, base_family_slice, len(self.base)))
sl = util.concatenate_indexing(sl, index)
if _subarray_immediate_mode or self.immediate_mode:
return self._get_from_immediate_cache((name, fam),
lambda: self.base._get_family_array(
name, fam, None, always_writable)[sl])
else:
return self.base._get_family_array(name, fam, sl, always_writable)
def _set_family_array(self, name, family, value, index=None):
fslice = self._get_family_slice(family)
self.base._set_family_array(
name, family, value, util.concatenate_indexing(fslice, index))
def _promote_family_array(self, *args, **kwargs):
self.base._promote_family_array(*args, **kwargs)
def __delitem__(self, name):
# is this the right behaviour?
raise RuntimeError("Arrays can only be deleted from the base snapshot")
def _del_family_array(self, name, family):
# is this the right behaviour?
raise RuntimeError("Arrays can only be deleted from the base snapshot")
@property
def _filename(self):
return self.base._filename + ":" + self._descriptor
[docs] def keys(self):
return list(self.base.keys())
[docs] def loadable_keys(self, fam=None):
if self._unifamily:
return self.base.loadable_keys(self._unifamily)
else:
return self.base.loadable_keys(fam)
[docs] def derivable_keys(self):
return self.base.derivable_keys()
[docs] def infer_original_units(self, *args):
"""Return the units on disk for a quantity with the specified dimensions"""
return self.base.infer_original_units(*args)
def _get_family_slice(self, fam):
sl = util.relative_slice(self._slice,
util.intersect_slices(self._slice, self.base._get_family_slice(fam), len(self.base)))
return sl
def _load_array(self, array_name, fam=None, **kwargs):
self.base._load_array(array_name, fam)
[docs] def write_array(self, array_name, fam=None, **kwargs):
fam = fam or self._unifamily
if not fam or self._get_family_slice(fam) != slice(0, len(self)):
raise IOError(
"Array writing is available for entire simulation arrays or family-level arrays, but not for arbitrary subarrays")
self.base.write_array(array_name, fam=fam, **kwargs)
def _derive_array(self, array_name, fam=None):
self.base._derive_array(array_name, fam)
[docs] def family_keys(self, fam=None):
return self.base.family_keys(fam)
def _create_array(self, *args, **kwargs):
self.base._create_array(*args, **kwargs)
def _create_family_array(self, *args, **kwargs):
self.base._create_family_array(*args, **kwargs)
[docs] def physical_units(self, *args, **kwargs):
self.base.physical_units(*args, **kwargs)
[docs] def is_derived_array(self, v, fam=None):
return self.base.is_derived_array(v)
[docs] def unlink_array(self, name):
self.base.unlink_array(name)
[docs] def get_index_list(self, relative_to, of_particles=None):
if of_particles is None:
of_particles = np.arange(len(self))
if relative_to is self:
return of_particles
return self.base.get_index_list(relative_to, util.concatenate_indexing(self._slice, of_particles))
[docs]class IndexedSubSnap(SubSnap):
"""Represents a subset of the simulation particles according
to an index array.
Parameters
----------
base : SimSnap object
The base snapshot
index_array : integer array or None
The indices of the elements that define the sub snapshot. Set to None to use iord-based instead.
iord_array : integer array or None
The iord of the elements that define the sub snapshot. Set to None to use index-based instead.
This may be computationally expensive. See note below.
Notes
-----
`index_array` and `iord_array` arguments are mutually exclusive.
In the case of `iord_array`, an sorting operation is required that may take
a significant time and require O(N) memory.
"""
def __init__(self, base, index_array=None, iord_array=None):
self._descriptor = "indexed"
self.base = base
self._inherit()
self._unifamily = base._unifamily
self._file_units_system = base._file_units_system
if index_array is None and iord_array is None:
raise ValueError(
"Cannot define a subsnap without an index_array or iord_array.")
if index_array is not None and iord_array is not None:
raise ValueError(
"Cannot define a subsnap without both and index_array and iord_array.")
if iord_array is not None:
index_array = self._iord_to_index(iord_array)
if isinstance(index_array, filt.Filter):
self._descriptor = index_array._descriptor
index_array = index_array.where(base)[0]
elif isinstance(index_array, tuple):
if isinstance(index_array[0], np.ndarray):
index_array = index_array[0]
else:
index_array = np.array(index_array)
else:
index_array = np.asarray(index_array)
findex = base._family_index()[index_array]
# Check the family index array is monotonically increasing
# If not, the family slices cannot be implemented
if not all(np.diff(findex) >= 0):
raise ValueError(
"Families must retain the same ordering in the SubSnap")
self._slice = index_array
self._family_slice = {}
self._family_indices = {}
self._num_particles = len(index_array)
# Find the locations of the family slices
for i, fam in enumerate(self.ancestor.families()):
ids = np.where(findex == i)[0]
if len(ids) > 0:
new_slice = slice(ids.min(), ids.max() + 1)
self._family_slice[fam] = new_slice
self._family_indices[fam] = np.asarray(index_array[
new_slice]) - base._get_family_slice(fam).start
def _iord_to_index(self, iord):
# Maps iord to indices. Note that this requires to perform an argsort (O(N log N) operations)
# and a binary search (O(M log N) operations) with M = len(iord) and N = len(self.base).
if 'iord_argsort' not in self.base:
self.base['iord_argsort'] = np.argsort(self.base['iord'])
# Find index of particles using a search sort
iord_base = self.base['iord']
iord_argsort = self.base['iord_argsort']
index_array = iord_argsort[np.searchsorted(iord_base, iord, sorter=iord_argsort)]
# TODO: custom search sort to prevent this check
# Check that the iord match
assert np.all(iord_base[index_array] == iord)
return index_array
def _get_family_slice(self, fam):
# A bit messy: jump out the SubSnap inheritance chain
# and call SimSnap method directly...
return SimSnap._get_family_slice(self, fam)
def _get_family_array(self, name, fam, index=None, always_writable=False):
sl = self._family_indices.get(fam,slice(0,0))
sl = util.concatenate_indexing(sl, index)
return self.base._get_family_array(name, fam, sl, always_writable)
def _set_family_array(self, name, family, value, index=None):
self.base._set_family_array(name, family, value,
util.concatenate_indexing(self._family_indices[family], index))
def _create_array(self, *args, **kwargs):
self.base._create_array(*args, **kwargs)
[docs]class FamilySubSnap(SubSnap):
"""Represents a one-family portion of a parent snap object"""
def __init__(self, base, fam):
self.base = base
self._inherit()
self._slice = base._get_family_slice(fam)
self._unifamily = fam
self._descriptor = ":" + fam.name
# Use the slice attributes to find sub array length
self._num_particles = self._slice.stop - self._slice.start
self._file_units_system = base._file_units_system
def __delitem__(self, name):
if name in list(self.base.keys()):
raise ValueError(
"Cannot delete global simulation property from sub-view")
elif name in self.base.family_keys(self._unifamily):
self.base._del_family_array(name, self._unifamily)
[docs] def keys(self):
global_keys = list(self.base.keys())
family_keys = self.base.family_keys(self._unifamily)
return list(set(global_keys).union(family_keys))
[docs] def family_keys(self, fam=None):
# We now define there to be no family-specific subproperties,
# because all properties can be accessed through standard
# __setitem__, __getitem__ methods
return []
def _get_family_slice(self, fam):
if fam is self._unifamily:
return slice(0, len(self))
else:
return slice(0, 0)
def _get_array(self, name, index=None, always_writable=False):
try:
return SubSnap._get_array(self, name, index, always_writable)
except KeyError:
return self.base._get_family_array(name, self._unifamily, index, always_writable)
def _create_array(self, array_name, ndim=1, dtype=None, zeros=True, derived=False, shared=None):
# Array creation now maps into family-array creation in the parent
self.base._create_family_array(
array_name, self._unifamily, ndim, dtype, derived, shared)
def _set_array(self, name, value, index=None):
if name in list(self.base.keys()):
self.base._set_array(
name, value, util.concatenate_indexing(self._slice, index))
else:
self.base._set_family_array(name, self._unifamily, value, index)
def _create_family_array(self, array_name, family, ndim, dtype, derived, shared):
self.base._create_family_array(
array_name, family, ndim, dtype, derived, shared)
def _promote_family_array(self, *args, **kwargs):
pass
def _load_array(self, array_name, fam=None, **kwargs):
if fam is self._unifamily or fam is None:
self.base._load_array(array_name, self._unifamily)
def _derive_array(self, array_name, fam=None):
if fam is self._unifamily or fam is None:
self.base._derive_array(array_name, self._unifamily)
[docs]def load(filename, *args, **kwargs):
"""Loads a file using the appropriate class, returning a SimSnap
instance."""
for c in config['snap-class-priority']:
if c._can_load(filename):
logger.info("Loading using backend %s" % str(c))
return c(filename, *args, **kwargs)
raise IOError(
"File %r: format not understood or does not exist" % filename)
[docs]def new(n_particles=0, order=None, **families):
"""Create a blank SimSnap, with the specified number of particles.
Position, velocity and mass arrays are created and filled
with zeros.
By default all particles are taken to be dark matter.
To specify otherwise, pass in keyword arguments specifying
the number of particles for each family, e.g.
f = new(dm=50, star=25, gas=25)
The order in which the different families appear in the snapshot
is unspecified unless you add an 'order' argument:
f = new(dm=50, star=25, gas=25, order='star,gas,dm')
guarantees the stars, then gas, then dark matter particles appear
in sequence.
"""
if len(families) == 0:
families = {'dm': n_particles}
t_fam = []
tot_particles = 0
if order is None:
for k, v in list(families.items()):
assert isinstance(v, int)
t_fam.append((family.get_family(k), v))
tot_particles += v
else:
for k in order.split(","):
v = families[k]
assert isinstance(v, int)
t_fam.append((family.get_family(k), v))
tot_particles += v
x = SimSnap()
x._num_particles = tot_particles
x._filename = "<created>"
x._create_arrays(["pos", "vel"], 3)
x._create_arrays(["mass"], 1)
rt = 0
for k, v in t_fam:
x._family_slice[k] = slice(rt, rt + v)
rt += v
x._decorate()
return x
def _get_snap_classes():
from . import gadgethdf
from . import nchilada
from . import gadget
from . import tipsy
from . import ramses
from . import grafic
from . import ascii
_snap_classes = [gadgethdf.GadgetHDFSnap, gadgethdf.SubFindHDFSnap, gadgethdf.EagleLikeHDFSnap,
nchilada.NchiladaSnap, gadget.GadgetSnap,
tipsy.TipsySnap, ramses.RamsesSnap, grafic.GrafICSnap,
ascii.AsciiSnap]
return _snap_classes