import os
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import numpy as np
from lmfit import Parameters
from lmfit.models import GaussianModel, LinearModel
from astropy.constants import c
from fitelp.label_tools import line_label
import fitelp.constants as constants
constants.init()
def vel_to_wave(restWave, vel, flux, fluxError=None, delta=False):
if delta is True:
wave = (vel / c.to('km/s').value) * restWave
else:
wave = (vel / c.to('km/s').value) * restWave + restWave
flux = flux / (restWave / c.to('km/s').value)
if fluxError is not None:
fluxError = fluxError / (restWave / c.to('km/s').value)
return vel, flux, fluxError
else:
return wave, flux
def wave_to_vel(restWave, wave, flux, fluxError=None, delta=False):
if delta is True:
vel = (wave / restWave) * c.to('km/s').value
else:
vel = ((wave - restWave) / restWave) * c.to('km/s').value
flux = flux * (restWave / c.to('km/s').value)
if fluxError is not None:
fluxError = fluxError * (restWave / c.to('km/s').value)
return vel, flux, fluxError
else:
return vel, flux
class FittingProfile(object):
def __init__(self, wave, flux, restWave, lineName, zone, rp, fluxError=None, xAxis='vel', initVals='vel'):
"""The input vel and flux must be limited to a single emission line profile"""
self.flux = flux
self.fluxError = fluxError
self.restWave = restWave
self.lineName = lineName
self.zone = zone
self.weights = self._weights()
self.rp = rp
self.xAxis = xAxis
self.initVals = initVals
if xAxis == 'vel':
if fluxError is None:
vel, self.flux = wave_to_vel(restWave, wave, flux)
else:
vel, self.flux, self.fluxError = wave_to_vel(restWave, wave, flux, fluxError)
self.x = vel
else:
self.x = wave
self.linGaussParams = Parameters()
def _weights(self):
if self.fluxError is None:
return None
else:
fluxErrorCR = self.fluxError# - self.continuum
return 1./fluxErrorCR
def _get_amplitude(self, numOfComponents, modelFit):
amplitudeTotal = 0.
for i in range(numOfComponents):
amplitudeTotal = amplitudeTotal + modelFit.best_values['g%d_amplitude' % (i+1)]
print("Amplitude Total is %f" % amplitudeTotal)
return amplitudeTotal
def _gaussian_component(self, pars, prefix, c, s, a, limits):
"""Fits a gaussian with given parameters.
pars is the lmfit Parameters for the fit, prefix is the label of the gaussian, c is the center, s is sigma,
a is amplitude. Returns the Gaussian model"""
varyCentre = True
varySigma = True
varyAmp = True
if limits['c'] is False:
varyCentre = False
cMin, cMax = -np.inf, np.inf
elif type(limits['c']) is tuple:
cMin = limits['c'][0]
cMax = limits['c'][1]
else:
cMin = c - c*limits['c']
cMax = c + c*limits['c']
if limits['s'] is False:
varySigma = False
sMin, sMax = -np.inf, np.inf
elif type(limits['s']) is tuple:
sMin = limits['s'][0]
sMax = limits['s'][1]
else:
sMin = s - s * limits['s']
sMax = s + s * limits['s']
if limits['a'] is False:
varyAmp = False
aMin, aMax = -np.inf, np.inf
elif type(limits['a']) is tuple:
aMin = limits['a'][0]
aMax = limits['a'][1]
else:
aMin = a - a * limits['a']
aMax = a + a * limits['a']
g = GaussianModel(prefix=prefix)
pars.update(g.make_params())
if isinstance(c, str):
pars[prefix + 'center'].set(expr=c, min=cMin, max=cMax, vary=varyCentre)
else:
pars[prefix + 'center'].set(c, min=cMin, max=cMax, vary=varyCentre)
if isinstance(s, str):
pars[prefix + 'sigma'].set(expr=s, min=sMin, max=sMax, vary=varySigma)
else:
pars[prefix + 'sigma'].set(s, min=sMin, max=sMax, vary=varySigma)
if isinstance(a, str):
pars[prefix + 'amplitude'].set(expr=a, min=aMin, max=aMax, vary=varyAmp)
else:
pars[prefix + 'amplitude'].set(a, min=aMin, max=aMax, vary=varyAmp)
return g
def multiple_close_emission_lines(self, lineNames, cListInit, sListInit, lS, lI):
"""All lists should be the same length"""
gList = []
# Assume initial parameters are in velocity
lin = LinearModel(prefix='lin_')
self.linGaussParams = lin.guess(self.flux, x=self.x)
self.linGaussParams.update(lin.make_params())
self.linGaussParams['lin_slope'].set(lS, vary=True)
self.linGaussParams['lin_intercept'].set(lI, vary=True)
for j, lineName in enumerate(lineNames):
numComps = self.rp.emProfiles[lineName]['numComps']
restWave = self.rp.emProfiles[lineName]['restWavelength']
copyFrom = self.rp.emProfiles[lineName]['copyFrom']
if copyFrom is not None:
copyFromRestWave = self.rp.emProfiles[copyFrom]['restWavelength']
cList = ['g{0}{1}_center*{2}'.format(copyFrom.replace('-', ''), (i + 1), (restWave / copyFromRestWave)) for i in range(numComps)]
sList = ['g{0}{1}_sigma'.format(copyFrom.replace('-', ''), i + 1) for i in range(numComps)]
if type(self.rp.emProfiles[lineName]['ampList']) is list:
aList = self.rp.emProfiles[lineName]['ampList']
if self.xAxis == 'vel':
aList = vel_to_wave(restWave, vel=0, flux=np.array(aList))[1]
else:
ampRatio = self.rp.emProfiles[lineName]['ampList']
aList = ['g{0}{1}_amplitude*{2}'.format(copyFrom.replace('-', ''), i + 1, ampRatio) for i in range(numComps)]
else:
cList = vel_to_wave(restWave, vel=np.array(cListInit), flux=0)[0]
sList = vel_to_wave(restWave, vel=np.array(sListInit), flux=0, delta=True)[0]
aListInit = self.rp.emProfiles[lineName]['ampList']
aList = vel_to_wave(restWave, vel=0, flux=np.array(aListInit))[1]
limits = self.rp.emProfiles[lineName]['compLimits']
for i in range(numComps):
if type(limits['c']) is list:
cLimit = limits['c'][i]
else:
cLimit = limits['c']
if type(limits['s']) is list:
sLimit = limits['s'][i]
else:
sLimit = limits['s']
if type(limits['a']) is list:
aLimit = limits['a'][i]
else:
aLimit = limits['a']
lims = {'c': cLimit, 's': sLimit, 'a': aLimit}
if len(lineNames) == 1:
prefix = 'g{0}_'.format(i + 1)
else:
prefix = 'g{0}{1}_'.format(lineName.replace('-', ''), i + 1)
gList.append(self._gaussian_component(self.linGaussParams, prefix, cList[i], sList[i], aList[i], lims))
gList = np.array(gList)
mod = lin + gList.sum()
init = mod.eval(self.linGaussParams, x=self.x)
out = mod.fit(self.flux, self.linGaussParams, x=self.x, weights=self.weights)
f = open(os.path.join(constants.OUTPUT_DIR, self.rp.regionName, "{0}_Log.txt".format(self.rp.regionName)), "a")
print("######## %s %s Linear and Multi-gaussian Model ##########\n" % (self.rp.regionName, self.lineName))
print(out.fit_report())
f.write("######## %s %s Linear and Multi-gaussian Model ##########\n" % (self.rp.regionName, self.lineName))
f.write(out.fit_report())
f.close()
components = out.eval_components()
if not hasattr(self.rp, 'plotResiduals'):
self.rp.plotResiduals = True
numComps = self.rp.emProfiles[lineName]['numComps']
self.plot_emission_line(numComps, components, out, self.rp.plotResiduals, lineNames, init=init, scaleFlux=self.rp.scaleFlux)
return out, components
def lin_and_multi_gaussian(self, numOfComponents, cList, sList, aList, lS, lI, limits):
"""All lists should be the same length"""
gList = []
if self.xAxis == 'wave' and self.initVals == 'vel':
cList = vel_to_wave(self.restWave, vel=np.array(cList), flux=0)[0]
sList = vel_to_wave(self.restWave, vel=np.array(sList), flux=0, delta=True)[0]
aList = vel_to_wave(self.restWave, vel=0, flux=np.array(aList))[1]
elif self.xAxis == 'vel' and self.initVals == 'wave':
cList = wave_to_vel(self.restWave, wave=np.array(cList), flux=0)[0]
sList = wave_to_vel(self.restWave, wave=np.array(sList), flux=0, delta=True)[0]
aList = wave_to_vel(self.restWave, wave=0, flux=np.array(aList))[1]
lin = LinearModel(prefix='lin_')
self.linGaussParams = lin.guess(self.flux, x=self.x)
self.linGaussParams.update(lin.make_params())
self.linGaussParams['lin_slope'].set(lS, vary=True)
self.linGaussParams['lin_intercept'].set(lI, vary=True)
for i in range(numOfComponents):
if type(limits['c']) is list:
cLimit = limits['c'][i]
else:
cLimit = limits['c']
if type(limits['s']) is list:
sLimit = limits['s'][i]
else:
sLimit = limits['s']
if type(limits['a']) is list:
aLimit = limits['a'][i]
else:
aLimit = limits['a']
lims = {'c': cLimit, 's': sLimit, 'a': aLimit}
prefix = 'g{0}_'.format(i+1)
gList.append(self._gaussian_component(self.linGaussParams, prefix, cList[i], sList[i], aList[i], lims))
gList = np.array(gList)
mod = lin + gList.sum()
init = mod.eval(self.linGaussParams, x=self.x)
out = mod.fit(self.flux, self.linGaussParams, x=self.x, weights=self.weights)
f = open(os.path.join(constants.OUTPUT_DIR, self.rp.regionName, "{0}_Log.txt".format(self.rp.regionName)), "a")
print("######## %s %s Linear and Multi-gaussian Model ##########\n" % (self.rp.regionName, self.lineName))
print(out.fit_report())
f.write("######## %s %s Linear and Multi-gaussian Model ##########\n" % (self.rp.regionName, self.lineName))
f.write(out.fit_report())
f.close()
components = out.eval_components()
if not hasattr(self.rp, 'plotResiduals'):
self.rp.plotResiduals = True
self.plot_emission_line(numOfComponents, components, out, self.rp.plotResiduals, init=init, scaleFlux=self.rp.scaleFlux)
self._get_amplitude(numOfComponents, out)
return out, components
def plot_emission_line(self, numOfComponents, components, out, plotResiduals=True, lineNames=None, init=None, scaleFlux=1e14):
ion, lambdaZero = line_label(self.lineName, self.restWave)
fig = plt.figure("%s %s %s" % (self.rp.regionName, ion, lambdaZero))
if plotResiduals is True:
frame1 = fig.add_axes((.1, .3, .8, .6))
plt.title("%s %s" % (ion, lambdaZero))
if self.xAxis == 'wave':
x = self.x
xLabel = constants.WAVE_AXIS_LABEL
yLabel = constants.FluxUnitsLabels(scaleFlux).FLUX_WAVE_AXIS_LABEL
elif self.xAxis == 'vel':
if hasattr(self.rp, 'showSystemicVelocity') and self.rp.showSystemicVelocity is True:
x = self.x - self.rp.systemicVelocity
xLabel = constants.DELTA_VEL_AXIS_LABEL
else:
x = self.x
xLabel = constants.VEL_AXIS_LABEL
if hasattr(self.rp, 'rp.plottingXRange') and self.rp.plottingXRange is not None:
plt.xlim(self.rp.plottingXRange)
yLabel = constants.FluxUnitsLabels(scaleFlux).FLUX_VEL_AXIS_LABEL
else:
raise Exception("Invalid xAxis argument. Must be either 'wave' or 'vel'. ")
plt.plot(x, self.flux, label='Data')
for i in range(numOfComponents):
labelComp = self.rp.componentLabels
if lineNames is None:
plt.plot(x, components['g%d_' % (i+1)]+components['lin_'], color=self.rp.componentColours[i], linestyle=':', label=labelComp[i])
else:
for j, lineName in enumerate(lineNames):
plt.plot(x, components['g{0}{1}_'.format(lineName.replace('-', ''), i + 1)] + components['lin_'], color=self.rp.componentColours[i], linestyle=':', label=labelComp[i])
# plt.plot(x, components['lin_'], label='lin_')
plt.plot(x, out.best_fit, color='black', linestyle='--', label='Fit')
# plt.plot(x, init, label='init')
plt.legend(loc='upper left')
plt.ylabel(yLabel)
if plotResiduals is True:
frame1 = plt.gca()
frame1.axes.get_xaxis().set_visible(False)
frame2 = fig.add_axes((.1, .1, .8, .2))
plt.plot(x, self.flux - out.best_fit)
plt.axhline(y=0, linestyle='--', color='black')
plt.ylabel('Residuals')
# plt.locator_params(axis='y', nbins=3)
# nbins = len(frame2.get_yticklabels())
frame2.yaxis.set_major_locator(MaxNLocator(nbins=3, prune='upper'))
plt.xlabel(xLabel)
plt.savefig(os.path.join(constants.OUTPUT_DIR, self.rp.regionName, self.lineName + " {0} Component Linear-Gaussian Model".format(numOfComponents)), bbox_inches='tight')
[docs]def plot_profiles(lineNames, rp, nameForComps='', title='', sortedIndex=None, plotAllComps=False, xAxis='vel', logscale=False, ymin=None):
try:
plt.figure(title)
ax = plt.subplot(1, 1, 1)
plt.title(title) # Recombination Emission Lines")
if xAxis == 'wave':
xLabel = constants.WAVE_AXIS_LABEL
yLabel = constants.FluxUnitsLabels(rp.scaleFlux).FLUX_WAVE_AXIS_LABEL
elif xAxis == 'vel':
if hasattr(rp, 'showSystemicVelocity') and rp.showSystemicVelocity is True:
xLabel = constants.DELTA_VEL_AXIS_LABEL
else:
xLabel = constants.VEL_AXIS_LABEL
if hasattr(rp, 'rp.plottingXRange'):
plt.xlim(rp.plottingXRange)
yLabel = constants.FluxUnitsLabels(rp.scaleFlux).FLUX_VEL_AXIS_LABEL
else:
raise Exception("Invalid xAxis argument. Must be either 'wave' or 'vel'. ")
plt.xlabel(xLabel)
plt.ylabel(yLabel)
for i in range(len(lineNames)):
name, x, flux, mod, col, comps, lab = rp.emProfiles[lineNames[i]]['plotInfo']
if xAxis == 'vel' and hasattr(rp, 'showSystemicVelocity') and rp.showSystemicVelocity is True:
x = x - rp.systemicVelocity
ax.plot(x, flux, color=col, label=lab)
ax.plot(x, mod, color=col, linestyle='--')
if plotAllComps is True:
for idx in range(rp.emProfiles[lineNames[i]]['numComps']):
plt.plot(x, comps['g%d_' % (idx + 1)] + comps['lin_'], color=rp.componentColours[idx], linestyle=':')
else:
if name == nameForComps:
for idx in range(rp.emProfiles[lineNames[i]]['numComps']):
plt.plot(x, comps['g%d_' % (idx + 1)] + comps['lin_'], color=rp.componentColours[idx], linestyle=':')
if sortedIndex is not None:
handles, labels = ax.get_legend_handles_labels()
handles2 = [handles[idx] for idx in sortedIndex]
labels2 = [labels[idx] for idx in sortedIndex]
ax.legend(handles2, labels2)
else:
ax.legend()
if logscale is True:
ax.set_yscale('log')
if ymin is not None:
ax.set_ylim(bottom=ymin)
plt.savefig(os.path.join(constants.OUTPUT_DIR, rp.regionName, title.strip(' ') + '.png'), bbox_inches='tight')
except KeyError:
print("SOME IONS IN {0} HAVE NOT BEEN DEFINED.".format(lineNames))