Created
August 8, 2025 14:35
-
-
Save btcross26/7637fb90c6375799457791718b10f8af to your computer and use it in GitHub Desktop.
BMM
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from scipy.special import logsumexp | |
| class BernoulliMixture: | |
| def __init__(self, n_components, max_iter, tol=1e-3): | |
| self.n_components = n_components | |
| self.max_iter = max_iter | |
| self.tol = tol | |
| def fit(self,x): | |
| self.x = x | |
| self.init_params() | |
| log_bernoullis = self.get_log_bernoullis(self.x) | |
| self.old_logL = self.get_log_likelihood(log_bernoullis) | |
| for step in range(self.max_iter): | |
| if step > 0: | |
| self.old_logL = self.logL | |
| # E-Step | |
| self.gamma = self.get_responsibilities(log_bernoullis) | |
| self.remember_params() | |
| # M-Step | |
| self.get_Neff() | |
| self.get_mu() | |
| self.get_pi() | |
| # Compute new log_likelihood: | |
| log_bernoullis = self.get_log_bernoullis(self.x) | |
| self.logL = self.get_log_likelihood(log_bernoullis) | |
| if np.isnan(self.logL): | |
| self.reset_params() | |
| print(self.logL) | |
| break | |
| def reset_params(self): | |
| self.mu = self.old_mu.copy() | |
| self.pi = self.old_pi.copy() | |
| self.gamma = self.old_gamma.copy() | |
| self.get_Neff() | |
| log_bernoullis = self.get_log_bernoullis(self.x) | |
| self.logL = self.get_log_likelihood(log_bernoullis) | |
| def remember_params(self): | |
| self.old_mu = self.mu.copy() | |
| self.old_pi = self.pi.copy() | |
| self.old_gamma = self.gamma.copy() | |
| def init_params(self): | |
| self.n_samples = self.x.shape[0] | |
| self.n_features = self.x.shape[1] | |
| #self.gamma = np.zeros(shape=(self.n_samples, self.n_components)) | |
| self.pi = 1/self.n_components * np.ones(self.n_components) | |
| self.mu = np.random.RandomState(seed=0).uniform(low=0.25, high=0.75, size=(self.n_components, self.n_features)) | |
| self.normalize_mu() | |
| def normalize_mu(self): | |
| sum_over_features = np.sum(self.mu, axis=1) | |
| for k in range(self.n_components): | |
| self.mu[k,:] /= sum_over_features[k] | |
| def get_responsibilities(self, log_bernoullis): | |
| gamma = np.zeros(shape=(log_bernoullis.shape[0], self.n_components)) | |
| Z = logsumexp(np.log(self.pi[None,:]) + log_bernoullis, axis=1) | |
| for k in range(self.n_components): | |
| gamma[:, k] = np.exp(np.log(self.pi[k]) + log_bernoullis[:,k] - Z) | |
| return gamma | |
| def get_log_bernoullis(self, x): | |
| log_bernoullis = self.get_save_single(x, self.mu) | |
| log_bernoullis += self.get_save_single(1-x, 1-self.mu) | |
| return log_bernoullis | |
| def get_save_single(self, x, mu): | |
| mu_place = np.where(np.max(mu, axis=0) <= 1e-15, 1e-15, mu) | |
| return np.tensordot(x, np.log(mu_place), (1,1)) | |
| def get_Neff(self): | |
| self.Neff = np.sum(self.gamma, axis=0) | |
| def get_mu(self): | |
| self.mu = np.einsum('ik,id -> kd', self.gamma, self.x) / self.Neff[:,None] | |
| def get_pi(self): | |
| self.pi = self.Neff / self.n_samples | |
| def predict(self, x): | |
| log_bernoullis = self.get_log_bernoullis(x) | |
| gamma = self.get_responsibilities(log_bernoullis) | |
| return np.argmax(gamma, axis=1) | |
| def get_sample_log_likelihood(self, log_bernoullis): | |
| return logsumexp(np.log(self.pi[None,:]) + log_bernoullis, axis=1) | |
| def get_log_likelihood(self, log_bernoullis): | |
| return np.mean(self.get_sample_log_likelihood(log_bernoullis)) | |
| def score(self, x): | |
| log_bernoullis = self.get_log_bernoullis(x) | |
| return self.get_log_likelihood(log_bernoullis) | |
| def score_samples(self, x): | |
| log_bernoullis = self.get_log_bernoullis(x) | |
| return self.get_sample_log_likelihood(log_bernoullis) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment