Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active August 17, 2025 09:19
Show Gist options
  • Select an option

  • Save tam17aki/c5e96f670ec3f561afb3f0c761291f04 to your computer and use it in GitHub Desktop.

Select an option

Save tam17aki/c5e96f670ec3f561afb3f0c761291f04 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""A demonstration of Frequency Estimation Based on WLS-SCDE for a dumped sinusoid.
Copyright (C) 2025 by Akira TAMAMORI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from typing import NamedTuple
import numpy as np
import numpy.typing as npt
from scipy.signal.windows import get_window
class DampedSinusoidParameter(NamedTuple):
"""Represents the parameters of a single damped sinusoid."""
fo: float # fundamental frequency
epsilon: float # dumping coefficient
amp: float # amplitude
phase: float # initial phase
def calculate_observables(
frame: npt.NDArray[np.float64], fs: float
) -> tuple[
npt.NDArray[np.complex128],
npt.NDArray[np.complex128],
npt.NDArray[np.complex128],
npt.NDArray[np.float64],
]:
"""Calculates the observables (gw, hw, kw) from a signal frame.
These are Fourier coefficients weighted by a window function and its derivatives.
"""
n_samples = len(frame)
fft_size = 1 << (n_samples - 1).bit_length()
# Use a window function whose value and derivatives are zero at the boundaries.
p_t = get_window("blackmanharris", n_samples)
dt = 1.0 / fs
dp_t = np.gradient(p_t, dt)
d2p_t = np.gradient(dp_t, dt)
# Calculate Fourier coefficients for the signal weighted by p(t), p'(t), and p''(t).
gwk_full = np.fft.rfft(frame * p_t, n=fft_size)
hwk_full = np.fft.rfft(frame * dp_t, n=fft_size)
kwk_full = np.fft.rfft(frame * d2p_t, n=fft_size)
freq_bins_hz = np.fft.rfftfreq(fft_size, 1 / fs).astype(np.float64)
omega_k_full = 2 * np.pi * freq_bins_hz
return gwk_full, hwk_full, kwk_full, omega_k_full
def select_bins_for_damped_scde(
gwk_full: npt.NDArray[np.complex128],
omega_k_full: npt.NDArray[np.float64],
fs: float,
num_bins: int = 7,
) -> npt.NDArray[np.int64]:
"""Selects a set of frequency bins around the spectral peak."""
freq_bins_hz = omega_k_full / (2 * np.pi)
mag_spec = np.abs(gwk_full)
lower_bound = 50
search_range = (freq_bins_hz > lower_bound) & (freq_bins_hz < fs / 2 * 0.95)
if not np.any(search_range):
return np.array([], dtype=int)
search_indices = np.where(search_range)[0]
mag_spec_searched = mag_spec[search_indices]
center_peak_local_idx = np.argmax(mag_spec_searched)
half_bins = num_bins // 2
start_local_idx = np.maximum(0, center_peak_local_idx - half_bins)
end_local_idx = start_local_idx + num_bins
if end_local_idx > len(mag_spec_searched):
end_local_idx = len(mag_spec_searched)
start_local_idx = max(0, end_local_idx - num_bins)
selected_local_indices = np.arange(start_local_idx, end_local_idx)
return search_indices[selected_local_indices]
def solve_complex_a_sq_wls(
gwk: npt.NDArray[np.complex128],
hwk: npt.NDArray[np.complex128],
kwk: npt.NDArray[np.complex128],
omega_k: npt.NDArray[np.float64],
) -> np.complex128 | None:
"""Solves for the complex squared frequency using Weighted Least Squares."""
if len(gwk) < 3:
return None
# Equation: (gwk) * ã² = (ωk^2 * gwk + 2jωk * hwk - kwk)
coeff = gwk
b = omega_k**2 * gwk + 2j * omega_k * hwk - kwk
# Weight W is the power |gwk|^2
w = np.abs(gwk) ** 2
# Solve the weighted normal equation: (A^{H}WA)x = (A^{H}Wb)
coeff_h = coeff.conj()
lhs = np.sum(coeff_h * w * coeff)
rhs = np.sum(coeff_h * w * b)
condition = 1e-12
if np.abs(lhs) < condition:
return None
complex_freq: np.complex128 = rhs / lhs
return complex_freq
def separate_params_from_complex_a_sq(
complex_a_sq: np.complex128, peak_omega: np.float64
) -> tuple[np.float64, np.float64]:
"""Separates frequency (a) and damping (ε) from the complex a^2."""
real_part = complex_a_sq.real
imag_part = complex_a_sq.imag
# Im(ã^2) ≈ 2ωε
omega_bound = 1e-6
if peak_omega < omega_bound:
epsilon = np.float64(0.0)
else:
epsilon = imag_part / (2 * peak_omega)
# Re(ã^2) ≈ a^2 + ε^2
a_sq = real_part - epsilon**2
if a_sq < 0:
return np.float64(np.float64(np.nan)), np.float64(np.float64(np.nan))
a: np.float64 = np.sqrt(a_sq)
return a, epsilon
def estimate_damped_sinusoid(
frame: npt.NDArray[np.float64], fs: float
) -> tuple[np.float64, np.float64]:
"""Estimates the frequency (fo) and damping factor (ε) of a single damped sinusoid.
This function is a refactored version with separated responsibilities.
Args:
frame (np.ndarray): The input signal frame.
fs (int): The sampling frequency in Hz.
Returns:
tuple[float, float]: The estimated (fo, ε) pair.
Returns (np.float64(np.nan), np.float64(np.nan))
if estimation fails.
"""
lower_freq = 50
var_bound = 1e-10
if len(frame) < lower_freq or np.var(frame) < var_bound:
return np.float64(np.nan), np.float64(np.nan)
# Step 1: Calculate the necessary observables (gw, hw, kw) from the signal.
gwk_full, hwk_full, kwk_full, omega_k_full = calculate_observables(frame, fs)
# Step 2: Select the most informative frequency bins around the spectral peak.
selected_indices = select_bins_for_damped_scde(
gwk_full, omega_k_full, fs, num_bins=7
)
if selected_indices.size < 3:
return np.float64(np.nan), np.float64(np.nan)
# Step 3: Solve for the complex squared frequency using WLS.
complex_a_sq = solve_complex_a_sq_wls(
gwk=gwk_full[selected_indices],
hwk=hwk_full[selected_indices],
kwk=kwk_full[selected_indices],
omega_k=omega_k_full[selected_indices],
)
if complex_a_sq is None:
return np.float64(np.nan), np.float64(np.nan)
# Step 4: Separate the physical parameters (a, ε) from the complex solution.
index: np.int64 = selected_indices[np.argmax(np.abs(gwk_full[selected_indices]))]
peak_omega = omega_k_full[index]
a, epsilon = separate_params_from_complex_a_sq(complex_a_sq, peak_omega)
if np.isnan(a):
return np.float64(np.nan), np.float64(np.nan)
# Convert angular frequency to Hz and perform final validation.
fo_hz = a / (2 * np.pi)
if not lower_freq < fo_hz < fs / 2:
return np.float64(np.nan), np.float64(np.nan)
return fo_hz, epsilon
def synthesize_damped_sinusoid(
params: DampedSinusoidParameter, n_samples: int, fs: float
) -> npt.NDArray[np.float64]:
"""Re-synthesize a damped sinusoid from its parameter object."""
t = np.arange(n_samples) / fs
signal: npt.NDArray[np.float64] = (
params.amp
* np.exp(-params.epsilon * t)
* np.cos(2 * np.pi * params.fo * t + params.phase)
)
return signal
def estimate_amp_phase_damped(
signal: npt.NDArray[np.float64], fs: float, fo: np.float64, epsilon: np.float64
) -> tuple[np.float64, np.float64]:
"""Estimate the amplitude and phase of a damped sinusoid with known fo and ε."""
t = np.arange(len(signal)) / fs
# model : y = c*exp(-εt)cos(2π fo t) - s*exp(-εt)sin(2π fo t)
# The unknowns are c = Acos(φ) and s = Asin(φ).
exp_decay = np.exp(-epsilon * t)
cos_term = exp_decay * np.cos(2 * np.pi * fo * t)
sin_term = -exp_decay * np.sin(2 * np.pi * fo * t)
a = np.vstack([cos_term, sin_term]).T
try:
coeffs, _, _, _ = np.linalg.lstsq(a, signal, rcond=None)
c, s = coeffs[0], coeffs[1]
except np.linalg.LinAlgError:
return np.float64(0.0), np.float64(0.0)
# Calculate amplitude and phase from c and s
amp: np.float64 = np.sqrt(c**2 + s**2)
phase: np.float64 = np.arctan2(s, c)
return amp, phase
def estimate_multiple_damped_sinusoids(
frame: npt.NDArray[np.float64], fs: float, n_sinusoids: int
) -> list[DampedSinusoidParameter]:
"""Estimate parameters of multiple damped sinusoids using iterative cancellation.
Args:
frame (np.ndarray): The input signal frame.
fs (int): The sampling frequency in Hz.
n_sinusoids (int): Number of Sinusoids.
Returns:
list[dict]: list of Estimated parameters of each sound source.
(fo, epsilon, amp, phase)
"""
estimated_params: list[DampedSinusoidParameter] = []
residual_signal = np.copy(frame) # Copy without destroying the original signal
for i in range(n_sinusoids):
print(f"--- Iteration {i+1}/{n_sinusoids} ---")
# 1. Estimate the parameters of the strongest single damped sinusoid
# from the current residual signal.
# Note: In this step, we expect to find the most energetic components.
fo, epsilon = estimate_damped_sinusoid(residual_signal, fs)
if np.isnan(fo):
print(" No more components found. Stopping.")
break # a more reliable ingredient won't be found
# 2. Use the estimated frequency fo to accurately estimate
# the amplitude and phase of that component.
# Ideally, we would like to estimate the amplitude and
# phase more accurately, taking into account the attenuation term.
amp, phase = estimate_amp_phase_damped(residual_signal, fs, fo, epsilon)
print(f" Component found: fo={fo:.2f} Hz, eps={epsilon:.2f}, amp={amp:.2f}")
# 3. Save estimated parameters to a list.
params = DampedSinusoidParameter(
float(fo), float(epsilon), float(amp), float(phase)
)
estimated_params.append(params)
# 4. Recombining the estimated component signal.
sinusoid_to_remove = synthesize_damped_sinusoid(params, len(frame), float(fs))
# 5. Remove the estimated component from the residual signal.
residual_signal -= sinusoid_to_remove
return estimated_params
def generate_test_signal(
fs: float,
duration: float,
n_sinusoids: int,
true_params: list[DampedSinusoidParameter],
noise_level: float = 0.01,
) -> npt.NDArray[np.float64]:
"""Generates the test signal for the experiment."""
n_samples = int(fs * duration)
test_signal = np.zeros(n_samples)
for params in true_params:
test_signal += synthesize_damped_sinusoid(params, n_samples, fs)
test_signal += np.random.randn(n_samples) * noise_level
print(
f"Generated a {duration}s signal with {n_sinusoids} damped sinusoids "
+ f"(noise level: {noise_level})."
)
return test_signal
def match_results(
estimated_params: list[DampedSinusoidParameter],
true_params: list[DampedSinusoidParameter],
) -> list[DampedSinusoidParameter | None]:
"""Matches estimated parameters with true parameters."""
matched_true_params: list[DampedSinusoidParameter | None] = [None] * len(
estimated_params
)
remaining_true_params = list(true_params)
for i, est_p in enumerate(estimated_params):
best_match_idx = -1
min_freq_diff = float("inf")
for j, true_p in enumerate(remaining_true_params):
diff = np.abs(est_p.fo - true_p.fo)
if diff < min_freq_diff:
min_freq_diff = diff
best_match_idx = j
if best_match_idx != -1:
matched_true_params[i] = remaining_true_params.pop(best_match_idx)
return matched_true_params
def print_results(
estimated_params: list[DampedSinusoidParameter],
matched_true_params: list[DampedSinusoidParameter | None],
) -> None:
"""Prints the estimation results in a formatted table."""
print("----------------------------------------------------------------------")
print(
"| Component | True F0 (Hz) | Est. F0 (Hz) | True Eps | Est. Eps | "
+ "True Amp | Est. Amp |"
)
print("----------------------------------------------------------------------")
if not estimated_params:
print("No components were estimated.")
else:
for i, (est_p, true_p) in enumerate(zip(estimated_params, matched_true_params)):
if true_p is None:
print(
f"| Est. {i+1} | --- | {est_p.fo:-12.2f} "
+ f"| --- | {est_p.epsilon:-8.2f} | --- "
+ f"| {est_p.amp:-8.2f} | (Unmatched)"
)
continue
print(
f"| Comp. {i+1} | {true_p.fo:-12.2f} | {est_p.fo:-12.2f} "
+ f"| {true_p.epsilon:-8.2f} | {est_p.epsilon:-8.2f} "
+ f"| {true_p.amp:-8.2f} | {est_p.amp:-8.2f} |"
)
print("----------------------------------------------------------------------")
def main() -> None:
"""Perform demonstration."""
fs = 44100
duration = 1.0 # sec
true_params = [
DampedSinusoidParameter(220, 5.0, 1.0, np.pi / 4),
DampedSinusoidParameter(350, 8.0, 0.8, np.pi / 3),
]
n_sinusoids = len(true_params)
print("--- Generating test signal ---")
test_signal = generate_test_signal(fs, duration, n_sinusoids, true_params)
print("\n--- Running Iterative Cancellation Method ---")
estimated_params = estimate_multiple_damped_sinusoids(test_signal, fs, n_sinusoids)
print("\n--- Estimation Result ---")
matched_true_params = match_results(estimated_params, true_params)
print_results(estimated_params, matched_true_params)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment