This is a port of the LISI algorithm from harmony in python, directly from Rcpp.
It should yield identical results as the python version while showing similar performance as the Rcpp version, thanks to the power of numba JIT.
This is a port of the LISI algorithm from harmony in python, directly from Rcpp.
It should yield identical results as the python version while showing similar performance as the Rcpp version, thanks to the power of numba JIT.
| import numba | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.neighbors import NearestNeighbors | |
| # Transpiled from | |
| # https://github.com/immunogenomics/LISI/blob/master/src/utils.cpp | |
| @numba.jit(nopython=True) | |
| def Hbeta( | |
| D: np.ndarray, beta: float, P: np.ndarray, idx: int | |
| ) -> tuple[float, np.ndarray]: | |
| """\ | |
| Shannon entropy and probability distribution | |
| """ | |
| P = np.exp(-D[:, idx] * beta) | |
| P_sum = np.sum(P) | |
| if P_sum == 0: | |
| return 0.0, np.zeros_like(P) | |
| else: | |
| H = np.log(P_sum) + beta * np.sum(D[:, idx] * P) / P_sum | |
| P /= P_sum | |
| return H, P | |
| @numba.jit(nopython=True) | |
| def compute_simpson_index( | |
| D: np.ndarray, | |
| knn_idx: np.ndarray, | |
| batch_labels: np.ndarray, | |
| n_batches: int, | |
| perplexity: float, | |
| tol: float = 1e-5, | |
| ) -> np.ndarray: | |
| n = D.shape[1] | |
| P = np.zeros(D.shape[0]) | |
| simpson = np.zeros(n) | |
| log_U = np.log(perplexity) | |
| for i in range(n): | |
| beta = 1.0 | |
| betamin = -np.inf | |
| betamax = np.inf | |
| H, P = Hbeta(D, beta, P, i) | |
| Hdiff = H - log_U | |
| tries = 0 | |
| while np.abs(Hdiff) > tol and tries < 50: | |
| if Hdiff > 0: | |
| betamin = beta | |
| if not np.isfinite(betamax): | |
| beta = beta * 2 | |
| else: | |
| beta = (beta + betamax) / 2 | |
| else: | |
| betamax = beta | |
| if not np.isfinite(betamin): | |
| beta = beta / 2 | |
| else: | |
| beta = (beta + betamin) / 2 | |
| H, P = Hbeta(D, beta, P, i) | |
| Hdiff = H - log_U | |
| tries += 1 | |
| if H == 0: | |
| simpson[i] = -1 | |
| continue | |
| # Compute Simpson's index | |
| for b in range(n_batches): | |
| q = np.where(batch_labels[knn_idx[:, i]] == b)[0] | |
| if q.size > 0: | |
| P_sum = np.sum(P[q]) | |
| simpson[i] += P_sum**2 | |
| return simpson | |
| # Transpiled from | |
| # https://github.com/immunogenomics/LISI/blob/master/R/utils.R | |
| def compute_lisi( | |
| X: np.ndarray, | |
| metadata: pd.DataFrame, | |
| label_colnames: list[str], | |
| perplexity: float = 30, | |
| n_jobs: int = -1, | |
| ) -> pd.DataFrame: | |
| assert ( | |
| X.shape[0] == metadata.shape[0] | |
| ), f"X and metadata must have the same number of cells, got {X.shape[0]} != {metadata.shape[0]}" | |
| assert len(label_colnames) > 0, "At least one label column name must be provided" | |
| assert all( | |
| [col in metadata.columns for col in label_colnames] | |
| ), f"Columns {set(label_colnames) - set(metadata.columns)} not found in metadata" | |
| n_neighbors = round(perplexity * 3) | |
| neighbors = NearestNeighbors( | |
| n_neighbors=n_neighbors, algorithm="kd_tree", n_jobs=n_jobs | |
| ) | |
| neighbors = neighbors.fit(X) | |
| distances, indices = neighbors.kneighbors(X) | |
| # Don't count yourself in your own neighborhood | |
| indices = indices[:, 1:] | |
| distances = distances[:, 1:] | |
| lisi_values = [] | |
| for column in label_colnames: | |
| # prevent passing python objects | |
| batch_labels, label_map = metadata[column].factorize() | |
| n_batches = len(label_map) | |
| simpson = compute_simpson_index( | |
| distances.T, indices.T, batch_labels, n_batches, perplexity | |
| ) | |
| lisi_values.append(1 / simpson) | |
| lisi_values = np.stack(lisi_values, axis=1) | |
| return pd.DataFrame(lisi_values, columns=label_colnames, index=metadata.index) |