Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

155 mc phase fit #156

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
266 changes: 266 additions & 0 deletions notebooks/mc_phase_fit_testing.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "3f265be4",
"metadata": {},
"outputs": [],
"source": [
"%matplotlib notebook"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e4eb75bb",
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2154f51",
"metadata": {},
"outputs": [],
"source": [
"from adler.objectdata.AdlerPlanetoid import AdlerPlanetoid\n",
"from adler.objectdata.AdlerData import AdlerData\n",
"from adler.science.PhaseCurve import PhaseCurve\n",
"from adler.utilities.plotting_utilities import plot_errorbar\n",
"\n",
"# from adler.utilities.science_utilities import get_df_obs_filt\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"import matplotlib.gridspec as gridspec\n",
"import astropy.units as u\n",
"from astropy.modeling.fitting import SLSQPLSQFitter"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fdd1f0f2",
"metadata": {},
"outputs": [],
"source": [
"# ssObjectId of object to analyse\n",
"# ssoid = \"8268570668335894776\" # NEO\n",
"ssoid = \"6098332225018\" # good MBA test object\n",
"# ssoid = \"5334524274243260416\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "472609a0",
"metadata": {},
"outputs": [],
"source": [
"# fname = \"../tests/data/testing_database.db\"\n",
"fname = \"/Users/jrobinson/lsst-adler/notebooks/gen_test_data/adler_demo_testing_database.db\"\n",
"planetoid = AdlerPlanetoid.construct_from_SQL(ssoid, sql_filename=fname)\n",
"# planetoid = AdlerPlanetoid.construct_from_RSP(ssoid)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2ae7c0b",
"metadata": {},
"outputs": [],
"source": [
"resample = 100\n",
"np.random.seed(0) # set the seed to ensure reproducibility"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5b0c1412",
"metadata": {},
"outputs": [],
"source": [
"fig = plt.figure()\n",
"gs = gridspec.GridSpec(1, 1)\n",
"ax1 = plt.subplot(gs[0, 0])\n",
"\n",
"adler_cols = AdlerData(ssoid, planetoid.filter_list)\n",
"\n",
"for filt in [\"r\", \"g\"]:\n",
" # get observations and define a phase curve model to fit\n",
"\n",
" sso = planetoid.SSObject_in_filter(filt)\n",
" obs = planetoid.observations_in_filter(filt)\n",
"\n",
" H = sso.H\n",
" G12 = sso.G12\n",
" pc = PhaseCurve(H=H * u.mag, phase_parameter_1=G12, model_name=\"HG12_Pen16\")\n",
"\n",
" alpha = np.array(getattr(obs, \"phaseAngle\")) * u.deg\n",
" mag_err = np.array(getattr(obs, \"magErr\")) * u.mag\n",
" red_mag = np.array(getattr(obs, \"reduced_mag\")) * u.mag\n",
"\n",
" pc_fit = pc.FitModel(\n",
" np.array(getattr(obs, \"phaseAngle\")) * u.deg,\n",
" red_mag,\n",
" mag_err,\n",
" )\n",
" pc = pc.InitModelSbpy(pc_fit)\n",
"\n",
" # adler_cols.populate_phase_parameters(filt,**pc.ReturnModelDict())\n",
"\n",
" # add this phase curve to the figure\n",
" fig = plot_errorbar(planetoid, filt_list=[filt], fig=fig)\n",
" ax1 = fig.axes[0]\n",
" alpha_fit = np.linspace(0, np.amax(obs.phaseAngle)) * u.deg\n",
" ax1.plot(\n",
" alpha_fit.value,\n",
" pc.ReducedMag(alpha_fit).value,\n",
" label=\"{}: H={:.2f}, G12={:.2f}\".format(filt, pc.H, pc.phase_parameter_1),\n",
" )\n",
" ax1.legend()\n",
"\n",
" # Resample the data within the uncertainties and fit a new model\n",
" mc_models = []\n",
" for i in range(resample):\n",
" _red_mag = np.random.normal(loc=np.array(red_mag), scale=np.array(mag_err)) * u.mag\n",
" ax1.scatter(alpha.value, _red_mag.value, s=1, c=\"k\", alpha=0.1, zorder=5)\n",
"\n",
" _pc_fit = pc.FitModel(\n",
" np.array(getattr(obs, \"phaseAngle\")) * u.deg,\n",
" _red_mag,\n",
" )\n",
" _pc = pc.InitModelSbpy(_pc_fit)\n",
" ax1.plot(\n",
" alpha_fit.value,\n",
" _pc.ReducedMag(alpha_fit).value,\n",
" c=\"k\",\n",
" alpha=0.1,\n",
" zorder=0,\n",
" label=\"{}: H={:.2f}, G12={:.2f}\".format(filt, _pc.H, _pc.phase_parameter_1),\n",
" )\n",
"\n",
" mc_models.append(_pc)\n",
"\n",
" break\n",
"\n",
"ax1.invert_yaxis()\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc96d3d6",
"metadata": {},
"outputs": [],
"source": [
"# extract all phase parameters from the resampling test\n",
"abs_mag = np.array([x.H.value for x in mc_models])\n",
"phase_param = np.array([x.phase_parameter_1 for x in mc_models])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f43eb646-cf4e-4103-a9d4-7f8b6c8d2738",
"metadata": {},
"outputs": [],
"source": [
"# use the resample function within FitModel\n",
"np.random.seed(0) # set the seed to ensure reproducibility\n",
"pc_fit_resamp = pc.FitModel(np.array(getattr(obs, \"phaseAngle\")) * u.deg, red_mag, mag_err, resample=resample)\n",
"pc_fit_resamp = PhaseCurve().InitModelSbpy(pc_fit_resamp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0ecaff7e",
"metadata": {},
"outputs": [],
"source": [
"# plot the distributions of all FitModel methods\n",
"\n",
"fig = plt.figure()\n",
"gs = gridspec.GridSpec(1, 2)\n",
"ax1 = plt.subplot(gs[0, 0])\n",
"ax2 = plt.subplot(gs[0, 1])\n",
"\n",
"# plot the results of resampling the data and storing the fits\n",
"label = \"resample test\"\n",
"for x, ax, ax_lab in zip([abs_mag, phase_param], [ax1, ax2], [\"H\", \"phase_parameter_1\"]):\n",
" ax.hist(x, bins=\"auto\", histtype=\"step\")\n",
" mean = np.mean(x)\n",
" std = np.std(x)\n",
" print(label, ax_lab, mean, std)\n",
" ax.axvline(mean, label=label)\n",
" ax.axvspan(mean - std, mean + std, alpha=0.3)\n",
" ax.set_xlabel(ax_lab)\n",
"\n",
"# first plot results for the regular FitModel result (no resampling)\n",
"# then plot the resulting FitModel with resample results (should be similar to the resample test)\n",
"for _pc, col, label in zip([pc, pc_fit_resamp], [\"k\", \"r\"], [\"FitModel single\", \"FitModel resample\"]):\n",
" for x_name, x_err_name, ax in zip(\n",
" [\"H\", \"phase_parameter_1\"], [\"H_err\", \"phase_parameter_1_err\"], [ax1, ax2]\n",
" ):\n",
" x = getattr(_pc, x_name)\n",
" x_err = getattr(_pc, x_err_name)\n",
"\n",
" if hasattr(x, \"unit\"):\n",
" x = x.value\n",
" if hasattr(x_err, \"unit\"):\n",
" x_err = x_err.value\n",
"\n",
" print(label, x_name, x, x_err)\n",
"\n",
" ax.axvline(x, color=col, label=label)\n",
" ax.axvspan(x - x_err, x + x_err, alpha=0.3, color=col)\n",
"\n",
"ax1.legend()\n",
"ax2.legend()\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6a0cbce",
"metadata": {},
"outputs": [],
"source": [
"# resampling leads to a slight shift in fitted values\n",
"# the resampled distribution has a slightly larger \"uncertainty\""
]
}
],
"metadata": {
"kernelspec": {
"display_name": "adler-dev",
"language": "python",
"name": "adler-dev"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
65 changes: 56 additions & 9 deletions src/adler/science/PhaseCurve.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def InitModelDict(self, model_dict):

# clean the input dictionary
del_keys = []
# make sure that there is not an existing model_function parameter
if "model_function" in model_dict:
del_keys.append("model_function")
# remove keys that aren't PhaseCurve parameters
for key, value in model_dict.items():
if not hasattr(self, key):
del_keys.append(key)
Expand Down Expand Up @@ -208,7 +212,9 @@ def InitModelSbpy(self, model_sbpy):
x.unit == ""
): # if there are no units (or weird blank units?) get just the value
x = x.value
else:
if hasattr(
x, "quantity"
): # catch any sbpy parameters returned as astropy.modeling.parameters.Parameter
x = x.quantity
# look up the correct adler parameter name (accounting for additional uncertainty, "_err", parameters)
if p.endswith("_err"): # assumes the uncertainty parameter always ends in "_err"
Expand Down Expand Up @@ -296,7 +302,7 @@ def AbsMag(self, phase_angle, reduced_mag):

return abs_mag

def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None):
def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None, resample=None):
"""Fit the phasecurve model parameters to observations.
starts with a phase curve model as an initial guess for parameters.
fits model to phase angle and reduced magnitude.
Expand All @@ -321,6 +327,10 @@ def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None):
Select a fitting function from astropy.modeling.fitting, defaults to astropy.modeling.fitting.LevMarLSQFitter.
N.B. that LevMarLSQFitter cannot handle inequality constraints for the HG1G2 model, use something like SLSQPLSQFitter from astropy.modeling.fitting (does not return covariance matrix!).

resample : int
Optional - if passed this forces a monte carlo resampling of data points within their uncertainties.
This the number of times to resample and fit the phase curve.
The phase curve parameter value and uncertainties are determined from the mean and std of the fitted values respectively.
Returns
----------

Expand All @@ -339,7 +349,7 @@ def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None):
model_fit = fitter(self.model_function, phase_angle, reduced_mag)

# Add fitted uncertainties as an additional attribute within the sbpy object
if "param_cov" in fitter.fit_info:
if ("param_cov" in fitter.fit_info) and (resample is None):
# get the covariance matrix from the fit
covariance = fitter.fit_info["param_cov"]
if covariance is not None:
Expand All @@ -348,15 +358,52 @@ def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None):
# update only the uncertainties for parameters used in the fit
param_names = np.array(model_fit.param_names)
fit_mask = ~np.array([getattr(model_fit, x).fixed for x in param_names])
# for i, x in enumerate(param_names[fit_mask]):
# p = getattr(model_fit, x)
# if hasattr(p, "unit") and (p.unit is not None):
# setattr(model_fit, "{}_err".format(x), fit_errs[i] * p.unit)
# else:
# setattr(model_fit, "{}_err".format(x), fit_errs[i])
for i, x in enumerate(param_names[fit_mask]):
setattr(model_fit, "{}_err".format(x), fit_errs[i])
# setattr(model_fit, "{}_err".format(x), fit_errs[i])
# TODO: return uncertainties with units if units are passed - see MC resample code below
p = getattr(model_fit, x)
if hasattr(p, "unit") and (p.unit is not None):
setattr(model_fit, "{}_err".format(x), fit_errs[i] * p.unit)
else:
setattr(model_fit, "{}_err".format(x), fit_errs[i])
# else:
### TODO log covariance is None error here
### TODO log covariance is None error here - no uncertainties

# run an MC reasmple fit to estimate parameter value and uncertainty
elif (resample is not None) and (mag_err is not None):
mc_models = [] # list to store MC model fits
for i in range(resample): # TODO: try optimise/parallelise this loop?
_reduced_mag = np.random.normal(loc=np.array(reduced_mag), scale=np.array(mag_err)) * u.mag
_model_fit = fitter(self.model_function, phase_angle, _reduced_mag)
mc_models.append(_model_fit)

# Update the model_fit parameters with the MC values
param_names = np.array(model_fit.param_names)
# update only the uncertainties for parameters used in the fit
fit_mask = ~np.array([getattr(model_fit, x).fixed for x in param_names])
for i, x in enumerate(param_names[fit_mask]):
# check if the parameter has units and then get array of the MC model values
m = mc_models[0]
p = getattr(m, x)
if hasattr(p, "unit") and (p.unit is not None):
fit_vals = np.array([getattr(m, x).value for m in mc_models]) * p.unit
else:
fit_vals = np.array([getattr(m, x) for m in mc_models])

### TODO
# else:
# log lack of uncertainties for fitter
# run an MCMC estimate of uncertainty?
# set the parameter value as the mean of the MC values
setattr(model_fit, "{}".format(x), np.mean(fit_vals))
# set the parameter uncertainty as the std of the MC values
setattr(model_fit, "{}_err".format(x), np.std(fit_vals))

else:
# log lack of uncertainties for fitter
print("no phase curve parameter uncertainties calculated")

### if overwrite_model: # add an overwrite option?
# redo __init__ with the new fitted parameters
Expand Down
Loading
Loading