Source code for popcor.base_lifters.stereo_lifter

from abc import ABC, abstractmethod

import numpy as np
import scipy.sparse as sp
from poly_matrix.poly_matrix import PolyMatrix

from popcor.utils.geometry import (
    generate_random_pose,
    get_C_r_from_theta,
    get_noisy_pose,
    get_pose_errors_from_theta,
    get_T,
    get_theta_from_C_r,
)

from .state_lifter import StateLifter

NOISE = 1.0  #


SOLVER_KWARGS = dict(
    min_gradient_norm=1e-6, max_iterations=10000, min_step_size=1e-10, verbosity=1
)


[docs] class StereoLifter(StateLifter, ABC): """StereoLifter is a general lifter class for the stereo localization problem, supporting both 2D and 3D cases. See :class:`~popcor.examples.Stereo2DLifter` for 2D and :class:`~popcor.examples.Stereo3DLifter` for 3D. """ NORMALIZE = True # Levels that were experimented with for creating a tight relaxation. LEVELS = [ "no", "u@u", # ... "u2", "u@r", "uuT", "urT", "uxT", ] PARAM_LEVELS = ["no", "p", "ppT"] LEVEL_NAMES = { "no": "$\\boldsymbol{u}_n$", "urT": "$\\boldsymbol{u}\\boldsymbol{t}^\\top_n$", "uxT": "$\\boldsymbol{u}\\boldsymbol{x}^\\top_n$", } EPS_ERROR = 1e-7 # for constraints test. default is 1e-8 def __init__( self, n_landmarks, d, level="no", param_level="no", variable_list=None ): self.y_ = None self.n_landmarks = n_landmarks self.landmarks_ = None # will be initialized on first access super().__init__( d=d, level=level, param_level=param_level, variable_list=variable_list, n_parameters=n_landmarks, ) @property @abstractmethod def M_matrix(self): raise NotImplementedError("Inheriting class must initialize M_matrix.") def get_all_variables(self): return [[self.HOM, "x"] + [f"z_{i}" for i in range(self.n_landmarks)]] def get_level_dims(self, n=1): """ :param n: number of landmarks to consider """ return { "no": 0, "u@u": n, # ... "u2": n * self.d, "u@r": n, "uuT": n * self.d**2, "urT": n * self.d**2, "uxT": n * (self.d * (self.d + self.d**2)), } @property def landmarks(self): if self.landmarks_ is None: landmarks = self.generate_random_landmarks(self.theta) self.landmarks_ = landmarks return self.landmarks_ def generate_random_landmarks(self, theta=None): if theta is not None: C, r = get_C_r_from_theta(theta, self.d) if self.d == 3: # sample left u, v coordinates in left image, and compute landmark coordinates from that. fu, cu, b = self.M_matrix[0, [0, 2, 3]] fv, cv = self.M_matrix[1, [1, 2]] u = np.random.uniform(0, cu * 2, self.n_landmarks) v = np.random.uniform(0, cv * 2, self.n_landmarks) z = np.random.uniform(0, 5, self.n_landmarks) x = 1 / fu * (z * (u - cu) - b) y = 1 / fv * z * (v - cv) points_cam = np.c_[x, y, z] # N x 3 else: # sample left u in left image, and compute landmark coordinates from that. fu, cu, b = self.M_matrix[0, :] u = np.random.uniform(0, cu * 2, self.n_landmarks) y = np.random.uniform(1, 5, self.n_landmarks) x = 1 / fu * (y * (u - cu) - b) points_cam = np.c_[x, y] # transform points from camera to world return (C.T @ (points_cam.T - r[:, None])).T else: return np.random.rand(self.n_landmarks, self.d) def sample_parameters(self, theta: np.ndarray | None = None) -> dict: landmarks = self.generate_random_landmarks(theta=self.theta) return self.sample_parameters_landmarks(landmarks) def get_parameters(self, var_subset=None): return self.get_p(param_subset=var_subset) @property def VARIABLE_LIST(self): return [ [self.HOM, "x"], [self.HOM, "z_0"], [self.HOM, "x", "z_0"], [self.HOM, "z_0", "z_1"], # should achieve tightness here ] @property def param_dict(self): return self.param_dict_landmarks @property def var_dict(self): level_dim = self.get_level_dims()[self.level] if self.var_dict_ is None: self.var_dict_ = {self.HOM: 1} self.var_dict.update({"x": self.d**2 + self.d}) self.var_dict.update( {f"z_{k}": self.d + level_dim for k in range(self.n_landmarks)} ) return self.var_dict_ def get_x(self, theta=None, parameters=None, var_subset=None): """ :param var_subset: list of variables to include in x vector. Set to None for all. """ if theta is None: theta = self.theta if parameters is None: parameters = self.parameters if var_subset is None: var_subset = self.var_dict.keys() # TODO(FD) below is a bit hacky, these two variables should not both be called theta. # theta is either (x, y, alpha) or (x, y, z, a1, a2, a3) C, r = get_C_r_from_theta(theta, self.d) if (self.param_level != "no") and (len(parameters) > 1): landmarks = parameters else: landmarks = { f"p_{i}": self.landmarks[i, :] for i in range(self.landmarks.shape[0]) } x_data = [] for key in var_subset: if key == self.HOM: x_data.append(1.0) elif key == "x": x_data += list(r) + list(C.flatten("C")) # row-wise flatten elif "z" in key: j = int(key.split("_")[-1]) pj = landmarks[f"p_{j}"][: self.d] # zj = C[self.d - 1, :] @ pj + r[self.d - 1] u = 1 / zj * np.r_[C[: self.d - 1, :] @ pj + r[: self.d - 1], 1] x_data += list(u) if self.level == "no": continue elif self.level == "u2": x_data += list(u**2) elif self.level == "u@u": x_data += [u @ u] elif self.level == "u@r": x_data += [u @ r] elif self.level == "uuT": x_data += list(np.outer(u, u).flatten()) elif self.level == "urT": # this works x_data += list(np.outer(u, r).flatten()) elif self.level == "uxT": x = np.r_[r, C.flatten("C")] x_data += list(np.outer(u, x).flatten()) dim_x = self.get_dim_x(var_subset=var_subset) assert len(x_data) == dim_x return np.array(x_data) def get_A_known(self, var_dict=None, output_poly=False, add_redundant=False): """ T = | cx' tx | | cy' ty | | cz' tz | | 0 0 0 1 | Let pj be the j-th landmark coordinate. [xj] [cx @ pj + tx] [yj] = [cy @ pj + ty] [zj] [cz @ pj + tz] Let u be the substitution variable, which has d-1 elements. Then we want to enforce that: u_xj = 1/zj * xj -> u_xj * zj = xj -> (cz @ pj + tz) * u_xj - (cx @ pj + tx) = 0 u_yj = 1/zj * yj -> u_yj * zj = yj -> same as above u_zj = 1/zj -> u_zj * zj = 1 -> u_zj * (cz @ pj + tz) -1 = 0 Writing things as homogeneous constraints: a1) cz @ pj * u_xj + tz*u_xj - cx @ pj - h * tx = 0 a2) -----1x------- --2x--- -- 3 -- --4--- a3) cz @ pj * u_zj + tz*u_zj - h*h = 0 ------1z------- --2z--- """ print("not using known stereo templates because they depend on the landmarks.") return [] # x contains: [c1, c2, c3, t] # z contains: [u_xj, u_yj, u_zj, H.O.T.] if self.d == 2: x = self.get_x() _, tx, tz, cx1, cx2, cz1, cz2, u_xj, u_zj, *_ = x cz = np.array([cz1, cz2]) cx = np.array([cx1, cx2]) pj = self.landmarks[0] assert abs(cz @ pj * u_xj + tz * u_xj - cx @ pj - tx) < 1e-10 assert abs(u_zj * cz @ pj + u_zj * tz - 1) < 1e-10 elif self.d == 3: x = self.get_x() # fmt: off (_, tx, ty, tz, cx1, cx2, cx3, cy1, cy2, cy3, cz1, cz2, cz3, u_x1, u_y1, u_z1, *_) = x # fmt: on p1 = self.landmarks[0] assert ( abs(u_z1 * (cx1 * p1[0] + cx2 * p1[1] + cx3 * p1[2]) + u_z1 * tx - u_x1) < 1e-10 ) assert ( abs(u_z1 * (cy1 * p1[0] + cy2 * p1[1] + cy3 * p1[2]) + u_z1 * ty - u_y1) < 1e-10 ) assert ( abs(u_z1 * (cz1 * p1[0] + cz2 * p1[1] + cz3 * p1[2]) + u_z1 * tz - 1) < 1e-10 ) if var_dict is None: var_dict = self.var_dict A_known = [] z_dim = self.get_level_dims()[self.level] if "x" not in var_dict or self.HOM not in var_dict: return A_known landmarks = [j for j in range(self.n_landmarks) if f"z_{j}" in var_dict] for j in landmarks: # one complete constraint has x, z_j and h. pj = self.landmarks[j] for i in range(self.d): A = PolyMatrix() # -----1i------- --2i--- -- 3 -- --4--- # a1) cz @ pj * u_xj + tz*u_xj - cx @ pj - h * tx = 0 # a2) cz @ pj * u_yj + tz*u_yj - cy @ pj - h * ty = 0 # a3) cz @ pj * u_zj + tz*u_zj - h*h = 0 # ------1i------- --2i--- # --- 1i --- fill_mat = np.zeros((self.d + self.d**2, self.d + z_dim)) # chooses cz of x, and u_xj, u_yj or u_zj of z fill_mat[-self.d :, i] = pj # --- 2 --- u_zj * tx # chooses tz of x, and u_ij of z fill_mat[self.d - 1, i] = 1.0 A[f"x", f"z_{j}"] = fill_mat if i < self.d - 1: # u, (v) fill_mat = np.zeros((self.d + self.d**2, 1)) # chooses ci of x fill_mat[(i + 1) * self.d : (i + 2) * self.d, 0] = -pj # chooses ti of x fill_mat[i, 0] = -1 A["x", self.HOM] = fill_mat elif i == self.d - 1: # z A[self.HOM, self.HOM] = -0 # 2.0 if output_poly: A_known.append(A) else: A_known.append(A.get_matrix(var_dict)) self.test_constraints(A_known) return A_known def sample_theta(self): return generate_random_pose(d=self.d).flatten() def simulate_y(self, noise: float | None = None): if noise is None: noise = NOISE T = get_T(theta=self.theta, d=self.d) y_sim = np.zeros((self.n_landmarks, self.M_matrix.shape[0])) for j in range(self.n_landmarks): y_gt = T @ np.r_[self.landmarks[j], 1.0] # in 2d: y_gt[1] # in 3d: y_gt[2] y_gt /= y_gt[self.d - 1] y_gt = self.M_matrix @ y_gt y_sim[j, :] = y_gt + np.random.normal(loc=0, scale=noise, size=len(y_gt)) return y_sim def get_Q( self, noise: float | None = None, output_poly: bool = False, use_cliques: list = [], ) -> PolyMatrix | sp.csr_matrix | sp.csc_matrix: if self.y_ is None: if noise is None: noise = NOISE self.y_ = self.simulate_y(noise=noise) Q = self.get_Q_from_y(self.y_, output_poly=output_poly, use_cliques=use_cliques) return Q def get_Q_from_y( self, y, output_poly=False, use_cliques=[] ) -> PolyMatrix | sp.csr_matrix | sp.csc_matrix: """ The least squares problem reads min_T sum_{n=0}^{N-1} || y - Mtilde@z || where the first d elements of z correspond to u, and Mtilde contains the first d-1 and last element of M Mtilde is thus of shape d*2 by dim_z, where dim_z=d+dL (the additional Lasserre variables) y is of length d*2, corresponding to the measured pixel values in left and right image. """ from poly_matrix.least_squares_problem import LeastSquaresProblem if len(use_cliques): js = use_cliques else: js = range(y.shape[0]) # when using lifting (level=urT), then we have # in 2d: M_tilde is 2 by 6, with first 2 columns: M[:, [0, 2]] # in 3d: M_tilde is 4 by 12, with first 3 columns: M[:, [0, 1, 3]] M_tilde = np.zeros((len(y[0]), self.var_dict["z_0"])) M_tilde[:, : self.d] = self.M_matrix[:, list(range(self.d - 1)) + [self.d]] # in 2d: M[:, 1] # in 3d: M[:, 2] m = self.M_matrix[:, self.d - 1] ls_problem = LeastSquaresProblem() for j in js: ls_problem.add_residual({self.HOM: (y[j] - m), f"z_{j}": -M_tilde}) if output_poly: Q = ls_problem.get_Q() else: Q = ls_problem.get_Q().get_matrix(self.var_dict) if self.NORMALIZE: Q /= self.n_landmarks * self.d # sanity check x = self.get_x() # sanity checks. Below is the best conditioned because we don't have to compute B.T @ B, which # can contain very large values. B = ls_problem.get_B_matrix(self.var_dict) errors = B @ x cost_test = errors.T @ errors if self.NORMALIZE: cost_test /= self.n_landmarks * self.d if output_poly: assert isinstance(Q, PolyMatrix) cost_Q = x.T @ Q.get_matrix(self.var_dict, output_type="csr") @ x else: cost_Q = x.T @ Q @ x assert abs(cost_test - cost_Q) < 1e-6, (cost_test, cost_Q) if not len(use_cliques): cost_raw = self.get_cost(self.theta, y) assert abs(cost_test - cost_raw) < 1e-6, (cost_test, cost_raw) assert isinstance(Q, (PolyMatrix, sp.csr_matrix, sp.csc_matrix)), type(Q) return Q def get_theta(self, x): return x[1 : 1 + self.d + self.d**2] def get_vec_around_gt(self, delta: float = 0): if delta == 0: return self.theta C, r = get_C_r_from_theta(self.theta, self.d) if self.d == 2: return super().get_vec_around_gt(delta=delta) else: return get_noisy_pose(C, r, delta) def get_C_cw(self, theta=None): C_cw, __ = get_C_r_from_theta(theta, self.d) return C_cw def get_position(self, theta=None): C_cw, r_wc_c = get_C_r_from_theta(theta, self.d) return (-C_cw.T @ r_wc_c)[None, :] def get_error(self, theta_hat, error_type="MSE"): return get_pose_errors_from_theta(theta_hat, self.theta, self.d)[error_type] def local_solver_manopt(self, t0, y, W=None, verbose=False, method="CG", **kwargs): """Alternative solver using Pymanopt. By default, :ref:`StateLifter.local_solver` by is used.""" import pymanopt from pymanopt.manifolds import Euclidean, Product, SpecialOrthogonalGroup if method == "CG": from pymanopt.optimizers import ConjugateGradient as Optimizer # fastest elif method == "SD": from pymanopt.optimizers import SteepestDescent as Optimizer # slow elif method == "TR": from pymanopt.optimizers import TrustRegions as Optimizer # okay else: raise ValueError(method) solver_kwargs = SOLVER_KWARGS solver_kwargs.update(kwargs) if verbose: solver_kwargs["verbosity"] = 2 else: solver_kwargs["verbosity"] = 1 manifold = Product((SpecialOrthogonalGroup(self.d, k=1), Euclidean(self.d))) if W is None: W = np.eye(4) if self.d == 3 else np.eye(2) @pymanopt.function.autograd(manifold) def cost(R, t): cost = 0 for i in range(self.n_landmarks): pi_cam = np.concatenate([R @ self.landmarks[i] + t, [1]], axis=0) # type: ignore y_gt = self.M_matrix @ (pi_cam / pi_cam[self.d - 1]) residual = y[i] - y_gt cost += residual.T @ W @ residual if self.NORMALIZE: return cost / (self.n_landmarks * self.d) return cost euclidean_gradient = None # set to None problem = pymanopt.Problem( manifold, cost, euclidean_gradient=euclidean_gradient # ) optimizer = Optimizer(**solver_kwargs) # type: ignore R_0, t_0 = get_C_r_from_theta(t0[: self.d + self.d**2], self.d) res = optimizer.run(problem, initial_point=(R_0, t_0)) R, t = res.point theta_hat = get_theta_from_C_r(R, t) return theta_hat, res.stopping_criterion, res.cost def __repr__(self): level_str = str(self.level).replace(".", "-") return f"stereo{self.d}d_{level_str}_{self.param_level}"