#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of the
# Pyedra Project (https://github.com/milicolazo/Pyedra/).
# Copyright (c) 2020, Milagros Colazo
# License: MIT
# Full Text: https://github.com/milicolazo/Pyedra/blob/master/LICENSE
# ============================================================================
# DOCS
# ============================================================================
"""Implementation of phase function for asteroids."""
# =============================================================================
# IMPORTS
# =============================================================================
import attr
import matplotlib.pyplot as plt
from matplotlib import cm
import numpy as np
import pandas as pd
import scipy
import scipy.interpolate
import scipy.optimize as optimization
from . import core, datasets
# ============================================================================
# CLASSES
# ============================================================================
[docs]@attr.s(frozen=True)
class HG1G2Plot(core.BasePlot):
"""Plots for HG1G2 fit."""
default_plot_kind = "curvefit"
[docs] def curvefit(
self,
df,
idc="id",
alphac="alpha",
magc="v",
ax=None,
cmap=None,
fit_kwargs=None,
data_kwargs=None,
):
"""Plot the phase function using the HG1G2 model.
Parameters
----------
df: ``pandas.DataFrame``
The dataframe must with the values
idc : ``str``, optional (default=id)
Column with the mpc number of the asteroids.
alphac : ``str``, optional (default=alpha)
Column with the phase angle of the asteroids.
magc : ``str``, optional (default=v)
Column with the magnitude. The default 'v' value is reference
to the reduced magnitude in Johnson's V filter.
ax : ``matplotlib.pyplot.Axis``, (optional)
Matplotlib axis
cmap : ``None``, ``str`` or calable (optional)
Name of the color map to be used
(https://matplotlib.org/users/colormaps.html).
If is None, the default colors of the matplotlib.pyplot.plot
function is used, and if, and is a callable is used as
colormap generator.
fit_kwargs: ``dict`` or ``None`` (optional)
The parameters to send to the fit curve plot.
Only ``label`` and ``color`` can't be provided.
data_kwargs: ``dict`` or ``None`` (optional)
The parameters to send to the data plot.
Only ``label`` and ``color`` can't be provided.
Return
------
``matplotlib.pyplot.Axis`` :
The axis where the method draws.
"""
def fit_y(d, e, f):
y = d - 2.5 * np.log10(e * fi1 + f * fi2 + (1 - e - f) * fi3)
return y
if ax is None:
ax = plt.gca()
fig = ax.get_figure()
fig.set_size_inches(self.DEFAULT_FIGURE_SIZE)
ax.invert_yaxis()
ax.set_title("HG1G2 - Phase curves")
ax.set_xlabel("Phase angle")
ax.set_ylabel(magc.upper())
fit_kwargs = {} if fit_kwargs is None else fit_kwargs
fit_kwargs.setdefault("ls", "--")
fit_kwargs.setdefault("alpha", 0.5)
data_kwargs = {} if data_kwargs is None else data_kwargs
data_kwargs.setdefault("marker", "o")
data_kwargs.setdefault("ls", "None")
model_size = len(self.pdf.model_df)
if cmap is None:
colors = [None] * model_size
elif callable(cmap):
colors = cmap(np.linspace(0, 1, model_size))
else:
cmap = cm.get_cmap(cmap)
colors = cmap(np.linspace(0, 1, model_size))
for idx, m_row in self.pdf.iterrows():
row_id = int(m_row.id)
data = df[df[idc] == m_row.id]
fi1 = np.array([])
fi2 = np.array([])
fi3 = np.array([])
for alpha_b in data[alphac]:
p1 = self.pdf.metadata.y_interp1(alpha_b)
fi1 = np.append(fi1, p1)
p2 = self.pdf.metadata.y_interp2(alpha_b)
fi2 = np.append(fi2, p2)
p3 = self.pdf.metadata.y_interp3(alpha_b)
fi3 = np.append(fi3, p3)
v_fit = fit_y(m_row.H12, m_row.G1, m_row.G2)
line = ax.plot(
data[alphac],
v_fit,
label=f"Fit #{row_id}",
color=colors[idx],
**fit_kwargs,
)
# data part
ax.plot(
data[alphac],
data[magc],
color=line[0].get_color(),
label=f"Data #{row_id}",
**data_kwargs,
)
# reorder legend for two columns
handles, labels = ax.get_legend_handles_labels()
labels, handles = zip(
*sorted(zip(labels, handles), key=lambda t: t[0])
)
ax.legend(handles, labels, ncol=2, loc="best")
return ax
# ============================================================================
# FUNCTIONS
# ============================================================================
def _HG1G2_model(X, a, b, c):
x, y, z = X
return a * x + b * y + c * z
[docs]def HG1G2_fit(df, idc="id", alphac="alpha", magc="v"):
"""Fit (H-G1-G2) system to data from table.
HG1G2_fit calculates the H,G1 and G2 parameters of the phase
function following the procedure described in [5]_ .
Parameters
----------
df: ``pandas.DataFrame``
The dataframe must with the values
idc : ``str``, optional (default=id)
Column with the mpc number of the asteroids.
alphac : ``str``, optional (default=alpha)
Column with the phase angle of the asteroids.
magc : ``str``, optional (default=v)
Column with the magnitude. The default 'v' value is reference
to the reduced magnitude in Johnson's V filter.
Returns
-------
``PyedraFitDataFrame``
The output contains eight columns: id (mpc number of
the asteroid), H (absolute magnitude returned by the fit),
H error (fit H parameter error), G1 (G1 parameter returned by
the fit), G1 error (fit G1 parameter error), G2 (G2 parameter
returned bythe fit), G2 error (fit G2 parameter error), and R
(fit determination coefficient).
References
----------
.. [5] Muinonen K., Belskaya I. N., Cellino A., Delbò M.,
Levasseur-Regourd A.-C.,Penttilä A., Tedesco E. F., 2010,
Icarus, 209, 542.
"""
lt = core.obs_counter(df, 3, idc, alphac)
if len(lt):
lt_str = " - ".join(str(idx) for idx in lt)
raise ValueError(
f"Some asteroids has less than 3 observations: {lt_str}"
)
noob = df.drop_duplicates(subset=idc, keep="first", inplace=False)
size = len(noob)
id_column = np.empty(size, dtype=int)
H_1_2_column = np.empty(size)
error_H_1_2_column = np.empty(size)
G_1_column = np.empty(size)
error_G_1_column = np.empty(size)
G_2_column = np.empty(size)
error_G_2_column = np.empty(size)
R_column = np.empty(size)
observations = np.empty(size, dtype=int)
penttila2016 = datasets.load_penttila2016()
alpha = penttila2016["alpha"].to_numpy()
phi1 = penttila2016["phi1"].to_numpy()
phi2 = penttila2016["phi2"].to_numpy()
phi3 = penttila2016["phi3"].to_numpy()
y_interp1 = scipy.interpolate.interp1d(alpha, phi1)
y_interp2 = scipy.interpolate.interp1d(alpha, phi2)
y_interp3 = scipy.interpolate.interp1d(alpha, phi3)
for idx, id in enumerate(noob[idc]):
data = df[df[idc] == id]
fi1 = np.array([])
fi2 = np.array([])
fi3 = np.array([])
for alpha_b in data[alphac]:
p1 = y_interp1(alpha_b)
fi1 = np.append(fi1, p1)
p2 = y_interp2(alpha_b)
fi2 = np.append(fi2, p2)
p3 = y_interp3(alpha_b)
fi3 = np.append(fi3, p3)
v = data[magc].to_numpy()
v_fit = 10 ** (-0.4 * v)
op, cov = optimization.curve_fit(_HG1G2_model, (fi1, fi2, fi3), v_fit)
a, b, c = op
error_a, error_b, error_c = np.sqrt(np.diag(cov))
H_1_2 = -2.5 * np.log10(a + b + c)
error_H_1_2 = (
1.0857362
* np.sqrt(error_a ** 2 + error_b ** 2 + error_c ** 2)
/ (a + b + c)
)
G_1 = a / (a + b + c)
error_G_1 = np.sqrt(
((b + c) * error_a) ** 2 + (a * error_b) ** 2 + (a * error_c) ** 2
) / ((a + b + c) ** 2)
G_2 = b / (a + b + c)
error_G_2 = np.sqrt(
(b * error_a) ** 2 + ((a + c) * error_b) ** 2 + (b * error_c) ** 2
) / ((a + b + c) ** 2)
residuals = v_fit - _HG1G2_model((fi1, fi2, fi3), *op)
ss_res = np.sum(residuals ** 2)
ss_tot = np.sum((v_fit - np.mean(v_fit)) ** 2)
r_squared = 1 - (ss_res / ss_tot)
id_column[idx] = id
H_1_2_column[idx] = H_1_2
error_H_1_2_column[idx] = error_H_1_2
G_1_column[idx] = G_1
error_G_1_column[idx] = error_G_1
G_2_column[idx] = G_2
error_G_2_column[idx] = error_G_2
R_column[idx] = r_squared
observations[idx] = len(data)
model_df = pd.DataFrame(
{
"id": id_column,
"H12": H_1_2_column,
"error_H12": error_H_1_2_column,
"G1": G_1_column,
"error_G1": error_G_1_column,
"G2": G_2_column,
"error_G2": error_G_2_column,
"R": R_column,
"observations": observations,
}
)
metadata = {
"y_interp1": y_interp1,
"y_interp2": y_interp2,
"y_interp3": y_interp3,
}
return core.PyedraFitDataFrame(
model_df=model_df, plot_cls=HG1G2Plot, model="HG1G2", metadata=metadata
)