Source code for pyhdust.singscat

# -*- coding:utf-8 -*-

r"""PyHdust *singscat* module: Single Scatering Dumbbell+disk model.

This release contains a simplified version of the one applied to Sigma Ori E in
Carciofi+2013.

Conditions:
- Calculations for a given observer position: observer at z = +infinity
- Single scattering by electrons (Thomson)
- Star is a point-source (*or at least, a spherical emitting source)
- :math:`\theta` is latitude. 0 = North pole; 180 = South pole.
- :math:`\phi` is longitude
- Spherical coordinates seguence (:math:`r`, :math:`\phi`, :math:`\theta`)

Remeber:
- phi=0 is x direction (perpendicular to the observer)
- (x, y) are not the same for (N, -E) astronomical images

:license: GNU GPL v3.0  https://github.com/danmoser/pyhdust/blob/master/LICENSE
"""
from __future__ import print_function

# import os as _os
import numpy as _np

# import time as _time
# import pyhdust as _hdt
# import pyhdust.jdcal as _jdcal
# import pyhdust.poltools as _polt
import pyhdust.phc as _phc

# import pyhdust.triangle as _triangle
import warnings as _warn

try:
    import matplotlib.pyplot as _plt
    import matplotlib.gridspec as _gridspec
    from mpl_toolkits.mplot3d import Axes3D as _Axes3D

    # import emcee as _emcee
    # from scipy.stats import percentileofscore as _perct
    # from scipy import optimize as _optimize
    from scipy import interpolate as _interpolate

except ImportError:
    _warn.warn("matplotlib, scipy and/or emcee module not installed!!!")

__author__ = "Daniel Moser"
__email__ = "dmfaes@gmail.com"


