Last active
August 17, 2025 09:19
-
-
Save tam17aki/c5e96f670ec3f561afb3f0c761291f04 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """A demonstration of Frequency Estimation Based on WLS-SCDE for a dumped sinusoid. | |
| Copyright (C) 2025 by Akira TAMAMORI | |
| Permission is hereby granted, free of charge, to any person obtaining a copy | |
| of this software and associated documentation files (the "Software"), to deal | |
| in the Software without restriction, including without limitation the rights | |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| copies of the Software, and to permit persons to whom the Software is | |
| furnished to do so, subject to the following conditions: | |
| The above copyright notice and this permission notice shall be included in all | |
| copies or substantial portions of the Software. | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| SOFTWARE. | |
| """ | |
| from typing import NamedTuple | |
| import numpy as np | |
| import numpy.typing as npt | |
| from scipy.signal.windows import get_window | |
| class DampedSinusoidParameter(NamedTuple): | |
| """Represents the parameters of a single damped sinusoid.""" | |
| fo: float # fundamental frequency | |
| epsilon: float # dumping coefficient | |
| amp: float # amplitude | |
| phase: float # initial phase | |
| def calculate_observables( | |
| frame: npt.NDArray[np.float64], fs: float | |
| ) -> tuple[ | |
| npt.NDArray[np.complex128], | |
| npt.NDArray[np.complex128], | |
| npt.NDArray[np.complex128], | |
| npt.NDArray[np.float64], | |
| ]: | |
| """Calculates the observables (gw, hw, kw) from a signal frame. | |
| These are Fourier coefficients weighted by a window function and its derivatives. | |
| """ | |
| n_samples = len(frame) | |
| fft_size = 1 << (n_samples - 1).bit_length() | |
| # Use a window function whose value and derivatives are zero at the boundaries. | |
| p_t = get_window("blackmanharris", n_samples) | |
| dt = 1.0 / fs | |
| dp_t = np.gradient(p_t, dt) | |
| d2p_t = np.gradient(dp_t, dt) | |
| # Calculate Fourier coefficients for the signal weighted by p(t), p'(t), and p''(t). | |
| gwk_full = np.fft.rfft(frame * p_t, n=fft_size) | |
| hwk_full = np.fft.rfft(frame * dp_t, n=fft_size) | |
| kwk_full = np.fft.rfft(frame * d2p_t, n=fft_size) | |
| freq_bins_hz = np.fft.rfftfreq(fft_size, 1 / fs).astype(np.float64) | |
| omega_k_full = 2 * np.pi * freq_bins_hz | |
| return gwk_full, hwk_full, kwk_full, omega_k_full | |
| def select_bins_for_damped_scde( | |
| gwk_full: npt.NDArray[np.complex128], | |
| omega_k_full: npt.NDArray[np.float64], | |
| fs: float, | |
| num_bins: int = 7, | |
| ) -> npt.NDArray[np.int64]: | |
| """Selects a set of frequency bins around the spectral peak.""" | |
| freq_bins_hz = omega_k_full / (2 * np.pi) | |
| mag_spec = np.abs(gwk_full) | |
| lower_bound = 50 | |
| search_range = (freq_bins_hz > lower_bound) & (freq_bins_hz < fs / 2 * 0.95) | |
| if not np.any(search_range): | |
| return np.array([], dtype=int) | |
| search_indices = np.where(search_range)[0] | |
| mag_spec_searched = mag_spec[search_indices] | |
| center_peak_local_idx = np.argmax(mag_spec_searched) | |
| half_bins = num_bins // 2 | |
| start_local_idx = np.maximum(0, center_peak_local_idx - half_bins) | |
| end_local_idx = start_local_idx + num_bins | |
| if end_local_idx > len(mag_spec_searched): | |
| end_local_idx = len(mag_spec_searched) | |
| start_local_idx = max(0, end_local_idx - num_bins) | |
| selected_local_indices = np.arange(start_local_idx, end_local_idx) | |
| return search_indices[selected_local_indices] | |
| def solve_complex_a_sq_wls( | |
| gwk: npt.NDArray[np.complex128], | |
| hwk: npt.NDArray[np.complex128], | |
| kwk: npt.NDArray[np.complex128], | |
| omega_k: npt.NDArray[np.float64], | |
| ) -> np.complex128 | None: | |
| """Solves for the complex squared frequency using Weighted Least Squares.""" | |
| if len(gwk) < 3: | |
| return None | |
| # Equation: (gwk) * ã² = (ωk^2 * gwk + 2jωk * hwk - kwk) | |
| coeff = gwk | |
| b = omega_k**2 * gwk + 2j * omega_k * hwk - kwk | |
| # Weight W is the power |gwk|^2 | |
| w = np.abs(gwk) ** 2 | |
| # Solve the weighted normal equation: (A^{H}WA)x = (A^{H}Wb) | |
| coeff_h = coeff.conj() | |
| lhs = np.sum(coeff_h * w * coeff) | |
| rhs = np.sum(coeff_h * w * b) | |
| condition = 1e-12 | |
| if np.abs(lhs) < condition: | |
| return None | |
| complex_freq: np.complex128 = rhs / lhs | |
| return complex_freq | |
| def separate_params_from_complex_a_sq( | |
| complex_a_sq: np.complex128, peak_omega: np.float64 | |
| ) -> tuple[np.float64, np.float64]: | |
| """Separates frequency (a) and damping (ε) from the complex a^2.""" | |
| real_part = complex_a_sq.real | |
| imag_part = complex_a_sq.imag | |
| # Im(ã^2) ≈ 2ωε | |
| omega_bound = 1e-6 | |
| if peak_omega < omega_bound: | |
| epsilon = np.float64(0.0) | |
| else: | |
| epsilon = imag_part / (2 * peak_omega) | |
| # Re(ã^2) ≈ a^2 + ε^2 | |
| a_sq = real_part - epsilon**2 | |
| if a_sq < 0: | |
| return np.float64(np.float64(np.nan)), np.float64(np.float64(np.nan)) | |
| a: np.float64 = np.sqrt(a_sq) | |
| return a, epsilon | |
| def estimate_damped_sinusoid( | |
| frame: npt.NDArray[np.float64], fs: float | |
| ) -> tuple[np.float64, np.float64]: | |
| """Estimates the frequency (fo) and damping factor (ε) of a single damped sinusoid. | |
| This function is a refactored version with separated responsibilities. | |
| Args: | |
| frame (np.ndarray): The input signal frame. | |
| fs (int): The sampling frequency in Hz. | |
| Returns: | |
| tuple[float, float]: The estimated (fo, ε) pair. | |
| Returns (np.float64(np.nan), np.float64(np.nan)) | |
| if estimation fails. | |
| """ | |
| lower_freq = 50 | |
| var_bound = 1e-10 | |
| if len(frame) < lower_freq or np.var(frame) < var_bound: | |
| return np.float64(np.nan), np.float64(np.nan) | |
| # Step 1: Calculate the necessary observables (gw, hw, kw) from the signal. | |
| gwk_full, hwk_full, kwk_full, omega_k_full = calculate_observables(frame, fs) | |
| # Step 2: Select the most informative frequency bins around the spectral peak. | |
| selected_indices = select_bins_for_damped_scde( | |
| gwk_full, omega_k_full, fs, num_bins=7 | |
| ) | |
| if selected_indices.size < 3: | |
| return np.float64(np.nan), np.float64(np.nan) | |
| # Step 3: Solve for the complex squared frequency using WLS. | |
| complex_a_sq = solve_complex_a_sq_wls( | |
| gwk=gwk_full[selected_indices], | |
| hwk=hwk_full[selected_indices], | |
| kwk=kwk_full[selected_indices], | |
| omega_k=omega_k_full[selected_indices], | |
| ) | |
| if complex_a_sq is None: | |
| return np.float64(np.nan), np.float64(np.nan) | |
| # Step 4: Separate the physical parameters (a, ε) from the complex solution. | |
| index: np.int64 = selected_indices[np.argmax(np.abs(gwk_full[selected_indices]))] | |
| peak_omega = omega_k_full[index] | |
| a, epsilon = separate_params_from_complex_a_sq(complex_a_sq, peak_omega) | |
| if np.isnan(a): | |
| return np.float64(np.nan), np.float64(np.nan) | |
| # Convert angular frequency to Hz and perform final validation. | |
| fo_hz = a / (2 * np.pi) | |
| if not lower_freq < fo_hz < fs / 2: | |
| return np.float64(np.nan), np.float64(np.nan) | |
| return fo_hz, epsilon | |
| def synthesize_damped_sinusoid( | |
| params: DampedSinusoidParameter, n_samples: int, fs: float | |
| ) -> npt.NDArray[np.float64]: | |
| """Re-synthesize a damped sinusoid from its parameter object.""" | |
| t = np.arange(n_samples) / fs | |
| signal: npt.NDArray[np.float64] = ( | |
| params.amp | |
| * np.exp(-params.epsilon * t) | |
| * np.cos(2 * np.pi * params.fo * t + params.phase) | |
| ) | |
| return signal | |
| def estimate_amp_phase_damped( | |
| signal: npt.NDArray[np.float64], fs: float, fo: np.float64, epsilon: np.float64 | |
| ) -> tuple[np.float64, np.float64]: | |
| """Estimate the amplitude and phase of a damped sinusoid with known fo and ε.""" | |
| t = np.arange(len(signal)) / fs | |
| # model : y = c*exp(-εt)cos(2π fo t) - s*exp(-εt)sin(2π fo t) | |
| # The unknowns are c = Acos(φ) and s = Asin(φ). | |
| exp_decay = np.exp(-epsilon * t) | |
| cos_term = exp_decay * np.cos(2 * np.pi * fo * t) | |
| sin_term = -exp_decay * np.sin(2 * np.pi * fo * t) | |
| a = np.vstack([cos_term, sin_term]).T | |
| try: | |
| coeffs, _, _, _ = np.linalg.lstsq(a, signal, rcond=None) | |
| c, s = coeffs[0], coeffs[1] | |
| except np.linalg.LinAlgError: | |
| return np.float64(0.0), np.float64(0.0) | |
| # Calculate amplitude and phase from c and s | |
| amp: np.float64 = np.sqrt(c**2 + s**2) | |
| phase: np.float64 = np.arctan2(s, c) | |
| return amp, phase | |
| def estimate_multiple_damped_sinusoids( | |
| frame: npt.NDArray[np.float64], fs: float, n_sinusoids: int | |
| ) -> list[DampedSinusoidParameter]: | |
| """Estimate parameters of multiple damped sinusoids using iterative cancellation. | |
| Args: | |
| frame (np.ndarray): The input signal frame. | |
| fs (int): The sampling frequency in Hz. | |
| n_sinusoids (int): Number of Sinusoids. | |
| Returns: | |
| list[dict]: list of Estimated parameters of each sound source. | |
| (fo, epsilon, amp, phase) | |
| """ | |
| estimated_params: list[DampedSinusoidParameter] = [] | |
| residual_signal = np.copy(frame) # Copy without destroying the original signal | |
| for i in range(n_sinusoids): | |
| print(f"--- Iteration {i+1}/{n_sinusoids} ---") | |
| # 1. Estimate the parameters of the strongest single damped sinusoid | |
| # from the current residual signal. | |
| # Note: In this step, we expect to find the most energetic components. | |
| fo, epsilon = estimate_damped_sinusoid(residual_signal, fs) | |
| if np.isnan(fo): | |
| print(" No more components found. Stopping.") | |
| break # a more reliable ingredient won't be found | |
| # 2. Use the estimated frequency fo to accurately estimate | |
| # the amplitude and phase of that component. | |
| # Ideally, we would like to estimate the amplitude and | |
| # phase more accurately, taking into account the attenuation term. | |
| amp, phase = estimate_amp_phase_damped(residual_signal, fs, fo, epsilon) | |
| print(f" Component found: fo={fo:.2f} Hz, eps={epsilon:.2f}, amp={amp:.2f}") | |
| # 3. Save estimated parameters to a list. | |
| params = DampedSinusoidParameter( | |
| float(fo), float(epsilon), float(amp), float(phase) | |
| ) | |
| estimated_params.append(params) | |
| # 4. Recombining the estimated component signal. | |
| sinusoid_to_remove = synthesize_damped_sinusoid(params, len(frame), float(fs)) | |
| # 5. Remove the estimated component from the residual signal. | |
| residual_signal -= sinusoid_to_remove | |
| return estimated_params | |
| def generate_test_signal( | |
| fs: float, | |
| duration: float, | |
| n_sinusoids: int, | |
| true_params: list[DampedSinusoidParameter], | |
| noise_level: float = 0.01, | |
| ) -> npt.NDArray[np.float64]: | |
| """Generates the test signal for the experiment.""" | |
| n_samples = int(fs * duration) | |
| test_signal = np.zeros(n_samples) | |
| for params in true_params: | |
| test_signal += synthesize_damped_sinusoid(params, n_samples, fs) | |
| test_signal += np.random.randn(n_samples) * noise_level | |
| print( | |
| f"Generated a {duration}s signal with {n_sinusoids} damped sinusoids " | |
| + f"(noise level: {noise_level})." | |
| ) | |
| return test_signal | |
| def match_results( | |
| estimated_params: list[DampedSinusoidParameter], | |
| true_params: list[DampedSinusoidParameter], | |
| ) -> list[DampedSinusoidParameter | None]: | |
| """Matches estimated parameters with true parameters.""" | |
| matched_true_params: list[DampedSinusoidParameter | None] = [None] * len( | |
| estimated_params | |
| ) | |
| remaining_true_params = list(true_params) | |
| for i, est_p in enumerate(estimated_params): | |
| best_match_idx = -1 | |
| min_freq_diff = float("inf") | |
| for j, true_p in enumerate(remaining_true_params): | |
| diff = np.abs(est_p.fo - true_p.fo) | |
| if diff < min_freq_diff: | |
| min_freq_diff = diff | |
| best_match_idx = j | |
| if best_match_idx != -1: | |
| matched_true_params[i] = remaining_true_params.pop(best_match_idx) | |
| return matched_true_params | |
| def print_results( | |
| estimated_params: list[DampedSinusoidParameter], | |
| matched_true_params: list[DampedSinusoidParameter | None], | |
| ) -> None: | |
| """Prints the estimation results in a formatted table.""" | |
| print("----------------------------------------------------------------------") | |
| print( | |
| "| Component | True F0 (Hz) | Est. F0 (Hz) | True Eps | Est. Eps | " | |
| + "True Amp | Est. Amp |" | |
| ) | |
| print("----------------------------------------------------------------------") | |
| if not estimated_params: | |
| print("No components were estimated.") | |
| else: | |
| for i, (est_p, true_p) in enumerate(zip(estimated_params, matched_true_params)): | |
| if true_p is None: | |
| print( | |
| f"| Est. {i+1} | --- | {est_p.fo:-12.2f} " | |
| + f"| --- | {est_p.epsilon:-8.2f} | --- " | |
| + f"| {est_p.amp:-8.2f} | (Unmatched)" | |
| ) | |
| continue | |
| print( | |
| f"| Comp. {i+1} | {true_p.fo:-12.2f} | {est_p.fo:-12.2f} " | |
| + f"| {true_p.epsilon:-8.2f} | {est_p.epsilon:-8.2f} " | |
| + f"| {true_p.amp:-8.2f} | {est_p.amp:-8.2f} |" | |
| ) | |
| print("----------------------------------------------------------------------") | |
| def main() -> None: | |
| """Perform demonstration.""" | |
| fs = 44100 | |
| duration = 1.0 # sec | |
| true_params = [ | |
| DampedSinusoidParameter(220, 5.0, 1.0, np.pi / 4), | |
| DampedSinusoidParameter(350, 8.0, 0.8, np.pi / 3), | |
| ] | |
| n_sinusoids = len(true_params) | |
| print("--- Generating test signal ---") | |
| test_signal = generate_test_signal(fs, duration, n_sinusoids, true_params) | |
| print("\n--- Running Iterative Cancellation Method ---") | |
| estimated_params = estimate_multiple_damped_sinusoids(test_signal, fs, n_sinusoids) | |
| print("\n--- Estimation Result ---") | |
| matched_true_params = match_results(estimated_params, true_params) | |
| print_results(estimated_params, matched_true_params) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment