Source code for pynbody.sph

"""

sph
===

pynbody SPH rendering module.

This module encompasses Kernel objects, which return C fragments from which
a final C code to perform the rendering is derived.

For most users, the function of interest will be :func:`~pynbody.sph.render_image`.

"""

import numpy as np
import scipy
import scipy.ndimage
import math
import time
import sys
import threading
import copy
import logging
import time
logger = logging.getLogger('pynbody.sph')

from . import _render
from .. import snapshot, array, config, units, util, config_parser, backcompat

try:
    from . import kdtree
except ImportError:
    raise ImportError("Pynbody cannot import the kdtree subpackage. This can be caused when you try to import pynbody directly from the installation folder. Try changing to another folder before launching python")
import os




def _get_threaded_image():
    return config_parser.getboolean('sph', 'threaded-image') and config['number_of_threads']

_threaded_image = _get_threaded_image()
_approximate_image = config_parser.getboolean('sph', 'approximate-fast-images')

def _exception_catcher(call_fn, exception_list, *args):
    try:
        call_fn(*args)
    except Exception as e:
        exception_list.append(sys.exc_info())

def _thread_map(func, *args):
    threads = []
    exceptions = []
    for arg_this in zip(*args):
        arg_ec_this = [func,exceptions]+list(arg_this)
        threads.append(threading.Thread(target=_exception_catcher, args=arg_ec_this))
        threads[-1].start()
    for t in threads:
        while t.is_alive():
            # just calling t.join() with no timeout can make it harder to
            # debug deadlocks!
            t.join(1.0)

    if len(exceptions)>0:
        # Here we re-raise the exception that was actually generated in a thread
        t,obj,trace= exceptions[0]
        raise t(obj).with_traceback(trace)

def _kernel_suitable_for_denoise(kernel):
    if type(kernel) is not Kernel:
        # e.g. does not work properly with Kernel2D
        return False
    else:
        return True

def _auto_denoise(sim, kernel):
    """Returns True if pynbody thinks denoise flag should be on for best
    results with this simulation."""

    if not _kernel_suitable_for_denoise(kernel):
        return False
    elif isinstance(sim.ancestor,snapshot.ramses.RamsesSnap):
        return True
    else:
        return False

def build_tree(sim):
    if hasattr(sim, 'kdtree') is False:
        # n.b. getting the following arrays through the full framework is
        # not possible because it can cause a deadlock if the build_tree
        # has been triggered by getting an array in the calling thread.
        boxsize = sim.properties.get('boxsize',None)
        if boxsize:
            if units.is_unit_like(boxsize):
                boxsize = float(boxsize.in_units(sim['pos'].units))
        else:
            boxsize = -1.0 # represents infinite box
        sim.kdtree = kdtree.KDTree(sim['pos'], sim['mass'],
                        leafsize=config['sph']['tree-leafsize'],
                        boxsize=boxsize)


def _tree_decomposition(obj):
    return [obj[i::_get_threaded_smooth()] for i in range(_get_threaded_smooth())]


def _get_tree_objects(sim):
    return list(map(getattr, _tree_decomposition(sim), ['kdtree'] * _get_threaded_smooth()))


def build_tree_or_trees(sim):

    if hasattr(sim, 'kdtree'):
        return

    logger.info('Building tree with leafsize=%d' % config['sph']['tree-leafsize'])

    start = backcompat.clock()
    build_tree(sim)
    end = backcompat.clock()

    logger.info('Tree build done in %5.3g s' % (end - start))


@snapshot.SimSnap.stable_derived_quantity
def smooth(self):
    build_tree_or_trees(self)

    logger.info('Smoothing with %d nearest neighbours' %
                config['sph']['smooth-particles'])

    sm = array.SimArray(np.empty(len(self['pos'])), self['pos'].units,
                       dtype=self['pos'].dtype)

    start = time.time()
    self.kdtree.set_array_ref('smooth',sm)
    self.kdtree.populate('hsm', config['sph']['smooth-particles'])
    end = time.time()

    logger.info('Smoothing done in %5.3gs' % (end - start))
    self._kdtree_derived_smoothing = True
    return sm

