Source code for popcor.utils.common

import itertools

import numpy as np
import scipy.sparse as sp


[docs] def upper_triangular(p): """Given vector, get the half kronecker product.""" return np.outer(p, p)[np.triu_indices(len(p))]
[docs] def diag_indices(n): """Given the half kronecker product, return diagonal elements""" z = np.empty((n, n)) z[np.triu_indices(n)] = range(int(n * (n + 1) / 2)) return np.diag(z).astype(int)
def get_aggregate_sparsity(matrix_list_sparse): agg_ii = [] agg_jj = [] for i, A_sparse in enumerate(matrix_list_sparse): assert isinstance(A_sparse, sp.spmatrix) ii, jj = A_sparse.nonzero() # type: ignore agg_ii += list(ii) agg_jj += list(jj) return sp.csr_matrix(([1.0] * len(agg_ii), (agg_ii, agg_jj)), A_sparse.shape)
[docs] def unravel_multi_index_triu(flat_indices, shape): """Equivalent of np.multi_index_triu, but using only the upper-triangular part of matrix.""" i_upper = [] j_upper = [] # for 4 x 4, this would give [4, 7, 9, 11] cutoffs = np.cumsum(list(range(1, shape[0] + 1))[::-1]) for idx in flat_indices: i = np.where(idx < cutoffs)[0][0] if i == 0: j = idx else: j = idx - cutoffs[i - 1] + i i_upper.append(i) j_upper.append(j) return np.array(i_upper), np.array(j_upper)
[docs] def ravel_multi_index_triu(index_tuple, shape): """Equivalent of np.multi_index_triu, but using only the upper-triangular part of matrix.""" ii, jj = index_tuple triu_mask = jj >= ii i_upper = ii[triu_mask] j_upper = jj[triu_mask] flat_indices = [] for i, j in zip(i_upper, j_upper): # for i == 0: idx = j # for i == 1: idx = shape[0] + j # for i == 2: idx = shape[0] + shape[0]-1 + j idx = np.sum(range(shape[0] - i, shape[0])) + j flat_indices.append(idx) return flat_indices
[docs] def create_symmetric(vec, eps_sparse, correct=False, sparse=False): """Create a symmetric matrix from the vectorized elements of the upper half""" def get_dim_x(len_vec): return int(0.5 * (-1 + np.sqrt(1 + 8 * len_vec))) try: # vec is dense len_vec = len(vec) dim_x = get_dim_x(len_vec) triu = np.triu_indices(n=dim_x) mask = np.abs(vec) > eps_sparse triu_i_nnz = triu[0][mask] triu_j_nnz = triu[1][mask] vec_nnz = vec[mask] except Exception: # vec is sparse len_vec = vec.shape[1] dim_x = get_dim_x(len_vec) vec.data[np.abs(vec.data) < eps_sparse] = 0 vec.eliminate_zeros() ii, jj = vec.nonzero() # vec is 1 x jj triu_i_nnz, triu_j_nnz = unravel_multi_index_triu(jj, (dim_x, dim_x)) vec_nnz = np.array(vec[ii, jj]).flatten() # assert dim_x == self.get_dim_x(var_dict) if sparse: offdiag = triu_i_nnz != triu_j_nnz diag = triu_i_nnz == triu_j_nnz triu_i = triu_i_nnz[offdiag] triu_j = triu_j_nnz[offdiag] diag_i = triu_i_nnz[diag] if correct: # divide off-diagonal elements by sqrt(2) vec_nnz_off = vec_nnz[offdiag] / np.sqrt(2) else: vec_nnz_off = vec_nnz[offdiag] vec_nnz_diag = vec_nnz[diag] Ai = sp.csr_array( ( np.r_[vec_nnz_diag, vec_nnz_off, vec_nnz_off], (np.r_[diag_i, triu_i, triu_j], np.r_[diag_i, triu_j, triu_i]), ), (dim_x, dim_x), dtype=float, ) else: Ai = np.zeros((dim_x, dim_x)) if correct: # divide all elements by sqrt(2) Ai[triu_i_nnz, triu_j_nnz] = vec_nnz / np.sqrt(2) Ai[triu_j_nnz, triu_i_nnz] = vec_nnz / np.sqrt(2) # undo operation for diagonal Ai[range(dim_x), range(dim_x)] *= np.sqrt(2) else: Ai[triu_i_nnz, triu_j_nnz] = vec_nnz Ai[triu_j_nnz, triu_i_nnz] = vec_nnz return Ai
[docs] def get_vec(mat, correct=True, sparse=False) -> np.ndarray | sp.csr_matrix | None: """Convert NxN Symmetric matrix to (N+1)N/2 vectorized version that preserves inner product. :param mat: (spmatrix or ndarray) symmetric matrix :return: ndarray """ from copy import deepcopy mat = deepcopy(mat) if correct: if isinstance(mat, sp.csc_matrix): ii, jj = mat.nonzero() mat[ii, jj] *= np.sqrt(2.0) diag = ii == jj mat[ii[diag], jj[diag]] /= np.sqrt(2) # type: ignore else: mat *= np.sqrt(2.0) mat[range(mat.shape[0]), range(mat.shape[0])] /= np.sqrt(2) if sparse: assert isinstance(mat, sp.csc_matrix) ii, jj = mat.nonzero() if len(ii) == 0: # got an empty matrix -- this can happen depending on the parameter values. return None triu_mask = jj >= ii flat_indices = ravel_multi_index_triu([ii[triu_mask], jj[triu_mask]], mat.shape) # type: ignore data = np.array(mat[ii[triu_mask], jj[triu_mask]]).flatten() # type: ignore vec_size = int(mat.shape[0] * (mat.shape[0] + 1) / 2) # type: ignore return sp.csr_matrix( (data, ([0] * len(flat_indices), flat_indices)), (1, vec_size) ) else: return np.array(mat[np.triu_indices(n=mat.shape[0])]).flatten() # type: ignore
def get_labels(p, zi, zj, var_dict): labels = [] size_i = var_dict[zi] size_j = var_dict[zj] if zi == zj: # only upper diagonal for i == j key_pairs = itertools.combinations_with_replacement(range(size_i), 2) else: key_pairs = itertools.product(range(size_i), range(size_j)) for i, j in key_pairs: label = f"{p}-" label += f"{zi}:{i}." if size_i > 1 else f"{zi}." label += f"{zj}:{j}" if size_j > 1 else f"{zj}" labels.append(label) return labels