# sing scat mod base
[docs]def sph_hyper_rej(res): """Hypercube rejection method""" x0 = _np.linspace(-1, 1, res) y0 = _np.linspace(-1, 1, res) z0 = _np.linspace(-1, 1, res) xx0, yy0, zz0 = _np.meshgrid(x0, y0, z0) rr = _np.sqrt(xx0**2 + yy0**2 + zz0**2) idx = _np.where(rr <= 1) return xx0[idx], yy0[idx], zz0[idx]
[docs]def blobs_coords(db=2, rb=1.0 / 3, res=3, Req=1.0, phi0=0, beta=0.0): """ :param db: distance from stellar center to blob center (Req) :param rb: radius of the blob (sphere; Req) :param res: **linear** resolution used in the *Hypercube rejection* method. :param Req: stellar equatorial radius (in cm) :param phi0: central position of the blob (in rad) """ x, y, z = sph_hyper_rej(res) x, y, z = _phc.cart_rot(x, y, z, ang_xy=0.0, ang_yz=beta, ang_zx=0.0) r, phi, th = _phc.cart2sph((x * rb + db), y * rb, z * rb) phi += phi0 return r * Req, phi, th
[docs]def cil_slice_rej(res, dd, rd): """NO *z* dimension""" x0 = _np.linspace(-1, 1, res) * (rd + dd) y0 = _np.linspace(-1, 1, res) * (rd + dd) xx0, yy0 = _np.meshgrid(x0, y0) rr = _np.sqrt(xx0**2 + yy0**2) idx = _np.where((rr <= dd + rd) & (rr >= dd - rd)) return xx0[idx], yy0[idx], _np.zeros(_np.shape(xx0[idx]))
[docs]def disk_coords(dd=2, rd=1.0 / 3, res=11.0, Req=1.0, beta=0.0, phi0=0.0): """Cilinder slice rejection method :param beta: obliquity, in rad :param phi0: **only** effective when beta != 0. """ x, y, z = cil_slice_rej(res, dd, rd) # The disk is invariant with :math:`\delta\phi`. So, instead of appling # phi0=_np.pi/2 and ang_yz, one could directly use ang_zx. # x, y, z = _phc.cart_rot(x, y, z, ang_xy=0., ang_yz=0., ang_zx=beta) x, y, z = _phc.cart_rot(x, y, z, ang_xy=0.0, ang_yz=beta, ang_zx=0.0) r, phi, th = _phc.cart2sph(x, y, z) phi += phi0 return r * Req, phi, th
def idx_phi_blobdisk_coords(phib, rd, phid, thd): # idx = _np.where( (_np.cos(phid) < _np.min(_np.cos(phib))) | # (_np.cos(phid) > _np.max(_np.cos(phib))) ) # return rd[idx], phid[idx], thd[idx] # Function standard: always between 0 to 2pi # if len(radcut)!= 2: # raise ValueError("len(`radcut`) != 2: {}".format(radcut)) # elif _np.min(radcut) == _np.max(radcut): # raise ValueError("delta-radcut > 0: {}".format(radcut)) tpi = 2 * _np.pi phib = _np.array([_np.min(phib), _np.max(phib)]) phib = _np.where(phib < 0, phib + tpi, phib) phib = _np.where(phib > tpi, phib - tpi, phib) if _np.diff(phib) > _np.pi: phib = phib[::-1] phid = _np.where(phid < 0, phid + tpi, phid) phid = _np.where(phid > tpi, phid - tpi, phid) if phib[0] < phib[1]: # 350, 10 idx = _np.where((phid < phib[0]) | (phid > phib[1])) else: idx = _np.where((phid < phib[0]) & (phid > phib[1])) return rd[idx], phid[idx], thd[idx]
[docs]def idx_phi_disklimit_coords(radcut, rd, phid, thd): """Enter the coordinates of a 2*pi disk and cuts it with the limits of radcut :param radcut: vector as [-np.pi/6, np.pi/6] """ # Function standard: always between 0 to 2pi if len(radcut) != 2: raise ValueError("len(`radcut`) != 2: {}".format(radcut)) elif _np.min(radcut) == _np.max(radcut): raise ValueError("delta-radcut > 0: {}".format(radcut)) tpi = 2 * _np.pi radcut = _np.array(radcut) radcut = _np.where(radcut < 0, radcut + tpi, radcut) radcut = _np.where(radcut > tpi, radcut - tpi, radcut) phid = _np.where(phid < 0, phid + tpi, phid) phid = _np.where(phid > tpi, phid - tpi, phid) if radcut[0] < radcut[1]: # 350, 10 idx = _np.where((phid < radcut[0]) | (phid > radcut[1])) else: idx = _np.where((phid < radcut[0]) & (phid > radcut[1])) return rd[idx], phid[idx], thd[idx]
[docs]def stokes(r, phi, th, irad, dV, ne, Req, occult=True): """ :param irad: in rad :param dV: in cm3 TODO: intensity calc. output in fraction (multiple by 100 to have percentage """ x, y, z = _phc.sph2cart(r, phi, th) x, y, z = _phc.cart_rot(x, y, z, 0.0, irad, 0.0) norm = ( 3 * _phc.sigT.cgs / 8 / _np.pi * (1.0 / r) ** 2 * _np.cos(_np.arctan(Req / r)) * dV * ne ) cos_chi = _np.cos(th) * _np.cos(irad) + _np.cos(phi - 0) * _np.sin(th) * _np.sin( irad ) sI = norm / 2.0 * (1 + cos_chi**2) sQ = norm / 2.0 * ((x / r) ** 2 - (y / r) ** 2) sU = norm / 2.0 * (2.0 * x * y / r**2) if occult is True: ind = _np.where((x**2 + y**2 < Req**2) & (z < 0)) sI[ind] = 0.0 sQ[ind] = 0.0 sU[ind] = 0.0 # ind = (x**2 + y**2 < Req**2) & (z > 0) # sI[ind] = sI[ind] return sI, sQ, sU, _np.zeros(len(r))
# single scat higher level
[docs]def blobsdiskmodel_geo( phi0, Req=_phc.Rsun.cgs, rb=1 / 3.0, db=2.0, bres=3, nb=8e11, rd=1 / 3.0, dd=2.0, dres=11, H=0.1, beta=0.0, nd=8e11, overlap=False, ): r"""Doc :param rb: in Req units :param overlap: disk and blob overlap each other? Default is `False`. Warning: only :math:`\phi` direction is checked (ie., coplanar overlap). :param beta: obliquity, in rad """ r1, phi1, th1 = blobs_coords(phi0=phi0, rb=rb, Req=Req, db=db, res=bres, beta=beta) xb1, yb1, zb1 = _phc.sph2cart(r1, phi1, th1) r2, phi2, th2 = blobs_coords( phi0=phi0 + _np.pi, rb=rb, Req=Req, db=db, res=bres, beta=beta ) xb2, yb2, zb2 = _phc.sph2cart(r2, phi2, th2) Vb = 4.0 / 3 * _np.pi * (rb * Req) ** 3 dVb = _np.zeros(len(r1)) + Vb / len(r1) nb = _np.zeros(len(r1)) + nb rD, phid, thd = disk_coords(rd=rd, Req=Req, dd=dd, res=dres, beta=beta, phi0=phi0) area_d = _np.pi * ((dd + rd) ** 2 - (dd - rd) ** 2) Vd = area_d * H * Req**3 if not overlap: # Remove the overlap of the disk over the blobs rD, phid, thd = idx_phi_blobdisk_coords(phi1, rD, phid, thd) rD, phid, thd = idx_phi_blobdisk_coords(phi2, rD, phid, thd) area_bd = _np.pi * rd**2 Vd = (area_d - 2 * area_bd) * H * Req**3 dVd = _np.zeros(len(rD)) + Vd / len(rD) nd = _np.zeros(len(rD)) + nd xd, yd, zd = _phc.sph2cart(rD, phid, thd) return ( _np.concatenate((xb1, xb2, xd)), _np.concatenate((yb1, yb2, yd)), _np.concatenate((zb1, zb2, zd)), _np.concatenate((r1, r2, rD)), _np.concatenate((phi1, phi2, phid)), _np.concatenate((th1, th2, thd)), _np.concatenate((dVb, dVb, dVd)), _np.concatenate((nb, nb, nd)), )
[docs]def blobsmodel_geo( phi0, Req=_phc.Rsun.cgs, rb=1 / 3.0, db=2.0, bres=3, nb=8e11, beta=0.0 ): """Doc :param overlap: disk and blob overlap each other? Default is `False`. Warning: only :math:`\phi` direction is checked (ie., coplanar overlap). :param beta: obliquity, in rad """ r1, phi1, th1 = blobs_coords(phi0=phi0, rb=rb, Req=Req, db=db, res=bres, beta=beta) xb1, yb1, zb1 = _phc.sph2cart(r1, phi1, th1) r2, phi2, th2 = blobs_coords( phi0=phi0 + _np.pi, rb=rb, Req=Req, db=db, res=bres, beta=beta ) xb2, yb2, zb2 = _phc.sph2cart(r2, phi2, th2) Vb = 4.0 / 3 * _np.pi * (rb * Req) ** 3 dVb = _np.zeros(len(r1)) + Vb / len(r1) nb = _np.zeros(len(r1)) + nb return ( _np.concatenate((xb1, xb2)), _np.concatenate((yb1, yb2)), _np.concatenate((zb1, zb2)), _np.concatenate((r1, r2)), _np.concatenate((phi1, phi2)), _np.concatenate((th1, th2)), _np.concatenate((dVb, dVb)), _np.concatenate((nb, nb)), )
[docs]def diskmodel_geo( phi0, Req=_phc.Rsun.cgs, rd=1 / 3.0, dd=2.0, dres=11, H=0.1, beta=0.0, nd=8e11, radcut=[], ): """Doc :param rd: in Req units :param beta: obliquity, in rad :param radcut: vector as [[1*np.pi/4, 3*np.pi/4], [5*np.pi/4, 7*np.pi/4]]. It must be around pi/2 and 3pi/2 for phase consistency. ``[]`` returns a full (2*pi) disk (Radian units). """ rD, phid, thd = disk_coords(rd=rd, Req=Req, dd=dd, res=dres, beta=beta, phi0=phi0) total_phi_disk = 2 * _np.pi for philim in radcut: # Remove an interval from the disk model philim = _np.array(philim) + phi0 rD, phid, thd = idx_phi_disklimit_coords(philim, rD, phid, thd) total_phi_disk -= _np.max(philim) - _np.min(philim) area_d = (total_phi_disk / 2.0) * ((dd + rd) ** 2 - (dd - rd) ** 2) Vd = area_d * H * Req**3 dVd = _np.zeros(len(rD)) + Vd / len(rD) nd = _np.zeros(len(rD)) + nd xd, yd, zd = _phc.sph2cart(rD, phid, thd) return xd, yd, zd, rD, phid, thd, dVd, nd
[docs]def blobs_cicle(phi_ar, irad, Req=_phc.Rsun.cgs, rb=1 / 3.0, db=2.0, bres=3, nb=8e11): """Calculates the stokes parameters of blobsmodel over a full cycle :param phi_ar: phi array (in rad; [0, 2pi)) """ stk = _np.zeros((len(phi_ar), 4)) for i in range(len(phi_ar)): phi0 = phi_ar[i] x, y, z, r, phi, th, dV, ne = blobsmodel_geo( phi0, Req=Req, rb=rb, db=db, bres=bres ) istk = stokes(r, phi, th, dV=dV, ne=ne, irad=irad, Req=Req) stk[i] = _np.array(istk).sum(axis=1) return stk.T
[docs]def blobsdisk_cicle( phi_ar, irad, Req=_phc.Rsun.cgs, rb=1 / 3.0, db=2.0, bres=3, nb=8e11, rd=1 / 3.0, dd=2.0, dres=11, H=0.1, beta=0.0, nd=8e11, overlap=False, ): """Calculates the stokes parameters of blobs+disk model over a full cycle :param phi_ar: phi array (in rad; [0, 2pi)) """ stk = _np.zeros((len(phi_ar), 4)) for i in range(len(phi_ar)): phi0 = phi_ar[i] x, y, z, r, phi, th, dV, ne = blobsdiskmodel_geo( phi0, Req=Req, rb=rb, db=db, bres=bres, nb=nb, rd=rd, dd=dd, dres=dres, H=H, beta=beta, nd=nd, overlap=overlap, ) istk = stokes(r, phi, th, dV=dV, ne=ne, irad=irad, Req=Req) stk[i] = _np.array(istk).sum(axis=1) return stk.T
[docs]def disk_cicle( phi_ar, irad, Req=_phc.Rsun.cgs, rd=1 / 3.0, dd=2.0, dres=11, H=0.1, beta=0.0, nd=8e11, radcut=[], ): """Calculates the stokes parameters of disk model over a full cycle :param phi_ar: phi array (in rad; [0, 2pi)) :param radcut: vector as [[-np.pi/4, np.pi/4], [-np.pi/4+np.pi, np.pi/4+np.pi]]. ``[]`` returns a full (2*pi) disk (Radian units). """ stk = _np.zeros((len(phi_ar), 4)) for i in range(len(phi_ar)): phi0 = phi_ar[i] x, y, z, r, phi, th, dV, ne = diskmodel_geo( phi0, Req=Req, rd=rd, dd=dd, dres=dres, H=H, beta=beta, nd=nd, radcut=radcut ) istk = stokes(r, phi, th, dV=dV, ne=ne, irad=irad, Req=Req) stk[i] = _np.array(istk).sum(axis=1) return stk.T
# general
[docs]def angQU(Q, U, filter=True): """Calculate [Q, U] angles with filters* (avg-180 < degs > avg+180)""" Q = _np.array(Q) U = _np.array(U) ind = _np.where(Q == 0) Q[ind] = 1e-34 ang = _np.arctan(U / Q) # ind = _np.where(Q <= 0.0) ang[ind] = ang[ind] + _np.pi ind = _np.where((Q > 0) & (U < 0)) ang[ind] = ang[ind] + 2 * _np.pi ang = ang / 2.0 # ind = _np.where(ang >= _np.pi) # ang[ind] = ang[ind] - _np.pi if filter: avg = _np.median(ang) avg = _phc.find_nearest([0, _np.pi / 4, _np.pi / 2, _np.pi * 3.0 / 4], avg) ind = _np.where((ang - avg) > 1.0 / 2 * _np.pi) ang[ind] = ang[ind] - _np.pi ind = _np.where((ang - avg) < -1.0 / 2 * _np.pi) ang[ind] = ang[ind] + _np.pi return ang
[docs]def mod2obs(Qmod, Umod, Qis, Uis, ths): """MODEL TO OBSERV :param ths: theta sky in rad :param Qis: IS Q :param Uis: IS U """ ang = angQU(Qmod, Umod) Qobc = _np.sqrt(Qmod**2 + Umod**2) * _np.cos(2 * (ang + ths)) Uobc = _np.sqrt(Qmod**2 + Umod**2) * _np.sin(2 * (ang + ths)) Qobc = Qobc + Qis Uobc = Uobc + Uis return Qobc, Uobc
[docs]def obs2mod(Qobs, Uobs, Qis, Uis, ths): """### MODEL TO OBSERV ### :param ths: theta sky""" Qcob = Qobs - Qis Ucob = Uobs - Uis ang = angQU(Qcob, Ucob) Qcob = _np.sqrt(Qcob**2 + Ucob**2) * _np.cos(2 * (ang - ths)) Ucob = _np.sqrt(Qcob**2 + Ucob**2) * _np.sin(2 * (ang - ths)) return Qcob, Ucob
[docs]def calc_phase(MJDar, period, MJD0=0.0): """Calc phase [0,1)""" phase = _np.modf((_np.array(MJDar) - MJD0) / period)[0] return _np.where(phase > 0, phase, phase + 1)
# beacon
[docs]def loadpol(txt, old=False, p_sigP=0): """Load polarization txt file. :param p_sigP: if > 0, filter values P/sigP """ dtb = _np.genfromtxt(txt, dtype=str) if not old: dtb = _np.core.records.fromarrays( dtb.transpose(), names="MJD,night,ccd,filt,calc,stdstars,dth,sigdth,P,Q,U,th,sigP," "sigth,outfile,star,flag,tags", formats="f8,{0},{0},{0},f8,{0},f8,f8,f8,f8,f8,f8,f8,f8,{0},{0},{0}" ",{0}".format(dtb.dtype), ) else: dtb = _np.core.records.fromarrays( dtb.transpose(), names="MJD,night," "filt,calc,stdstars,dth,devdth,P,Q,U,th,sigP,sigth", formats="f8,{0},{0},f8,{0},f8,f8,f8,f8,f8,f8,f8,f8".format(dtb.dtype), ) if p_sigP > 0: idx = _np.where(dtb["P"] / dtb["sigP"] >= p_sigP) return dtb[idx] return dtb
# plotting
[docs]def plot_coords(xb1, yb1, zb1, Req, slim=3): """:param slim: Scale limit [-slim, slim]""" fig = _plt.figure() lins, cols = (2, 2) gs = _gridspec.GridSpec(lins, cols) ax = _plt.subplot(gs[0, 0], projection="3d") # first line, first col ax10 = _plt.subplot(gs[1, 0]) # second line, first col ax01 = _plt.subplot(gs[0, 1]) ax11 = _plt.subplot(gs[1, 1]) ax.scatter(xb1 / Req, yb1 / Req, zb1 / Req) ax.set_xlim3d(-slim, slim) ax.set_ylim3d(-slim, slim) ax.set_zlim3d(-slim, slim) ax10.scatter(yb1 / Req, zb1 / Req) ax10.set_xlabel("y") ax10.set_ylabel("z") ax10.set_xlim(-slim, slim) ax10.set_ylim(-slim, slim) ax01.scatter(xb1 / Req, yb1 / Req) ax01.set_xlabel("x") ax01.set_ylabel("y") ax01.set_xlim(-slim, slim) ax01.set_ylim(-slim, slim) ax11.scatter(xb1 / Req, zb1 / Req) ax11.set_xlabel("x") ax11.set_ylabel("z") ax11.set_xlim(-slim, slim) ax11.set_ylim(-slim, slim) return fig, [ax, ax01, ax10, ax11]
[docs]def plot_QU_gd(x, y, Q, U, irad, Req): """using griddata""" fig = _plt.figure() lins, cols = (1, 2) gs = _gridspec.GridSpec(lins, cols) axq = _plt.subplot(gs[0, 0]) axu = _plt.subplot(gs[0, 1]) xmin = _np.min(x) / Req xmax = _np.max(x) / Req ymin = _np.min(y) / Req ymax = _np.max(y) / Req xx, yy = _np.meshgrid( _np.linspace(xmin, xmax, 32), _np.linspace(ymin, ymax, 32)[::-1] ) yo = y * _np.cos(irad) q = _interpolate.griddata( _np.array([x, yo]).T / Req, Q, _np.array([xx.flatten(), yy.flatten()]).T ) u = _interpolate.griddata( _np.array([x, yo]).T / Req, U, _np.array([xx.flatten(), yy.flatten()]).T ) axq.imshow(q.reshape(32, 32), origin="lower", extent=[xmin, xmax, ymin, ymax]) axu.imshow(u.reshape(32, 32), origin="lower", extent=[xmin, xmax, ymin, ymax]) return fig, [axq, axu]
[docs]def plot_QU_bc(x, y, Q, U, irad, Req): """using baricenter TODO: discover why it is not working """ fig2 = _plt.figure() lins, cols = (1, 2) gs = _gridspec.GridSpec(lins, cols) axq2 = _plt.subplot(gs[0, 0]) axu2 = _plt.subplot(gs[0, 1]) xmin = _np.min(x) / Req xmax = _np.max(x) / Req ymin = _np.min(y) / Req ymax = _np.max(y) / Req axq2.imshow( _phc.baricent_map(x, y, Q, res=2 * 32, yfact=_np.cos(irad)), origin="lower", extent=[xmin, xmax, ymin, ymax], ) axu2.imshow( _phc.baricent_map(x, y, U, res=2 * 32, yfact=_np.cos(irad)), origin="lower", extent=[xmin, xmax, ymin, ymax], ) return fig2, [axq2, axu2]
# MAIN ### if __name__ == "__main__": pass