Last active
July 28, 2024 15:44
-
-
Save dneuman/76fa42cf8c2655517433a591a6a2d905 to your computer and use it in GitHub Desktop.
Weighted Moving Average Smoother in Python using Pandas and Numpy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import pandas as pd | |
| def Hanning(size): | |
| w = np.hanning(size+2) | |
| w = np.array(w[1:-1]) # remove zeros at endpoints | |
| return (w / max(w)) | |
| def WeightedMovingAverage(fs, size, pad=True, winType=Hanning, wts=None): | |
| """Apply a weighted moving average on the supplied series. | |
| Parameters | |
| ---------- | |
| s : pandas.Series | |
| data to be averaged | |
| size : integer | |
| how wide a window to use | |
| pad : Boolean (optional, default = True) | |
| flag determining whether to pad beginning and end of data with a | |
| weighted average of the last `size` points. This provides better | |
| smoothing at the beginning and end of the line, but it tends to have | |
| zero slope. | |
| winType : Function (optional, default = Hanning) | |
| Window function that takes an integer (window size) and returns a list | |
| of weights to be applied to the data. The default is Hanning, a | |
| weighted cosine with non-zero endpoints. Other possible windows are: | |
| * np.bartlett (triangular with endpoints of 0) | |
| * np.blackman (3 cosines creating taper) | |
| * np.hamming (weighted cosine) | |
| * np.hanning (weighted cosine with endpoints of 0) | |
| * Triangle (triangle with non-zero enpoints, and option to | |
| clip top of triangle) | |
| wts : list (optional, default = None) | |
| List of weights to use. `size` becomes the length of wts. Use this | |
| option to provide a custom weighting function. The length of wts | |
| should be odd, but this is not enforced. | |
| Returns | |
| ------- | |
| Pandas Series containing smoothed data | |
| Notes | |
| ----- | |
| Defaults to using a Hanning window for weights, centered on | |
| each point. For points near the beginning or end of data, special | |
| processing is required that isn't in built-in functions. | |
| Any rows with no value (nan) are dropped from series, and that reduced | |
| series is returned. This series will have fewer members than what was | |
| given, and may cause problems with mismatched indexes. | |
| """ | |
| def SetLimits(i, hw): | |
| # i: current data location where window is centred | |
| # hw: half window width | |
| ds = max(0, (i-hw)) # data start | |
| de = min(n-1, (i+hw)) # data end | |
| ws = hw - (i - ds) # window start | |
| we = hw + (de - i) # window end | |
| return ds, de, ws, we | |
| s = fs.dropna() | |
| if type(wts) == type(None): | |
| size += (size+1) % 2 # make odd | |
| window = winType(size) | |
| window /= window.sum() # normalize window | |
| else: | |
| window = wts / wts.sum() | |
| size = len(wts) | |
| n = len(s) | |
| hw = int(size / 2) # half window width | |
| # convolve has boundary effects when there is no overlap with the window | |
| # Begining and end of 'a' must be adjusted to compensate. | |
| # np.average() effectively scales the weights for the different sizes. | |
| if pad: # pad the data with reflected values | |
| # create padded beginning | |
| y = np.zeros(n+2*hw) | |
| for i in range(hw): | |
| y[i] = s.iloc[hw-i] | |
| for i in range(hw): | |
| y[i+n+hw] = s.iloc[n-i-1] | |
| for i in range(n): | |
| y[i+hw] = s.iloc[i] | |
| yc = np.convolve(y, window, mode='same') | |
| a = pd.Series(yc[hw:n+hw], | |
| index=s.index, | |
| name=s.name) | |
| else: # clip window as available data decreases | |
| a = pd.Series(np.convolve(s, window, mode='same'), | |
| index=s.index, | |
| name=s.name) | |
| for i in range(hw+1): # fix the start | |
| (ds, de, ws, we) = SetLimits(i, hw) | |
| a.iloc[i] = np.average(s.iloc[ds:de], weights=window[ws:we]) | |
| for i in range(n-hw-1, n): # fix the end | |
| (ds, de, ws, we) = SetLimits(i, hw) | |
| a.iloc[i] = np.average(s.iloc[ds:de], weights=window[ws:we]) | |
| return a |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment