Source code for pyhdust.spectools

# -*- coding:utf-8 -*-

"""PyHdust *spectools* module: spectroscopy tools

Algumas definicoes: para todas as rotinas funcionarem, todos os espectros devem
estar agrupados num mesmo caminho (`path`), em estrutura de
noite/estrelas/espec.

Currently the functions only read ``*.cal.fits`` files. The ``.cal`` suffix means
a header with the following keywords:

    * 'MJD-OBS' or 'MJD' or 'JD' or 'DATE-OBS'
    * 'CRVAL1' + 'CDELT1'

A useful tool for normalizing spectra with Python (not used/imported here):
https://python4esac.github.io/plotting/specnorm.html


:license: GNU GPL v3.0 https://github.com/danmoser/pyhdust/blob/master/LICENSE
"""
from __future__ import print_function
import os as _os
import numpy as _np
import datetime as _dt

# import time as _time
from glob import glob as _glob

# from itertools import product as _iproduct
import pyhdust.phc as _phc
import pyhdust.jdcal as _jdcal
import pyhdust.input as _inp
import pyhdust.stats as _stt
import pyhdust as _hdt
from six import string_types as _strtypes
from shutil import copyfile as _copyfile
import warnings as _warn
import requests as _requests
import astropy.io.fits as _pyfits

# from lmfit import Model as _Model

try:
    # import astropy.coordinates.sky_coordinate.SkyCoord as _SkyCoord
    import matplotlib as _mpl
    import matplotlib.pyplot as _plt
    import matplotlib.patches as _mpatches
    from matplotlib.ticker import MaxNLocator as _MaxNLocator
    import matplotlib.gridspec as _gridspec
    import scipy.interpolate as _interpolate
    from scipy.optimize import curve_fit as _curve_fit
    from scipy.stats import percentileofscore as _pos
    from astropy.modeling import models as _models
    from astropy.modeling import fitting as _fitting
    import pandas as _pd
except ImportError:
    _warn.warn("matplotlib, scipy and/or pandas module not installed!!!")

try:
    import wget as _wget
    import xmltodict as _xmltodict
except ImportError:
    _warn.warn("wget and/or xmltodict module not installed!!!")

# try:
#     import pyqt_fit.nonparam_regression as _smooth
#     from pyqt_fit import npr_methods as _npr_methods
# except ImportError:
#     _warn.warn('pyqt_fit module not installed!!!')

__author__ = "Daniel Moser"
__email__ = "dmfaes@gmail.com"

_outfold = ""