def _get_smooth_array_ensuring_compatibility(self):
    # On-disk smoothing information may conflict; KDTree assumes the number of nearest neighbours
    # is rigidly adhered to. Thus we must use our own self-consistent smoothing.
    if 'smooth' in self:
        if not getattr(self,'_kdtree_derived_smoothing',False):
            smooth_ar = smooth(self)
        else:
            smooth_ar = self['smooth']
    else:
        self['smooth'] = smooth_ar = smooth(self)
    return smooth_ar

@snapshot.SimSnap.stable_derived_quantity
def rho(self):
    build_tree_or_trees(self)


    logger.info('Calculating SPH density')
    rho = array.SimArray(
        np.empty(len(self['pos'])), self['mass'].units / self['pos'].units ** 3,
        dtype=self['pos'].dtype)

    
    start = time.time()


    self.kdtree.set_array_ref('smooth',_get_smooth_array_ensuring_compatibility(self))
    self.kdtree.set_array_ref('mass',self['mass'])
    self.kdtree.set_array_ref('rho',rho)

    self.kdtree.populate('rho', config['sph']['smooth-particles'])

    end = time.time()
    logger.info('Density calculation done in %5.3g s' % (end - start))

    return rho


class Kernel(object):

    def __init__(self):
        self.h_power = 3
        # Return the power of the smoothing length which appears in
        # the denominator of the expression for the general kernel.
        # Will be 3 for 3D kernels, 2 for 2D kernels.

        self.max_d = 2
        # The maximum value of the displacement over the smoothing for
        # which the kernel is non-zero

        self.safe = threading.Lock()

    def get_samples(self, dtype=np.float32):
        sys.stdout.flush()
        with self.safe:
            if not hasattr(self, "_samples"):
                sample_pts = np.arange(0, 4.01, 0.02)
                self._samples = np.array(
                    [self.get_value(x ** 0.5) for x in sample_pts], dtype=dtype)
        return self._samples

    def get_value(self, d, h=1):
        """Get the value of the kernel for a given smoothing length."""
        # Default : spline kernel
        if d < 1:
            f = 1. - (3. / 2) * d ** 2 + (3. / 4.) * d ** 3
        elif d < 2:
            f = 0.25 * (2. - d) ** 3
        else:
            f = 0

        return f / (math.pi * h ** 3)


class Kernel2D(Kernel):

    def __init__(self, k_orig=Kernel()):
        self.h_power = 2
        self.max_d = k_orig.max_d
        self.k_orig = k_orig
        self.safe = threading.Lock()

    def get_value(self, d, h=1):
        import scipy.integrate as integrate
        import numpy as np
        return 2 * integrate.quad(lambda z: self.k_orig.get_value(np.sqrt(z ** 2 + d ** 2), h), 0, h)[0]


class TopHatKernel(object):

    def __init__(self):
        self.h_power = 3
        self.max_d = 2

    def get_c_code(self):
        code = """#define KERNEL1(d,h) (d<%d *h)?%.5e/(h*h*h):0
        #define KERNEL(dx,dy,dz,h) KERNEL1(sqrt((dx)*(dx)+(dy)*(dy)+(dz)*(dz)),h)
        #define Z_CONDITION(dz,h) abs(dz)<(%d*h)
        #define MAX_D_OVER_H %d""" % (self.max_d, 3. / (math.pi * 4 * self.max_d ** self.h_power), self.max_d, self.max_d)
        return code


[docs]def render_spherical_image(snap, qty='rho', nside=8, distance=10.0, kernel=Kernel(), kstep=0.5, denoise=None, out_units=None, threaded=None): """Render an SPH image on a spherical surface. Requires healpy libraries. **Keyword arguments:** *qty* ('rho'): The name of the simulation array to render *nside* (8): The healpix nside resolution to use (must be power of 2) *distance* (10.0): The distance of the shell (for 3D kernels) or maximum distance of the skewers (2D kernels) *kernel*: The Kernel object to use (defaults to 3D spline kernel) *kstep* (0.5): The sampling distance when projecting onto the spherical surface in units of the smoothing length *denoise* (False): if True, divide through by an estimate of the discreteness noise. The returned image is then not strictly an SPH estimate, but this option can be useful to reduce noise. *threaded*: if False, render on a single core. Otherwise, the number of threads to use. Defaults to a value specified in your configuration files. *Currently multi-threaded rendering is slower than single-threaded because healpy does not release the gil*. """ if denoise is None: denoise = _auto_denoise(snap, kernel) if denoise and not _kernel_suitable_for_denoise(kernel): raise ValueError("Denoising not supported with this kernel type. Re-run with denoise=False") renderer = _render_spherical_image if threaded is None: threaded = _get_threaded_image() if threaded: im = _threaded_render_image( renderer, snap, qty, nside, distance, kernel, kstep, denoise, out_units, num_threads=threaded) else: im = renderer( snap, qty, nside, distance, kernel, kstep, denoise, out_units) return im
def _render_spherical_image(snap, qty='rho', nside=8, distance=10.0, kernel=Kernel(), kstep=0.5, denoise=None, out_units=None, __threaded=False, snap_slice=None): import healpy as hp if denoise is None: denoise = _auto_denoise(snap, kernel) if denoise and not _kernel_suitable_for_denoise(kernel): raise ValueError("Denoising not supported with this kernel type. Re-run with denoise=False") if out_units is not None: conv_ratio = (snap[qty].units * snap['mass'].units / (snap['rho'].units * snap['smooth'].units ** kernel.h_power)).ratio(out_units, **snap.conversion_context()) if snap_slice is None: snap_slice = slice(len(snap)) with snap.immediate_mode: D, h, pos, mass, rho, qtyar = [snap[x].view( np.ndarray)[snap_slice] for x in ('r', 'smooth', 'pos', 'mass', 'rho', qty)] ds = np.arange(kstep, kernel.max_d + kstep / 2, kstep) weights = np.zeros_like(ds) for i, d1 in enumerate(ds): d0 = d1 - kstep # work out int_d0^d1 x K(x), then set our discretized kernel to # match that dvals = np.arange(d0, d1, 0.05) ivals = list(map(kernel.get_value, dvals)) ivals *= dvals integ = ivals.sum() * 0.05 weights[i] = 2 * integ / (d1 ** 2 - d0 ** 2) weights[:-1] -= weights[1:] if kernel.h_power == 3: ind = np.where(np.abs(D - distance) < h * kernel.max_d)[0] # angular radius subtended by the intersection of the boundary # of the SPH particle with the boundary surface of the calculation: rad = np.arctan(np.sqrt( h[ind, np.newaxis] ** 2 - (D[ind, np.newaxis] - distance) ** 2) / distance) elif kernel.h_power == 2: ind = np.where(D < distance)[0] # angular radius taken at distance of particle: rad = np.arctan( h[ind, np.newaxis] * ds[np.newaxis, :] / D[ind, np.newaxis]) else: raise ValueError("render_spherical_image doesn't know how to handle this kernel") im, im2 = _render.render_spherical_image_core( rho, mass, qtyar, pos, D, h, ind, ds, weights, nside) im = im.view(array.SimArray) if denoise: im /= im2 im.units = snap[qty].units * snap["mass"].units / \ snap["rho"].units / snap["smooth"].units ** (kernel.h_power) im.sim = snap if out_units is not None: im.convert_units(out_units) return im def _threaded_render_image(fn, s, *args, **kwargs): """ Render an SPH image using multiple threads. The arguments are exactly the same as those to render_image, but additionally you can specify the number of threads using the keyword argument *num_threads*. The default is given by your configuration file, probably 4. It should probably match the number of cores on your machine. """ with s.immediate_mode: num_threads = kwargs['num_threads'] del kwargs['num_threads'] verbose = kwargs.get('verbose', True) kwargs['__threaded'] = True # will pass into render_image ts = [] # isolate each output in its own list so we can # sum them in a predictable order. This prevents # FP-accuracy introducing random noise at each # rendering, but of course doesn't really fix the # underlying issue that FP errors can build to percent # level errors outputs = [[] for i in range(num_threads)] if verbose: logger.info("Rendering image on %d threads..." % num_threads) for i in range(num_threads): kwargs_local = copy.copy(kwargs) kwargs_local['snap_slice'] = slice(i, None, num_threads) args_local = [outputs[i], s] + list(args) ts.append(threading.Thread( target=_render_image_bridge(fn), args=args_local, kwargs=kwargs_local)) ts[-1].start() for t in ts: t.join() # Each output is a 1-element list with a numpy array. Sum them. if any([len(o)==0 for o in outputs]): raise RuntimeError("There was a problem with the multi-threaded image render. Try running again with threaded=False to debug the underlying error.") return sum([o[0] for o in outputs]) def _interpolated_renderer(fn, levels): """ Render an SPH image using interpolation to speed up rendering where smoothing lengths are large. """ if levels == 1: return fn def render_fn(*args, **kwargs): kwargs['smooth_range'] = (0, 2) kwargs['res_downgrade'] = 1 sub = 1 base = fn(*args, **kwargs) kwargs['smooth_range'] = (1, 2) for i in range(1, levels): sub *= 2 if i == levels - 1: kwargs['smooth_range'] = (1, 100000) kwargs['res_downgrade'] = sub new_im = fn(*args, **kwargs) zoom = [float(x)/y for x,y in zip(base.shape, new_im.shape)] base += scipy.ndimage.interpolation.zoom(new_im, zoom, order=1) return base return render_fn def _render_image_bridge(fn): """Helper function for threaded_render_image; do not call directly""" def bridge(*args, **kwargs): output_list = args[0] X = fn(*args[1:], **kwargs) output_list.append(X) return bridge
[docs]def render_image(snap, qty='rho', x2=100, nx=500, y2=None, ny=None, x1=None, y1=None, z_plane=0.0, out_units=None, xy_units=None, kernel=Kernel(), z_camera=None, smooth='smooth', smooth_in_pixels=False, force_quiet=False, approximate_fast=_approximate_image, threaded=None, denoise=None): """ Render an SPH image using a typical (mass/rho)-weighted 'scatter' scheme. **Keyword arguments:** *qty* ('rho'): The name of the array within the simulation to render *x2* (100.0): The x-coordinate of the right edge of the image *nx* (500): The number of pixels wide to make the image *y2*: The y-coordinate of the upper edge of the image (default x2, or if ny is specified, x2*ny/nx) *ny* (nx): The number of pixels tall to make the image *x1* (-x2): The x-coordinate of the left edge of the image *y1* (-y2): The y-coordinate of the lower edge of the image *z_plane* (0.0): The z-coordinate of the plane of the image *out_units* (no conversion): The units to convert the output image into *xy_units*: The units for the x and y axes *kernel*: The Kernel object to use (default Kernel(), a 3D spline kernel) *z_camera*: If this is set, a perspective image is rendered, assuming the kernel is suitable (i.e. is a projecting kernel). The camera is at the specified z coordinate looking towards -ve z, and each pixel represents a line-of-sight radially outwards from the camera. The width then specifies the width of the image in the z=0 plane. Particles too close to the camera are also excluded. *smooth*: The name of the array which contains the smoothing lengths (default 'smooth') *smooth_in_pixels*: If True, the smoothing array contains the smoothing length in image pixels, rather than in real distance units (default False) *approximate_fast*: if True, render high smoothing length particles at progressively lower resolution, resample and sum *denoise*: if True, divide through by an estimate of the discreteness noise. The returned image is then not strictly an SPH estimate, but this option can be useful to reduce noise especially when rendering AMR grids which often introduce problematic edge effects. *verbose*: if True, all text output suppressed *threaded*: if False (or None), render on a single core. Otherwise, the number of threads to use (defaults to a value specified in your configuration files). """ if denoise is None: denoise = _auto_denoise(snap, kernel) if denoise and not _kernel_suitable_for_denoise(kernel): raise ValueError("Denoising not supported with this kernel type. Re-run with denoise=False") if approximate_fast: base_renderer = _interpolated_renderer( _render_image, int(np.floor(np.log2(nx / 20)))) else: base_renderer = _render_image if threaded is None: threaded = _get_threaded_image() if threaded: im = _threaded_render_image(base_renderer, snap, qty, x2, nx, y2, ny, x1, y1, z_plane, out_units, xy_units, kernel, z_camera, smooth, smooth_in_pixels, True, num_threads=threaded) else: im = base_renderer(snap, qty, x2, nx, y2, ny, x1, y1, z_plane, out_units, xy_units, kernel, z_camera, smooth, smooth_in_pixels, False) if denoise: # call self to render a 'flat field' snap['__denoise_one'] = 1 im2 = render_image(snap, '__denoise_one', x2, nx, y2, ny, x1, y1, z_plane, None, xy_units, kernel, z_camera, smooth, smooth_in_pixels, force_quiet, approximate_fast, threaded, False) del snap.ancestor['__denoise_one'] im2 = im / im2 im2.units = im.units return im2 else: return im
def _render_image(snap, qty, x2, nx, y2, ny, x1, y1, z_plane, out_units, xy_units, kernel, z_camera, smooth, smooth_in_pixels, force_quiet, smooth_range=None, res_downgrade=None, snap_slice=None, __threaded=False): """The single-threaded image rendering core function. External calls should be made to the render_image function.""" import os import os.path global config verbose = config["verbose"] and not force_quiet snap_proxy = {} # cache the arrays and take a slice of them if we've been asked to for arname in 'x', 'y', 'z', 'pos', smooth, qty, 'rho', 'mass': snap_proxy[arname] = snap[arname] if snap_slice is not None: snap_proxy[arname] = snap_proxy[arname][snap_slice] if 'boxsize' in snap.properties: boxsize = snap.properties['boxsize'].in_units(snap_proxy['x'].units,**snap.conversion_context()) else: boxsize = None in_time = time.time() if y2 is None: if ny is not None: y2 = x2 * float(ny) / nx else: y2 = x2 if ny is None: ny = nx if x1 is None: x1 = -x2 if y1 is None: y1 = -y2 if res_downgrade is not None: # calculate original resolution dx = float(x2 - x1) / nx dy = float(y2 - y1) / ny # degrade resolution nx //= res_downgrade ny //= res_downgrade # shift boundaries (since x1, x2 etc refer to centres of pixels, # not edges, but we want the *edges* to remain invariant) sx = dx * float(res_downgrade - 1) / 2 sy = dy * float(res_downgrade - 1) / 2 x1 -= sx y1 -= sy x2 += sx y2 += sy x1, x2, y1, y2, z1 = [float(q) for q in (x1, x2, y1, y2, z_plane)] if smooth_range is not None: smooth_lo = float(smooth_range[0]) smooth_hi = float(smooth_range[1]) else: smooth_lo = 0.0 smooth_hi = 100000.0 nx = int(nx + .5) ny = int(ny + .5) result = np.zeros((ny, nx), dtype=np.float32) n_part = len(snap) if xy_units is None: xy_units = snap_proxy['x'].units x = snap_proxy['x'].in_units(xy_units) y = snap_proxy['y'].in_units(xy_units) z = snap_proxy['z'].in_units(xy_units) sm = snap_proxy[smooth] if sm.units != x.units and not smooth_in_pixels: sm = sm.in_units(x.units) qty_s = qty qty = snap_proxy[qty] mass = snap_proxy['mass'] rho = snap_proxy['rho'] if out_units is not None: # Calculate the ratio now so we don't waste time calculating # the image only to throw a UnitsException later conv_ratio = (qty.units * mass.units / (rho.units * sm.units ** kernel.h_power)).ratio(out_units, **snap.conversion_context()) if z_camera is None: z_camera = 0.0 if boxsize: # work out the tile offsets required to make the image wrap num_repeats = int(round(x2/boxsize))+1 repeat_array = np.linspace(-num_repeats*boxsize,num_repeats*boxsize,num_repeats*2+1) else: repeat_array = [0.0] result = _render.render_image(nx, ny, x, y, z, sm, x1, x2, y1, y2, z_camera, 0.0, qty, mass, rho, smooth_lo, smooth_hi, kernel, repeat_array, repeat_array) result = result.view(array.SimArray) # The weighting works such that there is a factor of (M_u/rho_u)h_u^3 # where M-u, rho_u and h_u are mass, density and smoothing units # respectively. This is dimensionless, but may not be 1 if the units # have been changed since load-time. if out_units is None: result *= (snap_proxy['mass'].units / (snap_proxy['rho'].units)).ratio( snap_proxy['x'].units ** 3, **snap_proxy['x'].conversion_context()) # The following will be the units of outputs after the above conversion # is applied result.units = snap_proxy[qty_s].units * \ snap_proxy['x'].units ** (3 - kernel.h_power) else: result *= conv_ratio result.units = out_units result.sim = snap return result
[docs]def to_3d_grid(snap, qty='rho', nx=None, ny=None, nz=None, x2=None, out_units=None, xy_units=None, kernel=Kernel(), smooth='smooth', approximate_fast=_approximate_image, threaded=None, snap_slice=None, denoise=None): """ Project SPH onto a grid using a typical (mass/rho)-weighted 'scatter' scheme. **Keyword arguments:** *qty* ('rho'): The name of the array within the simulation to render *nx* (x2-x1 / soft): The number of pixels wide to make the grid *ny* (nx): The number of pixels tall to make the grid *nz* (nx): The number of pixels deep to make the grid *out_units* (no conversion): The units to convert the output grid into *xy_units*: The units for the x and y axes *kernel*: The Kernel object to use (default Kernel(), a 3D spline kernel) *smooth*: The name of the array which contains the smoothing lengths (default 'smooth') *denoise*: if True, divide through by an estimate of the discreteness noise. The returned image is then not strictly an SPH estimate, but this option can be useful to reduce noise especially when rendering AMR grids which often introduce problematic edge effects. """ import os import os.path global config if denoise is None: denoise = _auto_denoise(snap, kernel) if denoise and not _kernel_suitable_for_denoise(kernel): raise ValueError("Denoising not supported with this kernel type. Re-run with denoise=False") in_time = time.time() if x2 is None: x1 = np.min(snap['x']) x2 = np.max(snap['x']) y1 = np.min(snap['y']) y2 = np.max(snap['y']) z1 = np.min(snap['z']) z2 = np.max(snap['z']) else: x1 = -x2 y1 = -x2 z1 = -x2 z2 = x2 y2 = x2 if nx is None: nx = np.ceil((x2 - x1) / np.min(snap['eps'])) if ny is None: ny = nx if nz is None: nz = nx x1, x2, y1, y2, z1, z2 = [float(q) for q in (x1, x2, y1, y2, z1, z2)] nx, ny, nz = [int(q) for q in (nx, ny, nz)] if approximate_fast: renderer = _interpolated_renderer( _to_3d_grid, int(np.floor(np.log2(nx / 20)))) else: renderer = _to_3d_grid if threaded is None: threaded = _get_threaded_image() if threaded: im = _threaded_render_image(renderer, snap, qty, nx, ny, nz, x1, x2, y1, y2, z1, z2, out_units, xy_units, kernel, smooth, num_threads=threaded) else: im = renderer(snap, qty, nx, ny, nz, x1, x2, y1, y2, z1, z2, out_units, xy_units, kernel, smooth, False) logger.info("Render done at %.2f s" % (time.time() - in_time)) if denoise: # call self to render a 'flat field' snap['__one'] = 1 im2 = to_3d_grid(snap, '__one', nx, ny, nz, x2, None, xy_units, kernel, smooth, approximate_fast, threaded, snap_slice, False) del snap.ancestor['__one'] im2 = im / im2 im2.units = im.units return im2 else: return im
def _to_3d_grid(snap, qty, nx, ny, nz, x1, x2, y1, y2, z1, z2, out_units, xy_units, kernel, smooth, __threaded=False, res_downgrade=None, snap_slice=None, smooth_range=None): snap_proxy = {} # cache the arrays and take a slice of them if we've been asked to for arname in 'x', 'y', 'z', 'pos', smooth, qty, 'rho', 'mass': snap_proxy[arname] = snap[arname] if snap_slice is not None: snap_proxy[arname] = snap_proxy[arname][snap_slice] if res_downgrade is not None: dx = float(x2 - x1) / nx dy = float(y2 - y1) / ny dz = float(z2 - z1) / nz nx //= res_downgrade ny //= res_downgrade nz //= res_downgrade # shift boundaries (see _render_image above for explanation) sx, sy, sz = [ d_i * float(res_downgrade - 1) / 2 for d_i in [dx, dy, dz]] x1 -= sx y1 -= sy z1 -= sz x2 += sx y2 += sy z2 += sz result = np.zeros((nx, ny, nz), dtype=np.float32) n_part = len(snap) if xy_units is None: xy_units = snap_proxy['x'].units x = snap_proxy['x'].in_units(xy_units) y = snap_proxy['y'].in_units(xy_units) z = snap_proxy['z'].in_units(xy_units) sm = snap_proxy[smooth] if sm.units != x.units: sm = sm.in_units(x.units) qty_s = qty qty = snap_proxy[qty] mass = snap_proxy['mass'] rho = snap_proxy['rho'] if out_units is not None: # Calculate the ratio now so we don't waste time calculating # the image only to throw a UnitsException later conv_ratio = (qty.units * mass.units / (rho.units * sm.units ** kernel.h_power)).ratio(out_units, **x.conversion_context()) if smooth_range is not None: smooth_lo = float(smooth_range[0]) smooth_hi = float(smooth_range[1]) else: smooth_lo = 0.0 smooth_hi = 100000.0 logger.info("Gridding particles") result = _render.to_3d_grid(nx,ny,nz,x,y,z,sm,x1,x2,y1,y2,z1,z2, qty,mass,rho,smooth_lo,smooth_hi,kernel) result = result.view(array.SimArray) # The weighting works such that there is a factor of (M_u/rho_u)h_u^3 # where M_u, rho_u and h_u are mass, density and smoothing units # respectively. This is dimensionless, but may not be 1 if the units # have been changed since load-time. if out_units is None: result *= (snap_proxy['mass'].units / (snap_proxy['rho'].units)).ratio( snap_proxy['x'].units ** 3, **snap_proxy['x'].conversion_context()) # The following will be the units of outputs after the above conversion # is applied result.units = snap_proxy[qty_s].units * \ snap_proxy['x'].units ** (3 - kernel.h_power) else: result *= conv_ratio result.units = out_units result.sim = snap return result
[docs]def spectra(snap, qty='rho', x1=0.0, y1=0.0, v2=400, nvel=200, v1=None, element='H', ion='I', xy_units=units.Unit('kpc'), vel_units = units.Unit('km s^-1'), smooth='smooth', __threaded=False) : """ Render an SPH spectrum using a (mass/rho)-weighted 'scatter' scheme of all the particles that have a smoothing length within 2 h_sm of the position. **Keyword arguments:** *qty* ('rho'): The name of the array within the simulation to render *x1* (0.0): The x-coordinate of the line of sight. *y1* (0.0): The y-coordinate of the line of sight. *v1* (-400.0): The minimum velocity of the spectrum *v2* (400.0): The maximum velocity of the spectrum *nvel* (500): The number of resolution elements in spectrum *xy_units* ('kpc'): The units for the x and y axes *smooth*: The name of the array which contains the smoothing lengths (default 'smooth') """ import os, os.path global config if config["tracktime"] : import time in_time = time.time() kernel=Kernel2D() if v1 is None: v1 = -v2 dvel = (v2 - v1) / nvel v1, v2, dvel, nvel = [float(q) for q in (v1,v2,dvel,nvel)] vels = np.arange(v1+0.5*dvel, v2, dvel) tau = np.zeros((nvel),dtype=np.float32) n_part = len(snap) if xy_units is None : xy_units = snap['x'].units x = snap['x'].in_units(xy_units) - x1 y = snap['y'].in_units(xy_units) - y1 vz = snap['vz'].in_units(vel_units) temp = snap['temp'].in_units(units.Unit('K')) sm = snap[smooth] if sm.units!=x.units : sm = sm.in_units(x.units) nucleons = {'H':1, 'He':4, 'Li':6, 'Ne':10, 'C':12, 'N':14, 'O':16, 'Mg':24, 'Si':28, 'S':32, 'Ca':40, 'Fe':56} nnucleons = nucleons[element] qty_s = qty qty = snap[qty] mass = snap['mass'] rho = snap['rho'] conv_ratio = (qty.units*mass.units/(rho.units*sm.units**kernel.h_power)).ratio(str(nnucleons)+' m_p cm^-2', **x.conversion_context()) try : kernel.safe.acquire(True) code = kernel.get_c_code() finally : kernel.safe.release() if __threaded : code+="#define THREAD 1\n" code+=file(os.path.join(os.path.dirname(__file__),'sph_spectra.c')).read() # before inlining, the views on the arrays must be standard np.ndarray # otherwise the normal numpy macros are not generated x,y,vz,temp,sm,qty, mass, rho = [q.view(np.ndarray) for q in (x,y,vz,temp,sm,qty, mass, rho)] if config['verbose']: print("Constructing SPH spectrum", file=sys.stderr) if config["tracktime"] : print("Beginning SPH render at %.2f s"%(time.time()-in_time), file=sys.stderr) #import pdb; pdb.set_trace() util.threadsafe_inline( code, ['tau', 'nvel', 'x', 'y', 'vz', 'temp', 'sm', 'v1', 'v2', 'nnucleons','qty', 'mass', 'rho'],verbose=2) if config["tracktime"] : print("Render done at %.2f s"%(time.time()-in_time), file=sys.stderr) mass_e = 9.10938188e-28 e = 4.803206e-10 c = 2.99792458e10 pi = 3.14159267 tauconst = pi*e*e / mass_e / c / np.sqrt(pi) oscwav0 = 1031.9261*0.13250*1e-8 tau = tauconst*oscwav0*tau*conv_ratio #tau = tau*conv_ratio print("tauconst: %g oscwav0: %g"%(tauconst,oscwav0)) print("tauconst*oscwav0: %g"%(tauconst*oscwav0)) print("conv_ratio: %g"%conv_ratio) print("max(N): %g"%(np.max(tau))) tau = tau.view(array.SimArray) tau.sim = snap return vels, tau