diff --git a/notebooks/mc_phase_fit_testing.ipynb b/notebooks/mc_phase_fit_testing.ipynb new file mode 100644 index 0000000..cb309e1 --- /dev/null +++ b/notebooks/mc_phase_fit_testing.ipynb @@ -0,0 +1,266 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "3f265be4", + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib notebook" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4eb75bb", + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e2154f51", + "metadata": {}, + "outputs": [], + "source": [ + "from adler.objectdata.AdlerPlanetoid import AdlerPlanetoid\n", + "from adler.objectdata.AdlerData import AdlerData\n", + "from adler.science.PhaseCurve import PhaseCurve\n", + "from adler.utilities.plotting_utilities import plot_errorbar\n", + "\n", + "# from adler.utilities.science_utilities import get_df_obs_filt\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "import matplotlib.gridspec as gridspec\n", + "import astropy.units as u\n", + "from astropy.modeling.fitting import SLSQPLSQFitter" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fdd1f0f2", + "metadata": {}, + "outputs": [], + "source": [ + "# ssObjectId of object to analyse\n", + "# ssoid = \"8268570668335894776\" # NEO\n", + "ssoid = \"6098332225018\" # good MBA test object\n", + "# ssoid = \"5334524274243260416\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "472609a0", + "metadata": {}, + "outputs": [], + "source": [ + "# fname = \"../tests/data/testing_database.db\"\n", + "fname = \"/Users/jrobinson/lsst-adler/notebooks/gen_test_data/adler_demo_testing_database.db\"\n", + "planetoid = AdlerPlanetoid.construct_from_SQL(ssoid, sql_filename=fname)\n", + "# planetoid = AdlerPlanetoid.construct_from_RSP(ssoid)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e2ae7c0b", + "metadata": {}, + "outputs": [], + "source": [ + "resample = 100\n", + "np.random.seed(0) # set the seed to ensure reproducibility" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5b0c1412", + "metadata": {}, + "outputs": [], + "source": [ + "fig = plt.figure()\n", + "gs = gridspec.GridSpec(1, 1)\n", + "ax1 = plt.subplot(gs[0, 0])\n", + "\n", + "adler_cols = AdlerData(ssoid, planetoid.filter_list)\n", + "\n", + "for filt in [\"r\", \"g\"]:\n", + " # get observations and define a phase curve model to fit\n", + "\n", + " sso = planetoid.SSObject_in_filter(filt)\n", + " obs = planetoid.observations_in_filter(filt)\n", + "\n", + " H = sso.H\n", + " G12 = sso.G12\n", + " pc = PhaseCurve(H=H * u.mag, phase_parameter_1=G12, model_name=\"HG12_Pen16\")\n", + "\n", + " alpha = np.array(getattr(obs, \"phaseAngle\")) * u.deg\n", + " mag_err = np.array(getattr(obs, \"magErr\")) * u.mag\n", + " red_mag = np.array(getattr(obs, \"reduced_mag\")) * u.mag\n", + "\n", + " pc_fit = pc.FitModel(\n", + " np.array(getattr(obs, \"phaseAngle\")) * u.deg,\n", + " red_mag,\n", + " mag_err,\n", + " )\n", + " pc = pc.InitModelSbpy(pc_fit)\n", + "\n", + " # adler_cols.populate_phase_parameters(filt,**pc.ReturnModelDict())\n", + "\n", + " # add this phase curve to the figure\n", + " fig = plot_errorbar(planetoid, filt_list=[filt], fig=fig)\n", + " ax1 = fig.axes[0]\n", + " alpha_fit = np.linspace(0, np.amax(obs.phaseAngle)) * u.deg\n", + " ax1.plot(\n", + " alpha_fit.value,\n", + " pc.ReducedMag(alpha_fit).value,\n", + " label=\"{}: H={:.2f}, G12={:.2f}\".format(filt, pc.H, pc.phase_parameter_1),\n", + " )\n", + " ax1.legend()\n", + "\n", + " # Resample the data within the uncertainties and fit a new model\n", + " mc_models = []\n", + " for i in range(resample):\n", + " _red_mag = np.random.normal(loc=np.array(red_mag), scale=np.array(mag_err)) * u.mag\n", + " ax1.scatter(alpha.value, _red_mag.value, s=1, c=\"k\", alpha=0.1, zorder=5)\n", + "\n", + " _pc_fit = pc.FitModel(\n", + " np.array(getattr(obs, \"phaseAngle\")) * u.deg,\n", + " _red_mag,\n", + " )\n", + " _pc = pc.InitModelSbpy(_pc_fit)\n", + " ax1.plot(\n", + " alpha_fit.value,\n", + " _pc.ReducedMag(alpha_fit).value,\n", + " c=\"k\",\n", + " alpha=0.1,\n", + " zorder=0,\n", + " label=\"{}: H={:.2f}, G12={:.2f}\".format(filt, _pc.H, _pc.phase_parameter_1),\n", + " )\n", + "\n", + " mc_models.append(_pc)\n", + "\n", + " break\n", + "\n", + "ax1.invert_yaxis()\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc96d3d6", + "metadata": {}, + "outputs": [], + "source": [ + "# extract all phase parameters from the resampling test\n", + "abs_mag = np.array([x.H.value for x in mc_models])\n", + "phase_param = np.array([x.phase_parameter_1 for x in mc_models])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f43eb646-cf4e-4103-a9d4-7f8b6c8d2738", + "metadata": {}, + "outputs": [], + "source": [ + "# use the resample function within FitModel\n", + "np.random.seed(0) # set the seed to ensure reproducibility\n", + "pc_fit_resamp = pc.FitModel(np.array(getattr(obs, \"phaseAngle\")) * u.deg, red_mag, mag_err, resample=resample)\n", + "pc_fit_resamp = PhaseCurve().InitModelSbpy(pc_fit_resamp)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ecaff7e", + "metadata": {}, + "outputs": [], + "source": [ + "# plot the distributions of all FitModel methods\n", + "\n", + "fig = plt.figure()\n", + "gs = gridspec.GridSpec(1, 2)\n", + "ax1 = plt.subplot(gs[0, 0])\n", + "ax2 = plt.subplot(gs[0, 1])\n", + "\n", + "# plot the results of resampling the data and storing the fits\n", + "label = \"resample test\"\n", + "for x, ax, ax_lab in zip([abs_mag, phase_param], [ax1, ax2], [\"H\", \"phase_parameter_1\"]):\n", + " ax.hist(x, bins=\"auto\", histtype=\"step\")\n", + " mean = np.mean(x)\n", + " std = np.std(x)\n", + " print(label, ax_lab, mean, std)\n", + " ax.axvline(mean, label=label)\n", + " ax.axvspan(mean - std, mean + std, alpha=0.3)\n", + " ax.set_xlabel(ax_lab)\n", + "\n", + "# first plot results for the regular FitModel result (no resampling)\n", + "# then plot the resulting FitModel with resample results (should be similar to the resample test)\n", + "for _pc, col, label in zip([pc, pc_fit_resamp], [\"k\", \"r\"], [\"FitModel single\", \"FitModel resample\"]):\n", + " for x_name, x_err_name, ax in zip(\n", + " [\"H\", \"phase_parameter_1\"], [\"H_err\", \"phase_parameter_1_err\"], [ax1, ax2]\n", + " ):\n", + " x = getattr(_pc, x_name)\n", + " x_err = getattr(_pc, x_err_name)\n", + "\n", + " if hasattr(x, \"unit\"):\n", + " x = x.value\n", + " if hasattr(x_err, \"unit\"):\n", + " x_err = x_err.value\n", + "\n", + " print(label, x_name, x, x_err)\n", + "\n", + " ax.axvline(x, color=col, label=label)\n", + " ax.axvspan(x - x_err, x + x_err, alpha=0.3, color=col)\n", + "\n", + "ax1.legend()\n", + "ax2.legend()\n", + "\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6a0cbce", + "metadata": {}, + "outputs": [], + "source": [ + "# resampling leads to a slight shift in fitted values\n", + "# the resampled distribution has a slightly larger \"uncertainty\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "adler-dev", + "language": "python", + "name": "adler-dev" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/adler/science/PhaseCurve.py b/src/adler/science/PhaseCurve.py index f0b0463..1187634 100644 --- a/src/adler/science/PhaseCurve.py +++ b/src/adler/science/PhaseCurve.py @@ -160,6 +160,10 @@ def InitModelDict(self, model_dict): # clean the input dictionary del_keys = [] + # make sure that there is not an existing model_function parameter + if "model_function" in model_dict: + del_keys.append("model_function") + # remove keys that aren't PhaseCurve parameters for key, value in model_dict.items(): if not hasattr(self, key): del_keys.append(key) @@ -208,7 +212,9 @@ def InitModelSbpy(self, model_sbpy): x.unit == "" ): # if there are no units (or weird blank units?) get just the value x = x.value - else: + if hasattr( + x, "quantity" + ): # catch any sbpy parameters returned as astropy.modeling.parameters.Parameter x = x.quantity # look up the correct adler parameter name (accounting for additional uncertainty, "_err", parameters) if p.endswith("_err"): # assumes the uncertainty parameter always ends in "_err" @@ -296,7 +302,7 @@ def AbsMag(self, phase_angle, reduced_mag): return abs_mag - def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None): + def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None, resample=None): """Fit the phasecurve model parameters to observations. starts with a phase curve model as an initial guess for parameters. fits model to phase angle and reduced magnitude. @@ -321,6 +327,10 @@ def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None): Select a fitting function from astropy.modeling.fitting, defaults to astropy.modeling.fitting.LevMarLSQFitter. N.B. that LevMarLSQFitter cannot handle inequality constraints for the HG1G2 model, use something like SLSQPLSQFitter from astropy.modeling.fitting (does not return covariance matrix!). + resample : int + Optional - if passed this forces a monte carlo resampling of data points within their uncertainties. + This the number of times to resample and fit the phase curve. + The phase curve parameter value and uncertainties are determined from the mean and std of the fitted values respectively. Returns ---------- @@ -339,7 +349,7 @@ def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None): model_fit = fitter(self.model_function, phase_angle, reduced_mag) # Add fitted uncertainties as an additional attribute within the sbpy object - if "param_cov" in fitter.fit_info: + if ("param_cov" in fitter.fit_info) and (resample is None): # get the covariance matrix from the fit covariance = fitter.fit_info["param_cov"] if covariance is not None: @@ -348,15 +358,52 @@ def FitModel(self, phase_angle, reduced_mag, mag_err=None, fitter=None): # update only the uncertainties for parameters used in the fit param_names = np.array(model_fit.param_names) fit_mask = ~np.array([getattr(model_fit, x).fixed for x in param_names]) + # for i, x in enumerate(param_names[fit_mask]): + # p = getattr(model_fit, x) + # if hasattr(p, "unit") and (p.unit is not None): + # setattr(model_fit, "{}_err".format(x), fit_errs[i] * p.unit) + # else: + # setattr(model_fit, "{}_err".format(x), fit_errs[i]) for i, x in enumerate(param_names[fit_mask]): - setattr(model_fit, "{}_err".format(x), fit_errs[i]) + # setattr(model_fit, "{}_err".format(x), fit_errs[i]) + # TODO: return uncertainties with units if units are passed - see MC resample code below + p = getattr(model_fit, x) + if hasattr(p, "unit") and (p.unit is not None): + setattr(model_fit, "{}_err".format(x), fit_errs[i] * p.unit) + else: + setattr(model_fit, "{}_err".format(x), fit_errs[i]) # else: - ### TODO log covariance is None error here + ### TODO log covariance is None error here - no uncertainties + + # run an MC reasmple fit to estimate parameter value and uncertainty + elif (resample is not None) and (mag_err is not None): + mc_models = [] # list to store MC model fits + for i in range(resample): # TODO: try optimise/parallelise this loop? + _reduced_mag = np.random.normal(loc=np.array(reduced_mag), scale=np.array(mag_err)) * u.mag + _model_fit = fitter(self.model_function, phase_angle, _reduced_mag) + mc_models.append(_model_fit) + + # Update the model_fit parameters with the MC values + param_names = np.array(model_fit.param_names) + # update only the uncertainties for parameters used in the fit + fit_mask = ~np.array([getattr(model_fit, x).fixed for x in param_names]) + for i, x in enumerate(param_names[fit_mask]): + # check if the parameter has units and then get array of the MC model values + m = mc_models[0] + p = getattr(m, x) + if hasattr(p, "unit") and (p.unit is not None): + fit_vals = np.array([getattr(m, x).value for m in mc_models]) * p.unit + else: + fit_vals = np.array([getattr(m, x) for m in mc_models]) - ### TODO - # else: - # log lack of uncertainties for fitter - # run an MCMC estimate of uncertainty? + # set the parameter value as the mean of the MC values + setattr(model_fit, "{}".format(x), np.mean(fit_vals)) + # set the parameter uncertainty as the std of the MC values + setattr(model_fit, "{}_err".format(x), np.std(fit_vals)) + + else: + # log lack of uncertainties for fitter + print("no phase curve parameter uncertainties calculated") ### if overwrite_model: # add an overwrite option? # redo __init__ with the new fitted parameters diff --git a/tests/adler/science/test_PhaseCurve.py b/tests/adler/science/test_PhaseCurve.py index b3d8fd7..a667a48 100644 --- a/tests/adler/science/test_PhaseCurve.py +++ b/tests/adler/science/test_PhaseCurve.py @@ -1,8 +1,11 @@ -from adler.science.PhaseCurve import PhaseCurve -from numpy.testing import assert_array_equal -import pytest import numpy as np import astropy.units as u +from numpy.testing import assert_array_equal, assert_almost_equal +import pytest + +from adler.science.PhaseCurve import PhaseCurve, ADLER_SBPY_DICT +from adler.utilities.tests_utilities import get_test_data_filepath +from adler.objectdata.AdlerPlanetoid import AdlerPlanetoid def test_PhaseCurve_init(): @@ -145,4 +148,125 @@ def test_PhaseCurve_FitModel_HG_bounds(): assert pc2.phase_parameter_1_err is not None -# TODO: test absmag +def test_PhaseCurve_FitModel_resample(): + + np.random.seed(0) # set the seed to ensure reproducibility + resample = 100 # number of resamples + + # these are the exact values that should be recovered with 100 resampled fits, for the random seed of 0 + resample_compare_vals = { + "H": 16.29648544, + "phase_parameter_1": 0.6225681900053228, + "H_err": 0.00774547, + "phase_parameter_1_err": 0.051412070779357485, + } + + # load a test object + ssoid = "6098332225018" # good MBA test object + filt = "r" + test_db_path = get_test_data_filepath("testing_database.db") + planetoid = AdlerPlanetoid.construct_from_SQL(ssoid, test_db_path, filter_list=[filt]) + sso = planetoid.SSObject_in_filter(filt) + obs = planetoid.observations_in_filter(filt) + + # set up the initial phasecurve model + H = sso.H + G12 = sso.G12 + pc = PhaseCurve(H=H * u.mag, phase_parameter_1=G12, model_name="HG12_Pen16") + + # get the observations + alpha = np.array(getattr(obs, "phaseAngle")) * u.deg + mag_err = np.array(getattr(obs, "magErr")) * u.mag + red_mag = np.array(getattr(obs, "reduced_mag")) * u.mag + + # do a single fit + pc_fit = pc.FitModel(alpha, red_mag, mag_err) + pc_fit = PhaseCurve().InitModelSbpy(pc_fit) + print(pc_fit.__dict__) + + # use the resample function within FitModel + pc_fit_resamp = pc.FitModel(alpha, red_mag, mag_err, resample=resample) + pc_fit_resamp = PhaseCurve().InitModelSbpy(pc_fit_resamp) + print(pc_fit_resamp.__dict__) + + # check that the fits are different + tolerance = 0.001 + for x in ["H", "H_err", "phase_parameter_1", "phase_parameter_1_err"]: + x1 = getattr(pc_fit_resamp, x) + x2 = getattr(pc_fit, x) + + if hasattr(x1, "unit") and (x1.unit is not None): + x1 = x1.value + if hasattr(x2, "unit") and (x2.unit is not None): + x2 = x2.value + + print(x, np.abs(x1 - x2)) + assert np.abs(x1 - x2) > tolerance + + # the resampled fit should have larger uncertainties + if "err" in x: + assert x1 > x2 + + # check the exact values of each fit + for x in ["H", "H_err", "phase_parameter_1", "phase_parameter_1_err"]: + x1 = getattr(pc_fit_resamp, x) + x2 = resample_compare_vals[x] + + if hasattr(x1, "unit") and (x1.unit is not None): + x1 = x1.value + + print(x, x1, x2) + assert_almost_equal(x1, x2) + + +def test_set_models(): + + for model in ["HG", "HG1G2", "HG12", "HG12_Pen16", "LinearPhaseFunc"]: + + # create a model + pc = PhaseCurve(model_name=model) + + # check which phase parameters the model should have + phase_params = ADLER_SBPY_DICT[pc.model_name] + for p in phase_params: + assert getattr(pc.model_function, phase_params[p]) + + +def test_ReturnInit(): + + # define a PhaseCurve object + pc = PhaseCurve(H=18.0, phase_parameter_1=0.15, model_name="HG") + + # convert the PhaseCurve object to dict + pc_dict = pc.ReturnModelDict() + print(pc_dict) + + assert pc_dict["H"] == 18.0 + assert pc_dict["phase_parameter_1"] == 0.15 + assert pc_dict["model_name"] == "HG" + + # create PhaseCurve from dict + print(type(pc_dict)) + pc2 = PhaseCurve().InitModelDict(pc_dict) + + assert pc2.H == 18.0 + assert pc2.phase_parameter_1 == 0.15 + assert pc2.model_name == "HG" + + # create sbpy model from PhaseCurve + pc_sbpy = pc.model_function + print(pc_sbpy) + + assert pc_sbpy.H == 18.0 + assert pc_sbpy.G == 0.15 + + # create PhaseCurve from sbpy model + pc3 = PhaseCurve().InitModelSbpy(pc_sbpy) + + assert pc3.H == 18.0 + assert pc3.phase_parameter_1 == 0.15 + assert pc3.model_name == "HG" + + +# TODO: test absmag and units +# TODO: test FitModel with weighted data