[docs]class Spec(object): """Definicao de classe espectro para conter toda a informacao util para plots e analises. EW in km/s Para configurar uma ou mais linhas: >>> spdtb = Spec() >>> spdtb.lbc == 0 >>> #significa que vetor wl eh vetor velocidades, e nao comprimento de >>> # onda. >>> spdtb.lbc = 6564. >>> spdtb2 = Spec() >>> spdtb2.lbc = 4863. Como usar (hard way): >>> spdtb = Spec() >>> #read spec `flux` and `wl` for a given `lbc` >>> (spdtb.EW, spdtb.EC, spdtb.VR, spdtb.peaksep, spdtb.depthcent,\\ >>> spdtb.F0) = analline(wl, flux, lbc) >>> spdtb.MJD = 1 >>> spdtb.file = file And then: >>> #to record it to the database: >>> spdtb.addspec() Para carregar uma tabela anterior, faca: >>> spdtb = Spec() >>> #(...) read new specs and then join with previous ones >>> spdtb.data = _np.vstack((spdtb.data, _np.loadtxt('hdt/datafile.txt'))) >>> spdtb.metadata = _np.vstack(( spdtb.metadata, \\ >>> _np.loadtxt('hdt/metafile.txt') )) >>> spdtb.updatecount() #to update the counter Ou simplesmente (nome de arquivos default): >>> spdtb.loaddata() """ def __init__( self, wl=None, flux=None, lbc=None, hwidth=1000.0, EW=_np.NaN, EC=_np.NaN, VR=_np.NaN, peaksep=_np.NaN, depthcent=_np.NaN, F0=_np.NaN, dateobs="", MJD=0.0, datereduc="", file="", gaussfit=False, ): self.wl = wl self.flux = flux self.lbc = lbc self.hwidth = hwidth self.EW = EW self.EC = EC self.VR = VR self.peaksep = peaksep self.depthcent = depthcent self.F0 = F0 self.file = file self.datereduc = datereduc self.dateobs = dateobs self.MJD = MJD self.count = 0 self.data = _np.empty(0) self.metadata = _np.empty(0) self.gaussfit = gaussfit
[docs] def reset(self): """Reset the class parameters""" self.wl = None self.flux = None self.EW = _np.NaN self.EC = _np.NaN self.VR = _np.NaN self.peaksep = _np.NaN self.depthcent = _np.NaN self.F0 = _np.NaN self.file = "" self.datereduc = "" self.dateobs = "" self.MJD = 0.0
[docs] def clear(self): """Clear the class parameters""" self.__init__()
[docs] def addspec(self): """Record the class parameters into the database""" self.count += 1 if self.count == 1: self.data = _np.array(self.lastinfo()) self.metadata = _np.array(self.lastmeta()) else: self.data = _np.vstack((self.data, self.lastinfo())) self.metadata = _np.vstack((self.metadata, self.lastmeta()))
# if self.flux != None and self.wl != None and self.lbc != None: # self.savespec()
[docs] def lastinfo(self): """Print the current class parameters (last spec)""" return ( self.MJD, self.EW, self.EC, self.VR, self.peaksep, self.depthcent, self.F0, )
[docs] def lastmeta(self): """Print the current class parameters (last spec)""" return self.MJD, self.dateobs, self.datereduc, self.file
[docs] def savedata( self, datafile=_outfold + "/datafile.txt", metafile=_outfold + "/metafile.txt" ): """Save current table""" header = ["MJD", "EW", "EC", "VR", "peaksep", "depthcent", "F0"] _np.savetxt( datafile, self.data, fmt="%12.6f", header=(len(header) * "{:>12s}").format(*header), ) _np.savetxt(metafile, self.metadata, fmt="%s", delimiter=",") return
[docs] def loaddata( self, datafile=_outfold + "/datafile.txt", metafile=_outfold + "/metafile.txt" ): """Function to load a previous table Usage: >>> spdtb = Spec() >>> spdtb.loaddata() """ self.data = _np.loadtxt(datafile) if _os.path.exists(metafile): self.metadata = _np.genfromtxt(metafile, dtype="str", delimiter=",") self.updatecount() return
def updatecount(self, num=0): if num > 0: self.count = num else: self.count = len(self.data) return
[docs] def loadspec(self, file): """Load a fits file (parameters `wl`, `flux`, `MJD`, `dateobs`, `datareduc` and `file`). Currently, only compatible for standard fits. """ if file.find(".fit") == -1: _warn.warn("# ERROR! `loadspec` unrecognized format!") return ( self.wl, self.flux, self.MJD, self.dateobs, self.datereduc, self.file, ) = loadfits(file) (self.EW, self.EC, self.VR, self.peaksep, self.depthcent, self.F0) = analline( self.wl, self.flux, self.lbc, hwidth=self.hwidth, verb=False, gaussfit=self.gaussfit, ) return
[docs] def plotspec(self, outname=""): """Export current spec into a PNG file.""" if self.wl is None or self.flux is None: _warn.warn("wrong Spec() parameters! {0}".format(self.file)) return if outname == "": path, file = _phc.trimpathname(self.file) outname = _phc.rmext(file) # Normalization: flux = linfit(self.wl, self.flux) wl = self.wl fig = _plt.figure() ax = fig.add_subplot(111) ax.plot(wl, flux) ax.set_ylabel("norm. flux") ax.set_xlabel("wavelength (arb. units)") ax.set_title(outname) _plt.savefig("{0}/{1:.2f}_{2}.png".format(_outfold, self.MJD, outname)) if self.lbc > 0: vels = (self.wl - self.lbc) / self.lbc * _phc.c.cgs * 1e-5 idx = _np.where(_np.abs(vels) <= self.hwidth) flux = linfit(vels[idx], flux[idx]) vels = vels[idx] _plt.clf() ax = fig.add_subplot(111) ax.plot(vels, flux) ax.set_ylabel("norm. flux") ax.set_xlabel("vel. (km/s)") ax.set_title("{0:.2f} {1} {2:.2f}".format(self.MJD, outname, self.lbc)) _plt.savefig( "{0}/{1:.2f}_{2}_{3:.2f}.png".format( _outfold, self.MJD, outname, self.lbc ) ) _plt.close() return
[docs]def shiftfits(fitsfile, newsh=None, verbose=False): """Update FITS spec header for a given shift value.""" imfits = _pyfits.open(fitsfile, mode="update") if "WLSHIFT" in imfits[0].header: if verbose: print( "# WLSHIFT = {0} for {1}".format( imfits[0].header["WLSHIFT"], _phc.trimpathname(fitsfile)[1] ) ) else: if verbose: print( "# No WLSHIFT available for {0}".format(_phc.trimpathname(fitsfile)[1]) ) if newsh is None: newsh = _phc.user_input("Type the new shift: ") if newsh != "": imfits[0].header["WLSHIFT"] = float(newsh) imfits.close() return
[docs]def checkshiftfits(fitslist, lbc=6562.8): """Do *shiftfits* sistematically INPUT: list of files OUTPUT: fits files header updated with WLSHIFT. """ fig, ax = _plt.subplots() for f in fitslist: data = loadfits(f) vel, flx = lineProf(data[0], data[1], lbc=lbc) good = False imfits = _pyfits.open(f) if "WLSHIFT" in imfits[0].header: shift0 = float(imfits[0].header["WLSHIFT"]) else: shift0 = 0.0 shift = 0 while not good: ax.plot([0, 0], [0.7, 1.2], ls="--", color="gray") veli = vel + shift * 3e5 / lbc ax.plot(veli, flx) _plt.show() _plt.draw() ri = _phc.user_input("\n# Is it good?(y/other): ") if ri != "y": try: shift = float(_phc.user_input("Type shift: ")) except ValueError: shift = 0.0 else: good = True ax.cla() if shift != 0: shiftfits(f, newsh=shift + shift0) _plt.close(fig) return
[docs]def loadfits(fitsfile): """load FITS spec Out: wl, flux, MJD, dateobs, datereduc, fitsfile """ imfits = _pyfits.open(fitsfile) flux = imfits[0].data wl = _np.arange(len(flux)) * imfits[0].header["CDELT1"] + imfits[0].header["CRVAL1"] (MJD, dateobs, datereduc) = (0.0, "", "") dtinfo = False if not dtinfo and "MJD-OBS" in imfits[0].header: MJD = float(imfits[0].header["MJD-OBS"]) dtinfo = True if not dtinfo and "MJD" in imfits[0].header: MJD = float(imfits[0].header["MJD"]) dtinfo = True if not dtinfo and "JD" in imfits[0].header: if isinstance(imfits[0].header["JD"], _strtypes): if len(imfits[0].header["JD"]) > 0: MJD = float(imfits[0].header["JD"]) - 2400000.5 dtinfo = True else: MJD = imfits[0].header["JD"] - 2400000.5 dtinfo = True if not dtinfo and "DATE-OBS" in imfits[0].header: if len(imfits[0].header["DATE-OBS"]) > 0: dtobs = imfits[0].header["DATE-OBS"] dtobs, tobs = check_dtobs(dtobs) MJD = _jdcal.gcal2jd(*dtobs)[1] + tobs dtinfo = True if not dtinfo and "FRAME" in imfits[0].header: dtobs = imfits[0].header["FRAME"] dtobs, tobs = check_dtobs(dtobs) MJD = _jdcal.gcal2jd(*dtobs)[1] + tobs dtinfo = True if not dtinfo: MJD = _jdcal.MJD_JD2000 _warn.warn( "No DATE-OBS information is available! {0}\nAssuming " "MJD_JD2000".format(fitsfile) ) if "DATE-OBS" in imfits[0].header: dateobs = imfits[0].header["DATE-OBS"] elif "FRAME" in imfits[0].header: dateobs = imfits[0].header["FRAME"] if "IRAF-TLM" in imfits[0].header: datereduc = imfits[0].header["IRAF-TLM"] elif "DATE" in imfits[0].header: datereduc = imfits[0].header["DATE"] if "WLSHIFT" in imfits[0].header: shift = float(imfits[0].header["WLSHIFT"]) wl += shift imfits.close() return wl, flux, MJD, dateobs, datereduc, fitsfile
[docs]def vac2air(wl): """The IAU standard for conversion from air to vacuum wavelengths is given in Morton (1991, ApJS, 77, 119). For vacuum wavelengths (VAC) in Angstroms, convert to air wavelength (AIR) via: AIR = VAC / (1.0 + 2.735182E-4 + 131.4182 / VAC^2 + 2.76249E8 / VAC^4 ) """ return wl / (1.0 + 2.735182e-4 + 131.4182 / wl**2 + 2.76249e8 / wl**4)
[docs]def air2vac(wl): """The IAU standard for conversion from air to vacuum wavelengths is given in Morton (1991, ApJS, 77, 119). For vacuum wavelengths (VAC) in Angstroms, convert to air wavelength (AIR) via: AIR = VAC / (1.0 + 2.735182E-4 + 131.4182 / VAC^2 + 2.76249E8 / VAC^4 ) Fitting the inverse curve: VAC = AIR / (1.0 - 2.73443407E-4 - 1.31275255E2 / AIR^2 - 2.75708212E8 / AIR^4 ) """ return wl / ( 1.0 - 2.73443407e-04 - 1.31275255e02 / wl**2 - 2.75708212e08 / wl**4 )
[docs]def vel2wl(vel, lbc): """Vel. to wavelength. Vel must be in km/s and output is in `lbc` units.""" wl = (vel / _phc.c.cgs * 1e5 + 1) * lbc return wl
[docs]def wl2vel(wl, lbc): """Wavelength to vel., in km/s. `wl` and `lbc` units must be the same.""" vels = (wl - lbc) / lbc * _phc.c.cgs * 1e-5 return vels
[docs]def hydrogenlinewl(ni, nf): """Generate H line transitions wavelengths in meters for VACUUM Rydberg constant `R` was manually adjusted to fit Halpha and Hbeta lines. """ return (10967850.0 * (1.0 / nf**2 - 1.0 / ni**2)) ** -1.0
[docs]def calcres_R(hwidth=1350, nbins=108): """ (h)Width in km/s. *WARNING*: `width` in HDUST input is only half. To HDUST effective R, multiple the input width by 2 he_re. # R = lbd/Dlbd = _phc.c/Dv = _phc.c*nbins/width # nbins = R*width/_phc.c """ return round(_phc.c.cgs * nbins / hwidth / 1e5)
[docs]def calcres_nbins(R=12000, hwidth=1350): """ (h)Width in km/s. *WARNING*: `width` in HDUST input is only half. To HDUST effective R, multiple the input width by 2 he_re. # R = lbd/Dlbd = _phc.c/Dv = _phc.c*nbins/width # nbins = R*width/_phc.c """ return round(R * hwidth * 1e5 / _phc.c.cgs)
[docs]def lineProf(x, flx, lbc, flxerr=_np.empty(0), hwidth=1000.0, ssize=0.05): """ lineProf() - retorna um array (flx) normalizado e um array x em VELOCIDADES. `lbc` deve fornecido em mesma unidade de x para conversão lambda -> vel. Se vetor x jah esta em vel., usar funcao linfit(). x eh importante, pois y pode ser nao igualmente amostrado. x e y devem estar em ordem crescente. ssize = % do tamanho de y; numero de pontos usados nas extremidades para a media do contínuo. 'ssize' de .5 à 0 (exclusive). OUTPUT: vel (array), flx (array) """ x = (x - lbc) / lbc * _phc.c.cgs * 1e-5 # km/s idx = _np.where(_np.abs(x) <= 1.001 * hwidth) if len(flxerr) == 0: flux = linfit(x[idx], flx[idx], ssize=ssize) # yerr=flxerr, if len(x[idx]) == 0: _warn.warn("Wrong `lbc` in the lineProf function") return x[idx], flux else: flux, flxerr = linfit(x[idx], flx[idx], yerr=flxerr[idx], ssize=ssize) if len(x[idx]) == 0: _warn.warn("Wrong `lbc` in the lineProf function") return x[idx], flux, flxerr
[docs]def linfit(x, y, ssize=0.05, yerr=_np.empty(0)): r""" linfit() - retorna um array (y) normalizado, em posicoes de x x eh importante, pois y pode ser nao igualmente amostrado. x e y devem estar em ordem crescente. ssize = % do tamanho de y; numero de pontos usados nas extremidades para a media do contínuo. 'ssize' de .5 à 0 (exclusive). OUTPUT: y, yerr (if given) .. code:: python #Example: import numpy as np import matplotlib.pyplot as plt import pyhdust.phc as phc import pyhdust.spectools as spt wv = np.linspace(6500, 6600, 101) flx = (np.arange(101)[::-1])/100.+1+phc.normgauss(4, x=wv, xc=6562.79)*5 plt.plot(wv, flx) normflx = spt.linfit(wv, flx) plt.plot(wv, normflx, ls='--') plt.xlabel(r'$\lambda$ ($\AA$)') plt.ylabel('Flux (arb. unit)') .. image:: _static/spt_linfit.png :align: center :width: 500 """ ny = _np.array(y)[:] if ssize < 0 or ssize > 0.5: _warn.warn("Invalid ssize value...", stacklevel=2) ssize = 0 ssize = int(ssize * len(y)) if ssize == 0: ssize = 1 medx0, medx1 = _np.average(x[:ssize]), _np.average(x[-ssize:]) if ssize > 9: medy0, medy1 = _np.median(ny[:ssize]), _np.median(ny[-ssize:]) else: medy0, medy1 = _np.average(ny[:ssize]), _np.average(ny[-ssize:]) new_y = medy0 + (medy1 - medy0) * (x - medx0) / (medx1 - medx0) idx = _np.where(new_y != 0) ny[idx] = ny[idx] / new_y[idx] if len(yerr) == 0.0: return ny else: yerr = yerr / _np.average(new_y) return ny, yerr
[docs]def EWcalc(vels, flux, vw=1000): """ Supoe que o fluxo jah estah normalizado, e vetores ordenados. Devolve o valor EW. """ idx = _np.where(_np.abs(vels) <= vw) outvels = vels[idx] normflux = flux[idx] ew = 0.0 if len(outvels) < 3: # normflux = _np.ones(len(outvels)) return ew for i in range(len(outvels) - 1): dl = outvels[i + 1] - outvels[i] # print(dl) ew += (1.0 - (normflux[i + 1] + normflux[i]) / 2.0) * dl return ew
[docs]def absLineCalc(vels, flux, vw=1000, ssize=0.05): r""" Calculate the line flux (input velocity vector). The `flux` is NON-normalized. ``ssize`` parameter controns the size of flux that will be evaluated at the extreme of the input flux array to determine the continuum level. ``vels = (wv - lbc) / lbc * phc.c.cgs * 1e-5 # km/s`` Output in the same flux units times :math:`\Delta v` (both flux and *v* input units). """ idx = _np.where(_np.abs(vels) <= vw) vels = vels[idx] flux = flux[idx] if ssize < 0 or ssize > 0.5: _warn.warn("Invalid ssize value...", stacklevel=2) ssize = 0 ssize = int(ssize * len(flux)) if ssize == 0: ssize = 1 medx0, medx1 = _np.average(vels[:ssize]), _np.average(vels[-ssize:]) if ssize > 9: medy0, medy1 = _np.median(flux[:ssize]), _np.median(flux[-ssize:]) else: medy0, medy1 = _np.average(flux[:ssize]), _np.average(flux[-ssize:]) new_y = medy0 + (medy1 - medy0) * (vels - medx0) / (medx1 - medx0) base = _np.trapz(new_y, vels) line = _np.trapz(flux, vels) return line - base
[docs]def gauss_fit(x, y, a0=None, x0=None, sig0=None, emission=True, ssize=0.05): """Return the area of a fitting Gaussian.""" if ssize <= 0 or ssize >= 0.5: _warn.warn("Invalid ssize value...", stacklevel=2) ssize = 0 ssize = int(ssize * len(y)) if ssize == 0: ssize = 1 medx0, medx1 = _np.average(x[:ssize]), _np.average(x[-ssize:]) if ssize > 6: medy0, medy1 = _np.median(y[:ssize]), _np.median(y[-ssize:]) else: medy0, medy1 = _np.average(y[:ssize]), _np.average(y[-ssize:]) new_y = medy0 + (medy1 - medy0) * (x - medx0) / (medx1 - medx0) q = 95 func = _np.max if not emission: func = _np.min q = 5 if a0 is None: a0 = _np.abs(_np.percentile(y - new_y, q)) - _np.median(y - new_y) if x0 is None: x0 = x[_np.where(y - new_y == func(y - new_y))] if sig0 is None: sig0 = (_np.max(x) - _np.min(x)) / 10.0 g_init = _models.Gaussian1D(amplitude=a0, mean=x0, stddev=sig0) g_init.bounds["amplitude"] = (0, 2 * a0) # g_init.verblevel = 0 fit_g = _fitting.LevMarLSQFitter() # print(a0, x0, sig0, _np.shape(a0), _np.shape(x0), _np.shape(sig0), # _np.shape(x), _np.shape(y)) g = fit_g(g_init, x, y - new_y) # print(g.parameters[0]) return g, new_y
[docs]def absLineDeb( wv, flux, lb0, lb1, vw=1000, ssize=0.05, a0=None, sig0=None, allout=False ): """Return the area of a fitting Gaussian with debblending.""" lbc = _np.average([lb0, lb1]) vels = (wv - lbc) / lbc * _phc.c.cgs * 1e-5 idx = _np.where(_np.abs(vels) <= vw * (1 + ssize)) x = wv[idx] y = flux[idx] if ssize <= 0 or ssize >= 0.5: _warn.warn("Invalid ssize value...", stacklevel=2) ssize = 0 ssize = int(ssize * len(y)) if ssize == 0: ssize = 1 nfxsig = _np.std(y) emission = True if _np.percentile(y, 5) + nfxsig < 1: emission = False if _np.percentile(y, 95) - 1.5 * nfxsig > 1: emission = True medx0, medx1 = _np.average(x[:ssize]), _np.average(x[-ssize:]) if ssize > 6: medy0, medy1 = _np.median(y[:ssize]), _np.median(y[-ssize:]) else: medy0, medy1 = _np.average(y[:ssize]), _np.average(y[-ssize:]) new_y = medy0 + (medy1 - medy0) * (x - medx0) / (medx1 - medx0) q = 95 if not emission: q = 5 if a0 is None: a0 = _np.abs(_np.percentile(y - new_y, q)) - _np.median(y - new_y) if sig0 is None: sig0 = (_np.max(x) - _np.min(x)) / 10.0 g1 = _models.Gaussian1D(a0, lb0, sig0) g1.bounds["amplitude"] = (0, 2 * a0) g1.bounds["mean"] = (lb0 * 0.9, lb1 * 0.99) g2 = _models.Gaussian1D(a0, lb1, sig0) g1.bounds["amplitude"] = (0, a0) g2.bounds["mean"] = (lb0 * 1.01, lb1 * 1.1) gg_init = g1 + g2 # gg_init.verblevel = 0 fitter = _fitting.SLSQPLSQFitter() gg = fitter(gg_init, x, y - new_y, verblevel=0) # print(gg.parameters[0], gg.parameters[0+3]) if not allout: return ( gg.parameters[0] * gg.parameters[2] * _np.sqrt(2 * _np.pi), gg.parameters[0 + 3] * gg.parameters[2 + 3] * _np.sqrt(2 * _np.pi), ) else: return gg, new_y, idx
[docs]def absLineCalcWave( wv, flux, lbc, vw=1000, ssize=0.05, gauss=False, allout=False, spcas=None ): r""" Calculate the line flux (input velocity vector). The `flux` is NON-normalized. ``ssize`` parameter controns the size of flux that will be evaluated at the extreme of the input flux array to determine the continuum level. ``vels = (wv - lbc) / lbc * phc.c.cgs * 1e-5 # km/s`` Output in the same flux units times :math:`\Delta v` (both flux and *v* input units). """ vels = (wv - lbc) / lbc * _phc.c.cgs * 1e-5 idx = _np.where(_np.abs(vels) <= vw) wv = wv[idx] flux = flux[idx] if not gauss: if ssize <= 0 or ssize >= 0.5: _warn.warn("Invalid ssize value...", stacklevel=2) ssize = 0 ssize = int(ssize * len(flux)) if ssize == 0: ssize = 1 medx0, medx1 = _np.average(wv[:ssize]), _np.average(wv[-ssize:]) if ssize > 6: medy0, medy1 = _np.median(flux[:ssize]), _np.median(flux[-ssize:]) else: medy0, medy1 = (_np.average(flux[:ssize]), _np.average(flux[-ssize:])) new_y = medy0 + (medy1 - medy0) * (wv - medx0) / (medx1 - medx0) if spcas is not None: if spcas == 0: idx = _np.where(wv > 25.95 * 1e4) flux[idx] = new_y[idx] elif spcas == 1: idx = _np.where(wv < 25.95 * 1e4) # print(len(idx[0])) flux[idx] = new_y[idx] base = _np.trapz(new_y, wv) line = _np.trapz(flux, wv) if not allout: return line - base else: return line, base, idx else: # nflx = linfit(wv, flux) nflx = flux nfxsig = _np.std(nflx) emission = True if _np.percentile(nflx, 5) + nfxsig < 1: emission = False if _np.percentile(nflx, 95) - 1.5 * nfxsig > 1: emission = True g, newy = gauss_fit(wv, nflx, emission=emission, ssize=ssize) if not allout: return g.parameters[0] * g.parameters[2] * _np.sqrt(2 * _np.pi) else: return g, newy, idx
[docs]def ECcalc(vels, flux, ssize=0.05, gaussfit=False, doublegf=True): """ Supoe que o fluxo jah estah normalizado, e vetores ordenados. If `gaussfit=False`, the single maximum value is taken. If `gaussfit=True`, then a single (`doublegf=False`) or a double (`doublegf=True`) Gaussian fit is performed over the line profile to determine its maximum. Calcula o topo da emissao da linha, e retorna em que velocidade ela ocorre. """ vels = _np.array(vels) flux = _np.array(flux) # if lncore > 0: # idx = _np.where(_np.abs(vels) < lncore) # vels = vels[idx] # flux = flux[idx] if len(flux) < 5: return _np.NaN, 0.0 if not gaussfit: idx = _np.where(_np.max(flux) == flux) if flux[idx][0] < 1: return _np.NaN, 0.0 if len(idx[0]) > 1: idx = idx[0][0] return flux[idx][0], vels[idx][0] else: # check if there is a peak ssize = int(ssize * len(vels)) if ssize == 0: ssize = 1 contmax = _np.max(_np.append(flux[:ssize], flux[-ssize:])) fluxmax = _np.max(flux) if fluxmax < 1.01 * contmax: return _np.NaN, 0.0 # Define model function to be used to fit to the data above def gauss(x, *p): A, mu, sigma = p return A * _np.exp(-((x - mu) ** 2) / (2.0 * sigma**2)) + 1 # ivc = _np.abs(vels - 0).argmin() if doublegf: i0 = _np.abs(flux[:ivc] - _np.max(flux[:ivc])).argmin() i1 = _np.abs(flux[ivc:] - _np.max(flux[ivc:])).argmin() + ivc try: p0 = [1.0, vels[i0], 40.0] coeff0, tmp = _curve_fit(gauss, vels[:ivc], flux[:ivc], p0=p0) p1 = [1.0, vels[i1], 40.0] coeff1, tmp = _curve_fit(gauss, vels[ivc:], flux[ivc:], p0=p1) ECs = _np.array([coeff0[0] + 1.0, coeff1[0] + 1.0]) EC = _np.max(ECs) idx = _np.where(ECs == EC)[0] # vel = _np.abs(coeff0[1] / 2) + _np.abs(coeff1[1] / 2) if idx == 0: vel = coeff0[1] else: vel = coeff1[1] return EC, vel except ValueError: return _np.NaN, 0.0 else: try: p0 = [1.0, 0, 40.0] coeff0, tmp = _curve_fit(gauss, vels, flux, p0=p0) EC = coeff0[0] + 1.0 return EC, coeff0[1] except ValueError: return _np.NaN, 0.0
[docs]def VRcalc(vels, flux, vw=1000, gaussfit=False, ssize=0.05): """ Calcula o PICO para os dois lados (azul/vermelho) da linha, ajustando a velocidade de repouso (TBD). """ # calcula e aplica correcao de vel. repousp vc = 0.0 vels += vc # faz o teste de tamanho if len(vels) < 5: vw = 0 ew0, ew1 = (_np.NaN, _np.NaN) return ew0, ew1, vc # corta em vw idx = _np.where(_np.abs(vels) <= vw) outvels = vels[idx] normflux = flux[idx] # ivc = _np.abs(outvels - 0).argmin() if not gaussfit: V = _np.max(normflux[:ivc]) R = _np.max(normflux[ivc:]) else: # check if there is a peak ssize = int(ssize * len(vels)) if ssize == 0: ssize = 1 contmax = _np.max(_np.append(flux[:ssize], flux[-ssize:])) fluxmax = _np.max(flux) if fluxmax < 1.01 * contmax: # print('# Bad profile!') return 0, 0, vc # Define model function to be used to fit to the data above def gauss(x, *p): A, mu, sigma = p return A * _np.exp(-((x - mu) ** 2) / (2.0 * sigma**2)) + 1.0 # ivc = _np.abs(vels - 0).argmin() i0 = _np.abs(flux[:ivc] - _np.max(flux[:ivc])).argmin() i1 = _np.abs(flux[ivc:] - _np.max(flux[ivc:])).argmin() + ivc try: p0 = [1.0, vels[i0], 40.0] coeff0, tmp = _curve_fit(gauss, vels[:ivc], flux[:ivc], p0=p0) p1 = [1.0, vels[i1], 40.0] coeff1, tmp = _curve_fit(gauss, vels[ivc:], flux[ivc:], p0=p1) V = coeff0[0] + 1.0 R = coeff1[0] + 1.0 except ValueError: return 1.0, 1.0, vc return V, R, vc
[docs]def PScalc(vels, flux, vc=0.0, ssize=0.05, gaussfit=False): """ Calcula peak_separation `doublegaussfit` = True, do it before and after zero velocity. False, use maximum (default). """ # check if there is a peak ssize = int(ssize * len(vels)) if ssize == 0: ssize = 1 contmax = _np.max(_np.append(flux[:ssize], flux[-ssize:])) fluxmax = _np.max(flux) if fluxmax < 1.01 * contmax: return _np.NaN, _np.NaN vels += vc ivc = _np.abs(vels - 0).argmin() i0 = _np.abs(flux[:ivc] - _np.max(flux[:ivc])).argmin() i1 = _np.abs(flux[ivc:] - _np.max(flux[ivc:])).argmin() + ivc if not gaussfit: return vels[i0], vels[i1] else: # Define model function to be used to fit to the data above def gauss(x, *p): A, mu, sigma = p return A * _np.exp(-((x - mu) ** 2) / (2.0 * sigma**2)) + 1.0 # try: p0 = [1.0, vels[i0], 20.0] coeff0, tmp = _curve_fit(gauss, vels[:ivc], flux[:ivc], p0=p0) p1 = [1.0, vels[i1], 20.0] coeff1, tmp = _curve_fit(gauss, vels[ivc:], flux[ivc:], p0=p1) return coeff0[1], coeff1[1] except ValueError: print("# PScalc error...") # print vels[i0], flux[i0], vels[i1], flux[i1] return 0, 0
[docs]def FWHM(vels, flux, halfmax, vmax=350.0, flxincr=0.01): """Calc. FWHM (Full-Width at Half Maximum) based on the value of the Half Maximum TODO: Gaussfit""" if len(vels) < 5 or len(flux) < 5: _warn.warn("# No valid line profile for FHWM") return _np.NaN vels = _np.array(vels) flux = _np.array(flux) # remove vels bigger than maxvel idx = _np.where(_np.abs(vels) < vmax) vels = vels[idx] flux = flux[idx] difflx = _np.abs(flux - halfmax) # remove diff bigger than hmf*halfmax i = 0 idx = _np.where(difflx < halfmax * flxincr * i) while len(vels[idx]) < 2: i += 1 idx = _np.where(difflx < halfmax * flxincr * i) vels = vels[idx] difflx = difflx[idx] # # difvels: ordered vels based on the flux difference # idx = _np.argsort(difflx) # difvels = vels[idx][:4] # # difvels: ordered vels closest to the 0 vel. idx = _np.argsort(_np.abs(vels)) difvels = vels[idx][:2] return _np.sum(_np.abs(difvels))
[docs]def DCcalc(vels, flux, vmax=None, vc=0.0, ssize=0.05): """ Calculo, na presenca de emissao, da profundidade do reverso central. Se fluxo máximo < 1.01*contínuo, retorna TODO: gauss fit Return flux at `vmax` (maximum considered velocity), and flux at `v0`. Depth of the central reversal is `flux[ivmax] - flux[ivc]`. """ if len(flux) < 5: return _np.NaN, _np.NaN vels += vc ivc = _np.abs(vels - 0).argmin() # check if there is a peak ssize = int(ssize * len(vels)) if ssize == 0: ssize = 1 contmax = _np.max(_np.append(flux[:ssize], flux[-ssize:])) fluxmax = _np.max(flux) if fluxmax < 1.01 * contmax: return flux[ivc], flux[ivc] # if a vmax is not given... if not isinstance(vmax, (int, float)): vmax = _np.abs(flux - _np.max(flux)).argmin() vmax = vels[vmax] ivmax = _np.abs(vels - vmax).argmin() return flux[ivmax], flux[ivc]
[docs]def analline(lbd, flux, lbdc, hwidth=1000, verb=True, gaussfit=False, doublegf=True): """ Return the analysis of a line. Both lbd and flux need to be ordered (a normalization IS FORCED). lbd,lbdc must have the same unit, and width in km/s is required. The line will be cutted so that the total DeltaLambda will be 2*width if `lbdc` <= 0, lbd array is assumed to be a velocity array (in km/s)! | EXAMPLE: Using sed2data. lbc = 0.6565 (halpha), obs = 1 (width==1000) | analline(lbd=sed2data[obs,:,2], flux=sed2data[obs,:,3], lbc=lbc) The EW is the equivalent width in km/s, EC is the Emission over Continuum ratio, VR ratio, peaksep in km/s, FWHM is the Full-Width at Half Maximum (emission as maximum) F0 is the depth of rest wavelength normalized to the continuum OUTPUT: EW, EC, VR, peaksep, FWHM, F0 """ if lbdc > 0: vels = (lbd - lbdc) / lbdc * _phc.c.cgs * 1e-5 else: vels = lbd # check if the file have the desired info. if vels[0] > -hwidth * 0.95 or vels[-1] < hwidth * 0.95: if verb: _warn.warn("spec out of range (wavelength)! Check hwidth!") return _np.NaN, _np.NaN, _np.NaN, _np.NaN, _np.NaN, _np.NaN idx = _np.where(_np.abs(vels) <= hwidth) vels = vels[idx] flux = flux[idx] # Normalization: flux = linfit(vels, flux) # Output: EW = EWcalc(vels, flux, vw=hwidth) EC, velEC = ECcalc(vels, flux, gaussfit=gaussfit, doublegf=doublegf) ew0, ew1, vc = VRcalc(vels, flux, vw=hwidth, gaussfit=gaussfit) if ew1 == 0 or EC is _np.NaN: VR = 1 else: VR = ew0 / ew1 if EC is _np.NaN: peaksep = _np.NaN else: vel0, vel1 = PScalc(vels, flux, gaussfit=gaussfit) peaksep = vel1 - vel0 if peaksep is _np.NaN: EC = peaksep VR = peaksep EC2, F0 = DCcalc(vels, flux, vmax=velEC) # depthcent = EC2 - F0 if EC2 < 1: EC2 = 1.0 fwhm = FWHM(vels, flux, (EC2 + F0) / 2.0, vmax=_np.abs(velEC)) else: fwhm = FWHM(vels, flux, EC / 2, vmax=hwidth) return EW, EC, VR, peaksep, fwhm, F0
[docs]def kurlog(file=None, output=None): """Generate a list of teff and logg present in a Kurucz file. If output is not specified, it is saved as `file`+.log""" if file is None: file = _os.path.join(_hdt.hdtpath(), "refs", "fp00k0.pck") teffs = [] loggs = [] fp = open(file) for i, line in enumerate(fp): if line.find("TEFF") > -1: teffs += [float(line.split()[1])] loggs += [float(line.split()[3])] fp.close() return teffs, loggs
[docs]def kuruczflux(teff, logg, wavrange=None): """Return fluxes from a Kurucz model. Fluxes are in ergs/cm**2/s/hz/ster and wavelength in nm (wavrange must be in nm). As tabelas do Kurucz sao erg/s/sr/cm2/Hz. Entao, tem q multiplicar 4pi para ter o fluxo observado. Abaixo, a conversao das unidades Kurucz para erg/s/cm2/A usuais. # erg/s/sr/cm2/Hz: lK15k, K15k, info = spt.kuruczflux(5777, 3., range=[100,1000]) lK15k*= 1e1 #Ang K15k = 2.99792458E+18*K15k*(lK15k)**-2*4*np.pi #erg/s/cm2/A OUTPUT: wv, flux, info""" kurfile = _os.path.join(_hdt.hdtpath(), "refs", "fp00k0.pck") kurwvlines = 174 - 22 kurflxcol = 10 # wave read = _phc.readrange(kurfile, 22, 22 + kurwvlines) wave = _np.array([val for line in read for val in line.split()], dtype=float) # choose best bestT = _np.inf bestg = _np.inf fp = open(kurfile) for i, line in enumerate(fp): if line.find("TEFF") > -1: readT = float(line.split()[1]) if _np.abs(readT - teff) <= _np.abs(bestT - teff): bestT = readT readg = float(line.split()[3]) if _np.abs(readg - logg) <= _np.abs(bestg - logg): bestg = readg i0 = i + 1 fp.close() best = [bestT, bestg] # read best flux read = _phc.readrange(kurfile, i0, i0 + kurwvlines) flux = _np.array( [ val for line in read for val in ( line[i : i + kurflxcol] for i in range(0, len(line) - 1, kurflxcol) ) ], dtype=float, ) # cut range if wavrange is None: return wave, flux, best else: idx = _np.where((wave > wavrange[0]) & (wave < wavrange[-1])) return wave[idx], flux[idx], best
[docs]def plot_all( fs2list, obsl=None, fmt=["png"], out=None, lbc=0.6564, hwidth=1000.0, solidfiles=True, xax=0, philist=[0], figname=None, nolabels=False, obsidx=False, ): r"""plot_all-like routine ``obsl`` list, in degrees. It will find the closest values. It the find :math:`\Delta\theta > 3^\circ`, a warning message is displayed. The ``obs`` index can be used if ``obsidx = True``. ``solinefiles`` keep solid lines for files (changes only colors), and change line shapes between observers. If ``False``, do the opposite. """ if isinstance(fs2list, _strtypes): fs2list = [fs2list] if not isinstance(obsl, list) and obsl is not None: _warn.warn("Wrong `obsl` format (None or list)", stacklevel=2) return fig = _plt.figure(figsize=(9, 9)) lins, cols = (3, 2) gs = _gridspec.GridSpec(lins, cols) gs.update(hspace=0.25) axt = _plt.subplot(gs[0, 1]) ax0 = _plt.subplot(gs[1, 0]) ax1 = _plt.subplot(gs[1, 1]) ax2 = _plt.subplot(gs[2, 0]) ax3 = _plt.subplot(gs[2, 1]) xtitle = "radial scale" for f in fs2list: m = _inp.HdustMod(f) tfile = _os.path.join(m.proj, m.modn, m.modn + m.suf + "*avg.temp") tfile = _glob(tfile) if len(tfile) > 0: npt, rplus, lev = (0, 0, 0) tfile.sort() tfile = tfile[-1] ( ncr, ncmu, ncphi, nLTE, nNLTE, Rstar, Ra, beta, data, pcr, pcmu, pcphi, ) = _hdt.readtemp(tfile) for phiidx in range(0, len(philist)): icphi = philist[phiidx] x = data[0, :, 0, icphi] if xax == 0: x = _np.log10(x / Rstar - 1.0) xtitle = r"$\log_{10}(r/R_*-1)$" elif xax == 1: x = x / Rstar xtitle = r"$r/R_*$" elif xax == 2: x = 1.0 - Rstar / x xtitle = r"$1-R_*/r$" y = data[3 + lev, :, ncmu / 2 + npt + rplus, icphi] y = y / 1000.0 axt.plot(x, y, "o-") fs2d = _hdt.readfullsed2(f) iobs = range(len(fs2d)) if obsl is not None: if not obsidx: iobs = [ _phc.find_nearest( _np.arccos(fs2d[:, 0, 0]) * 180 / _np.pi, ob, idx=True ) for ob in obsl ] else: iobs = obsl for ob in iobs: obfmt = r"{:.1f}$^\circ$, {:.1f}$^\circ$".format( _np.arccos(fs2d[ob, 0, 0]) * 180 / _np.pi, _np.arccos(fs2d[ob, 0, 1]) ) if solidfiles: pdict = { "color": _phc.cycles(fs2list.index(f)), "dashes": _phc.dashes(iobs.index(ob)), } else: pdict = { "dashes": _phc.dashes(fs2list.index(f)), "color": _phc.cycles(iobs.index(ob)), } ax0.plot( fs2d[ob, :, 2], fs2d[ob, :, 3], label=_os.path.basename(f), **pdict ) ax1.plot(fs2d[ob, :, 2], fs2d[ob, :, 3], label=obfmt, **pdict) ax2.plot(fs2d[ob, :, 2], fs2d[ob, :, 7] * 100, **pdict) ax3.plot( *lineProf(fs2d[ob, :, 2], fs2d[ob, :, 3], lbc=lbc, hwidth=hwidth), **pdict ) axt.set_xlabel(xtitle, labelpad=1) axt.set_ylabel(r"Temperature (10$^3$ K)") ax0.set_xlim([0.37, 1.0]) ax0.autoscale(axis="y", tight=True) ax0.set_yscale("log") ax0.set_xlabel(r"$\mu$m") ax0.set_ylabel(r"$\lambda F_\lambda/F$") ax1.set_xlim([1.0, 100.0]) ax1.autoscale(axis="y", tight=True) ax1.set_xscale("log") ax1.set_yscale("log") ax1.set_xlabel(r"$\mu$m", labelpad=1) ax1.yaxis.tick_right() ax1.yaxis.set_label_position("right") ax1.yaxis.set_ticks_position("both") ax1.set_ylabel(r"$\lambda F_\lambda/F$") ax2.set_xlim([0.37, 0.9]) ax2.autoscale(axis="y", tight=True) ax2.set_xlabel(r"$\mu$m") ax2.set_ylabel("P (%)") ax3.set_xlim([-hwidth, hwidth]) ax3.set_xlabel(r"km/s") ax3.yaxis.tick_right() ax3.yaxis.set_label_position("right") ax3.yaxis.set_ticks_position("both") ax3.set_ylabel("Normalized Flux") if not nolabels: ax1.legend( loc="best", fancybox=True, framealpha=0.5, fontsize=9, labelspacing=0.05 ) if len(fs2list) > 1 and not nolabels: ax0.legend( loc="best", fancybox=True, framealpha=0.5, fontsize=8, labelspacing=0.05 ) _phc.savefig(fig, fmt=fmt, figname=figname) # figname='outname') return
[docs]def splitKurucz(filen, path=None): """ Split atmospheric Kurucz file (e.g., 'ap00k0.dat') into individual models. INPUT: file, path (strings) OUTPUT: *files written """ if path is None: path = _os.getcwd() allk = _np.loadtxt(filen, dtype=str, delimiter="\n") src = _os.path.splitext(_os.path.split(filen)[1])[0] if not _os.path.exists(src): _os.mkdir(src) src = _os.path.join(src, src) for i in range(0, len(allk) - 1): if "EFF" in allk[i]: iref = i teff = int(allk[i].split()[1][:-1]) logg = float(allk[i].split()[3][:-3]) elif "DECK6 72" in allk[i]: allk[i] = allk[i].replace("DECK6 72", "DECK6 71") elif "EFF" in allk[i + 1]: _np.savetxt( src + "tef%05dg%.1f.dat" % (teff, logg), allk[iref : i + 1], fmt="%s" ) _np.savetxt(src + "tef%05dg%.1f.dat" % (teff, logg), allk[iref:], fmt="%s") return
[docs]def writeFits( flx, lbd, extrahead=None, savename=None, verbose=False, path=None, lbdc=None, externhd=None, ): """Write a 1D spectra FITS. | INPUT: flux array, lbd array, extrahead flag+info, save name. | - lbd array: if len(lbd)==2: lbd = [CRVAL1, CDELT1] | else: CDELT1 = (lbd[-1]-lbd[0])/(len(lbd)-1) | CRVAL1 = lbd[0] | WARNING: lbd must be in ANGSTROMS (FITS default). It can also be | velocities. In this case, it must be in km/s and lbdc is given in | ANGSTROM. | - extrahead: matrix (n,2). Example: [['OBJECT','Achernar'], ['COMMENT', | 'value']] `externhd` = copy the header from an external file. OUTPUT: write FITS file. """ if path is None or path == "": path = _os.getcwd() if path[-1] != ["/"]: path += "/" if lbdc is not None: lbd = (lbd / _phc.c.cgs * 1e5 + 1) * lbdc hdu = _pyfits.PrimaryHDU(flx) hdulist = _pyfits.HDUList([hdu]) if externhd is not None: extf = _pyfits.open(externhd) hdulist[0].header = extf[0].header hdulist[0].header["BZERO"] = 0.0 hdulist[0].header["CRVAL1"] = lbd[0] if len(lbd) == 2: hdulist[0].header["CDELT1"] = lbd[1] else: hdulist[0].header["CDELT1"] = (lbd[-1] - lbd[0]) / (len(lbd) - 1) if extrahead is not None: for e in extrahead: hdulist[0].header[e[0]] = e[1] if savename is None: savename = "spec_{0}".format(_phc.dtflag()) if savename.find(".fit") == -1: savename += ".fits" hdu.writeto(path + savename, overwrite=True) if verbose: print("# FITS file {0}{1} saved!".format(path, savename)) return
[docs]def averagespecs(speclist, n=999, path="", objname="OBJECT"): """Average specs taken in the same MJD, in groups of approx. `n` elements. OUTPUT: Files written.""" if len(path) > 0 and path[-1] != "/": path += "/" speclist = _np.array(speclist) obsdates = [] for sp in speclist: data = loadfits(sp) obsdates.append(data[2]) obsdates = _np.array(obsdates) # Sorting things idx = _np.argsort(obsdates) speclist = speclist[idx] obsdates = obsdates[idx] # Same day iMJD = [] for m in obsdates: iMJD.append(divmod(m, 1)[0]) idxMJD = _np.unique(iMJD) # Do the avgs based on the MJD for i in idxMJD: idx = _np.where(iMJD == i) N = len(speclist[idx]) for j in _phc.splitequal(N / n, N): fidx = speclist[idx][j[0] : j[1]] data = loadfits(fidx[0]) wl = data[0] newdate = _np.average(obsdates[idx][j[0] : j[1]]) MJD = int(divmod(newdate, 1)[0]) MJDfrac = int(round(divmod(newdate, 1)[1] * 10000)) fluxes = _np.zeros(len(wl)) for f in fidx: data = loadfits(f) fluxes += _np.interp(wl, data[0], data[1]) flx = fluxes / len(fidx) outname = "alpEri_PUCHEROS_VIS_{0}_{1:04d}_avg.fits".format(MJD, MJDfrac) writeFits( flx, wl, savename=outname, path=path, extrahead=[ ["OBJECT", objname], ["Comment", "Averaged from {0} spectra".format(len(fidx))], ["MJD-OBS", newdate], ], ) return
[docs]def cardelli(lbd, flux, ebv=0.0, Rv=3.1): """ Milky Way Extinction law from Cardelli et al. 1989 `lbd` must be in microns. OUTPUT: Corrected flux. """ x = 1.0 / _np.array(lbd) # CCM x is 1/microns a, b = _np.ndarray(x.shape, x.dtype), _np.ndarray(x.shape, x.dtype) if any((x < 0.3) | (10 < x)): raise ValueError("Some wavelengths outside CCM 89 extinction curve " + "range") irs = (0.3 <= x) & (x <= 1.1) opts = (1.1 <= x) & (x <= 3.3) nuv1s = (3.3 <= x) & (x <= 5.9) nuv2s = (5.9 <= x) & (x <= 8) fuvs = (8 <= x) & (x <= 10) # CCM Infrared a[irs] = 0.574 * x[irs] ** 1.61 b[irs] = -0.527 * x[irs] ** 1.61 # CCM NIR/optical a[opts] = _np.polyval( (0.32999, -0.7753, 0.01979, 0.72085, -0.02427, -0.50447, 0.17699, 1), x[opts] - 1.82, ) b[opts] = _np.polyval( (-2.09002, 5.3026, -0.62251, -5.38434, 1.07233, 2.28305, 1.41338, 0), x[opts] - 1.82, ) # CCM NUV a[nuv1s] = 1.752 - 0.316 * x[nuv1s] - 0.104 / ((x[nuv1s] - 4.67) ** 2 + 0.341) b[nuv1s] = -3.09 + 1.825 * x[nuv1s] + 1.206 / ((x[nuv1s] - 4.62) ** 2 + 0.263) y = x[nuv2s] - 5.9 Fa = -0.04473 * y**2 - 0.009779 * y**3 Fb = -0.2130 * y**2 - 0.1207 * y**3 a[nuv2s] = 1.752 - 0.316 * x[nuv2s] - 0.104 / ((x[nuv2s] - 4.67) ** 2 + 0.341) + Fa b[nuv2s] = -3.09 + 1.825 * x[nuv2s] + 1.206 / ((x[nuv2s] - 4.62) ** 2 + 0.263) + Fb # CCM FUV a[fuvs] = _np.polyval((-0.070, 0.137, -0.628, -1.073), x[fuvs] - 8) b[fuvs] = _np.polyval((0.374, -0.42, 4.257, 13.67), x[fuvs] - 8) AlbAv = a + b / Rv return flux * 10 ** (-AlbAv * Rv * ebv / 2.5)
[docs]def fitzpatrick(wave, flux, ebv, Rv=3.1, LMC2=False, AVGLMC=False): """ Deredden a flux vector using the Fitzpatrick (1999) parameterization Parameters ---------- wave : array Wavelength in Angstrom flux : array Calibrated flux vector, same number of elements as wave. ebv : float, optional Color excess E(B-V). If a positive ebv is supplied, then fluxes will be dereddened rather than reddened. The default is 3.1. AVGLMC : boolean If True, then the default fit parameters c1,c2,c3,c4,gamma,x0 are set to the average values determined for reddening in the general Large Magellanic Cloud (LMC) field by Misselt et al. (1999, ApJ, 515, 128). The default is False. LMC2 : boolean If True, the fit parameters are set to the values determined for the LMC2 field (including 30 Dor) by Misselt et al. Note that neither `AVGLMC` nor `LMC2` will alter the default value of Rv, which is poorly known for the LMC. Returns ------- new_flux : array Dereddened flux vector, same units and number of elements as input flux. Notes ----- .. note:: This function was ported from the IDL Astronomy User's Library. The following five parameters allow the user to customize the adopted extinction curve. For example, see Clayton et al. (2003, ApJ, 588, 871) for examples of these parameters in different interstellar environments. - x0: Centroid of 2200 A bump in microns (default = 4.596) - gamma: Width of 2200 A bump in microns (default =0.99) - c3: Strength of the 2200 A bump (default = 3.23) - c4: FUV curvature (default = 0.41) - c2: Slope of the linear UV extinction component (default = -0.824 + 4.717/R) - c1: Intercept of the linear UV extinction component (default = 2.030 - 3.007*c2 """ # x = 10000./ wave # Convert to inverse microns x = 1.0 / wave # microns curve = x * 0.0 # Set some standard values: x0 = 4.596 gamma = 0.99 c3 = 3.23 c4 = 0.41 c2 = -0.824 + 4.717 / Rv c1 = 2.030 - 3.007 * c2 if LMC2: x0 = 4.626 gamma = 1.05 c4 = 0.42 c3 = 1.92 c2 = 1.31 c1 = -2.16 elif AVGLMC: x0 = 4.596 gamma = 0.91 c4 = 0.64 c3 = 2.73 c2 = 1.11 c1 = -1.28 # Compute UV portion of A(lambda)/E(B-V) curve using FM fitting function # and R-dependent coefficients xcutuv = _np.array([10000.0 / 2700.0]) xspluv = 10000.0 / _np.array([2700.0, 2600.0]) iuv = _np.where(x >= xcutuv)[0] N_UV = len(iuv) iopir = _np.where(x < xcutuv)[0] Nopir = len(iopir) if N_UV > 0: xuv = _np.concatenate((xspluv, x[iuv])) else: xuv = xspluv yuv = c1 + c2 * xuv yuv = yuv + c3 * xuv**2 / ((xuv**2 - x0**2) ** 2 + (xuv * gamma) ** 2) yuv = yuv + c4 * ( 0.5392 * (_np.maximum(xuv, 5.9) - 5.9) ** 2 + 0.05644 * (_np.maximum(xuv, 5.9) - 5.9) ** 3 ) yuv = yuv + Rv yspluv = yuv[0:2] # save spline points if N_UV > 0: curve[iuv] = yuv[2::] # remove spline points # Compute optical portion of A(lambda)/E(B-V) curve # using cubic spline anchored in UV, optical, and IR xsplopir = _np.concatenate( ([0], 10000.0 / _np.array([26500.0, 12200.0, 6000.0, 5470.0, 4670.0, 4110.0])) ) ysplir = _np.array([0.0, 0.26469, 0.82925]) * Rv / 3.1 ysplop = _np.array( ( _np.polyval([-4.22809e-01, 1.00270, 2.13572e-04][::-1], Rv), _np.polyval([-5.13540e-02, 1.00216, -7.35778e-05][::-1], Rv), _np.polyval([7.00127e-01, 1.00184, -3.32598e-05][::-1], Rv), _np.polyval( [1.19456, 1.01707, -5.46959e-03, 7.97809e-04, -4.45636e-05][::-1], Rv ), ) ) ysplopir = _np.concatenate((ysplir, ysplop)) if Nopir > 0: tck = _interpolate.splrep( _np.concatenate((xsplopir, xspluv)), _np.concatenate((ysplopir, yspluv)), s=0, ) curve[iopir] = _interpolate.splev(x[iopir], tck) # Now apply extinction correction to input flux vector curve *= -ebv return flux * 10.0 ** (0.4 * curve)
[docs]def sort_specs(specs, path=None): """Specs in an (N,2) array, where specs[:,0] are the files paths and specs[:,1] the instrument name. Return ordered_specs""" if path is not None: if path[-1] != "/": path += "/" else: path = "" nsp = _np.shape(specs)[0] MJDs = _np.zeros(nsp) specs = _np.array(specs) lims = [_np.inf, -_np.inf] for i in range(nsp): wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(path + specs[i][0]) MJDs[i] = MJD if MJDs[i] < lims[0]: lims[0] = MJDs[i] if MJDs[i] > lims[1]: lims[1] = MJDs[i] return specs[MJDs.argsort()], lims
[docs]def convgaussFunc( wl, flx, lbc, hwidth=1000.0, convgauss=0.0, frac=0.0, ssize=0.05, wlout=False ): """Do a Gaussian convolution of a given Line Profile with a Gaussian. `wl`, `flx`, `lbc`: wavelenght and flux of the spectrum containing the line, and its value. `hwidth`, `ssize`: width to be keeped around the line (km/s), and the region (in percentage) where the continuum level will be evaluted around the selected region. `convgauss`: if bigger then 0., do the convolution. Its values is the sigma of the gaussian conv. profile (in km/s). `frac`: controls the intensity of the convolution. `frac`=0 means pure profile output and `frac`=1 a pure gaussian output with the same EW value. `wlout`: returns a wavelength array instead of a velocity array (standar) OUTPUT: vel/wl, flux (arrays) """ (x, yo) = lineProf(wl, flx, lbc=lbc, hwidth=hwidth + 3 * convgauss, ssize=ssize) y1 = yo y2 = 0.0 if convgauss > 0 and frac > 0: step = _np.abs(_np.min([x[j + 1] - x[j] for j in range(len(x) - 1)])) xn = _np.arange(-hwidth - 3 * convgauss, hwidth + 3 * convgauss + step, step) cf = _phc.normgauss(convgauss, x=xn) yo = _np.interp(xn, x, yo) x = xn y1 = yo * (1 - frac) y2 = _np.convolve(yo * frac, cf / _np.trapz(cf), "same") if wlout: x = (x / _phc.c.cgs * 1e5 + 1) * lbc return x, y1 + y2
# def gaussfold(wl, flx, sig, lbc, hwidth=1000., ssize=.05): # """Translation from gaussfold.pro""" # (x, yo) = lineProf(wl, flx, lbc=lbc, hwidth=hwidth+3*sig, ssize=ssize) # x = (x / _phc.c.cgs * 1e5 + 1) * lbc # lammax = _np.max(x) # lammin = _np.min(x) # dlambda = sig / 17. # interlam = lammin + dlambda * _np.arange( (lammax-lammin)/dlambda+1 ) # interflux = _np.interp( interlam, wl, flx ) # fwhm_pix = sig / dlambda # window = fwhm_pix(17*fwhm_pix).astype(int) # gauss = _phc.psf_gaussian(window, sig=fwhm_pix, norm=True, ndim=1) # fold = _phc.convol # fluxfold = _np.interp( lam, interlam, fold ) # _warn('# Function not implemented!!') # return None
[docs]def cutpastrefspec(ivl, iflx, irefvl, ireflx, hwidth, ssize=0.05): """Cut and paste a given line profile into a reference line profile. Both profiles (with any resolution) must be normalized and given in vel. It was designed to solve the problem of Achernar's Halpha line wings problem and it works like this: given a reference profile (`refvl`, `reflx`), the selected profile will be cutted at the `hwidth` position and them pasted in the corresponding position (and intensity level) of the reference spectrum. OUTPUT: refvl, reflx """ flx = _np.interp(irefvl, ivl, iflx) i0 = _np.abs(irefvl + hwidth).argmin() i1 = _np.abs(irefvl - hwidth).argmin() ssize = int(ssize * len(flx)) if ssize == 0: ssize = 1 refav = ( _np.average(ireflx[i0 - ssize / 2 : i0 + ssize / 2 + 1]) / 2.0 + _np.average(ireflx[i1 - ssize / 2 : i1 + ssize / 2 + 1]) / 2.0 ) av = ( _np.average(flx[i0 - ssize / 2 : i0 + ssize / 2 + 1]) / 2.0 + _np.average(flx[i1 - ssize / 2 : i1 + ssize / 2 + 1]) / 2.0 ) flx += refav - av reflx = _np.array(ireflx).copy() reflx[i0 : i1 + 1] = flx[i0 : i1 + 1] return irefvl, reflx
[docs]def load_specs_fits( speclist, ref, lbc, lncore=None, hwidth=None, gaussfit=False, plotcut=0 ): """Load a list of specs and do the *line core cut & paste* `lncore`: cut and paste hwidth of the line center. It can be None, and must be < hwidth. If hwidth is None, it is assumed to be 1000 km/s. `speclist` : ['path+file.fits', ...] `ref`: reference spectra to do the cut & paste `plotcut`: if plotcut > 0, save the cutted spectra in steps of this variable. OUTPUT: dtb_obs """ if hwidth is None: hwidth = 1000.0 # do core cut? docore = lncore < hwidth if lncore is None: docore = False # load ref refwl, reflx = loadfits(ref[0])[0:2] refvl, reflx = lineProf(refwl, reflx, lbc=lbc) # load specs dtb_obs = Spec(lbc=lbc, hwidth=hwidth, gaussfit=gaussfit) for i in range(_np.shape(speclist)[0]): print(speclist[i]) dtb_obs.loadspec(speclist[i]) vl, flx = lineProf(dtb_obs.wl, dtb_obs.flux, lbc=lbc) if docore: cuted = cutpastrefspec(vl, flx, refvl, reflx, lncore) dtb_obs.flux = cuted[1] dtb_obs.wl = (cuted[0] / _phc.c.cgs * 1e5 + 1) * lbc ( dtb_obs.EW, dtb_obs.EC, dtb_obs.VR, dtb_obs.peaksep, dtb_obs.depthcent, dtb_obs.F0, ) = analline( dtb_obs.wl, dtb_obs.flux, dtb_obs.lbc, hwidth=lncore, verb=False, gaussfit=dtb_obs.gaussfit, ) else: ( dtb_obs.EW, dtb_obs.EC, dtb_obs.VR, dtb_obs.peaksep, dtb_obs.depthcent, dtb_obs.F0, ) = analline( dtb_obs.wl, dtb_obs.flux, dtb_obs.lbc, hwidth=hwidth, verb=False, gaussfit=dtb_obs.gaussfit, ) dtb_obs.addspec() # complementary plot if plotcut > 0 and docore: fig0, ax = _plt.subplots() for i in range(_np.shape(speclist)[0]): dtb_obs.loadspec(speclist[i]) vl, flx = lineProf(dtb_obs.wl, dtb_obs.flux, lbc=lbc) cuted = cutpastrefspec(vl, flx, refvl, reflx, lncore) if i % plotcut == 0: ax.plot(cuted[0], cuted[1]) _phc.savefig(fig0) return dtb_obs
[docs]def plot_spec_info(speclist, dtb_obs, mAEW=False, mgray=None): """Standard plot of the Spec class (EW, E/C, V/R, peak-sep., FWHM, F0) OUTPUT: figure (fig pyplot) """ if mAEW: dtb_obs.data[:, 1] *= 1000 * dtb_obs.lbc / _phc.c.cgs * 1e5 # Legend, Markers and colors idx... instm = list(_np.unique(speclist[:, 1])) # coridx = [ phc.cycles(instm.index(i)) for i in speclist[:, 1]] cores = _phc.gradColor(range(len(instm)), cmapn="inferno") coridx = [cores[instm.index(i)] for i in speclist[:, 1]] coridx = _np.array(coridx) mkidx = [_phc.cycles(instm.index(i), "mk") for i in speclist[:, 1]] mkidx = _np.array(mkidx) # Plots fig = _plt.figure() lins, cols = (7, 1) gssteps = [slice(0, 2), 2, 3, 4, 5, 6] gs = _gridspec.GridSpec(lins, cols) axs = [_plt.subplot(gs[g, :]) for g in gssteps] # EW axs[0].invert_yaxis() axs[-1].set_xlabel("Julian date - 2400000.5") ylabels = [ "EW (m\u00c5)", "E/C", "V/R", ("pk. sep." + "\n" + "(km/s)"), "FWHM" + "\n" + "(km/s)", r"F${\lambda 0}$", ] for i, ax in enumerate(axs): # binned x, y = _phc.bindata(dtb_obs.data[:, 0], dtb_obs.data[:, i + 1]) # yi = _savgol(y, 3, 1) ax.plot(x, y, color="gray", zorder=0) # points for uniquem in set(mkidx): idx = _np.where(mkidx == uniquem) ax.plot( dtb_obs.data[:, 0][idx], dtb_obs.data[:, i + 1][idx], color=coridx[idx][0], marker=uniquem, ls="", ) ax.set_ylabel(ylabels[i]) # xlim = axs[0].get_xlim() axs[2].plot(xlim, [1, 1], ls=":", color="k", zorder=1) for i in range(1, len(axs)): # ax.locator_params(axis='y', nbins=4) axs[i].yaxis.set_major_locator(_MaxNLocator(nbins=4, prune="upper")) if i in [1, 2, 3]: axs[i].get_yticklabels()[-1].set_visible(False) for ax in axs[:-1]: ax.set_xticklabels([]) # Legend for i in range(len(instm)): # axs[0].plot([np.NaN], [np.NaN], label=instm[i], color=phc.cycles(i), # marker=phc.cycles(i, 'mk'), ls='') axs[0].plot( [_np.NaN], [_np.NaN], label=instm[i], color=cores[i], marker=_phc.cycles(i, "mk"), ls="", ) axs[0].legend( loc="best", fancybox=True, framealpha=0.5, fontsize=8, labelspacing=0.05, ncol=2 ) # bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0. fig.subplots_adjust(hspace=0.01) # Gray for ax in axs: ax.set_xlim(xlim) if mgray is not None: ylim = ax.get_ylim() rect = _mpatches.Rectangle( [mgray[0], ylim[0]], mgray[1] - mgray[0], ylim[1] - ylim[0], ec="gray", fc="gray", alpha=0.5, zorder=1, ) ax.add_patch(rect) if len(mgray) == 4: if mgray is not None: ylim = ax.get_ylim() rect = _mpatches.Rectangle( [mgray[2], ylim[0]], mgray[3] - mgray[2], ylim[1] - ylim[0], ec="gray", fc="gray", alpha=0.5, zorder=1, hatch="//", ) ax.add_patch(rect) return fig
# TODO: Check if obsolete
[docs]def normalize_range(lb, spec, a, b): """This function is obsolete and must be removed. Still here for compatibility issues. """ a2 = (spec[b] - spec[a]) / (lb[b] - lb[a]) a1 = spec[a] - a2 * lb[a] return spec / (a1 + a2 * lb)
[docs]def normalize_spec(lb, flx, q=2, diff=0.03, perc=0, nlbp=50): """Normalize a spectrum using the non-parametric regression algorithm of Local Polynomial Kernel (order=``q``). If perc > 0, a "percentile filter" is applyed to the spectrum (divided in nlbp bins). INPUT: lb, flx OUTPUT: norm_flx """ def linear_model(x, *coef): result = 0 for i in range(len(coef)): result += coef[i] * x**i return result if perc <= 0: Initial_guess = [0.0, 0.0] coef1, cov1 = _curve_fit(linear_model, lb, flx, Initial_guess) idx0 = _np.where(flx != 0) ilb = lb[idx0] iflx = flx[idx0] idxi = _np.where(_np.abs(linear_model(ilb, *coef1) / iflx - 1) < diff) xsi = ilb[idxi] ysi = iflx[idxi] else: xsi, ysi = _phc.bindata(lb, flx, nbins=nlbp, perc=perc) xsi = xsi.reshape(-1, 1) Initial_guess = _np.zeros(q + 1) coef2, cov2 = _curve_fit(linear_model, xsi, ysi, Initial_guess) k2 = linear_model(lb, *coef2) return flx / k2
[docs]def renorm(vl, y): """Renormalize ``y`` so that the equivalent width is preserved when the continuum is shifted to 1. """ ext = _np.mean([y[0], y[-1]]) a0 = _np.trapz(y, vl) A = (a0 - _np.trapz(_np.tile(1, len(vl)), vl)) / ( a0 - _np.trapz(_np.tile(ext, len(vl)), vl) ) B = 1 - A * ext return A * y + B
[docs]def normEW(vl, y, area=None): """Normalize ``y`` curve to have a specific area. If ``area is None``, then the normalized equivalent width is preserved. """ if area is None: area = _np.trapz(linfit(vl, y), vl) y0 = linfit(vl, y) - 1 a1 = _np.trapz(y0, vl) a0 = _np.trapz(_np.tile([1], len(vl)), vl) f = (area - a0) / a1 return f * y0 + 1
[docs]def checksubdirs(path, star, lbc, hwidth=1000, showleg=True, plots=False): """ Faz o que tem que fazer. """ if not _os.path.exists("{0}/{1}".format(path, star)): _os.system("mkdir {0}/{1}".format(path, star)) nights = [o for o in _os.listdir(path) if _os.path.isdir("{0}/{1}".format(path, o))] fig = _plt.figure() ax = fig.add_subplot(111) spdtb = Spec() spdtb.lbc = lbc spdtb.hwidth = 1000.0 for night in nights: targets = [ o for o in _os.listdir("%s/%s" % (path, night)) if _os.path.isdir("%s/%s/%s" % (path, night, o)) ] for target in targets: if target.find(star) > -1: scal = _glob("%s/%s/%s/*.cal.fits" % (path, night, target)) if len(scal) > 0: for cal in scal: spdtb.loadspec(cal) spdtb.addspec() if not _np.isnan(spdtb.EW): if plots: spdtb.plotspec() vels = (spdtb.wl - lbc) / lbc * _phc.c.cgs * 1e-5 idx = _np.where(_np.abs(vels) <= hwidth) flux = linfit(vels[idx], spdtb.flux[idx]) vels = vels[idx] leg = spdtb.MJD ax.plot( vels, flux, label=leg, alpha=0.7, color=_phc.colors[ _np.mod(spdtb.count, len(_phc.colors)) ], ) else: print("# Data not reduced for %s at %s!" % (star, night)) ax.set_xlim([-hwidth, hwidth]) ax.set_ylim([-1, 5]) if showleg: legend = _plt.legend(loc=(0.75, 0.05), labelspacing=0.1) _plt.setp(legend.get_texts(), fontsize="small") _plt.savefig("{0}/{1}_at_{2}.png".format(_outfold, star, lbc)) _plt.close() spdtb.savedata( datafile="{0}/{1}.txt".format(_outfold, star), metafile="{0}/meta_{1}.txt".format(_outfold, star), ) return
[docs]def VREWcalc(vels, flux, vw=1000): """ Supoe que o fluxo jah estah normalizado, e vetores ordenad_os. Calcula o ew para os dois lados (azul/vermelho) da linha, ajustando a velocidade de repouso (TBD). """ # calcula e aplica correcao de vel. repousp vc = 0.0 vels += vc # corta em vw, e faz o teste de tamanho if len(vels) < 5: vw = 0 if vw > 0: idx = _np.where(_np.abs(vels) <= vw) outvels = vels[idx] normflux = flux[idx] else: ew0 = 0.0 ew1 = 0.0 return ew0, ew1, vc # ivc = _np.abs(outvels - 0).argmin() ew0 = 0.0 for i in range(0, ivc): dl = outvels[i + 1] - outvels[i] ew0 += (1.0 - (normflux[i + 1] + normflux[i]) / 2.0) * dl ew1 = 0.0 for i in range(ivc, len(outvels) - 1): dl = outvels[i + 1] - outvels[i] ew1 += (1.0 - (normflux[i + 1] + normflux[i]) / 2.0) * dl return ew0, ew1, vc
[docs]def normcontinuum_std(flux, ssize=0.05): """ Assumes that the `flux` vector is normalized. `ssize` is the percentage of the flux vector to be sampled as continuum (0-1.); default=0.05. It returns the standard deviation of the normalized continuum (around 1.0). """ # averaging borders ny = _np.array(flux)[:] if ssize < 0 or ssize > 0.5: _warn.warn("Invalid ssize value...", stacklevel=2) ssize = 0 ssize = int(ssize * len(ny)) if ssize == 0: ssize = 1 if ssize > 1: continuum = _np.concatenate((ny[:ssize], ny[-ssize:])) if _np.abs(1 - _np.average(continuum)) < 0.05: return _stt.mad(continuum) # Whole averaging mp = ssize * 100 pp = ssize * 100 p50 = _pos(ny, 1.0) if p50 > 100 - pp: _warn.warn("The continuum of this spec is too low! <1: " "Is is nomalized?") pp = 100 - p50 elif p50 < mp: _warn.warn("The continuum of this spec is too high! >1: " "Is is nomalized?") mp = p50 p45 = _np.percentile(ny, p50 - mp) p55 = _np.percentile(ny, p50 + pp) continuum = ny[_np.where((ny > p45) & (ny < p55))] return _stt.mad(continuum)
[docs]def plotSpecData( dtb, limits=None, civcfg=[1, "m", 2013, 1, 1], fmt=["png"], ident=None, lims=None, setylim=False, addsuf="", ): """Plot spec class database `vs` MJD e civil date Plot originally done to London, Canada, 2014. INPUT: civcfg = [step, 'd'/'m'/'y', starting year, month, day] `lims` sequence: 'EW', 'E/C', 'V/R', 'Pk. sep. (km/s)', 'E-F0', 'F0' `lims` = [[-2,4+2,2],[1.,1.4+.1,0.1],[.6,1.4+.2,.2],[0,400+100,100], [.30,.45+.05,.05],[0.6,1.20+.2,.2]] If `lims` is defined, `setylim` can be set to True. OUTPUT: Written image.""" if isinstance(dtb, _strtypes): print("# Loading dtb {0}".format(dtb)) dtb = _np.loadtxt(dtb) if ident is not None: idref = _np.unique(ident) ylabels = ["EW", "E/C", "V/R", "Pk. sep. (km/s)", "E-F0", "F0"] fig, ax = _plt.subplots(6, 1, sharex=True, figsize=(9.6, 8)) icolor = "blue" for i in range(1, len(ylabels) + 1): ax[i - 1].plot(*_phc.bindata(dtb[:, 0], dtb[:, i], 20)) for j in range(len(dtb[:, 0])): if ident is not None: idx = _np.where(ident[j] == idref)[0] icolor = _phc.colors[idx] ax[i - 1].plot(dtb[j, 0], dtb[j, i], "o", color=icolor) ax[i - 1].set_ylabel(ylabels[i - 1]) if lims is not None: if lims[i - 1][-1] != 0: ax[i - 1].set_yticks(_np.arange(*lims[i - 1])) if setylim: ax[i - 1].set_ylim([lims[i - 1][0], lims[i - 1][1]]) if ident is not None: for id in idref: idx = _np.where(id == idref)[0] icolor = _phc.colors[idx] ax[0].plot([], [], "o", color=icolor, label=id) ax[0].legend( bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.0, prop={"size": 6} ) if limits is None: # limits = ax[0].get_xlim() limits = [dtb[0, 0], dtb[-1, 0]] else: ax[0].set_xlim(limits) mjd0, mjd1 = limits ax[5].set_xlabel("MJD") ticks = _phc.gentkdates( mjd0, mjd1, civcfg[0], civcfg[1], dtstart=_dt.datetime(civcfg[2], civcfg[3], civcfg[4]).date(), ) mjdticks = [_jdcal.gcal2jd(date.year, date.month, date.day)[1] for date in ticks] # ticks = [dt.datetime(*jdcal.jd2gcal(jdcal.MJD_0, date)[:3]).date() for \ # date in ax[0].get_xticks()] # mjdticks = ax[0].get_xticks() for i in range(1, 6 + 1): ax2 = ax[i - 1].twiny() ax2.set_xlim(limits) ax2.set_xticks(mjdticks) ax2.set_xticklabels(["" for date in ticks]) if i == 1: ax2.set_xlabel("Civil date") ax2.set_xticklabels([date.strftime("%d %b %y") for date in ticks]) _plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45) _plt.subplots_adjust(left=0.13, right=0.8, top=0.88, bottom=0.06, hspace=0.15) for f in fmt: print("SpecQ{1}.{0}".format(f, addsuf)) _plt.savefig("SpecQ{1}.{0}".format(f, addsuf), transparent=True) _plt.close() return
[docs]def din_spec( metadata, lbc=6562.86, hwidth=1500.0, res=50, interv=None, fmt=["png"], outname="din_spec", pxsize=8, vmin=None, vmax=None, avg=True, cmapn="inferno", refspec=None, figsize=None, ): """Plot dynamical specs. from metadata table of the Spec class. `interv` controls the interval between specs (in days). `res` is the resolution in km/s. By default (`avg`=True), the average of spectra in that bin is show. If `avg`=False, the nearest bin-centered (in time) spectra will be shown. if `refspec` is not None, them it will be a difference spectra. """ # Define MJD and bins dates = _np.array(metadata[:, 0], dtype=float) t0 = _np.min(dates) tf = _np.max(dates) if interv is None: interv = _np.linspace(t0, tf, 21) else: interv = _np.arange(t0, tf + interv, interv) dt = interv[1] - interv[0] # Select specs wl0 = _np.arange(-hwidth, hwidth + res, res) # Load refspec, if required baselevel = 1.0 if refspec is not None: wl, flux, tmp, tmp, tmp, tmp = loadfits(refspec) wl, flux = lineProf(wl, flux, lbc=lbc, hwidth=hwidth) refflx = _np.interp(wl0, wl, flux) baselevel = 0 fluxes = _np.zeros((len(wl0), len(interv))) + baselevel for i in range(len(interv)): # method 1 if not avg: date = _phc.find_nearest(dates, interv[i]) if date < interv[i] + dt / 2 and date > interv[i] - dt / 2: j = list(dates).index(date) wl, flux, tmp, tmp, tmp, tmp = loadfits(metadata[j, 3]) wl, flux = lineProf(wl, flux, lbc=lbc, hwidth=hwidth) if refspec is None: fluxes[:, i] = _np.interp(wl0, wl, flux) else: flux = _np.interp(wl0, wl, flux) fluxes[:, i] = flux - refflx # method 2 else: k = 0 for j in range(len(dates)): if dates[j] < interv[i] + dt / 2 and dates[j] > interv[i] - dt / 2: wl, flux, tmp, tmp, tmp, tmp = loadfits(metadata[j, 3]) wl, flux = lineProf(wl, flux, lbc=lbc, hwidth=hwidth) fluxes[:, i] += _np.interp(wl0, wl, flux) k += 1 if k > 0: # fluxes[:,i]/= k wl = vel2wl(wl0, lbc) tmp, fluxes[:, i] = lineProf(wl, fluxes[:, i], lbc=lbc, hwidth=hwidth) if refspec is not None: fluxes[:, i] = fluxes[:, i] - refflx if all(fluxes[:, i] == baselevel): fluxes[:, i] = _np.NaN # Create image img = _np.empty((pxsize * len(interv), len(wl0))) for i in range(len(interv)): img[i * pxsize : (i + 1) * pxsize] = _np.tile(fluxes[:, i], pxsize).reshape( pxsize, len(wl0) ) # Save image if figsize is None: fig, ax = _plt.subplots( figsize=(len(wl0) / 16, pxsize * len(interv) / 16), dpi=80 ) else: fig, ax = _plt.subplots(figsize=figsize) # _plt.figure(figsize=(len(wl0) / 16, pxsize * len(interv) / 16), dpi=80) # print _np.min(img), _np.max(img) cmapn = _plt.get_cmap(cmapn) cmapn.set_bad("k", 1.0) ax.imshow(img, vmin=vmin, vmax=vmax, cmap=cmapn, origin="lower") ax.set_xlabel(r"Velocity (km s$^{-1}$)") ax.set_ylabel(r"Julian Day - 2400000.5") # ax.set_xlim([-hwidth, hwidth]) ax.set_yticks( _np.linspace(pxsize * len(interv) * 0.1, pxsize * len(interv) * 0.9, 8) ) ax.set_yticklabels( [ int(round((tf - t0) * t / (pxsize * len(interv)) + t0)) for t in ax.get_yticks() ], rotation="vertical", ) ax.set_xticklabels( [ int(round(t * 2.0 * hwidth / (len(wl0) - 1) - hwidth)) for t in ax.get_xticks() ] ) # , rotation='vertical') # fig.tight_layout() ax.xaxis.set_tick_params(color="gray", width=1.1) ax.yaxis.set_tick_params(color="gray", width=1.1) fig.gca().invert_yaxis() _phc.savefig(fig, fmt=fmt, figname=outname) return
[docs]def plot_line_str( fig, ax, lbc="", ylabel="", fs=14, xlim=None, dlim=None, cmapn="gnuplot", lfs=10, ylim=None, ): """Line plotting structure""" if lbc != "": ax.set_title(r"$\lambda_c$ = {0:.1f} $\AA$".format(lbc), size=fs) if ylabel != "": ax.set_ylabel(ylabel, size=fs) if xlim is not None: ax.xlims = ax.set_xlim(xlim) if ylim is not None: ax.set_ylim(ylim) ax.set_xlabel(r"Velocity (km s$^{-1}$)", size=fs) # reverse to keep order consistent ax.legend() handles, labels = ax.get_legend_handles_labels() ax.legend( handles[::-1], labels[::-1], loc="upper right", labelspacing=0.1, fancybox=True, framealpha=0.5, fontsize=lfs, ) # loc=(1.05, .01) rect = _mpatches.Rectangle( [0.835, 0.01], 0.15, 0.44, ec="black", fc="white", transform=ax.transAxes, zorder=10, alpha=0.5, ) ax.add_patch(rect) ax3 = fig.add_axes([0.82, 0.12, 0.025, 0.35]) # ax3.set_axis_bgcolor('white') cmap = _plt.get_cmap(cmapn) norm = _mpl.colors.Normalize(vmin=dlim[0], vmax=dlim[1]) cb = _mpl.colorbar.ColorbarBase(ax3, cmap=cmap, norm=norm, orientation="vertical") cb.set_label("MJD", size=fs) fig.subplots_adjust(left=0.1, right=0.95, top=0.94, bottom=0.1) # , hspace=0.3, wspace=.3) return fig, ax
[docs]def spec_time( speclist, lbc=6562.8, ref_spec=("/data/Dropbox/work/" "sci_16-15aeri/alpEri_FEROS_2000AVE.mt"), mod_lbc=0.656461, MJDref=None, mod_ref=( "/data/Dropbox/work/sci_16-15aeri/" "fullsed_mod03_VDDn0_1p4e12_Be_aeri2014.sed2" ), fmt=["png", "pdf"], outname=None, cmapn="inferno", hwidth=1000.0, outpath="", figsize=(5, 7), ysh=0.01, ): """Plot specs over time as suggested by Rivi. ``speclist`` is an array of strings to the path of the `*.fits` files. ``ref_spec`` is a reference `*.fits` and ``mod_ref`` an hdust reference model. They are ignored if the path is not found.is ``ysh`` control the vertical separation of the profiles. """ if outname is None or outname == "": outname = _phc.dtflag() MJDs = [_np.inf, 0] for sp in speclist: wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(sp) if MJD < MJDs[0]: MJDs[0] = MJD if MJD > MJDs[1]: MJDs[1] = MJD if MJDref is None: MJDref = MJDs[0] elif MJDs[0] > MJDref: MJDs[0] = MJDref # Plot extrem = [_np.inf, 0] fig, ax = _plt.subplots() for sp in speclist: wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(sp) vel, flux = lineProf(wl, flux, lbc, hwidth=hwidth) if len(flux) == 0: raise NameError("Wrong lbc in spt.spe") if cmapn is not None: cor = _phc.gradColor( [MJD], min=MJDs[0], max=(MJDs[1] + 0.1 * (MJDs[1] - MJDs[0])), cmapn=cmapn, )[0] else: cor = "k" # print(MJD, MJDs, extrem, ysh, (MJD-MJDs[0])*ysh, flux, sp) ax.plot(vel, flux + (MJD - MJDs[0]) * ysh, color=cor) if _np.max(flux + (MJD - MJDs[0]) * ysh) > extrem[1]: extrem[1] = _np.max(flux + (MJD - MJDs[0]) * ysh) if _np.min(flux + (MJD - MJDs[0]) * ysh) < extrem[0]: extrem[0] = _np.min(flux + (MJD - MJDs[0]) * ysh) # print(extrem) if _os.path.exists(ref_spec): wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(ref_spec) vel, flux = lineProf(wl, flux, lbc, hwidth=hwidth) # ax.text(650., 0.8, 'Reference', horizontalalignment='center', ax.text( 800.0, 0.8, "Reference", horizontalalignment="center", verticalalignment="center", ) # , transform=ax.transAxes) ax.plot(vel, flux + (MJDref - MJDs[0]) * ysh, color="k", ls=":") # print(MJDref, MJDs, ysh, extrem, _np.min(flux), _np.max(flux)) if _np.min(flux + (MJDref - MJDs[0]) * ysh) < extrem[0]: extrem[0] = _np.min(flux + (MJDref - MJDs[0]) * ysh) ax.plot(vel + 5, flux + (57655 - MJDs[0]) * ysh, color="k", ls="--") ax.text( 800, 1.06 + (57655 - MJDs[0]) * ysh, "Reference", horizontalalignment="center", verticalalignment="center", ) print("A!") if _np.max(flux + (57655 - MJDs[0]) * ysh) > extrem[1]: print("B!") extrem[1] = _np.max(flux + (57655 - MJDs[0]) * ysh) if _os.path.exists(mod_ref): s2d = _hdt.readfullsed2(mod_ref) vel, flux = lineProf(s2d[4, :, 2], s2d[4, :, 3], mod_lbc, hwidth=hwidth) ax.plot(vel, flux + (56910 - MJDs[0]) * ysh, color="k", ls="--") ax.text( 800, 1.06 + (56910 - MJDs[0]) * ysh, "model", horizontalalignment="center", verticalalignment="center", ) ax.set_xlabel(r"Velocity (km s$^{-1}$)") ax.set_ylabel(r"Julian Day - 2400000.5") ax.set_ylim(extrem) ax.set_xlim([-hwidth, hwidth]) # ax.set_yticks(_np.arange(56300, 57000+100, 100)) yref = [1.0, 1 + _np.diff(MJDs) * ysh] # yMJDs = _np.arange(56300, 57100, 100) yMJDs = _np.arange(MJDs[0], MJDs[1], 100) ax.set_yticks(list(_phc.renormvals(yMJDs, MJDs, yref))) ax.set_yticklabels(yMJDs, rotation="vertical") fig.set_size_inches(figsize) fig.subplots_adjust(left=0.1, right=0.94, top=0.99, bottom=0.04) ax.minorticks_on() ax3 = ax.twinx() ax3.set_yticks(list(_phc.renormvals(yMJDs, MJDs, yref))) ax3.set_yticklabels([]) ax3.minorticks_on() ax2 = ax.twinx() ax2.spines["right"].set_position(("axes", 1.05)) ax2.set_ylabel("Civil date") # dtminticks = _phc.gentkdates(56201., 57023., 1, 'm') dtminticks = _phc.gentkdates(MJDs[0], MJDs[1], 1, "m") i = 1 # dtticks = _phc.gentkdates(56201., 57023., 3, 'm') dtticks = _phc.gentkdates(MJDs[0], MJDs[1], 3, "m") mjdticks = [_jdcal.gcal2jd(date.year, date.month, date.day)[1] for date in dtticks] while dtticks[0] not in dtminticks: dtminticks = _phc.gentkdates(yMJDs[0] + i, yMJDs[-1], 1, "m") i += 1 minjdticks = [ _jdcal.gcal2jd(date.year, date.month, date.day)[1] for date in dtminticks ] ax2.set_yticks(list(_phc.renormvals(mjdticks, MJDs, yref))) ax2.set_yticks(list(_phc.renormvals(minjdticks, MJDs, yref)), minor=True) xlabs = [date.strftime("%Y-%m-%d") for date in dtticks] # xlabs[1::2] = ['']*len(xlabs[1::2]) ax2.set_yticklabels(xlabs, rotation="vertical") ax2.set_ylim(extrem) ax3.set_ylim(extrem) ax.xaxis.set_tick_params(length=8, width=1.5) ax.xaxis.set_tick_params(length=6, which="minor") ax.yaxis.set_tick_params(length=4, which="minor") ax.yaxis.set_tick_params(length=8, width=1.5) ax2.yaxis.set_tick_params(length=4, which="minor") ax2.yaxis.set_tick_params(length=8, width=1.5) ax3.yaxis.set_tick_params(length=4, which="minor") ax3.yaxis.set_tick_params(length=8, width=1.5) # , fontsize=10) _phc.savefig(fig, figname=outpath + outname, fmt=fmt) return
[docs]def spec_time_Achernar( speclist, lbc=6562.8, fmt=["png", "pdf"], outname=None, cmapn="inferno", hwidth=1000.0, outpath="", figsize=(5, 15), ysh=0.01, ): """Plot specs over time as suggested by Rivi""" if outname is None or outname == "": outname = _phc.dtflag() MJDs = [_np.inf, 0] for sp in speclist: wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(sp) if MJD < MJDs[0]: MJDs[0] = MJD if MJD > MJDs[1]: MJDs[1] = MJD MJDref = 56245 if MJDs[0] > MJDref: MJDs[0] = MJDref # Plot extrem = [_np.inf, 0] fig, ax = _plt.subplots() for sp in speclist: wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(sp) vel, flux = lineProf(wl, flux, lbc, hwidth=hwidth) if len(flux) == 0: raise NameError("Wrong lbc in spt.spe") if cmapn is not None: cor = _phc.gradColor( [MJD], min=MJDs[0], max=(MJDs[1] + 0.1 * (MJDs[1] - MJDs[0])), cmapn=cmapn, )[0] else: cor = "k" # print(MJD, MJDs, extrem, ysh, (MJD-MJDs[0])*ysh, flux, sp) ax.plot(vel, flux + (MJD - MJDs[0]) * ysh, color=cor) if _np.max(flux + (MJD - MJDs[0]) * ysh) > extrem[1]: extrem[1] = _np.max(flux + (MJD - MJDs[0]) * ysh) if _np.min(flux + (MJD - MJDs[0]) * ysh) < extrem[0]: extrem[0] = _np.min(flux + (MJD - MJDs[0]) * ysh) wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits( "/data/Dropbox/work" "/sci_16-15aeri/alpEri_FEROS_2000AVE.mt" ) vel, flux = lineProf(wl, flux, 6561.8, hwidth=hwidth) ax.text( 650.0, 0.8, "photospheric ref.", horizontalalignment="center", verticalalignment="center", ) # , transform=ax.transAxes) ax.plot(vel, flux + (MJDref - MJDs[0]) * ysh, color="k", ls=":") if _np.min(flux + (MJDref - MJDs[0]) * ysh) < extrem[0]: extrem[0] = _np.min(flux + (MJDref - MJDs[0]) * ysh) s2d = _hdt.readfullsed2( "/data/Dropbox/work/sci_16-15aeri/" "fullsed_mod03_VDDn0_1p4e12_Be_aeri2014.sed2" ) vel, flux = lineProf(s2d[4, :, 2], s2d[4, :, 3], 0.656461, hwidth=hwidth) ax.plot(vel, flux + (56910 - MJDs[0]) * ysh, color="k", ls="--") ax.text( 800, 1.06 + (56910 - MJDs[0]) * ysh, "model", horizontalalignment="center", verticalalignment="center", ) ax.set_xlabel(r"Velocity (km s$^{-1}$)") ax.set_ylabel(r"Julian Day - 2400000.5") ax.set_ylim(extrem) ax.set_xlim([-hwidth, hwidth]) # ax.set_yticks(_np.arange(56300, 57000+100, 100)) yref = [1.0, 1 + _np.diff(MJDs) * ysh] yMJDs = _np.arange(56300, 57100, 100) ax.set_yticks(list(_phc.renormvals(yMJDs, MJDs, yref))) ax.set_yticklabels(yMJDs, rotation="vertical") fig.set_size_inches(figsize) fig.subplots_adjust(left=0.1, right=0.94, top=0.99, bottom=0.04) ax.minorticks_on() ax3 = ax.twinx() ax3.set_yticks(list(_phc.renormvals(yMJDs, MJDs, yref))) ax3.set_yticklabels([]) ax3.minorticks_on() ax2 = ax.twinx() ax2.spines["right"].set_position(("axes", 1.05)) ax2.set_ylabel("Civil date") dtminticks = _phc.gentkdates(56201.0, 57023.0, 1, "m") i = 1 dtticks = _phc.gentkdates(56201.0, 57023.0, 3, "m") mjdticks = [_jdcal.gcal2jd(date.year, date.month, date.day)[1] for date in dtticks] while dtticks[0] not in dtminticks: dtminticks = _phc.gentkdates(yMJDs[0] + i, yMJDs[-1], 1, "m") i += 1 minjdticks = [ _jdcal.gcal2jd(date.year, date.month, date.day)[1] for date in dtminticks ] ax2.set_yticks(list(_phc.renormvals(mjdticks, MJDs, yref))) ax2.set_yticks(list(_phc.renormvals(minjdticks, MJDs, yref)), minor=True) xlabs = [date.strftime("%Y-%m-%d") for date in dtticks] # xlabs[1::2] = ['']*len(xlabs[1::2]) ax2.set_yticklabels(xlabs, rotation="vertical") ax2.set_ylim(extrem) ax3.set_ylim(extrem) ax.xaxis.set_tick_params(length=8, width=1.5) ax.xaxis.set_tick_params(length=6, which="minor") ax.yaxis.set_tick_params(length=4, which="minor") ax.yaxis.set_tick_params(length=8, width=1.5) ax2.yaxis.set_tick_params(length=4, which="minor") ax2.yaxis.set_tick_params(length=8, width=1.5) ax3.yaxis.set_tick_params(length=4, which="minor") ax3.yaxis.set_tick_params(length=8, width=1.5) # , fontsize=10) _phc.savefig(fig, figname=outpath + outname, fmt=fmt) return
[docs]def extractfromsplot(file, splot): """Ce = center; Co = core #LcCe, LcCo, lcGW, lcEW, lvCe, lcCo, lvEW, lrCe, LrCo, lrEW """ out = _np.array(10 * [_np.NaN]) readflag = False for line in splot: if line.find("]:") > 0 and readflag: readflag = False if line.find(file) > 0: readflag = True if readflag: info = line.split() # if _re.match("^\d+?\.\d+?$", info[0]) is not None: try: float(info[0]) info = _np.array(info, dtype=float) if info[0] > 6556 and info[0] < 6556 + 4.33: if len(info) == 4: out[6] = float(info[3]) elif len(info) == 7: out[4] = float(info[0]) out[5] = float(info[4]) elif info[0] > 6556 + 4.33 and info[0] < 6556 + 2 * 4.33: if len(info) == 4: out[3] = float(info[3]) elif len(info) == 7: out[0] = float(info[0]) out[1] = float(info[4]) out[2] = float(info[5]) elif info[0] > 6556 + 2 * 4.33 and info[0] < 6556 + 3 * 4.33: if len(info) == 4: out[9] = float(info[3]) elif len(info) == 7: out[7] = float(info[0]) out[8] = float(info[4]) except: pass return out
[docs]def check_dtobs(dtobs): """Check if the dtobs fits the float format. Required for MJD calc.""" if "T" in dtobs: dtobs = dtobs.replace(".", "") tobs, dtobs = dtobs.split("T") if len(tobs) == 10: dtobs, tobs = tobs, dtobs tobs = tobs.split(":") tobs = float(tobs[0]) * 3600 + float(tobs[1]) * 60 + float(tobs[2]) tobs /= 24 * 3600 else: tobs = 0.0 if dtobs[4] == "-": dtobs = dtobs.split("-") elif dtobs[2] == "/": dtobs = dtobs.split("/")[::-1] else: _warn.warn('Wrong "DATE-OBS" in header! {0}'.format(dtobs)) raise SystemExit(1) dtobs = _np.array(dtobs, dtype="int32") return dtobs, tobs
# TODO: Check if obsolete
[docs]def overplotsubdirs(path, star, limits=(6540, 6600), showleg=True): """ Realiza o plot de espectros da estrela `star` dentre do diretorio `path`. Atualmente, faz o plot entre os valores `limits` (Angstroms). Gera os arquivos `path/star/star.log` e `path/star/star_specs.png`. """ # path = _os.getcwd() # star = _phc.user_input('Type the star name: ') # ref0 = 6540 # ref1 = 6600 ref0, ref1 = limits if not _os.path.exists("{0}/{1}".format(path, star)): _os.system("mkdir {0}/{1}".format(path, star)) f0 = open("{0}/{1}/{1}.log".format(path, star), "w") nights = [o for o in _os.listdir(path) if _os.path.isdir("{0}/{1}".format(path, o))] i = 0 for night in nights: targets = [ o for o in _os.listdir("%s/%s" % (path, night)) if _os.path.isdir("%s/%s/%s" % (path, night, o)) ] for target in targets: if target.find(star) > -1: scal = _glob("%s/%s/%s/*.cal.fits" % (path, night, target)) if len(scal) > 0: srv = _glob("%s/%s/%s/*.rv.fits" % (path, night, target)) if len(srv) != len(scal): print("# Specs without dopcor at %s!" % night) srv = scal # legendl += (night,) for cal in scal: imfits = _pyfits.open(cal) spec = imfits[0].data lbda = ( _np.arange(len(spec)) * imfits[0].header["CDELT1"] + imfits[0].header["CRVAL1"] ) # a = _phc.user_input('type to continue: ') if lbda[-1] > 6560: # and flag == '1': min_dif = min(abs(lbda - ref0)) a0 = _np.where(abs(lbda - ref0) == min_dif)[0][0] min_dif = min(abs(lbda - ref1)) a1 = _np.where(abs(lbda - ref1) == min_dif)[0][0] spec = normalize_range(lbda, spec, a0, a1) msg = "{0}, {1}, {2}".format((0.1 * i), night, cal) print(msg) f0.writelines(msg + "\n") try: leg = imfits[0].header["DATE-OBS"] except: leg = imfits[0].header["FRAME"] _plt.plot( lbda, spec, label=leg, alpha=0.7, color=_phc.colors[_np.mod(i, len(_phc.colors))], ) i += 1 else: print("# Data not reduced for %s at %s!" % (star, night)) msg = "{0}, {1}, {2}".format("NC", night, "None") f0.writelines(msg + "\n") if showleg: legend = _plt.legend(loc=(0.75, 0.05), labelspacing=0.1) _plt.setp(legend.get_texts(), fontsize="small") _plt.xlim([ref0, ref1]) _plt.ylim([-1, 5]) # _plt.xlabel('vel. (km/s)') _plt.savefig("{0}/{1}/{1}_specs.png".format(path, star)) _plt.close() f0.close() # # Ha = False # False do HeI 6678 # # for i in range(len(ifits)): # imfits = _pyfits.open(ifits[i]) # print imfits[0].header[3] # specs[i][:len(imfits[0].data)] = imfits[0].data # lbds[i] = _np.arange(len(specs[i]))*imfits[0].header['CDELT1']+ # imfits[0].header['CRVAL1'] # if Ha: # if i == 0: # lbds[i] = (lbds[i]-6561.5)/6561.5*3e5 # else: # lbds[i] = (lbds[i]-6562.8)/6562.8*3e5 # else: # if i == 0: # lbds[i] = (lbds[i]-6676.8)/6676.8*3e5 # else: # lbds[i] = (lbds[i]-6678.)/6678.*3e5 # # a = _np.where( abs(lbds[i]+1000) == min(abs(lbds[i]+1000)) ) # b = _np.where( abs(lbds[i]-1000) == min(abs(lbds[i]-1000)) ) # # specs[i] = normalize_range(lbds[i],specs[i],a,b) # # legendl += [imfits[0].header['DATE-OBS']] # # figure(2) # for i in range(len(specs)): # plot(lbds[i], specs[i], label=legendl[i]) # # legend(legendl, 'lower right') # legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) #'lower right' # xlim([-1000,1000]) # if Ha: # title('Halpha profile from LNA-Janot for Achernar') # ylim([.65,1.1]) # else: # title('HeI 6678 profile from LNA-Janot for Achernar') # ylim([.9,1.05]) # # legend = _plt.legend(legendl, loc=(0.75, .05), labelspacing=0.1) # _plt.setp(legend.get_texts(), fontsize='small') # # xlabel('vel. (km/s)') print("# Plot done!") return
[docs]def diffplotsubdirs(path, star, limits=(6540, 6600)): """ Realiza o plot de espectros da estrela `star` dentre do diretorio `path`. Atualmente, faz o plot entre os valores `limits` (Angstroms). Gera os arquivos `path/star/star.log` e `path/star/star_specs_dif.png`. """ ref0, ref1 = limits if not _os.path.exists("{0}/{1}".format(path, star)): _os.system("mkdir {0}/{1}".format(path, star)) # f0 = open('{0}/{1}/{1}.log'.format(path, star), 'w') nights = [o for o in _os.listdir(path) if _os.path.isdir("{0}/{1}".format(path, o))] i = 0 for night in nights: targets = [ o for o in _os.listdir("%s/%s" % (path, night)) if _os.path.isdir("%s/%s/%s" % (path, night, o)) ] for target in targets: if target.find(star) > -1: scal = _glob("%s/%s/%s/*.cal.fits" % (path, night, target)) if len(scal) > 0: srv = _glob("%s/%s/%s/*.rv.fits" % (path, night, target)) if len(srv) != len(scal): print("# Specs with dopcor at %s!" % night) srv = scal # legendl += (night,) for cal in scal: imfits = _pyfits.open(cal) spec = imfits[0].data lbda = ( _np.arange(len(spec)) * imfits[0].header["CDELT1"] + imfits[0].header["CRVAL1"] ) # a = _phc.user_input('type to continue: ') if lbda[0] > 5500: # and flag == '1': min_dif = min(abs(lbda - ref0)) a0 = _np.where(abs(lbda - ref0) == min_dif)[0][0] min_dif = min(abs(lbda - ref1)) a1 = _np.where(abs(lbda - ref1) == min_dif)[0][0] spec = normalize_range(lbda, spec, a0, a1) + (0.1 * i) # print (0.1 * i) try: leg = imfits[0].header["DATE-OBS"] except: leg = imfits[0].header["FRAME"] _plt.plot( [ref0, ref1], [1 + 0.1 * i, 1 + 0.1 * i], "k--", alpha=0.5, ) _plt.plot(lbda, spec, label=leg, color=_phc.colors[i]) i += 1 else: print("# Data not reduced for %s at %s!" % (star, night)) legend = _plt.legend(loc=(0.75, 0.05), labelspacing=0.1) _plt.setp(legend.get_texts(), fontsize="small") _plt.xlim([ref0, ref1]) # _plt.xlabel('vel. (km/s)') _plt.savefig("{0}/{1}/{1}_specs_dif.png".format(path, star)) # # Ha = False # False do HeI 6678 # # for i in range(len(ifits)): # imfits = _pyfits.open(ifits[i]) # print imfits[0].header[3] # specs[i][:len(imfits[0].data)] = imfits[0].data # lbds[i] = _np.arange(len(specs[i]))*imfits[0].header['CDELT1']+ # imfits[0].header['CRVAL1'] # if Ha: # if i == 0: # lbds[i] = (lbds[i]-6561.5)/6561.5*3e5 # else: # lbds[i] = (lbds[i]-6562.8)/6562.8*3e5 # else: # if i == 0: # lbds[i] = (lbds[i]-6676.8)/6676.8*3e5 # else: # lbds[i] = (lbds[i]-6678.)/6678.*3e5 # # a = _np.where( abs(lbds[i]+1000) == min(abs(lbds[i]+1000)) ) # b = _np.where( abs(lbds[i]-1000) == min(abs(lbds[i]-1000)) ) # # specs[i] = normalize_range(lbds[i],specs[i],a,b) # # legendl += [imfits[0].header['DATE-OBS']] # # figure(2) # for i in range(len(specs)): # plot(lbds[i], specs[i], label=legendl[i]) # # legend(legendl, 'lower right') # legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) #'lower right' # xlim([-1000,1000]) # if Ha: # title('Halpha profile from LNA-Janot for Achernar') # ylim([.65,1.1]) # else: # title('HeI 6678 profile from LNA-Janot for Achernar') # ylim([.9,1.05]) # # legend = _plt.legend(legendl, loc=(0.75, .05), labelspacing=0.1) # _plt.setp(legend.get_texts(), fontsize='small') # # xlabel('vel. (km/s)') print("# Plot done!") return
[docs]def refplotsubdirs(path, star, limits=(6540, 6600)): """ Realiza o plot de espectros da estrela `star` dentre do diretorio `path`. Atualmente, faz o plot entre os valores `limits` (Angstroms). Gera os arquivos `path/star/star.log` e `path/star/star_specs_REFERENCIA.png`. """ ref0, ref1 = limits if not _os.path.exists("{0}/{1}".format(path, star)): _os.system("mkdir {0}/{1}".format(path, star)) f0 = open("{0}/{1}/{1}.log".format(path, star), "w") nights = [o for o in _os.listdir(path) if _os.path.isdir("{0}/{1}".format(path, o))] i = 0 for night in nights: targets = [ o for o in _os.listdir("%s/%s" % (path, night)) if _os.path.isdir("%s/%s/%s" % (path, night, o)) ] for target in targets: if target.find(star) > -1: scal = _glob("%s/%s/%s/*.cal.fits" % (path, night, target)) if len(scal) > 0: srv = _glob("%s/%s/%s/*.rv.fits" % (path, night, target)) if len(srv) != len(scal): srv = scal for cal in scal: imfits = _pyfits.open(cal) spec = imfits[0].data lbda = ( _np.arange(len(spec)) * imfits[0].header["CDELT1"] + imfits[0].header["CRVAL1"] ) # a = _phc.user_input('type to continue: ') if lbda[0] > 5500: # and flag == '1': min_dif = min(abs(lbda - ref0)) a0 = _np.where(abs(lbda - ref0) == min_dif)[0][0] min_dif = min(abs(lbda - ref1)) a1 = _np.where(abs(lbda - ref1) == min_dif)[0][0] spec = normalize_range(lbda, spec, a0, a1) # print (0.1 * i) leg = imfits[0].header["DATE-OBS"] refleg = "2012-11-20T23:51:37.392" refleg = "2008-06-13" if leg == refleg: f0 = open("{0}/{1}/ref.txt".format(path, star), "w") f0.writelines([str(x) + "\t" for x in lbda]) f0.writelines("\n") f0.writelines([str(x) + "\t" for x in spec]) f0.writelines("\n") f0.close() i += 1 else: print("# Data not reduced for %s at %s!" % (star, night)) f0 = open("{0}/{1}/ref.txt".format(path, star)) lines = f0.readlines() f0.close() specref = _np.array(lines[1].split(), dtype=float) lbdaref = _np.array(lines[0].split(), dtype=float) func = _interpolate.interp1d(lbdaref, specref) # , kind='cubic') lbdaref = _np.linspace(ref0, ref1, 5000) specref = func(lbdaref) i = 0 for night in nights: targets = [ o for o in _os.listdir("%s/%s" % (path, night)) if _os.path.isdir("%s/%s/%s" % (path, night, o)) ] for target in targets: if target.find(star) > -1: scal = _glob("%s/%s/%s/*.cal.fits" % (path, night, target)) if len(scal) > 0: srv = _glob("%s/%s/%s/*.rv.fits" % (path, night, target)) if len(srv) != len(scal): print("# Specs without dopcor at %s!" % night) srv = scal # legendl += (night,) for cal in scal: imfits = _pyfits.open(cal) spec = imfits[0].data lbda = ( _np.arange(len(spec)) * imfits[0].header["CDELT1"] + imfits[0].header["CRVAL1"] ) # a = _phc.user_input('type to continue: ') if lbda[0] > 5500: # and flag == '1': min_dif = min(abs(lbda - ref0)) a0 = _np.where(abs(lbda - ref0) == min_dif)[0][0] min_dif = min(abs(lbda - ref1)) a1 = _np.where(abs(lbda - ref1) == min_dif)[0][0] spec = normalize_range(lbda, spec, a0, a1) func = _interpolate.interp1d(lbda, spec) # , kind='cubic') # Tive problemas de 'out-of-bounds'... um espectro # estava desordenado: # print imfits[0].header['CDELT1'], # imfits[0].header['CRVAL1'], cal spec = func(lbdaref) # print (0.1 * i) try: leg = imfits[0].header["DATE-OBS"] except: leg = imfits[0].header["FRAME"] if i < 130: _plt.plot( lbdaref, spec - specref, label=leg, alpha=0.8, color=_phc.colors[i], ) i += 1 else: print("# Data not reduced for %s at %s!" % (star, night)) legend = _plt.legend(loc=(0.75, 0.05), labelspacing=0.1) _plt.setp(legend.get_texts(), fontsize="small") _plt.xlim([ref0, ref1]) _plt.title("Ref.= %s" % refleg) # _plt.xlabel('vel. (km/s)') _plt.savefig("{0}/{1}/{1}_specs_{2}.png".format(path, star, refleg[:10])) # # Ha = False # False do HeI 6678 # # for i in range(len(ifits)): # imfits = _pyfits.open(ifits[i]) # print imfits[0].header[3] # specs[i][:len(imfits[0].data)] = imfits[0].data # lbds[i] = _np.arange(len(specs[i]))*imfits[0].header['CDELT1']+\ # imfits[0].header['CRVAL1'] # if Ha: # if i == 0: # lbds[i] = (lbds[i]-6561.5)/6561.5*3e5 # else: # lbds[i] = (lbds[i]-6562.8)/6562.8*3e5 # else: # if i == 0: # lbds[i] = (lbds[i]-6676.8)/6676.8*3e5 # else: # lbds[i] = (lbds[i]-6678.)/6678.*3e5 # # a = _np.where( abs(lbds[i]+1000) == min(abs(lbds[i]+1000)) ) # b = _np.where( abs(lbds[i]-1000) == min(abs(lbds[i]-1000)) ) # # specs[i] = normalize_range(lbds[i],specs[i],a,b) # # legendl += [imfits[0].header['DATE-OBS']] # # figure(2) # for i in range(len(specs)): # plot(lbds[i], specs[i], label=legendl[i]) # # legend(legendl, 'lower right') # legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) #'lower right' # xlim([-1000,1000]) # if Ha: # title('Halpha profile from LNA-Janot for Achernar') # ylim([.65,1.1]) # else: # title('HeI 6678 profile from LNA-Janot for Achernar') # ylim([.9,1.05]) # # legend = _plt.legend(legendl, loc=(0.75, .05), labelspacing=0.1) # _plt.setp(legend.get_texts(), fontsize='small') # # xlabel('vel. (km/s)') print("# Plot done!") return
[docs]def overplotsubdirs2(path, star, limits=(6540, 6600)): """ Realiza o plot de espectros da estrela `star` dentre do diretorio `path`. Atualmente, faz o plot entre os valores `limits` (Angstroms). Ha' um criterio de escolha de espectros aqui (rudimentar). Gera os arquivos `path/star/star.log` e `path/star/star_specs2.png`. """ ref0, ref1 = limits if not _os.path.exists("{0}/{1}".format(path, star)): _os.system("mkdir {0}/{1}".format(path, star)) f0 = open("{0}/{1}/{1}.log".format(path, star), "w") nights = [o for o in _os.listdir(path) if _os.path.isdir("{0}/{1}".format(path, o))] ax = _plt.figure() i = 0 for night in nights: targets = [ o for o in _os.listdir("%s/%s" % (path, night)) if _os.path.isdir("%s/%s/%s" % (path, night, o)) ] for target in targets: if target.find(star) > -1: scal = _glob("%s/%s/%s/*.cal.fits" % (path, night, target)) if len(scal) > 0: srv = _glob("%s/%s/%s/*.rv.fits" % (path, night, target)) if len(srv) != len(scal): print("# Specs without dopcor at %s!" % night) srv = scal # legendl += (night,) for cal in scal: imfits = _pyfits.open(cal) spec = imfits[0].data lbda = ( _np.arange(len(spec)) * imfits[0].header["CDELT1"] + imfits[0].header["CRVAL1"] ) # a = _phc.user_input('type to continue: ') if lbda[0] > 5500: # and flag == '1': min_dif = min(abs(lbda - ref0)) a0 = _np.where(abs(lbda - ref0) == min_dif)[0][0] min_dif = min(abs(lbda - ref1)) a1 = _np.where(abs(lbda - ref1) == min_dif)[0][0] spec = normalize_range(lbda, spec, a0, a1) # print (0.1 * i), night prtcolor = _phc.colors[i] try: leg = imfits[0].header["DATE-OBS"] except: leg = imfits[0].header["FRAME"] check = False if leg.find("2012-11-20T23:51:37.392") != -1: leg = "2012-11-20" prtcolor = _phc.colors[0] check = True elif leg.find("22/01/2013") != -1: leg = "2013-01-22" check = True # elif leg.find('03/07/2013') != -1: # leg = '2013-07-03' # check = True elif leg.find("28/07/2013") != -1: leg = "2013-07-28" check = True elif leg.find("2013-11-12T01:30:38.938") != -1: leg = "2013-11-12" check = True else: print(leg) if check: print(cal) _plt.plot( lbda, spec, label=leg, alpha=0.7, color=prtcolor ) i += 1 else: msg = "# Data not reduced for %s at %s!" % (star, night) print(msg) f0.writelines(msg) font = { "size": 16, } legend = _plt.legend(loc=(0.75, 0.05), labelspacing=0.1) # _plt.setp(legend.get_texts(), fontsize='small') _plt.xlim([ref0, ref1]) _plt.ylim([0.58, 1.2]) _plt.xlabel(r"wavelength ($\AA$)", fontdict=font) _plt.ylabel("Normalized flux", fontdict=font) # _plt.xlabel('vel. (km/s)') _plt.savefig("{0}/{1}/{1}_specs2.png".format(path, star)) _plt.close() f0.close() # # Ha = False # False do HeI 6678 # # for i in range(len(ifits)): # imfits = _pyfits.open(ifits[i]) # print imfits[0].header[3] # specs[i][:len(imfits[0].data)] = imfits[0].data # lbds[i] = _np.arange(len(specs[i]))*imfits[0].header['CDELT1']+ # imfits[0].header['CRVAL1'] # if Ha: # if i == 0: # lbds[i] = (lbds[i]-6561.5)/6561.5*3e5 # else: # lbds[i] = (lbds[i]-6562.8)/6562.8*3e5 # else: # if i == 0: # lbds[i] = (lbds[i]-6676.8)/6676.8*3e5 # else: # lbds[i] = (lbds[i]-6678.)/6678.*3e5 # # a = _np.where( abs(lbds[i]+1000) == min(abs(lbds[i]+1000)) ) # b = _np.where( abs(lbds[i]-1000) == min(abs(lbds[i]-1000)) ) # # specs[i] = normalize_range(lbds[i],specs[i],a,b) # # legendl += [imfits[0].header['DATE-OBS']] # # figure(2) # for i in range(len(specs)): # plot(lbds[i], specs[i], label=legendl[i]) # # legend(legendl, 'lower right') # legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) #'lower right' # xlim([-1000,1000]) # if Ha: # title('Halpha profile from LNA-Janot for Achernar') # ylim([.65,1.1]) # else: # title('HeI 6678 profile from LNA-Janot for Achernar') # ylim([.9,1.05]) # # legend = _plt.legend(legendl, loc=(0.75, .05), labelspacing=0.1) # _plt.setp(legend.get_texts(), fontsize='small') # # xlabel('vel. (km/s)') print("# Plot done!") return
[docs]def overPlotLineSeries( fullseds, obsers=[0], lbc=0.6564606, fmt=["png"], convgauss=0.0, frac=0.0, addsuf="", labels=None, hwidth=1000.0, ssize=0.05, outpath="", ylim=[0.7, 2.2], cmapn="gnuplot", ): """Generate overplot spec. line from a HDUST mod list, separated by observers. Observers config. must be the same between models in `fullseds` list. If `convgauss` > 0, do a gaussian convolution. """ if labels is None: labels = [""] * len(fullseds) for obs in obsers: fig, ax = _plt.subplots() fig2, ax2 = _plt.subplots() k = obsers.index(obs) for file in fullseds: i = fullseds.index(file) sed2data = _hdt.readfullsed2(file) obsdegs = (_np.arccos(sed2data[:, 0, 0]) * 180 / _np.pi)[obsers] obsdegs = list(obsdegs) (x, yo) = lineProf( sed2data[obs, :, 2], sed2data[obs, :, 3], lbc=lbc, hwidth=hwidth + 3 * convgauss, ssize=ssize, ) y1 = yo y2 = 0.0 if convgauss > 0: step = _np.min([x[j + 1] - x[j] for j in range(len(x) - 1)]) xn = _np.arange( -hwidth - 3 * convgauss, hwidth + 3 * convgauss + step, step ) cf = _phc.normgauss(convgauss, x=xn) yo = _np.interp(xn, x, yo) x = xn y1 = yo * (1 - frac) y2 = _np.convolve(yo * frac, cf / _np.trapz(cf), "same") ax2.plot(x, y1, color=_phc.colors[_np.mod(i, len(_phc.colors))]) ax2.plot(x, y2, color=_phc.colors[_np.mod(i, len(_phc.colors))]) y = y1 + y2 # y = linfit(x, y1+y2) if file == fullseds[0]: ax.plot( x, y, label="{0:02.1f} deg. {1}".format(obsdegs[k], labels[i]), color=_phc.colors[_np.mod(i, len(_phc.colors))], ) # ew0 = EWcalc(x, y, vw=hwidth) else: ax.plot( x, y, color=_phc.colors[_np.mod(i, len(_phc.colors))], label=labels[i], ) # ewf = EWcalc(x, y, vw=hwidth) plot_line_str(fig, ax, lbc=lbc, ylim=ylim, cmapn=cmapn, xlim=[-hwidth, hwidth]) figname = outpath + "modsover_lbc{1:.4f}_obs{0:02.1f}{2}".format( obsdegs[k], lbc, addsuf ) _phc.savefig(fig, figname, fmt) plot_line_str( fig2, ax2, lbc=lbc, ylim=ylim, cmapn=cmapn, xlim=[-hwidth, hwidth] ) figname = outpath + "modsover_lbc{1:.4f}_obs{0:02.1f}{2}Extra".format( obsdegs[k], lbc, addsuf ) _phc.savefig(fig, figname, fmt) return
[docs]def overPlotLineFits( specs, lbc=0.6564606, fmt=["png"], hwidth=1500.0, ylim=None, yzero=False, addsuf="", dlim=None, cmapn="jet", xlim=None, outpath="", ): """Generate overplot spec. line from a FITS file list.""" fig, ax = _plt.subplots() for spec in specs: i = specs.index(spec) print("# Reading {0}...".format(_phc.trimpathname(spec)[1])) wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(spec) (x, y) = lineProf(wl, flux, lbc=lbc, hwidth=hwidth) if dateobs.find("-") > 0: dateobs = dateobs[:10] elif dateobs.find("/") > 0: dtobs = dateobs.split("/")[::-1] dateobs = "-".join(dtobs) if dlim is None: cor = _phc.colors[_np.mod(i, len(_phc.colors))] else: cor = _phc.gradColor([MJD], min=dlim[0], max=dlim[1], cmapn=cmapn)[0] ax.plot(x, y, label="{0}".format(dateobs), color=cor) ylabel = "Overplotted spectra" fig, ax = plot_line_str( fig, ax, lbc=lbc, ylabel=ylabel, xlim=xlim, dlim=dlim, cmapn=cmapn, ylim=ylim ) figname = outpath + "fitsover_lbc{1:.4f}{0}".format(addsuf, lbc) _phc.savefig(fig, figname, fmt) return
[docs]def incrPlotLineSeries( fullseds, obsers=[0], lbc=0.6564606, fmt=["png"], addsuf="", outpath="" ): """Generate incremented spec. line from a HDUST mod list, separated by observers. The increment is 0.1 for each file in fullseds sequence. Observers config. must be the same between models in `fullseds` list. """ for obs in obsers: fig, ax = _plt.subplots() k = obsers.index(obs) for file in fullseds: i = fullseds.index(file) sed2data = _hdt.readfullsed2(file) obsdegs = (_np.arccos(sed2data[:, 0, 0]) * 180 / _np.pi)[obsers] obsdegs = list(obsdegs) (x, y) = lineProf(sed2data[obs, :, 2], sed2data[obs, :, 3], lbc=lbc) if file == fullseds[0]: ax.plot( x, y + 0.1 * i, label="{0:02.1f} deg.".format(obsdegs[k]), color=_phc.colors[_np.mod(i, len(_phc.colors))], ) else: ax.plot(x, y + 0.1 * i, color=_phc.colors[_np.mod(i, len(_phc.colors))]) ax.set_title("lbc = {0:.5f} $\mu$m".format(lbc)) ax.legend(loc="best", fancybox=True, framealpha=0.5) figname = outpath + "modsincr_lbc{1:.4f}_obs{0:02.1f}{2}".format( obsdegs[k], lbc, addsuf ) for f in fmt: print("# Saved {1}.{0}".format(f, figname)) fig.savefig(figname + ".{0}".format(f), transparent=True) _plt.close() return
[docs]def incrPlotLineFits( specs, lbc=0.6564606, fmt=["png"], hwidth=1500.0, yzero=False, addsuf="", dlim=None, cmapn="jet", xlim=None, outpath="", ylim=None, ): """Generate incremented spec. line from FITS files list. The increment is 0.1 for each file in sequence. """ fig, ax = _plt.subplots() for spec in specs: i = specs.index(spec) print("# Reading {0}...".format(_phc.trimpathname(spec)[1])) wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(spec) (x, y) = lineProf(wl, flux, lbc=lbc, hwidth=hwidth) if dateobs.find("-") > 0: dateobs = dateobs[:10] elif dateobs.find("/") > 0: dtobs = dateobs.split("/")[::-1] dateobs = "-".join(dtobs) if dlim is None: cor = _phc.colors[_np.mod(i, len(_phc.colors))] else: cor = _phc.gradColor([MJD], min=dlim[0], max=dlim[1], cmapn=cmapn)[0] ax.plot(x, y + 0.1 * i, label="{0}".format(dateobs), color=cor) if yzero: ylim = ax.get_ylim() ax.plot([0, 0], ylim, ls="-", color="Gray") ylabel = "Spaced spectra" fig, ax = plot_line_str( fig, ax, lbc=lbc, ylabel=ylabel, xlim=xlim, dlim=dlim, cmapn=cmapn, ylim=ylim ) figname = outpath + "fitsincr_lbc{1:.4f}{0}".format(addsuf, lbc) _phc.savefig(fig, figname, fmt) return
[docs]def diffPlotLineSeries( fullseds, obsers=[0], lbc=0.6564606, fmt=["png"], rvel=None, rflx=None, hwidth=1000.0, outpath="", addsuf="", ): """Generate overplot of DIFFERENCE spec. line from a HDUST mod list. The model will be linearly interpolated with the reference spec. If none is given as reference, then it assumes the first of the list. It is recommend to run first (rvel, rflx) = lineProf(rvel, rflx, lbc=lbc, hwidth=hwidth). Observers config. must be the same between models in `fullseds` list. """ for obs in obsers: fig, ax = _plt.subplots() k = obsers.index(obs) for file in fullseds: i = fullseds.index(file) sed2data = _hdt.readfullsed2(file) obsdegs = (_np.arccos(sed2data[:, 0, 0]) * 180 / _np.pi)[obsers] obsdegs = list(obsdegs) (x, y) = lineProf( sed2data[obs, :, 2], sed2data[obs, :, 3], lbc=lbc, width=hwidth ) if rvel is None or rflx is None: refspec = _hdt.readfullsed2(fullseds[0]) (vel, flx) = lineProf( refspec[obs, :, 2], refspec[obs, :, 3], lbc=lbc, hwidth=hwidth ) else: flx = _np.interp(x, rvel, rflx) if file == fullseds[0]: ax.plot( x, y - flx, label="{0:02.1f} deg.".format(obsdegs[k]), color=_phc.colors[_np.mod(i, len(_phc.colors))], ) else: ax.plot(x, y - flx, color=_phc.colors[_np.mod(i, len(_phc.colors))]) ax.set_title("lbc = {0:.5f} $\mu$m".format(lbc)) ax.set_ylabel("Difference spectra (spec - ref.)") ax.legend(fontsize=8, loc="best", fancybox=True, framealpha=0.5) figname = outpath + "modsdiff_lbc{1:.4f}_obs{0:02.1f}{2}".format( obsdegs[k], lbc, addsuf ) for f in fmt: print("# Saved {1}.{0}".format(f, figname)) _plt.close() return
[docs]def diffPlotLineFits( specs, lbc=0.6564606, fmt=["png"], xlim=None, rvel=None, rflx=None, hwidth=1500.0, addsuf="", cmapn="jet", dlim=None, outpath="", ylim=None, ): """Generate overplot of DIFFERENCE spec. line from a FITS files list. The observations will be linearly interpolated with the reference spec. If none is given as reference, then it assumes the first of the list. It is recommend to run first (rvel, rflx) = lineProf(rvel, rflx, lbc=lbc, hwidth=hwidth). If `cmap` is None or empty, the phc.colors vector is read. """ fig, ax = _plt.subplots() for spec in specs: i = specs.index(spec) print("# Reading {0}...".format(_phc.trimpathname(spec)[1])) wl, flux, MJD, dateobs, datereduc, fitsfile = loadfits(spec) (x, y) = lineProf(wl, flux, lbc=lbc, hwidth=hwidth) if rvel is None or rflx is None: # wl0, flux0, MJD, dateobs0, datereduc, fitsfile = \ # loadfits(specs[0]) # (rvel,flx) = lineProf(wl0, flux0, lbc=lbc, hwidth=hwidth) # flx = _np.interp(x, rvel, rflx) rvel = x rflx = y flx = y[:] else: flx = _np.interp(x, rvel, rflx) # if spec == specs[0]: # ax.plot(x, y-flx, label='{0}'.format(dateobs), \ # color= _phc.colors[_np.mod(i, len(_phc.colors))]) # else: # ax.plot(x, y-flx, color= _phc.colors[_np.mod(i, # len(_phc.colors))]) if dateobs.find("-") > 0: dateobs = dateobs[:10] elif dateobs.find("/") > 0: dtobs = dateobs.split("/")[::-1] dateobs = "-".join(dtobs) if dlim is None: cor = _phc.colors[_np.mod(i, len(_phc.colors))] else: cor = _phc.gradColor([MJD], min=dlim[0], max=dlim[1], cmapn=cmapn)[0] ax.plot(x, y - flx, label="{0}".format(dateobs), color=cor) ylabel = "Difference spectra" fig, ax = plot_line_str( fig, ax, lbc=lbc, ylabel=ylabel, xlim=xlim, dlim=dlim, cmapn=cmapn, ylim=ylim ) figname = outpath + "fitsdiff_lbc{1:.4f}{0}".format(addsuf, lbc) _phc.savefig(fig, figname, fmt) return
[docs]def diffPlotLineObs( fullseds, obsers=[0], lbc=0.6564606, fmt=["png"], rvel=None, rflx=None, hwidth=1000.0, addsuf="", outpath="", ): """Generate overplot of DIFFERENCE spec. line from a HDUST OBSERVERS list. The model will be linearly interpolated with the reference spec. If none is given as reference, then it assumes the first observer of the list. It is recommend to run first (rvel, rflx) = lineProf(rvel, rflx, lbc=lbc, hwidth=hwidth). Observers config. must be the same between models in `fullseds` list. """ for file in fullseds: fig, ax = _plt.subplots() sed2data = _hdt.readfullsed2(file) obsdegs = (_np.arccos(sed2data[:, 0, 0]) * 180 / _np.pi)[obsers] obsdegs = list(obsdegs) for obs in obsers: i = obsers.index(obs) (x, y) = lineProf( sed2data[obs, :, 2], sed2data[obs, :, 3], lbc=lbc, hwidth=hwidth ) if rvel is None or rflx is None: (vel, flx) = lineProf( sed2data[obsers[0], :, 2], sed2data[obsers[0], :, 3], lbc=lbc, hwidth=hwidth, ) else: flx = _np.interp(x, rvel, rflx) ax.plot( x, y - flx, label="{0:02.1f} deg.".format(obsdegs[i]), color=_phc.colors[_np.mod(i, len(_phc.colors))], ) ax.set_title("lbc={0:.5f}$\mu$m, {1}".format(lbc, _phc.trimpathname(file)[1])) ax.set_ylabel("Difference spectra (spec - ref.)") ax.legend(fontsize=8, loc="best", fancybox=True, framealpha=0.5) figname = outpath + "modsdiff_lbc{1:.4f}{0}".format(addsuf, lbc) for f in fmt: print("# Saved {1}.{0}".format(f, figname)) fig.savefig(figname + ".{0}".format(f), transparent=True) _plt.close() return
[docs]def max_func_pts(x, y, ws=0.01, avgbox=3): """`ws` window size where the maximum will be evaluated. Example: `ws=0.02` corresponds to 2% of the length of the input.""" x, y = (_np.array(x), _np.array(y)) N = len(x) parts = _phc.splitequal(N * ws, N) n = len(parts) xout, yout = (_np.zeros(n), _np.zeros(n)) for i in range(n): p = parts[i] Y = y[p[0] : p[1]] X = x[p[0] : p[1]] idx = _np.argsort(Y) xout[i] = _np.average(X[idx][-avgbox:]) yout[i] = _np.average(Y[idx][-avgbox:]) return xout, yout
def sum_ec(fwl, fflx): dmin = _np.inf wlmin = _np.inf wlmax = 0 for f in fwl: if _np.min(_np.diff(f)) < dmin: dmin = _np.min(_np.diff(f)) if _np.min(f) < wlmin: wlmin = _np.min(f) if _np.max(f) > wlmax: wlmax = _np.max(f) swl = _np.arange(wlmin, wlmax, dmin) sflx = _np.zeros(len(swl)) for i in range(len(fwl)): idx = _np.where((swl > _np.min(fwl[i])) & (swl < _np.max(fwl[i]))) sflx[idx] += _np.interp(swl[idx], fwl[i], fflx[i]) return swl, sflx
[docs]def lbdc2range(lbdc): """Function doc""" dl = lbdc[1] - lbdc[0] return _np.linspace(lbdc[0] - dl / 2, lbdc[-1] + dl / 2, len(lbdc) + 1)
[docs]def classify_specs(list_of_specs, starid, instrument, calib, comment=""): """Do useful things with a list of FITS specs to classify them. It will (1) generate figures of the specs, with line info; (2) classify the band of observation; (3) copy the file with a standard name. """ lines = [6562.79, 4861.35, 4340.472, 4101.734, 21655.2488] lnames = ["Ha", "Hb", "Hc", "Hd", "Brg"] list_of_specs = list(list_of_specs) list_of_specs.sort() for s in list_of_specs: print(s) wl, flux, MJD, dateobs, datereduc, fitsfiles = loadfits(s) fig, ax = _plt.subplots() ax.plot(wl, flux, label=dateobs) wlrange = [_np.min(wl), _np.max(wl)] flxrange = [_np.min(flux), _np.max(flux)] band = "unknown" # print(wlrange) for l in lines: if _phc.is_inside_ranges(l, wlrange): ax.plot([l, l], flxrange, "--", color="gray") if wlrange[0] > l * 0.91 and wlrange[1] < l * 1.09: band = lnames[lines.index(l)] if band == "unknown": if wlrange[1] > 9000 and wlrange[1] < 25000: band = "nIR" if wlrange[0] > 5300 and wlrange[1] < 11000: band = "RI" if wlrange[0] < 4100 and wlrange[1] < 6000: band = "BV" if wlrange[0] < 3700 and wlrange[1] < 6000: band = "UV" if wlrange[0] < 4700 and wlrange[1] > 6700: band = "Vis" ax.set_title(s) ax.legend() figname = _os.path.splitext(s) _phc.savefig(fig, figname=list(figname)[0]) expname = "{}_{}_{}".format(starid, instrument, band) if len(comment) > 0: expname += "_" + comment expname += "_{}_{:04d}".format(int(MJD), int(round(1e4 * (MJD % 1)))) expname += ".{}.fits".format(calib) _copyfile(s, expname)
[docs]def automatic_BeSS( RA, DEC, size="0.2", date_lower="1000-01-01", date_upper="3000-01-01", band_lower="6.4e-7", band_upper="6.7e-7", ): """This is a script for downloading BeSS spectra, directly from the database website, using VO Table and pandas dataframes Parameters ---------- RA : str Right ascension [° J200] as string DEC : str Declination [° J2000] as string size: str Radius of the cone search in degree as string date_lower: str Initial date in format yyyy-mm-dd as string date_upper: str Final date in format yyyy-mm-dd as string band lower: str Initial wavelength [meters] in scientific notation as string band_upper: str Final wavelength [meters] in scientific notation as string Returns ------- None, the routine downloads file in the script directory. Example ------- #Halpha for 25 Cyg from 2019-10-01 to 2020-03-27 >>> RA = "299.979" >>> DEC = "37.04" >>> date_lower = "2019-10-01" >>> date_upper = "2020-03-27" >>> automatic_BeSS(RA, DEC, size='0.1', date_lower, date_upper, band_lower='6.4e-7', band_upper='6.7e-7') #Data downloaded in the script directory ------- #Download all Ha data of a star >>> automatic_BeSS(RA="299.979", DEC="37.04") Routine written by Pedro Ticiani dos Santos IMPORTANT NOTE: When using this function, the downloaded files go to the script directory. This is something still undergoing work. """ user_url = "http://basebe.obspm.fr/cgi-bin/ssapBE.pl?POS={0},{1}&SIZE={2}&BAND={3}/{4}&TIME={5}/{6}".format( RA, DEC, size, band_lower, band_upper, date_lower, date_upper ) r = _requests.get(url=user_url) # xml parsed => dict global_dict = _xmltodict.parse(r.text) # Interesting data selection entries_list = global_dict["VOTABLE"]["RESOURCE"]["TABLE"]["DATA"]["TABLEDATA"][ "TR" ] # Dataframe init (eq. Table) df01 = _pd.DataFrame() # Browse through the entries and record it in the dataframe df01 for item in entries_list: # Create a row for the dataframe p01 = { "Fits URL": item["TD"][0], "Target name": item["TD"][45], "Target class": item["TD"][46], "vo_format": item["TD"][1], } # add row in progress in the dataframe df01 = df01.append(p01, ignore_index=True) # Dataframe init df02 = _pd.DataFrame() # Iteration on each row for item in entries_list: vo_url_fits = item["TD"][0] try: # Download of each file in progress with his url file_bess = _wget.download(vo_url_fits) # Opening FITS fits_in_progress = _pyfits.open(file_bess) # Retrieve header information for fits in progress header_fits_ip = fits_in_progress[1].header # catch potential errors except IOError: print("Error downloading fits file.") # Create a row for the dataframe # with VO Table value + Header infos p02 = { "Fits URL": item["TD"][0], "Target name": item["TD"][45], "Target class": item["TD"][46], "Resolution": header_fits_ip["SPEC_RES"], "Creation Date": header_fits_ip["DATE"], } # add row in progress in the dataframe df02 = df02.append(p02, ignore_index=True) # if you want to download only the first file, change : to 1. download = _pyfits.open(_wget.download(df02.iloc[0][:])) # THE FILES DOWNLOADED ARE IN VOTABLE FORMAT. Some scripts must be changed # in the .fits reading part when extracting wavelength and flux values. return
# MAIN ### if __name__ == "__main__": pass