Skip to content

Instantly share code, notes, and snippets.

@dhhagan
Created March 16, 2016 19:58
Show Gist options
  • Select an option

  • Save dhhagan/eb9dffe08c85da547115 to your computer and use it in GitHub Desktop.

Select an option

Save dhhagan/eb9dffe08c85da547115 to your computer and use it in GitHub Desktop.
opc.py
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
import seaborn as sns
import warnings
class DataError(TypeError):
pass
class CalculationError(Exception):
pass
class Opc(object):
''' Generic OPC Class '''
def __init__(self, **kwargs):
self.density = kwargs.get('density', 1.0)
self.dN = kwargs.get('data')
self.bins = kwargs.get('bins', [])
self.divisor = kwargs.get('divisor', 100.)
# Make sure the data is going to work
assert type(self.dN) is pd.DataFrame, "Your data is not a pandas DataFrame"
assert type(self.bins) in (list, np.ndarray), \
"bins must be either a list or numpy array"
assert len(self.bins) == (len(self.dN.columns) + 1), \
"You must have 1 more size bin than you do data columns"
# Rename the index
self.dN.index.rename('index', inplace = True)
# Replace all zeros with NaNs and drop empty rows
self.dN = self.dN.replace(0, np.nan).dropna(how = 'all')
# Make sure there are no duplicate rows
self.dN = self.dN.reset_index().drop_duplicates('index').set_index('index')
# Initialize the midpoint diameter vector that has length 1 less than total bins
self.numbins = len(self.bins) - 1
# Replace this with regular methods instead of a function call..
self.datum = self.calc_dp()
# Calculate Surface Area and Volume factors (S+P Eqn. 8.10)
self.s_factor = self.Dp ** 2 * math.pi
self.v_factor = self.Dp ** 3 * (math.pi / 6.)
# Calculate the product of dN and the multiplicity factors
self.dS = self.dN.mul(self.s_factor)
self.dV = self.dN.mul(self.v_factor)
# Calculate the differential distribution matrices
# Normalize the data by bin using the log diameter for the bin
self.dNdDp = self.dN.div(self.dDp)
self.dSdDp = self.dS.div(self.dDp)
self.dVdDp = self.dV.div(self.dDp)
# Calculate the surface area matrix using equations []
self.dNdlogDp = self.dNdDp.mul(2.3025).mul(self.Dp)
self.dSdlogDp = self.dSdDp.mul(2.3025).mul(self.Dp)
self.dVdlogDp = self.dVdDp.mul(2.3025).mul(self.Dp)
# Calculate the area values (Essentially the area in each bin on a logDp plot)
# As seen below, dlogDp is log10(point2) - log10(point1)
self.dN_area = self.dNdlogDp.mul(self.dlogDp)
self.dS_area = self.dSdlogDp.mul(self.dlogDp)
self.dV_area = self.dVdlogDp.mul(self.dlogDp)
# Dump all of the data into a Panel
self.data = pd.Panel({
'dN': self.dN,
'dS': self.dS,
'dV': self.dV,
'dNdDp': self.dNdDp,
'dSdDp': self.dSdDp,
'dVdDp': self.dVdDp,
'dNdlogDp': self.dNdlogDp,
'dSdlogDp': self.dSdlogDp,
'dVdlogDp': self.dVdlogDp,
'dN_area': self.dN_area,
'dS_area': self.dS_area,
'dV_area': self.dV_area,
}).transpose(1, 2, 0)
# calculate statistics and pm values
self.stats = self.calculate_statistics()
def calc_dp(self):
''' Calculate the midpoint diameters and logdp values '''
c = np.array([self.bins[1:], np.roll(self.bins, 1)[1:]]).T
columns = ['bin{}'.format(i) for i in range(self.numbins)]
self.dDp = np.array([(x[0] - x[1]) for x in c])
self.Dp = np.array([np.mean(x) for x in c])
self.logDp = np.array([math.log10(x) for x in self.Dp])
self.dlogDp = np.array([(np.log10(x[0]) - np.log10(x[1])) for x in c])
datum = pd.DataFrame.from_dict({
'bin': columns,
'Dp': self.Dp,
'logDp': self.logDp,
'dlogDp': self.dlogDp,
'dDp': self.dDp,
'boundary': self.bins[0:-1]
})
datum.index = datum['bin']
del datum['bin']
return datum
def integrated_pm(self):
# DO THIS FOR THE WHOLE DANG THING!
''' Returns a DataFrame '''
# find the bin that pertains that contains the largest value we want!
pm1bin = self.datum.query("boundary < 1.0").index[-1]
pm25bin = self.datum.query("boundary < 2.5").index[-1]
pm10bin = self.datum.query("boundary < 10.").index[-1]
# Make bound estimates (Hi and Lo)
Dp_lo = self.bins[:-1]
Dp_hi = self.bins[1:]
v_factor_lo = Dp_lo ** 3 * (math.pi / 6.)
v_factor_hi = Dp_hi ** 3 * (math.pi / 6.)
dV_lo = self.dN.mul(v_factor_lo)
dV_hi = self.dN.mul(v_factor_hi)
vol_lo = (dV_lo * self.density).T
vol_hi = (dV_hi * self.density).T
pm1 = self.dV_area.T[:pm1bin].T.sum(axis = 1) * self.density
pm25 = self.dV_area.T[:pm25bin].T.sum(axis = 1) * self.density
pm10 = self.dV_area.T[:pm10bin].T.sum(axis = 1) * self.density
pmtot = self.dV_area.sum(axis = 1) * self.density
df = pd.DataFrame({
'pm1_lo': vol_lo[:pm1bin].sum(),
'pm1': pm1,
'pm1_hi': vol_hi[:pm1bin].sum(),
'pm25_lo': vol_lo[:pm25bin].sum(),
'pm25': pm25,
'pm25_hi': vol_hi[:pm25bin].sum(),
'pm10_lo': vol_lo[:pm10bin].sum(),
'pm10': pm10,
'pm10_hi': vol_hi[:pm10bin].sum(),
'pmtot': pmtot
})
return df
def simple_pm(self):
'''
Calculate the PM by multiplying the Number concentration by the volume factor for a generic particle in that size bin
multiplied by the density and then summed across the bins.
Returns mass per bin.
'''
volume = (self.dV * self.density).T
pm1bin = self.datum.query("boundary < 1.0").index[-1]
pm25bin = self.datum.query("boundary < 2.5").index[-1]
pm10bin = self.datum.query("boundary < 10.").index[-1]
pm = pd.DataFrame({
'pm1': volume[:pm1bin].sum(),
'pm2_5': volume[:pm25bin].sum(),
'pm10': volume[:pm10bin].sum()
})
return pm
def _variance(self, row):
'''
Calculate the value of (each bin - arithmetic mean) ** 2 as a factor and then
multiply by the number concentration in that bin before multiplying by 1/Ntot
'''
ntot = row.sum()
mean = row.mul(self.Dp).sum() / ntot
return row.mul((self.Dp - mean) ** 2).sum() / ntot
def calculate_statistics(self):
'''
Calculate the CMD/Geomean and Geometric Stdev for each datapoint
'''
stats = pd.DataFrame()
stats['ntot'] = self.dN.sum(axis = 1)
stats['mean'] = self.dN.mul(self.Dp).sum(axis = 1) / stats['ntot']
stats['var'] = self.dN.apply(self._variance, axis = 1)
# The divide N by 10. trick deals with large numbers on 32-bit architecture
stats['cmd'] = (self.dN.apply(lambda x: self.Dp ** (x / self.divisor),
axis = 1).replace(0, np.nan).prod(axis = 1)).pow(1. / (stats['ntot'] / self.divisor))
# Calculating GSD is much more of a pain..
for i, row in stats.iterrows():
nrow = self.dN["{}".format(i)]
try:
stats.loc[i, 'gsd'] = math.pow(10, np.sqrt(nrow.mul((self.logDp - np.log10(row['cmd'])) ** 2).sum(axis = 1) / (row['ntot'] - 1)))
except Exception as e:
stats.loc[i, 'gsd'] = np.nan
warnings.warn("Index {}\n{}".format(i, e))
# Adding for chris (Dec 2015) Calculated using S+P 8.50
stats['cmd_SA'] = stats.apply(lambda row: np.exp(np.log(row['cmd']) + 2 * np.log(row['gsd']) ** 2) , axis = 1)
return stats
# PLOTZ!
def plot1(self, row = 0):
'''
Returns the normalized aerosol concentration plot
'''
# Get the data by row
data = self.data.ix[row,]
rc = {
'xtick.major.size': 8.0,
'xtick.minor.size': 5.0,
'ytick.major.size': 8.0
}
with sns.axes_style('dark'):
fig, ax = plt.subplots(3, sharex = True, figsize = (12, 9))
ax[0].set_title('Normalized Aerosol Concentration')
ax[0].set_ylabel("$n_N\;(D_p)$ [$\mu m^{-1} cm^{-3}$]", fontsize = 14)
ax[1].set_ylabel("$n_S\; (D_p)$ [$\mu m \;cm^{-3}$]", fontsize = 14)
ax[2].set_ylabel("$n_V\;(D_p)$ [$\mu m^2 cm^{-3}$]", fontsize = 14)
ax[2].set_xlabel("$D_p$ [$\mu m$]")
count = 0
for i, r in data.iterrows():
ax[0].bar(self.bins[count], r['dNdDp'], self.datum['dDp'][i], alpha = 0.75)
ax[1].bar(self.bins[count], r['dSdDp'], self.datum['dDp'][i], alpha = 0.75)
ax[2].bar(self.bins[count], r['dVdDp'], self.datum['dDp'][i], alpha = 0.75)
count += 1
ax[0].set_xscale('symlog')
ax[1].set_xscale('symlog')
ax[2].set_xscale('symlog')
#ax[2].tick_params(axis = 'both', which = 'both', top = 'off', right = 'off')
ax[2].xaxis.set_major_formatter(ScalarFormatter())
plt.tight_layout()
plt.show()
return fig, ax
def __repr__(self):
return "OPC"
class Grimm(Opc):
''' Opc instance specific to the Grimm instrument '''
def __init__(self, **kwargs):
# Bin diameters for the Grim OPC according to the documentation [um]
bin_boundaries = np.array([0.25, 0.28, 0.3, 0.35, 0.4, 0.45, 0.5, 0.58, 0.65, 0.7,
0.8, 1.0, 1.3, 1.6, 2.0, 2.5, 3.0, 3.5, 4.0, 5.0, 6.5,
7.5, 8.5, 10.0, 12.5, 15.0, 17.5, 20.0, 25.0, 30.0, 32.0, 35.0])
super(self.__class__, self).__init__(bins = bin_boundaries, **kwargs)
class Alphasense(Opc):
''' Opc instance specific to the Alphasense OPC-N2 '''
def __init__(self, **kwargs):
bin_boundaries = np.array([0.38, 0.54, 0.78, 1.05, 1.34, 1.59, 2.07, 3., 4., 5.,
6.5, 8., 10., 12., 14., 16., 17.5])
super(self.__class__, self).__init__(bins = bin_boundaries, **kwargs)
def __repr__(self):
return "Alphasense OPC-N2"
class Smps(Opc):
''' Opc instance specific to an SMPS '''
def __init__(self, **kwargs):
super(self.__class__, self).__init__(divisor = 1.0e12, **kwargs)
def integrated_pm(self, Dp = 1.):
''' Returns a DataFrame '''
# find the bin that pertains that contains the largest value we want!
try:
pmbin = self.datum.query("boundary < {}".format(Dp)).index[-1]
except:
pmbin = self.datum.index[-1]
# 1e-9 converts from nm3 mass /cm3 air -> ug / m3 air
pm = self.dV_area.T[:pmbin].T.sum(axis = 1) * self.density #*1e-9
pmtot = self.dV_area.sum(axis = 1) * self.density #* 1e-9
df = pd.DataFrame({
'pm': pm,
'pmtot': pmtot
})
return df
def __repr__(self):
return "SMPS"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment