Source code for brainlit.map_neurons.map_neurons
from logging import root
from brainlit.algorithms.trace_analysis.fit_spline import (
GeometricGraph,
compute_parameterization,
CubicHermiteChain,
)
import typing
import numpy as np
import copy
import networkx as nx
from scipy.interpolate import (
BSpline,
splev,
CubicHermiteSpline,
RegularGridInterpolator,
)
import h5py
from tqdm import tqdm
import scipy.io as io
[docs]class DiffeomorphismAction:
"""Interface for differentiable mappings e.g. transformations that register a brain image to an atlas."""
[docs] def evaluate(self, position: np.array) -> np.array:
"""Evaluate the mapping at specified positions.
Args:
position (np.array): Coordinates in original space.
Returns:
np.array: Transformed coordinates.
"""
pass
[docs] def D(self, position: np.array, deriv: np.array, order: int = 1) -> np.array:
"""Evaluate the mapping on a set of derivatives at specified positions.
Args:
position (np.array): Coordinates in the original space.
deriv (np.array): Derivatives at the respective positions
order (int, optional): Derivative order. Defaults to 1.
Returns:
np.array: Transformed derivatives.
"""
pass
[docs]class Diffeomorphism_Transform(DiffeomorphismAction):
"""Object that takes a discrete vector field and creates a continuous one which can also give derivative (Jacobian) information.
Implements DiffeomorphismAction which is an interface to transform points and tangent vectors.
"""
def __init__(self, points, values):
"""Create diffeomorphism from a set of coordinates and vectors at those coordinate positions.
Args:
points (list): 1-D arrays representing the cooridates of a regular grid with shapes (m1,),(m2,)(m3,). Should be symmetrical around zero and have length ratios equal to that of the 10um ARA: m1:m2:m3=132:80:114
values (np.array): Vector field values at grid, shape (m1,m2,m3,3).
"""
self.scale_factor = 6550 / points[0][-1]
points_scaled = [self.scale_factor * p for p in points]
self.og_coords = np.meshgrid(*points_scaled)
self.F = RegularGridInterpolator([points[0], points[1], points[2]], values)
[docs] def evaluate(self, position):
"""Evaluate the new coordinate at the given position.
Args:
position (np.array): Original coordinate location. Shape (3,).
Returns:
np.array: New coordinate location under diffeomorphism. Shape (3,).
"""
position = position / self.scale_factor
return self.F(position) * self.scale_factor
[docs] def Jacobian(self, pos: np.array) -> np.array:
"""Evaluate the diffeomorphism's Jacobian at the given position.
Args:
position (np.array): Coordinate location. Shape (3,).
Returns:
np.array: Jacobian matrix of diffeomorphism at given location. Shape (3,3,).
"""
step = 1
J = np.zeros((3, 3))
J[:, 0] = (
self.evaluate([pos[0] + step, pos[1], pos[2]])
- self.evaluate([pos[0], pos[1], pos[2]])
) / step
J[:, 1] = (
self.evaluate([pos[0], pos[1] + step, pos[2]])
- self.evaluate([pos[0], pos[1], pos[2]])
) / step
J[:, 2] = (
self.evaluate([pos[0], pos[1], pos[2] + step])
- self.evaluate([pos[0], pos[1], pos[2]])
) / step
return J
[docs] def D(
self, positions: np.array, derivs: np.array, order: int = 1, verbose=False
) -> np.array:
"""Compute transformed derivatives of mapping at given positions. Only for the non-affine component.
Args:
position (np.array): nx3 positions at which to compute derivatives.
deriv (np.array): nx3 derivatives at the respective positions.
order (int, optional): Order of derivative (must be 1). Defaults to 1.
Raises:
ValueError: Only derivative order 1 is allowed here.
Returns:
np.array: Transformed derivatives
"""
if order != 1:
raise ValueError(f"Argument order must be 1, not {order}")
transformed_deriv = derivs.copy()
for i, (pos, d) in enumerate(
tqdm(
zip(positions, derivs),
desc="Computing transformed derivatives...",
disable=not verbose,
total=len(positions),
)
):
J = (self.Jacobian(pos),)
transformed_deriv[i, :] = np.squeeze(np.matmul(J, d).T)
return transformed_deriv
[docs]class CloudReg_Transform(DiffeomorphismAction):
"""Object that can read mat files from CloudReg, and compute transformations on points and Jacobians.
Implements DiffeomorphismAction which is an interface to transform points and tangent vectors.
"""
def __init__(self, vpath: str, Apath: str, direction: str = "atlas"):
"""Compute transformation from CloudReg mat files
Args:
vpath (str): path to mat file.
Apath (str): path to mat file with affine transformation.
direction (str, optional): direction of transformation, only target to atlas space is implemented so far. Defaults to "atlas".
"""
# not: transformation files go from template space to target space
f = h5py.File(vpath, "r")
self.vtx = np.array(f.get("vtx"))
self.vty = np.array(f.get("vty"))
self.vtz = np.array(f.get("vtz"))
f = io.loadmat(Apath)
A = f["A"]
self.A = A
self.B = np.linalg.inv(A)
self.direction = direction
self._integrate(direction=direction)
[docs] def apply_affine(self, position: np.array) -> np.array:
"""Apply affine transformation in the transformation of positions in target space to atlas space.
Args:
position (np.array): nx3 array with positions in target space.
Returns:
np.array: positions after affine transformation was applied.
"""
# transformation direction: atlas
A = self.B
transformed_position = (
A[:3, :3] @ position.T + np.expand_dims(A[:3, 3], axis=0).T
)
transformed_position = transformed_position.T
return transformed_position
def _integrate(
self,
velocity_voxel_size: list = [100.0, 100.0, 100.0],
direction: str = "atlas",
):
"""Integrate velocity field in order to compute diffeomorphsm mapping. Translated from https://github.com/neurodata/CloudReg/blob/master/cloudreg/registration/transform_points.m
Integration is done in the direction to allow mapping from target to atlas space.
Args:
velocity_voxel_size (list, optional): Voxel resolution of trarnsformation. Defaults to [100.0, 100.0, 100.0].
direction (str, optional): direction of transformation, only target to atlas space is implemented so far. Defaults to "atlas".
Raises:
NotImplementedError: direction must be to atlas space.
ValueError: invalid value for direction
"""
vtx = self.vtx
vty = self.vty
vtz = self.vtz
nT = vtx.shape[0]
dt = 1 / nT
nxV = [vtx.shape[2], vtx.shape[3], vtx.shape[1]]
dxV = velocity_voxel_size # units: microns/voxel
xV = np.arange(0, nxV[0]) * dxV[0] # units: microns
yV = np.arange(0, nxV[1]) * dxV[1]
zV = np.arange(0, nxV[2]) * dxV[2]
xV = xV - np.mean(xV)
yV = yV - np.mean(yV)
zV = zV - np.mean(zV)
# *V variables represent a static grid
[XV, YV, ZV] = np.meshgrid(xV, yV, zV, indexing="ij") # units: microns
# reshape to match matlab
XV = np.swapaxes(XV, 0, 1)
YV = np.swapaxes(YV, 0, 1)
ZV = np.swapaxes(ZV, 0, 1)
if direction == "atlas":
timesteps = np.arange(0, nT, 1)
indicator = -1
elif direction == "target":
timesteps = np.arange(nT - 1, -1, -1)
indicator = 1
raise NotImplementedError(
f"Cannot integrate from atlas to target space yet."
)
else:
raise ValueError(
f"direction argument must be atlas or target, not {direction}"
)
# trans variables aggregates the cumulative displacement from the originial grid coordinates
transx = XV # units: microns
transy = YV
transz = ZV
vtx = np.swapaxes(vtx, 0, 3)
vtx = np.swapaxes(vtx, 1, 2)
vty = np.swapaxes(vty, 0, 3)
vty = np.swapaxes(vty, 1, 2)
vtz = np.swapaxes(vtz, 0, 3)
vtz = np.swapaxes(vtz, 1, 2)
for t in tqdm(timesteps, desc="integrating velocity field", disable=False):
# Deform the static grid at a certain time point
Xs = XV + indicator * vtx[:, :, :, t] * dt
Ys = YV + indicator * vty[:, :, :, t] * dt
Zs = ZV + indicator * vtz[:, :, :, t] * dt
F = RegularGridInterpolator(
(yV, xV, zV),
transx - XV,
method="linear",
bounds_error=False,
fill_value=None,
)
XYZs = np.reshape(np.stack((Ys, Xs, Zs), axis=-1), newshape=(-1, 3))
# Add the newest timestep of displacement (Xs) to the cumulative displacement
transx = np.reshape(F(XYZs), Xs.shape) + Xs
F = RegularGridInterpolator(
(yV, xV, zV),
transy - YV,
method="linear",
bounds_error=False,
fill_value=None,
)
transy = np.reshape(F(XYZs), Ys.shape) + Ys
F = RegularGridInterpolator(
(yV, xV, zV),
transz - ZV,
method="linear",
bounds_error=False,
fill_value=None,
)
transz = np.reshape(F(XYZs), Zs.shape) + Zs
Fx = RegularGridInterpolator(
(yV, xV, zV),
transx - XV,
method="linear",
bounds_error=False,
fill_value=None,
)
Fy = RegularGridInterpolator(
(yV, xV, zV),
transy - YV,
method="linear",
bounds_error=False,
fill_value=None,
)
Fz = RegularGridInterpolator(
(yV, xV, zV),
transz - ZV,
method="linear",
bounds_error=False,
fill_value=None,
)
self.og_coords = [ZV, XV, YV]
self.diffeomorphism = (Fx, Fy, Fz)
[docs] def evaluate(self, position: np.array) -> np.array:
"""Apply non-affine component of mapping to positions, in direction of self.direction (default from target to atlas).
Args:
position (np.array): Positions at which to compute mappings.
Returns:
np.array: Mappings of the input.
"""
Fx = self.diffeomorphism[0]
Fy = self.diffeomorphism[1]
Fz = self.diffeomorphism[2]
transformed_positionx = Fx(position[:, [1, 0, 2]]) + position[:, 0]
transformed_positiony = Fy(position[:, [1, 0, 2]]) + position[:, 1]
transformed_positionz = Fz(position[:, [1, 0, 2]]) + position[:, 2]
transformed_position = np.stack(
(transformed_positionx, transformed_positiony, transformed_positionz),
axis=1,
)
return transformed_position
[docs] def Jacobian(self, pos: np.array) -> np.array:
"""Compute Jacobian of transformation at a given point.
Args:
pos (np.array): Coordinate in space.
Returns:
np.array: Jacobian at that coordinate
"""
step = 1
Fx = self.diffeomorphism[0]
Fy = self.diffeomorphism[1]
Fz = self.diffeomorphism[2]
J = np.zeros((3, 3))
J[0, 0] = (Fx([pos[1], pos[0] + step, pos[2]]) - Fx(pos[[1, 0, 2]])) / step + 1
J[0, 1] = (Fx([pos[1] + step, pos[0], pos[2]]) - Fx(pos[[1, 0, 2]])) / step
J[0, 2] = (Fx([pos[1], pos[0], pos[2] + step]) - Fx(pos[[1, 0, 2]])) / step
J[1, 0] = (Fy([pos[1], pos[0] + step, pos[2]]) - Fy(pos[[1, 0, 2]])) / step
J[1, 1] = (Fy([pos[1] + step, pos[0], pos[2]]) - Fy(pos[[1, 0, 2]])) / step + 1
J[1, 2] = (Fy([pos[1], pos[0], pos[2] + step]) - Fy(pos[[1, 0, 2]])) / step
J[2, 0] = (Fz([pos[1], pos[0] + step, pos[2]]) - Fz(pos[[1, 0, 2]])) / step
J[2, 1] = (Fz([pos[1] + step, pos[0], pos[2]]) - Fz(pos[[1, 0, 2]])) / step
J[2, 2] = (Fz([pos[1], pos[0], pos[2] + step]) - Fz(pos[[1, 0, 2]])) / step + 1
return J
[docs] def D(self, position: np.array, deriv: np.array, order: int = 1) -> np.array:
"""Compute transformed derivatives of mapping at given positions. Only for the non-affine component.
Args:
position (np.array): nx3 positions at which to compute derivatives.
deriv (np.array): nx3 derivatives at the respective positions.
order (int, optional): Order of derivative (must be 1). Defaults to 1.
Raises:
ValueError: Only derivative order 1 is allowed here.
Returns:
np.array: Transformed derivatives
"""
if order != 1:
raise ValueError(f"Argument order must be 1, not {order}")
transformed_deriv = deriv.copy()
for i, (pos, d) in enumerate(zip(position, deriv)):
J = self.Jacobian(pos)
transformed_deriv[i, :] = np.matmul(J, d).T
return transformed_deriv
def compute_derivs(
us: np.array,
tck: tuple = None,
positions: np.array = None,
deriv_method: str = "difference",
) -> np.array:
"""Estimate derivatives of a sequence of points. Derivatives can be estimated in three ways:
- For curves parameterized by scipy's spline API, spline estimation uses scipy's derivative computation
- For a sequence of points, we use the finite-difference method from:
Sundqvist, H., & Veronis, G. (1970). A simple finite‐difference grid with non‐constant intervals. Tellus, 22(1), 26-31.
- one-sided derivatives are derived from the piecewise linear interpolation.
Args:
us (np.array): Parameter values (in form returned by scipy.interpolate.splprep).
tck (tuple): Knots, bspline coefficients, and degree of spline (in form returned by scipy.interpolate.splprep).
positions (np.array): nx3 array of positions (for use by difference method).
deriv_method (str, optional): Method to use (from list above), spline for scipy.interpolate.splev, difference for the finite difference method, two-sided for one-sided derivatives from linear interpolation. Defaults to "difference".
Raises:
ValueError: If the wrong combination of arguments/deriv_method is used.
ValueError: If derivative method is unrecognized.
Returns:
np.array: Derivative estimates at specified positions, or tuple of np.array for two-sided option.
"""
if deriv_method == "spline":
if tck is None or positions is not None:
raise ValueError(
f"When using spline method, positions should be None and tck should not"
)
derivs = np.array(splev(us, tck, der=1)).T
return derivs
elif deriv_method == "difference":
if tck is not None or positions is None:
raise ValueError(
f"When using difference method, tck should be None and positions should not"
)
# Sundqvist & Veronis 1970
f_im1 = positions[:-2, :]
f_i = positions[1:-1, :]
f_ip1 = positions[2:, :]
hs = np.diff(us)
h_im1 = np.expand_dims(hs[:-1], axis=1)
h_i = np.expand_dims(hs[1:], axis=1)
if len(us) >= 3:
diffs = (
f_ip1
- np.multiply((1 - np.divide(h_i, h_im1) ** 2), f_i)
- np.multiply(np.divide(h_i, h_im1) ** 2, f_im1)
)
diffs = np.concatenate(
(
[positions[1, :] - positions[0, :]],
diffs,
[positions[-1, :] - positions[-2, :]],
),
axis=0,
)
elif len(us) == 2:
diffs = np.array(
[positions[1, :] - positions[0, :], positions[-1, :] - positions[-2, :]]
)
norms = np.linalg.norm(diffs, axis=1)
derivs = np.divide(diffs, np.array([norms]).T)
return derivs
elif deriv_method == "two-sided":
f_im1 = positions[:-1, :]
f_i = positions[1:, :]
left_derivs = f_i - f_im1
norms = np.linalg.norm(left_derivs, axis=1)
left_derivs = np.divide(left_derivs, np.array([norms]).T)
right_derivs = f_i - f_im1
norms = np.linalg.norm(right_derivs, axis=1)
right_derivs = np.divide(right_derivs, np.array([norms]).T)
return left_derivs, right_derivs
else:
raise ValueError(f"Invalid deriv_method argument: {deriv_method}")
def transform_geometricgraph(
G_transformed: GeometricGraph,
Phi: DiffeomorphismAction,
deriv_method: str = "two-sided",
derivs: np.array = None,
) -> GeometricGraph:
"""Apply a diffeomorphism to a GeometricGraph object.
Args:
G_transformed (GeometricGraph): Object that will be transformed.
Phi (DiffeomorphismAction): Diffeomorphism that will define the transformation.
deriv_method (str, optional): Choice of derivative estimation to use. Defaults to None.
derivs (np.array, optional): nx3 array of derivative values associated with nodes on the GemoetricGraph. Only applicable if GeometricGraph has a single branch. Defaults to None.
Raises:
NotImplementedError: This method transforms GemoetricGraphs composed of BSplines to GemoetricGraphs composed of CubicHermite splines.
ValueError: If derivs argument is given but GemoetricGraph has multiple branches.
ValueError: If splines were not computed for the GemoetricGraph yet.
Returns:
GeometricGraph: Transformed GeometricGraph
"""
if G_transformed.spline_type is not BSpline:
raise NotImplementedError("Can only transform bsplines")
if derivs is not None and len(G_transformed.spline_tree.nodes) > 1:
raise ValueError("Manually gave derivatives for tree with multiple branches")
if not G_transformed.spline_tree:
raise ValueError(
"Must compute spline tree before running this function - fit_spline_tree_invariant()"
)
spline_tree = G_transformed.spline_tree
# process in reverse dfs order to ensure parents are processed after
reverse_dfs = list(reversed(list(nx.topological_sort(spline_tree))))
if deriv_method in ["spline", "difference"]:
for node_n, node in enumerate(reverse_dfs):
path = spline_tree.nodes[node]["path"]
tck, us = spline_tree.nodes[node]["spline"]
positions = np.array(splev(us, tck, der=0)).T
if derivs is None or node_n > 0:
derivs = compute_derivs(
us=us, positions=positions, deriv_method=deriv_method
)
transformed_positions = Phi.evaluate(positions)
transformed_us = compute_parameterization(transformed_positions)
transformed_derivs = Phi.D(positions, derivs, order=1)
norms = np.linalg.norm(transformed_derivs, axis=1)
transformed_derivs = np.divide(transformed_derivs, np.array([norms]).T)
chspline = CubicHermiteSpline(
transformed_us, transformed_positions, transformed_derivs
)
spline_tree.nodes[node]["spline"] = chspline, transformed_us
for trace_node, position, deriv in zip(
path, transformed_positions, transformed_derivs
):
G_transformed.nodes[trace_node]["loc"] = position
G_transformed.nodes[trace_node]["deriv"] = deriv
elif deriv_method == "two-sided":
for node_n, node in enumerate(reverse_dfs):
path = spline_tree.nodes[node]["path"]
tck, us = spline_tree.nodes[node]["spline"]
positions = np.array(splev(us, tck, der=0)).T
# here, left derivs are the derivatives on the left side of the segment (i.e. the right derivative of the point on the left)
if derivs is None:
left_derivs, right_derivs = compute_derivs(
us=us, positions=positions, deriv_method=deriv_method
)
else:
left_derivs = derivs[0]
right_derivs = derivs[1]
transformed_positions = Phi.evaluate(positions)
transformed_us = compute_parameterization(transformed_positions)
transformed_left_derivs = Phi.D(positions[:-1, :], left_derivs, order=1)
norms = np.linalg.norm(transformed_left_derivs, axis=1)
transformed_left_derivs = np.divide(
transformed_left_derivs, np.array([norms]).T
)
transformed_right_derivs = Phi.D(positions[1:, :], right_derivs, order=1)
norms = np.linalg.norm(transformed_right_derivs, axis=1)
transformed_right_derivs = np.divide(
transformed_right_derivs, np.array([norms]).T
)
chspline = CubicHermiteChain(
transformed_us,
transformed_positions,
transformed_left_derivs,
transformed_right_derivs,
)
spline_tree.nodes[node]["spline"] = chspline, transformed_us
for i, (trace_node, position) in enumerate(
zip(path, transformed_positions)
):
G_transformed.nodes[trace_node]["loc"] = position
# here, left deriv is the left side derivative
if i > 0:
G_transformed.nodes[trace_node][
"left_deriv"
] = transformed_right_derivs[i - 1, :]
if i < len(path) - 1:
G_transformed.nodes[trace_node][
"right_deriv"
] = transformed_left_derivs[i, :]
return G_transformed