Last active
August 19, 2025 08:52
-
-
Save tam17aki/46a94316af9a3fdf220fd315d1d220e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """A demonstration of frequency estimation using the MUSIC algorithm. | |
| Copyright (C) 2025 by Akira TAMAMORI | |
| Permission is hereby granted, free of charge, to any person obtaining a copy | |
| of this software and associated documentation files (the "Software"), to deal | |
| in the Software without restriction, including without limitation the rights | |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| copies of the Software, and to permit persons to whom the Software is | |
| furnished to do so, subject to the following conditions: | |
| The above copyright notice and this permission notice shall be included in all | |
| copies or substantial portions of the Software. | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| SOFTWARE. | |
| """ | |
| import argparse | |
| import warnings | |
| import numpy as np | |
| import numpy.typing as npt | |
| from scipy.linalg import eigh | |
| from scipy.signal import find_peaks | |
| def synthesize_sinusoids( | |
| fs: float, | |
| duration: float, | |
| freqs: npt.NDArray[np.float64], | |
| amp_range: tuple[float, float] = (0.2, 1.2), | |
| rng: np.random.Generator | None = None, | |
| ) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]: | |
| """Generate a clean signal from multiple sinusoids with random amps and phases. | |
| Args: | |
| fs (float): Sampling frequency in Hz. | |
| duration (float): Signal duration in seconds. | |
| freqs (np.ndarray): Array of sinusoidal frequencies in Hz. | |
| amp_range (tuple[float, float]): Min and max amplitude range. | |
| rng (np.random.Generator, optional): Random generator for reproducibility. | |
| Returns: | |
| tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| - clean_signal: Sum of sinusoids (float64). | |
| - amps: Amplitudes of each sinusoid. | |
| - phases: Phases of each sinusoid in radian. | |
| """ | |
| if rng is None: | |
| rng = np.random.default_rng() | |
| t = np.linspace(0, duration, int(fs * duration), endpoint=False) | |
| amps = rng.uniform(amp_range[0], amp_range[1], freqs.size).astype(np.float64) | |
| phases = rng.uniform(-np.pi, np.pi, freqs.size).astype(np.float64) | |
| components = [ | |
| a * np.cos(2 * np.pi * f * t + p) for f, a, p in zip(freqs, amps, phases) | |
| ] | |
| clean_signal = np.asarray(np.sum(components, axis=0, dtype=np.float64)) | |
| return clean_signal, amps, phases | |
| def add_awgn( | |
| signal: npt.NDArray[np.float64], | |
| snr_db: float, | |
| rng: np.random.Generator | None = None, | |
| ) -> npt.NDArray[np.float64]: | |
| """Add Additive White Gaussian Noise (AWGN) to a given signal. | |
| Args: | |
| signal (np.ndarray): Input clean signal. | |
| snr_db (float): Target signal-to-noise ratio in dB. | |
| rng (np.random.Generator, optional): Random generator. | |
| Returns: | |
| np.ndarray: Noisy signal with specified SNR. | |
| """ | |
| if rng is None: | |
| rng = np.random.default_rng() | |
| signal_power = np.var(signal) | |
| noise_power = signal_power / (10 ** (snr_db / 10)) | |
| noise = rng.normal(0.0, np.sqrt(noise_power), signal.size) | |
| return signal + noise | |
| def generate_test_signal( | |
| fs: float, | |
| duration: float, | |
| f_true: npt.NDArray[np.float64], | |
| snr_db: float, | |
| amp_range: tuple[float, float] = (0.2, 1.2), | |
| ) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]: | |
| """Generate a noisy test signal consisting of multiple sinusoids and AWGN. | |
| Args: | |
| fs (float): Sampling frequency in Hz. | |
| duration (float): Signal duration in seconds. | |
| f_true (np.ndarray): Array of sinusoidal frequencies in Hz. | |
| snr_db (float): Target signal-to-noise ratio in dB. | |
| amp_range (tuple[float, float]): Min and max amplitude range. | |
| Returns: | |
| tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| - noisy_signal (np.ndarray of float64): Generated test signal. | |
| - amps (np.ndarray of float64): Random amplitudes assigned to each sinusoid. | |
| - phases (np.ndarray of float64): Random phases assigned to each sinusoid. | |
| """ | |
| clean_signal, amps, phases = synthesize_sinusoids(fs, duration, f_true, amp_range) | |
| noisy_signal = add_awgn(clean_signal, snr_db) | |
| return noisy_signal, amps, phases | |
| def _build_covariance_matrix( | |
| frame: npt.NDArray[np.complex128], subspace_dim: int | |
| ) -> npt.NDArray[np.complex128]: | |
| """Build the Forward-Backward averaged covariance matrix.""" | |
| n_samples = frame.size | |
| # K (number of snapshots) -> n_snapshots | |
| n_snapshots = n_samples - subspace_dim + 1 | |
| # --- Forward Covariance Matrix Calculation --- | |
| # X (data matrix) -> data_matrix_x | |
| hankel_matrix = np.zeros((subspace_dim, n_snapshots), dtype=np.complex128) | |
| for i in range(n_snapshots): | |
| hankel_matrix[:, i] = frame[i : i + subspace_dim] | |
| # R_f (Forward covariance) -> cov_matrix_f | |
| cov_matrix_f = (hankel_matrix @ hankel_matrix.conj().T) / n_snapshots | |
| # --- Backward Covariance Matrix Calculation --- | |
| # J (exchange matrix) -> exchange_matrix | |
| exchange_matrix = np.ascontiguousarray(np.fliplr(np.eye(subspace_dim))) | |
| # R_b (Backward covariance) -> cov_matrix_b | |
| cov_matrix_b = exchange_matrix @ cov_matrix_f.conj() @ exchange_matrix | |
| # --- Averaging --- | |
| # R_fb (Forward-Backward covariance) -> cov_matrix_fb | |
| cov_matrix_fb = (cov_matrix_f + cov_matrix_b) / 2.0 | |
| return cov_matrix_fb.astype(np.complex128) | |
| def _estimate_noise_subspace( | |
| frame: npt.NDArray[np.complex128], subspace_dim: int, model_order: int | |
| ) -> npt.NDArray[np.complex128] | None: | |
| """Build the covariance matrix and estimates the noise subspace.""" | |
| # 1. Build the covariance matrix (using the FB method) | |
| cov_matrix = _build_covariance_matrix(frame, subspace_dim) | |
| # 2. Eigenvalue decomposition | |
| try: | |
| _, eigenvectors = eigh(cov_matrix) | |
| except np.linalg.LinAlgError: | |
| warnings.warn("Eigenvalue decomposition on covariance matrix failed.") | |
| return None | |
| # The noise subspace is the set of vectors corresponding to the smaller eigenvalues | |
| # Since it is in ascending order, select (subspace_dim - model_order) vectors | |
| # from the beginning | |
| n_noise_vectors = subspace_dim - model_order | |
| noise_subspace: npt.NDArray[np.complex128] = eigenvectors[:, :n_noise_vectors] | |
| return noise_subspace | |
| def _calculate_music_spectrum( | |
| fs: float, | |
| subspace_dim: int, | |
| noise_subspace: npt.NDArray[np.complex128], | |
| n_grid_points: int, | |
| ) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: | |
| """Calculate the MUSIC pseudospectrum over a frequency grid.""" | |
| freq_grid = np.linspace(0, fs / 2, num=n_grid_points, dtype=np.float64) | |
| music_spectrum = np.zeros(freq_grid.size) | |
| # G*G^H only needs to be calculated once | |
| projector_onto_noise = noise_subspace @ noise_subspace.conj().T | |
| for i, f in enumerate(freq_grid): | |
| omega = 2 * np.pi * f / fs | |
| # Calculate a steering vector a(ω) | |
| steering_vector = np.exp(-1j * omega * np.arange(subspace_dim)) | |
| # Calculate the denominator a^H * (G*G^H) * a | |
| steering_vector_h = steering_vector.conj() | |
| denominator = steering_vector_h @ projector_onto_noise @ steering_vector | |
| # Add a small value to avoid division by zero | |
| music_spectrum[i] = 1 / (np.abs(denominator) + 1e-12) | |
| return freq_grid, music_spectrum | |
| def _find_music_peaks( | |
| freq_grid: npt.NDArray[np.float64], | |
| music_spectrum: npt.NDArray[np.float64], | |
| n_real_sinusoids: int, | |
| ) -> npt.NDArray[np.float64]: | |
| """Find the N strongest peaks from the MUSIC spectrum.""" | |
| # 1. Find all "local maxima" as peak candidates without using prominence. | |
| # Ignores extremely small noise floor fluctuations. | |
| all_peaks, _ = find_peaks(music_spectrum, height=np.mean(music_spectrum)) | |
| all_peaks = np.array(all_peaks, dtype=np.int64) | |
| if all_peaks.size < n_real_sinusoids: | |
| return freq_grid[all_peaks] if all_peaks.size > 0 else np.array([]) | |
| # 2. From all the peak candidates found, select N peaks | |
| # with the highest spectral values. | |
| strongest_peak_indices = all_peaks[ | |
| np.argsort(music_spectrum[all_peaks])[-n_real_sinusoids:] | |
| ] | |
| estimated_freqs = freq_grid[strongest_peak_indices] | |
| return np.sort(estimated_freqs) | |
| def estimate_frequencies_music( | |
| frame: npt.NDArray[np.complex128], | |
| fs: float, | |
| n_real_sinusoids: int, | |
| n_grid_points: int = 2048, | |
| ) -> npt.NDArray[np.float64]: | |
| """Estimate frequencies of multiple non-damped sinusoids using MUSIC. | |
| Args: | |
| frame (np.ndarray): | |
| One-dimensional input signal frame (time-domain samples), dtype=complex128. | |
| fs (float): | |
| Sampling frequency in Hz. | |
| n_real_sinusoids (int): | |
| Number of sinusoidal components to estimate. | |
| n_grid_points (int): | |
| Number of frequency grid points for MUSIC spectrum. | |
| Returns: | |
| np.ndarray: | |
| Array of estimated frequencies in Hz (dtype=float64). | |
| Returns an empty array if estimation fails. | |
| """ | |
| model_order = 2 * n_real_sinusoids | |
| n_samples = frame.size | |
| subspace_dim = n_samples // 3 | |
| if subspace_dim <= model_order: | |
| warnings.warn("Invalid subspace dimension for ESPRIT. Returning empty result.") | |
| return np.array([]) | |
| # 1. Estimate the noise subspace | |
| noise_subspace = _estimate_noise_subspace(frame, subspace_dim, model_order) | |
| if noise_subspace is None: | |
| warnings.warn("Failed to estimate noise subspace. Returning empty result.") | |
| return np.array([]) | |
| # 2. Calculate the MUSIC spectrum | |
| freq_grid, music_spectrum = _calculate_music_spectrum( | |
| fs, subspace_dim, noise_subspace, n_grid_points | |
| ) | |
| # 3. Detecting peaks from a spectrum | |
| estimated_freqs = _find_music_peaks(freq_grid, music_spectrum, n_real_sinusoids) | |
| return estimated_freqs | |
| def parse_args() -> argparse.Namespace: | |
| """Parse command-line arguments for MUSIC demo.""" | |
| parser = argparse.ArgumentParser( | |
| description="Frequency estimation demo using MUSIC algorithm." | |
| ) | |
| parser.add_argument( | |
| "--fs", | |
| type=float, | |
| default=44100.0, | |
| help="Sampling frequency in Hz (default: 44100)", | |
| ) | |
| parser.add_argument( | |
| "--duration", | |
| type=float, | |
| default=0.04, | |
| help="Signal duration in seconds (default: 0.04)", | |
| ) | |
| parser.add_argument( | |
| "--snr_db", | |
| type=float, | |
| default=10.0, | |
| help="Signal-to-noise ratio in dB (default: 10)", | |
| ) | |
| parser.add_argument( | |
| "--f_true", | |
| type=float, | |
| nargs="+", | |
| default=[440.0, 460.0, 480.0], | |
| help="List of true frequencies in Hz (space separated). Default: 440 460 480", | |
| ) | |
| parser.add_argument( | |
| "--amp_range", | |
| type=float, | |
| nargs=2, | |
| default=[0.2, 1.2], | |
| metavar=("AMP_MIN", "AMP_MAX"), | |
| help="Amplitude range for sinusoid generation (default: 0.2 1.2)", | |
| ) | |
| parser.add_argument( | |
| "--n_grid_points", | |
| type=int, | |
| default=2048, | |
| help="Number of frequency grid points for MUSIC spectrum (default: 2048)", | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| """Perform demonstration.""" | |
| args = parse_args() | |
| fs = args.fs | |
| duration = args.duration | |
| snr_db = args.snr_db | |
| f_true = np.array(args.f_true, dtype=np.float64) | |
| amp_range = tuple(args.amp_range) | |
| n_grid_points = args.n_grid_points | |
| n_sinusoids = f_true.size | |
| noisy_signal, amps, phases = generate_test_signal( | |
| fs, duration, f_true, snr_db, amp_range | |
| ) | |
| print("--- Experiment Setup ---") | |
| print(f"Sampling Frequency: {fs} Hz") | |
| print(f"Signal Duration: {duration*1000:.0f} ms") | |
| print(f"True Frequencies: {f_true} Hz") | |
| print(f"Amplitudes: {amps}") | |
| print(f"Phases: {phases} rad") | |
| print(f"SNR: {snr_db} dB") | |
| print(f"Number of Grid Points: {n_grid_points}\n") | |
| print("--- Running MUSIC ---") | |
| signal_complex = noisy_signal.astype(np.complex128) | |
| music_freqs = estimate_frequencies_music( | |
| signal_complex, fs, n_sinusoids, n_grid_points | |
| ) | |
| print(f"Estimated (MUSIC): {music_freqs}") | |
| if music_freqs.size == n_sinusoids: | |
| print(f"Errors (MUSIC): {music_freqs - f_true}\n") | |
| else: | |
| print( | |
| f"Warning: Only found {music_freqs.size} peaks, " | |
| + f"but expected {n_sinusoids}." | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment