Source code for pysac.analysis.tube3D.tvtk_tube_functions

# -*- coding: utf-8 -*-
"""
This submodule provides routines to generate and analyse "Flux Surfaces" as
described in (Mumford et. al. 2014).
Flux Surfaces are created by the tracing of a closed loop of fieldlines,
from which a surface is reconstructed by creating polygons between the 
pesudo parallel streamlines.
"""

import numpy as np
from tvtk.api import tvtk

__all__ = ['move_seeds', 'make_circle_seeds', 'create_flux_surface', 
           'update_flux_surface', 'make_poly_norms', 'norms_sanity_check',
           'get_surface_vectors', 'interpolate_scalars', 'interpolate_vectors',
           'update_interpolated_vectors', 'update_interpolated_scalars',
           'get_surface_velocity_comp', 'get_the_line', 'update_the_line',
           'get_surface_indexes', 'PolyDataWriter', 'write_step', 'write_flux',
           'write_wave_flux', 'read_step', 'get_data']

[docs]def move_seeds(seeds, vfield, dt): """ Move a list of seeds based on a velocity field. .. warning:: WARNING: THIS IS HARD CODED FOR GRID SIZE! Parameters ---------- seeds: tvtk.PolyData Old seed points vfield: mayavi.sources.array_source.ArraySource object The velocity field dt: float The time step betweent the current and the previous step. Returns ------- seeds_arr: ndarray New Seed points """ v_seed = tvtk.ProbeFilter(source=vfield,input=seeds) v_seed.update() int_vels = np.array(v_seed.output.point_data.vectors)[:,:2]/(15.625*1e3) seed_arr = np.array(seeds.points) seed_arr[:,:2] += int_vels * dt #seeds.points = seed_arr return seed_arr
[docs]def make_circle_seeds(n, r, **domain): """ Generate an array of n seeds evenly spaced in a circle at radius r. Parameters ---------- n: integer Number of Seeds to Create r: float Radius of the Circle in grid points **domain: Dict kwargs specifiying the properties of the domain. Returns ------- surf_seeds: tvtk.PolyData vtkPolyData containg point data with the seed locations. Needs: xmax, ymax, zmax """ xc = domain['xmax']/2 yc = domain['ymax']/2 ti = 0 surf_seeds = [] for theta in np.linspace(0, 2 * np.pi, n, endpoint=False): surf_seeds.append([r * np.cos(theta + 0.5 * ti) + xc, r * np.sin(theta + 0.5 * ti) + yc, domain['zmax']]) surf_seeds_arr = np.array(surf_seeds) surf_seeds = tvtk.PolyData() surf_seeds.points = surf_seeds_arr return surf_seeds
[docs]def create_flux_surface(bfield, surf_seeds): """ Create a flux surface from an array of seeds and a tvtk vector field. Parameters ---------- bfield: tvtk.ImageData The vector field to use for streamline traceing surf_seeds: numpy.ndarray The array of seed points to start the fieldline tracing from Returns ------- surf_field_lines: tvtk.StreamTracer instance The fieldline tracer with the fieldlines stored inside it. surface: tvtk.RuledSurfaceFilter instance The surface built from the StreamTracer instance """ #Make a streamline instance with the bfield surf_field_lines = tvtk.StreamTracer() surf_field_lines.input = bfield surf_field_lines.source = surf_seeds surf_field_lines.integrator = tvtk.RungeKutta4() surf_field_lines.maximum_propagation = 1000 surf_field_lines.integration_direction = 'backward' surf_field_lines.update() #Create surface from 'parallel' lines surface = tvtk.RuledSurfaceFilter() surface.input = surf_field_lines.output surface.close_surface = True surface.pass_lines = True surface.offset = 0 surface.distance_factor = 30 surface.ruled_mode = 'point_walk' # surface.ruled_mode = 'resample' # surface.resolution = (10,1) surface.update() return surf_field_lines, surface
[docs]def update_flux_surface(surf_seeds, surf_field_lines, surface): """ Update the flux surface streamlines and surface. """ surf_field_lines.update() surface.update()
[docs]def make_poly_norms(poly_data): """ Extract the normal vectors from a PolyData instance (A surface). Parameters ---------- poly_data: tvtk.PolyData instance The poly data to extract normal vectors from Returns ------- poly_norms: tvtk.PolyDataNormals instance The normal vectors """ poly_norms = tvtk.PolyDataNormals() poly_norms.input = poly_data poly_norms.compute_point_normals = True poly_norms.flip_normals = False poly_norms.update() return poly_norms
[docs]def norms_sanity_check(poly_norms): """ Check that the normals are pointing radially outwards. ..warning:: THIS IS HARD CODED to grid size and surface size Parameters ---------- poly_norms: tvtk.PolyDataNormals instance The normals to check Returns ------- poly_normals: tvtk.PolyDataNormals instance The same normals but flipped if needed """ norm1 = poly_norms.output.point_data.normals[1000] norm_sanity = np.dot(norm1, np.array(poly_norms.input.points.get_point(1000))- np.array([63,63,poly_norms.input.points.get_point(1000)[2]])) if norm_sanity < 0: poly_norms.flip_normals = not(poly_norms.flip_normals) poly_norms.update() passfail = False else: passfail = True return passfail, poly_norms
[docs]def get_surface_vectors(poly_norms, surf_bfield): """ Calculate the vector normal, vertically parallel and Azimuthally around the surface cont""" # Update the Normals poly_norms.update() passfail, poly_norms = norms_sanity_check(poly_norms) # print "pass norm check?", passfail normals = np.array(poly_norms.output.point_data.normals) parallels = surf_bfield / np.sqrt(np.sum(surf_bfield**2,axis=1))[:, np.newaxis] torsionals = np.cross(normals,parallels) torsionals /= np.sqrt(np.sum(torsionals**2,axis=1))[:, np.newaxis] return normals, torsionals, parallels
[docs]def interpolate_scalars(image_data, poly_data): """ Interpolate a imagedata scalars to a set points in polydata""" surface_probe_filter = tvtk.ProbeFilter(source=image_data, input=poly_data) surface_probe_filter.update() # Calculate Vperp, Vpar, Vphi surface_scalars = np.array(surface_probe_filter.output.point_data.scalars) return surface_probe_filter, surface_scalars
[docs]def interpolate_vectors(image_data, poly_data): """ Interpolate a imagedata vectors to a set points in polydata""" surface_probe_filter = tvtk.ProbeFilter(source=image_data, input=poly_data) surface_probe_filter.update() # Calculate Vperp, Vpar, Vphi surface_vectors = np.array(surface_probe_filter.output.point_data.vectors) return surface_probe_filter, surface_vectors
[docs]def update_interpolated_vectors(poly_data, surface_probe_filter): if poly_data: surface_probe_filter.input = poly_data surface_probe_filter.update() # Calculate Vperp, Vpar, Vphi surface_vectors = np.array(surface_probe_filter.output.point_data.vectors) return surface_vectors
[docs]def update_interpolated_scalars(poly_data, surface_probe_filter): if poly_data: surface_probe_filter.input = poly_data surface_probe_filter.update() # Calculate Vperp, Vpar, Vphi surface_scalars = np.array(surface_probe_filter.output.point_data.scalars) return surface_scalars
[docs]def get_surface_velocity_comp(surface_velocities, normals, torsionals, parallels): vperp = np.zeros(len(surface_velocities)) vpar = np.zeros(len(surface_velocities)) vphi = np.zeros(len(surface_velocities)) for i in xrange(0,len(surface_velocities)): vperp[i] = np.dot(normals[i],surface_velocities[i]) vpar[i] = np.dot(parallels[i],surface_velocities[i]) vphi[i] = np.dot(torsionals[i],surface_velocities[i]) return vperp, vpar, vphi
[docs]def get_the_line(bfield, surf_seeds, n): """Generate the vertical line on the surface""" the_line = tvtk.StreamTracer(input=bfield, source=tvtk.PolyData( points=np.array( [surf_seeds.points.get_point(n),[0,0,0]] ) ) ) the_line.integrator = tvtk.RungeKutta4() the_line.maximum_propagation = 1000 the_line.integration_direction = 'backward' the_line.update() return the_line
[docs]def update_the_line(the_line, surf_seeds, seed, length): """ Updates the TD line at each time step, while making sure the length is fixed""" the_line.source.points = np.array([surf_seeds.get_point(seed), [0.0, 0.0, 0.0]]) the_line.update() N = len(the_line.output.points) if N < length: print len(the_line.output.points) for i in list(the_line.output.points)[N-1:]: the_line.output.points.append(i) if N > length: print len(the_line.output.points) the_line.output.points = list(the_line.output.points)[:length] return the_line
[docs]def get_surface_indexes(surf_poly,the_line): point_locator = tvtk.PointLocator(data_set=surf_poly) surf_line_index = [] surf_line_points = [] for point in the_line.output.points: surf_line_index.append(point_locator.find_closest_point(point)) surf_line_points.append(surf_poly.points.get_point(surf_line_index[-1])) return surf_line_index, surf_line_points
[docs]class PolyDataWriter(object): """ This class allows you to write tvtk polydata objects to a file, with as many or as few associated PointData arrays as you wish. Parameters ---------- """ def __init__(self, filename, polydata): self.poly_out = polydata self.filename = filename
[docs] def add_point_data(self, vectors=None, scalars=None, vector_name=None, scalar_name=None): """ Add a vector comonent and a associated scalar """ #Error Checking: if vectors is not None or scalars is not None: raise ValueError("Need to specify Vectors or scalars") if vectors is not None and vector_name is None: raise ValueError("If vectors is specified a name must be specified") if scalars is not None and scalar_name is None: raise ValueError("If scalars is specified a name must be specified") pd_par = tvtk.PointData(scalars=scalars,vectors=vectors) pd_par.scalars.name = scalar_name pd_par.vectors.name = vector_name self.poly_out.point_data.add_array(pd_par.scalars) self.poly_out.point_data.add_array(pd_par.vectors)
[docs] def add_array(self, **kwargs): """ Add any number of arrays via keyword arguments. Examples -------- Add one scalar >>> writer = PolyDataWriter(filename, polydata) >>> writer.add_array(myscalar=myarray) >>> writer.write """ for name, array in kwargs.items(): pd_par = tvtk.PointData(scalars=array) pd_par.scalars.name = name self.poly_out.point_data.add_array(pd_par.scalars)
[docs] def write(self): w = tvtk.XMLPolyDataWriter(input=self.poly_out,file_name=self.filename) w.write()
[docs]def write_step(file_name, surface, normals, parallels, torsionals, vperp, vpar, vphi): """ Write out the surface vectors and velocity compnents. """ writer = PolyDataWriter(file_name, surface.output) writer.add_point_data(scalars=vpar, vectors=parallels, scalar_name="vpar", vector_name="par") writer.add_point_data(scalars=vperp,vectors=normals, scalar_name="vperp", vector_name="perp") writer.add_point_data(scalars=vphi, vectors=torsionals, scalar_name="vphi", vector_name="phi") writer.write()
[docs]def write_flux(file_name, surface, surface_density, surface_va, surface_beta, surface_cs, Fpar, Fperp, Fphi): pd_density = tvtk.PointData(scalars=surface_density) pd_density.scalars.name = "surface_density" pd_va = tvtk.PointData(scalars=surface_va) pd_va.scalars.name = "surface_va" pd_beta = tvtk.PointData(scalars=surface_beta) pd_beta.scalars.name = "surface_beta" pd_cs = tvtk.PointData(scalars=surface_cs) pd_cs.scalars.name = "surface_cs" pd_Fpar = tvtk.PointData(scalars=Fpar) pd_Fpar.scalars.name = "Fpar" pd_Fperp = tvtk.PointData(scalars=Fperp) pd_Fperp.scalars.name = "Fperp" pd_Fphi = tvtk.PointData(scalars=Fphi) pd_Fphi.scalars.name = "Fphi" poly_out = surface poly_out.point_data.add_array(pd_density.scalars) poly_out.point_data.add_array(pd_va.scalars) poly_out.point_data.add_array(pd_beta.scalars) poly_out.point_data.add_array(pd_cs.scalars) poly_out.point_data.add_array(pd_Fpar.scalars) poly_out.point_data.add_array(pd_Fperp.scalars) poly_out.point_data.add_array(pd_Fphi.scalars) w = tvtk.XMLPolyDataWriter(input=poly_out,file_name=file_name) w.write()
[docs]def write_wave_flux(file_name, surface_poly, parallels, normals, torsionals, Fwpar, Fwperp, Fwphi): pd_Fwpar = tvtk.PointData(scalars=Fwpar, vectors=parallels) pd_Fwpar.scalars.name = "Fwpar" pd_Fwpar.vectors.name = "par" pd_Fwperp = tvtk.PointData(scalars=Fwperp, vectors=normals) pd_Fwperp.scalars.name = "Fwperp" pd_Fwperp.vectors.name = "perp" pd_Fwphi = tvtk.PointData(scalars=Fwphi, vectors=torsionals) pd_Fwphi.scalars.name = "Fwphi" pd_Fwphi.vectors.name = "phi" poly_out = surface_poly poly_out.point_data.add_array(pd_Fwpar.scalars) poly_out.point_data.add_array(pd_Fwperp.scalars) poly_out.point_data.add_array(pd_Fwphi.scalars) w = tvtk.XMLPolyDataWriter(input=poly_out,file_name=file_name) w.write()
[docs]def read_step(filename): """ Read back in a saved surface file""" r = tvtk.XMLPolyDataReader(file_name=filename) r.update() return r.output
[docs]def get_data(poly_out,name): names = {} #Extract varibles from file for i in xrange(0,poly_out.point_data.number_of_arrays): names.update({poly_out.point_data.get_array_name(i):i}) data = np.array(poly_out.point_data.get_array(names[name])) return data