"""
BootstrapChainLadder implementation.
"""
import functools
import warnings
import numpy as np
import pandas as pd
from numpy.random import RandomState
from scipy import stats
from .base import BaseRangeEstimator, BaseRangeEstimatorResult
[docs]class BootstrapChainLadder(BaseRangeEstimator):
"""
The purpose of the bootstrap technique is to estimate the predicition
error of the total reserve estimate and to approximate the predictive
distribution. It is often impractical to obtain the prediction error
using an analytical approach due to the complexity of reserve estimators.
Predicition error is comprised of two components: process error
and estimation error (Prediction Error = Estimation Error + Process Error).
The estimation error (parameter error) represents the uncertainty in the
parameter estimates given that the model is correctly specified. The
process error is analogous to the variance of a random variable,
representing the uncertainty in future outcomes.
The procedure used to generate the predicitive distribution of reserve
estimates is based on Leong et al. Appendix A, assuming the starting point
is a triangle of cumulative losses:
1. Calculate the all-year volume-weighted age-to-age factors.
2. Estimate the fitted historical cumulative paid loss and ALAE
using the latest diagonal of the original triangle and the
age-to-age factors from [1] to un-develop the losses.
3. Calculate the unscaled Pearson residuals, degrees of freedom
and scale parameter.
4. Calculate the adjusted Pearson residuals.
5. Sample with replacement from the adjusted Pearson residuals.
6. Calculate the triangle of sampled incremental losses
(I^ = m + r_adj * sqrt(m)), where I^ = Resampled incremental loss,
m = Incremental fitted loss (from [2]) and r_adj = Adjusted Pearson
residuals.
7. Using the triangle from [6], project future losses using the
Chain Ladder method.
8. Include Process variance by simulating each incremental future
loss from a Gamma distribution with mean = I^ and
variance = I^ * scale parameter.
9. Estimate unpaid losses using the Chain Ladder technique.
10. Repeat for the number of cycles specified.
The collection of projected ultimates for each origin year over all
bootstrap cycles comprises the predictive distribtuion of reserve
estimates.
Note that the estimate of the distribution of losses assumes
development is complete by the final development period. This is
to avoid the complication associated with modeling a tail factor.
References
----------
1. England, P., and R. Verrall, (2002), *Stochastic Claims Reserving in General
Insurance*, British Actuarial Journal 8(3): 443-518.
2. CAS Working Party on Quantifying Variability in Reserve Estimates,
*The Analysis and Estimation of Loss & ALAE Variability: A Summary Report*,
Casualty Actuarial Society Forum, Fall 2005.
3. Leong et al., (2012), *Back-Testing the ODP Bootstrap of the Paid
Chain-Ladder Model with Actual Historical Claims Data*, Casualty Actuarial
Society E-Forum.
4. Kirschner, et al., *Two Approaches to Calculating Correlated Reserve
Indications Across Multiple Lines of Business* Appendix III, Variance
Journal, Volume 2/Issue 1.
5. Shapland, Mark R., (2016), *Using the ODP Bootstrap Model: A
Practicioner's Guide*, CAS Monograph Series Number 4: Casualty Actuarial
Society, 2016.
"""
def __init__(self, cumtri):
"""
The BootstrapChainLadder class definition.
Parameters
----------
cumtri: triangle.CumTriangle
A cumulative triangle instance.
"""
super().__init__(cumtri=cumtri)
self._dfrlvi = None
self._dof = None
def __call__(self, sims=1000, q=[.75, .95], procdist="gamma", parametric=False,
two_sided=False, interpolation="linear", random_state=None):
"""
``BootstrapChainLadder`` simulation initializer. Generates predictive
distribution of reserve outcomes by origin and in total.
The estimated distribution of losses assumes development is complete
by the final development period in order to avoid the complication of
modeling a tail factor.
Parameters
----------
sims: int
The number of bootstrap simulations to perform. Defaults to 1000.
q: array_like of float or float
Quantile or sequence of quantiles to compute, which must be
between 0 and 1 inclusive.
procdist: str
The distribution used to incorporate process variance. Currently,
this can only be set to "gamma".
two_sided: bool
Whether the two_sided interval should be included in summary
output. For example, if ``two_sided==True`` and ``q=.95``, then
the 2.5th and 97.5th quantiles of the bootstrapped reserve
distribution will be returned [(1 - .95) / 2, (1 + .95) / 2]. When
False, only the specified quantile(s) will be computed. Defaults
to False.
parametric: bool
If True, fit standardized residuals to a normal distribution, and
sample from this parameterized distribution. Otherwise, bootstrap
procedure samples with replacement from the collection of
standardized residuals. Defaults to False.
interpolation: {"linear", "lower", "higher", "midpoint", "nearest"}
This optional parameter specifies the interpolation method to use
when the desired quantile lies between two data points i < j. See
``numpy.quantile`` for more information. Default value is "linear".
random_state: np.random.RandomState
If int, random_state is the seed used by the random number
generator; If RandomState instance, random_state is the random
number generator; If None, the random number generator is the
RandomState instance used by np.random.
Returns
-------
BootstrapChainLadderResult
"""
ldfs = self._ldfs(sel="all-weighted")
cldfs = self._cldfs(ldfs=ldfs)
maturity = self.tri.maturity.astype(str)
latest = self.tri.latest_by_origin
trisqrd = self._trisqrd(ldfs=ldfs)
# Obtain reference to BootstrapChainLadder estimates.
tri_fit_cum = self._tri_fit_cum(ldfs=ldfs)
tri_fit_incr = self._tri_fit_incr(fitted_tri_cum=tri_fit_cum)
unscld_residuals = self._resid_us(fitted_tri_incr=tri_fit_incr)
adjust_residuals = self._resid_adj(resid_us=unscld_residuals)
scale_param = self._scale_param(resid_us=unscld_residuals)
sampling_dist = self._sampling_dist(resid_adj=adjust_residuals)
dfsamples = self._bs_samples(
sampling_dist=sampling_dist, fitted_tri_incr=tri_fit_incr,
sims=sims, parametric=parametric,
random_state=random_state
)
dfldfs = self._bs_ldfs(dfsamples=dfsamples)
dfforecasts = self._bs_forecasts(dfsamples, dfldfs, scale_param)
dfprocerror = self._bs_process_error(
dfforecasts=dfforecasts, scale_param=scale_param, procdist=procdist,
random_state=random_state
)
dfreserves = self._bs_reserves(dfprocerror=dfprocerror)
ultimates = dfreserves.groupby(["origin"])["ultimate"].mean()
ultimates[latest.index.min()] = latest[latest.index.min()]
reserves = pd.Series(ultimates - latest, name="reserve")
std_error = self._bs_std_error(dfreserves)
cv = pd.Series(std_error / reserves, name="cv")
qtls, qtlhdrs = self._qtls_formatter(q=q, two_sided=two_sided)
# Compile Chain Ladder point estimate summary.
dfmatur = maturity.to_frame().reset_index(drop=False).rename({"index": "origin"}, axis=1)
dfcldfs = cldfs.to_frame().reset_index(drop=False).rename({"index": "maturity"}, axis=1)
dfcldfs["maturity"] = dfcldfs["maturity"].astype(str)
dfcldfs["emergence"] = 1 / dfcldfs["cldf"]
dfsumm = dfmatur.merge(dfcldfs, on=["maturity"], how="left").set_index("origin")
dfsumm.index.name = None
dflatest = latest.to_frame().rename({"latest_by_origin": "latest"}, axis=1)
dfsumm = functools.reduce(
lambda df1, df2: df1.join(df2),
(dflatest, ultimates.to_frame(), reserves.to_frame(), std_error.to_frame(), cv.to_frame()),
dfsumm
)
# Add "Total" index and set to NaN fields that shouldn't be aggregated.
dfsumm.loc["total"] = dfsumm.sum()
dfsumm.loc["total", "maturity"] = ""
dfsumm.loc["total", ["cldf", "emergence"]] = np.NaN
dfsumm.loc["total", "std_error"] = std_error["total"]
dfsumm.loc["total", "cv"] = std_error["total"] / dfsumm.loc["total", "reserve"]
# Attach quantiles.
dftotal_res = dfreserves.groupby(["sim"], as_index=False).sum()
dftotal_res["origin"] = "total"
dfreserves = pd.concat([dfreserves, dftotal_res])
for ii, jj in zip(qtls, qtlhdrs):
dfsumm[jj] = dfsumm.index.map(
lambda v: np.percentile(
dfreserves[dfreserves.origin == v]["reserve"].values,
100 * ii, interpolation=interpolation
)
)
bcl_result = BootstrapChainLadderResult(
summary=dfsumm, tri=self.tri, ldfs=ldfs, tail=1.0, trisqrd=trisqrd,
reserve_dist=dfreserves, sims_data=dfprocerror, scale_param=scale_param,
dof=self.dof, unscaled_residuals=unscld_residuals,
adjusted_residuals=adjust_residuals,
sampling_dist=None if parametric else sampling_dist,
fitted_tri_cum=tri_fit_cum, fitted_tri_incr=tri_fit_incr, sims=sims,
procdist=procdist, parametric=parametric, q=q, interpolation=interpolation
)
return(bcl_result)
@property
def dfrlvi(self):
"""
Transform triangle's last valid origin index into DataFrame format.
Returns
-------
pd.DataFrame
"""
if self._dfrlvi is None:
df = self.tri.rlvi.reset_index(drop=False)
df = df.rename({"index": "origin", "dev": "l_act_dev"}, axis=1)
self._dfrlvi = df.drop("col_offset", axis=1)
return(self._dfrlvi)
[docs] def _get_dfcombined(self, dfsamples, dfldfs):
"""
Merge output of ``self._bs_samples`` and ``self._bs_ldfs``.
Parameters
----------
dfsamples: pd.DataFrame
Output from ``self._bs_samples``.
dfldfs: pd.DataFrame
Output from ``self._bs_ldfs``.
Returns
-------
pd.DataFrame
"""
dfcombined = dfsamples.merge(dfldfs, on=["sim", "dev"], how="left")
dfcombined = dfcombined.merge(self.dfrlvi, on=["origin"], how="left")
return(dfcombined.reset_index(drop=True).sort_values(by=["sim", "origin", "dev"]))
@property
def dof(self):
"""
Return the degress of freedom.
Returns
-------
int
"""
if self._dof is None:
self._dof = self.tri.nbr_cells - (self.tri.columns.size - 1) + self.tri.index.size
return(self._dof)
[docs] def _scale_param(self, resid_us):
"""
Return the scale parameter, which is the sum of the squared unscaled
Pearson residuals over the degrees of freedom. This method is intended
for internal use only.
Parameters
----------
resid_us: pd.DataFrame
Unscaled Pearson residuals, typically output by
``self._resid_us``.
Returns
-------
float
"""
return((resid_us**2).sum().sum() / self.dof)
[docs] def _tri_fit_cum(self, ldfs):
"""
Return the cumulative fitted triangle using backwards recursion,
starting with the observed cumulative paid/incurred-to-date along the
latest diagonal.
Parameters
----------
ldfs: pd.Series
Selected ldfs, typically the output of calling ``self._ldfs``.
Returns
-------
pd.DataFrame
"""
fitted_tri_cum = self.tri.copy(deep=True)
for ii in range(fitted_tri_cum.shape[0]):
iterrow = fitted_tri_cum.iloc[ii, :]
if iterrow.isnull().any():
# Find first NaN element in iterrow.
nan_hdr = iterrow.isnull()[iterrow.isnull() == True].index[0]
nan_idx = fitted_tri_cum.columns.tolist().index(nan_hdr)
init_idx = nan_idx - 1
else:
# If here, iterrow is the most mature exposure period.
init_idx = fitted_tri_cum.shape[1] - 1
# Set to NaN any development periods earlier than init_idx.
fitted_tri_cum.iloc[ii, :init_idx] = np.NaN
# Iterate over rows, undeveloping triangle from latest diagonal.
for jj in range(fitted_tri_cum.iloc[ii, :init_idx].size, 0, -1):
prev_col_idx, curr_col_idx, curr_ldf_idx = jj, jj - 1, jj - 1
prev_col_val = fitted_tri_cum.iloc[ii, prev_col_idx]
curr_ldf_val = ldfs.iloc[curr_ldf_idx]
fitted_tri_cum.iloc[ii, curr_col_idx] = (prev_col_val / curr_ldf_val)
return(fitted_tri_cum)
[docs] @staticmethod
def _tri_fit_incr(fitted_tri_cum):
"""
Return a fitted incremental triangle.
Parameters
----------
fitted_tri_cum: pd.DataFrame
Typically the output from ``self._tri_fit_cum``.
Returns
-------
pd.DataFrame
"""
tri = fitted_tri_cum.diff(axis=1)
tri.iloc[:, 0] = fitted_tri_cum.iloc[:, 0]
return(tri)
[docs] def _resid_us(self, fitted_tri_incr):
"""
Return unscaled Pearson residuals, given by
:math:`r_{us} = \\frac{I - m}{\\sqrt{|m|}}`, where :math:`r_{us}` represents the
unscaled Pearson residuals, :math:`I` the actual incremental losses and :math:`m`
fitted incremental losses.
Parameters
----------
fitted_tri_incr: pd.DataFrame
Typically the output from ``self._tri_fit_incr``.
Returns
-------
pd.DataFrame
"""
# I represents actual incremental losses, m fitted incremental losses.
I = pd.DataFrame(self.tri.to_incr())
m = fitted_tri_incr
return((I - m) / np.sqrt(m.abs()))
[docs] def _resid_adj(self, resid_us):
"""
Compute and return the adjusted Pearson residuals, given by
:math:`r_{adj} = \\sqrt{\\frac{N}{dof}} * r_{us}`, where *r_adj*
represents the adjusted Pearson residuals, *N* the number of triangle cells,
*dof* the degress of freedom and *r_us* the unscaled Pearson residuals.
Parameters
----------
resid_us: pd.DataFrame
Unscaled Pearson residuals, typically output by ``self._resid_us``.
Returns
-------
pd.DataFrame
"""
return(np.sqrt(self.tri.nbr_cells / self.dof) * resid_us)
[docs] @staticmethod
def _sampling_dist(resid_adj):
"""
Return ``resid_adj`` as a 1-dimensional array, which will be sampled
from with replacement in order to produce synthetic triangles for
bootstrapping. Any NaN's and 0's present in ``resid_adj`` will not be
present in the returned array.
Parameters
----------
resid_adj: pd.DataFrame
Adjusted Pearson residuals, typically output by ``self._resid_adj``.
Returns
-------
np.ndarray
"""
resid_ = resid_adj.iloc[:-1, :-1].values.ravel()
return(resid_[np.logical_and(~np.isnan(resid_), resid_ != 0)])
[docs] def _bs_samples(self, sampling_dist, fitted_tri_incr, sims=1000, parametric=False,
random_state=None):
"""
Return DataFrame containing sims resampled-with-replacement
incremental loss triangles if ``parametric=False``, otherwise
random variates from a normal distribution with mean zero and
variance derived from ``resid_adj``. Randomly generated incremental
data gets cumulated in preparation for ldf calculation in next
step.
Parameters
----------
sampling_dist: np.ndarray
The residuals from the fitted incremental triangle coerced
into a one-dimensional numpy array.
fitted_tri_incr: pd.DataFrame
The incremental triangle fitted using backwards recursion.
Typically the output of ``self._tri_fit_incr``.
sims: int
The number of bootstrap simulations to run. Defaults to 1000.
parametric: bool
If True, fit standardized residuals to a normal distribution, and
sample from the parameterized distribution. Otherwise, bootstrap
procedure proceeds by sampling with replacement from the array
of standardized residuals. Defaults to False.
random_state: np.random.RandomState
If int, random_state is the seed used by the random number
generator; If RandomState instance, random_state is the random
number generator; If None, the random number generator is the
RandomState instance used by np.random.
Returns
-------
pd.DataFrame
"""
if random_state is not None:
if isinstance(random_state, int):
prng = RandomState(random_state)
elif isinstance(random_state, RandomState):
prng = random_state
else:
prng = RandomState()
sampling_dist = sampling_dist.flatten()
fti = fitted_tri_incr.reset_index(drop=False).rename({"index": "origin"}, axis=1)
dfm = pd.melt(fti, id_vars=["origin"], var_name="dev", value_name="value")
dfm = dfm[~np.isnan(dfm["value"])].astype({"origin": int, "dev": int, "value": float})
# Make positive any first development period negative values.
min_devp = dfm["dev"].min()
dfm["value"] = np.where(
np.logical_and(dfm["dev"].values == min_devp, dfm["value"].values < 0),
1., dfm["value"].values
)
dfi = self.tri.to_tbl(dropna=False).drop("value", axis=1)
dfp = dfi.merge(dfm, how="outer", on=["origin", "dev"])
dfp["rectype"] = np.where(np.isnan(dfp["value"].values), "forecast", "actual")
dfp = dfp.rename({"value": "incr"}, axis=1)
dfp["incr_sqrt"] = np.sqrt(dfp["incr"].values)
dfrtypes = {"origin": int, "dev": int, "incr": float,
"incr_sqrt": float, "rectype": str}
dfrcols = ["origin", "dev", "incr", "rectype", "incr_sqrt"]
# Replicate dfp sims times then redefine datatypes.
dfr = pd.DataFrame(np.tile(dfp, (sims, 1)), columns=dfrcols).astype(dfrtypes)
# Assign simulation identifier to each record in dfr.
dfr["sim"] = np.divmod(dfr.index, self.tri.shape[0] * self.tri.shape[1])[0]
sample_size = dfr.shape[0]
if parametric:
# Sample random standard normal residuals.
dfr["resid"] = prng.normal(loc=0, scale=sampling_dist.std(ddof=1), size=sample_size)
else:
# Randomly sample residuals from sampling_dist.
dfr["resid"] = prng.choice(sampling_dist, sample_size, replace=True)
# Calcuate resampled incremental and cumulative losses.
dfr["resid"] = np.where(dfr["rectype"].values == "forecast", np.NaN, dfr["resid"].values)
dfr = dfr.sort_values(by=["sim", "origin", "dev"]).reset_index(drop=True)
dfr["samp_incr"] = dfr["incr"].values + dfr["resid"].values * dfr["incr_sqrt"].values
dfr["samp_cum"] = dfr.groupby(["sim", "origin"], as_index=False)["samp_incr"].cumsum()
return(dfr.reset_index(drop=True))
[docs] def _bs_ldfs(self, dfsamples):
"""
Compute and return loss development factors for each set of synthetic
loss data.
Parameters
----------
dfsamples: pd.DataFrame
Output from ``self._bs_samples``.
Returns
-------
pd.DataFrame
"""
keepcols = ["sim", "origin", "dev", "samp_cum", "last_origin"]
new_col_names = {"index": "dev", "origin": "last_origin", "row_offset": "origin_offset"}
dflvi = self.tri.clvi.reset_index(drop=False).rename(new_col_names, axis=1)
dfinit = dfsamples.merge(dflvi, how="left", on=["dev"])
dfinit = dfinit[keepcols].sort_values(by=["sim", "dev", "origin"])
df = dfinit[~np.isnan(dfinit["samp_cum"])].reset_index(drop=True)
df["_aggdev2"] = np.where(df["origin"].values == df["last_origin"].values, 0, df["samp_cum"].values)
df2 = df.groupby(["sim", "dev"], as_index=False)[["samp_cum", "_aggdev2"]].sum().rename(
{"samp_cum": "_aggdev1"}, axis=1)
df2["_aggdev2"] = df2["_aggdev2"].shift(periods=1)
df2["dev"] = df2["dev"].shift(periods=1)
dfldfs = df2[df2["_aggdev2"] != 0].dropna(how="any")
dfldfs["dev"] = dfldfs["dev"].astype(int)
dfldfs["ldf"] = dfldfs["_aggdev1"] / dfldfs["_aggdev2"]
return(dfldfs[["sim", "dev", "ldf"]].reset_index(drop=True))
[docs] def _bs_forecasts(self, dfsamples, dfldfs, scale_param):
"""
Populate lower-right of each simulated triangle using values from
``self._bs_samples`` and development factors from ``self._bs_ldfs``.
Parameters
----------
Parameters
----------
dfsamples: pd.DataFrame
Output from ``self._bs_samples``.
dfldfs: pd.DataFrame
Output from ``self._bs_ldfs``.
scale_param: float
the sum of the squared unscaled Pearson residuals over the
degrees of freedom. Output from ``self._scale_param``.
Returns
-------
pd.DataFrame
"""
dfcombined = self._get_dfcombined(dfsamples, dfldfs)
min_origin_year = dfcombined["origin"].min()
dfcombined["_l_init_indx"] = np.where(
dfcombined["dev"].values >= dfcombined["l_act_dev"].values,
dfcombined.index.values, -1
)
dfacts = dfcombined[(dfcombined["origin"].values == min_origin_year) |
(dfcombined["_l_init_indx"].values == -1)]
dffcst = dfcombined[~dfcombined.index.isin(dfacts.index)].sort_values(
by=["sim", "origin", "dev"])
dffcst["_l_act_indx"] = dffcst.groupby(["sim", "origin"])["_l_init_indx"].transform("min")
l_act_cum = dffcst.loc[dffcst["_l_act_indx"], "samp_cum"].values
dffcst["l_act_cum"] = l_act_cum
dffcst["_cum_ldf"] = dffcst.groupby(["sim", "origin"])["ldf"].transform(
"cumprod").shift(periods=1)
dffcst["_samp_cum2"] = dffcst["l_act_cum"].values * dffcst["_cum_ldf"].values
dffcst["_samp_cum2"] = np.where(
np.isnan(dffcst["_samp_cum2"].values), 0, dffcst["_samp_cum2"].values
)
dffcst["cum_final"] = np.where(
np.isnan(dffcst["samp_cum"].values), 0,
dffcst["samp_cum"].values) + dffcst["_samp_cum2"].values
# Combine forecasts with actuals then compute incremental losses by sim and origin.
dffcst = dffcst.drop(labels=["samp_cum", "samp_incr"], axis=1).rename(
columns={"cum_final": "samp_cum"})
dfsqrd = pd.concat([dffcst, dfacts], sort=True).sort_values(
by=["sim", "origin", "dev"])
dfsqrd["_dev1_ind"] = (dfsqrd["dev"].values == 1) * 1
dfsqrd["_incr_dev1"] = dfsqrd["_dev1_ind"].values * dfsqrd["samp_cum"].values
dfsqrd["_incr_dev2"] = dfsqrd.groupby(["sim", "origin"])["samp_cum"].diff(periods=1)
dfsqrd["_incr_dev2"] = np.where(
np.isnan(dfsqrd["_incr_dev2"].values), 0, dfsqrd["_incr_dev2"].values
)
dfsqrd["samp_incr"] = dfsqrd["_incr_dev1"].values + dfsqrd["_incr_dev2"].values
dfsqrd["var"] = np.abs(dfsqrd["samp_incr"].values * scale_param)
dfsqrd["sign"] = np.where(dfsqrd["samp_incr"].values > 0, 1, -1)
dfsqrd = dfsqrd.drop(
labels=[ii for ii in dfsqrd.columns if ii.startswith("_")], axis=1)
return(dfsqrd.sort_values(by=["sim", "origin", "dev"]).reset_index(drop=True))
[docs] @staticmethod
def _bs_process_error(dfforecasts, scale_param, procdist="gamma", random_state=None):
"""
Incorporate process error by simulating each incremental future
loss from ``procdist``. The mean is set to the forecast incremental
loss amount and variance to `mean x self.scale_param`.
The parameters for ``procdist`` must be positive. Since the mean
and variance used to parameterize ``procdist`` depend on the
resampled incremental losses, it is necessary to incorporate logic
to address the possibility of negative incremental losses arising
in the resampling stage. The approach used to handle negative
incremental values is described in Shapland[1], and replaces the
distribution mean with the absolute value of the mean, and the
variance with the absolute value of the mean multiplied by ``scale_param``.
Parameters
----------
dfforecasts: pd.DataFrame
DateFrame of bootstraps forecasts generated within
``self._bs_forecasts``.
scale_param: float
the sum of the squared unscaled Pearson residuals over the
degrees of freedom. Available in ``self._scale_param``.
procdist: str
Specifies the distribution used to incorporate process error.
Currently, can only be set to "gamma". Any other distribution
will result in an error.
random_state: np.random.RandomState
If int, random_state is the seed used by the random number
generator; If RandomState instance, random_state is the random
number generator; If None, the random number generator is the
RandomState instance used by np.random.
Returns
-------
pd.DataFrame
"""
# Initialize pseudo random number generator.
if random_state is not None:
if isinstance(random_state, int):
prng = RandomState(random_state)
elif isinstance(random_state, RandomState):
prng = random_state
else:
prng = RandomState()
# Parameterize distribution for the incorporation of process variance.
if procdist.strip().lower() == "gamma":
dfforecasts["param2"] = scale_param
dfforecasts["param1"] = np.abs(dfforecasts["samp_incr"].values / dfforecasts["param2"].values)
def fdist(param1, param2):
"""
gamma.rvs(a=param1, scale=param2, size=1, random_state=None)
"""
return(prng.gamma(param1, param2))
else:
raise ValueError("Invalid procdist specification: `{}`".format(procdist))
dfforecasts["final_incr"] = np.where(
dfforecasts["rectype"].values == "forecast",
fdist(dfforecasts["param1"].values, dfforecasts["param2"].values) * dfforecasts["sign"].values,
dfforecasts["samp_incr"].values
)
dfforecasts["final_cum"] = dfforecasts.groupby(["sim", "origin"])["final_incr"].cumsum()
dfforecasts = dfforecasts.rename({"final_cum": "ultimate", "l_act_cum": "latest"}, axis=1)
return(dfforecasts.sort_values(by=["sim", "origin", "dev"]).reset_index(drop=True))
[docs] @staticmethod
def _bs_reserves(dfprocerror):
"""
Compute unpaid loss reserve estimate using output from
``self._bs_process_error``.
Parameters
----------
dfprocerror: pd.DataFrame
Output from ``self._bs_process_error``.
Returns
-------
pd.DataFrame
"""
keepcols = ["sim", "origin", "latest", "ultimate", "reserve"]
max_devp = dfprocerror["dev"].values.max()
dfprocerror["reserve"] = dfprocerror["ultimate"] - dfprocerror["latest"]
dfreserves = dfprocerror[dfprocerror["dev"].values == max_devp][keepcols].drop_duplicates()
dfreserves["latest"] = np.where(
np.isnan(dfreserves["latest"].values),
dfreserves["ultimate"].values, dfreserves["latest"].values
)
dfreserves["reserve"] = np.nan_to_num(dfreserves["reserve"].values, 0)
return(dfreserves.sort_values(by=["origin", "sim"]).reset_index(drop=True))
[docs] @staticmethod
def _bs_std_error(dfreserves):
"""
Compute standard error of bootstrapped reserves by origin and in aggregate.
Parameters
----------
dfreserves: pd.DataFrame
Output from ``self._bs_reserves``.
Returns
-------
pd.Series
"""
# Compute standard deviation of bootstrap samples by origin.
dforigin_std = dfreserves.groupby(["origin"], as_index=False)["reserve"].std(ddof=1)
origin_se = pd.Series(
data=dforigin_std["reserve"].values, index=dforigin_std["origin"].values,
name="std_error")
dftotal = dfreserves.groupby(["sim"], as_index=False)["reserve"].sum()
total_se = pd.Series(
data=dftotal["reserve"].std(ddof=1), index=["total"], name="std_error"
)
return(origin_se.append(total_se))
[docs]class BootstrapChainLadderResult(BaseRangeEstimatorResult):
"""
Container class for ``BootstrapChainLadder`` output.
Parameters
----------
summary: pd.DataFrame
Chain Ladder summary compilation.
reserve_dist: pd.DataFrame
The predicitive distribution of reserve estimates generated via
bootstrapping. ``reserve_dist`` is a five column DataFrame
consisting of the simulation number, origin period, the latest
loss amount for the associated origin period, and the predictive
distribution of ultimates and reserves.
sims_data: pd.DataFrame
A DataFrame consiting of all simulated values an intermediate
fields. When a large number of bootstrap iterations are run,
``sims_data`` will be correspondingly large. The fields include:
- dev:
The simulated development period.
- incr:
The actual incremental loss amount obtain from the fitted triangle.
- incr_sqrt:
The square root of incr.
- l_act_cum:
The latest actual cumulative loss amount for dev/origin.
- l_act_dev:
The latest dev period with actual losses for a given origin period.
- ldf:
Loss development factors computed on syntehtic triangle data.
- origin:
The simulated origin period.
- rectype:
Whether the dev/origin combination represent actual or forecast data
in the squared triangle.
- resid:
The resampled adjusted residuals if ``parametric=False``, otherwise a
random sampling from a normal distribution with mean zero and variance
based on the variance of the adjusted residuals.
- samp_cum:
A syntehtic cumulative loss amount.
- samp_incr:
A synthetic incremental loss amount.
- sim:
Bootstrap iteration.
- var:
The variance, computed as scale_param x samp_incr.
- sign:
The sign of samp_incr.
- param2/param1:
Parameters for the process error distribution.
- final_incr:
Final simulated incremetnal loss amount after the incorporation of
process error.
- final_cum:
Final simulated cumulative loss amount after the incorporation of
process error.
tri: trikit.triangle.CumTriangle
A cumulative triangle instance.
ldfs: pd.Series
Loss development factors.
scale_param: float
The the sum of the squared unscaled Pearson residuals over the triangle's
degrees of freedom.
dof: int
Triangle degrees of freedom.
unscaled_residuals: pd.DataFrame
The unscaled residuals.
adjusted_residuals: pd.DataFrame
The adjusted residuals.
sampling_dist: np.ndarray
Same as ``adjusted_residuals`` but as a numpy array with NaN's and 0's
removed. None if ``parametric=True``.
fitted_tri_cum: pd.DataFrame
Cumulative triangle fit using backwards recursion.
fitted_tri_incr: pd.DataFrame
Incremental triangle fit using backwards recursion.
sims: int
Number of bootstrap iterations performed.
procdist: str
Distribution used to incorporate process variance. Currently "gamma" is
the only option.
parametric: bool
Whether parametric or non-parametric bootstrap was performed.
q: float or array_like of float
Quantiles over which to evaluate reserve distribution in summary output.
interpolation: {"linear", "lower", "higher", "midpoint", "nearest"}
Optional parameter which specifies the interpolation method to use
when the desired quantile lies between two data points i < j. See
``numpy.quantile`` for more information. Default value is "linear".
kwargs: dict
Additional keyword arguments passed into ``BootstrapChainLadder``'s
``__call__`` method.
"""
def __init__(self, summary, tri, ldfs, tail, trisqrd, reserve_dist, sims_data,
scale_param, dof, unscaled_residuals, adjusted_residuals,
sampling_dist, fitted_tri_cum, fitted_tri_incr, sims, procdist,
parametric, q, interpolation, **kwargs):
super().__init__(summary=summary, tri=tri, ldfs=ldfs, tail=tail,
trisqrd=trisqrd, process_error=None, parameter_error=None)
self.unscaled_residuals = unscaled_residuals
self.adjusted_residuals = adjusted_residuals
self.fitted_tri_incr = fitted_tri_incr
self.fitted_tri_cum = fitted_tri_cum
self.sampling_dist = sampling_dist
self.interpolation = interpolation
self.reserve_dist = reserve_dist
self.scale_param = scale_param
self.parametric = parametric
self.sims_data = sims_data
self.procdist = procdist
self.sims = sims
self.dof = dof
self.q = q
if kwargs is not None:
for kk in kwargs:
setattr(self, kk, kwargs[kk])
qtlsfields = [ii for ii in self.summary.columns if ii.endswith("%")]
self.qtlhdrs = {ii: "{:,.0f}".format for ii in qtlsfields}
self._summspecs.update(self.qtlhdrs)
# Properties.
self._residuals_detail = None
self._fit_assessment = None
self._origin_dist = None
self._agg_dist = None
@property
def origin_dist(self):
"""
Return distribution of bootstrapped ultimates/reserves by origin period.
Returns
-------
pd.DataFrame
"""
if self._origin_dist is None:
dist_columns = ["latest", "ultimate", "reserve"]
self._origin_dist = self.reserve_dist.groupby(
["sim", "origin"], as_index=False)[dist_columns].sum()
return(self._origin_dist)
@property
def residuals_detail(self):
"""
Summary statistics based on triangle residuals.
Returns
-------
pd.DataFrame
"""
if self._residuals_detail is None:
if not self.parametric:
unscaled = self.unscaled_residuals.values.ravel()
adjusted = self.adjusted_residuals.values.ravel()
unscaled = unscaled[~np.isnan(unscaled)]
adjusted = adjusted[~np.isnan(adjusted)]
unscaled = unscaled[unscaled != 0]
adjusted = adjusted[adjusted != 0]
unscaled_size = unscaled.size
unscaled_sum = unscaled.sum(axis=0)
unscaled_ssqr = np.sum(unscaled**2, axis=0)
unscaled_min = unscaled.min(axis=0)
unscaled_max = unscaled.max(axis=0)
unscaled_mean = unscaled.mean(axis=0)
unscaled_skew = stats.skew(unscaled, axis=0, nan_policy="omit")
unscaled_mode = stats.mode(unscaled, axis=0, nan_policy="omit").mode[0]
unscaled_cvar = stats.variation(unscaled, axis=0, nan_policy="omit")
unscaled_kurt = stats.kurtosis(unscaled, axis=0, nan_policy="omit")
unscaled_var = unscaled.var(ddof=1, axis=0)
unscaled_std = unscaled.std(ddof=1, axis=0)
unscaled_med = np.median(unscaled, axis=0)
adjusted_size = adjusted.size
adjusted_sum = adjusted.sum(axis=0)
adjusted_ssqr = np.sum(adjusted**2, axis=0)
adjusted_min = adjusted.min(axis=0)
adjusted_max = adjusted.max(axis=0)
adjusted_mean = adjusted.mean(axis=0)
adjusted_skew = stats.skew(adjusted, axis=0, nan_policy="omit")
adjusted_mode = stats.mode(adjusted, axis=0, nan_policy="omit").mode[0]
adjusted_cvar = stats.variation(adjusted, axis=0, nan_policy="omit")
adjusted_kurt = stats.kurtosis(adjusted, axis=0, nan_policy="omit")
adjusted_var = adjusted.var(ddof=1, axis=0)
adjusted_std = adjusted.std(ddof=1, axis=0)
adjusted_med = np.median(adjusted, axis=0)
self._residuals_detail = pd.DataFrame({
"unscaled": [
unscaled_size, unscaled_sum , unscaled_ssqr, unscaled_min,
unscaled_max, unscaled_mean, unscaled_skew, unscaled_mode,
unscaled_cvar, unscaled_kurt, unscaled_var , unscaled_std,
unscaled_med
],
"adjusted": [
adjusted_size, adjusted_sum , adjusted_ssqr, adjusted_min,
adjusted_max, adjusted_mean, adjusted_skew, adjusted_mode,
adjusted_cvar, adjusted_kurt, adjusted_var , adjusted_std,
adjusted_med
],
},
index=[
"size", "sum", "sum_of_squares", "minimum", "maximum", "mean",
"skew", "mode", "cov", "kurtosis", "variance",
"standard_deviation", "median"
]
)
return(self._residuals_detail)
[docs] def _get_quantiles_by_devp(self, qtls, qtlhdrs):
"""
Get quantile of boostrapped reserve distribution for an individual origin
period and in total.
Parameters
----------
q: array_like
A length-2 sequence representing to upper and lower bounds of
the estimated reserve distribution.
Returns
-------
pd.DataFrame
"""
dfsims = self.sims_data[["origin", "dev", "ultimate"]]
dfults = dfsims[dfsims.dev == dfsims.dev.max()].reset_index(drop=True)
dev_increment = np.unique(self.ldfs.index[1:] - self.ldfs.index[:-1])[0]
dfults["dev"] = self.ldfs.index.max() + dev_increment
dfsims = pd.concat([dfsims, dfults])
dftotal_keys = dfsims[dfsims.origin == dfsims.origin.min()][["origin", "dev"]].drop_duplicates()
dftotal_keys["origin"] = "total"
dfqtls_keys = pd.concat(
[dfsims[["origin", "dev"]].drop_duplicates(), dftotal_keys]
).reset_index(drop=True
)
# Get total reserve across all origin periods.
dftotal = dfsims.copy(deep=True)
dftotal["origin"] = "total"
dftotal = dftotal.groupby(["origin", "dev"], as_index=False)
dflist = []
for ii, jj in zip(qtls, qtlhdrs):
dfqtl = dfsims.groupby(["origin", "dev"], as_index=False).aggregate(
"quantile", q=ii, interpolation="linear").rename(
{"ultimate": jj}, axis=1
)
dftotal_qtl = dftotal.aggregate(
"quantile", q=ii, interpolation="linear").rename({"ultimate": jj},
axis=1
)
dflist.append(pd.concat([dfqtl, dftotal_qtl]))
# Combine DataFrames in dflist into single table.
dfqtls = functools.reduce(
lambda df1, df2: df1.merge(df2, on=["origin", "dev"], how="left"),
dflist, dfqtls_keys).reset_index(drop=True)
return(dfqtls)
[docs] def get_quantiles(self, q, interpolation="linear", lb=None):
"""
Get quantiles of bootstrapped reserve distribution for an individual origin
periods and in total. Returns a DataFrame, with columns representing the
percentiles of interest.
Parameters
----------
q: array_like of float or float
Quantile or sequence of quantiles to compute, which must be between 0 and 1
inclusive.
interpolation: {"linear", "lower", "higher", "midpoint", "nearest"}
Optional parameter which specifies the interpolation method to use
when the desired quantile lies between two data points i < j. See
``numpy.quantile`` for more information. Default value is "linear".
lb: float
Lower bound of simulated values. If ``lb`` is not None, quantiles less
than ``lb`` will be set to ``lb``. To eliminate negative quantiles,
set ``lb=0``.
Returns
-------
pd.DataFrame
"""
qarr = np.asarray(q, dtype=float)
if np.any(np.logical_and(qarr > 1, qarr < 0)):
raise ValueError("q values must fall within [0, 1].")
else:
qtls, qtlhdrs = self._qtls_formatter(q=q)
qtl_pairs = [(qtlhdrs[ii], qtls[ii]) for ii in range(len(qtls))]
dqq = {
str(ii[0]): [
np.percentile(
self.reserve_dist[self.reserve_dist.origin == origin]["reserve"].values,
100 * ii[-1], interpolation=interpolation
) for origin in self.summary.index] for ii in qtl_pairs
}
dfqq = pd.DataFrame().from_dict(dqq).set_index(self.summary.index)
if lb is not None:
dfqq = dfqq.applymap(lambda v: lb if v < lb else v)
return(dfqq)
[docs] def plot(self, q=.90, actuals_color="#334488", forecasts_color="#FFFFFF",
fill_color="#FCFCB1", fill_alpha=.75, axes_style="darkgrid",
context="notebook", col_wrap=4, hue_kws=None, exhibit_path=None,
**kwargs):
"""
Generate exhibit representing the distribution of reserve estimates
resulting from bootstrap resampling, along with percentiles from the
distribution given by ``q``, the percentile(s) of interest.
Parameters
----------
q: float in range of [0,1]
two_sided percentile interval to highlight, which must be between
0 and 1 inclusive. For example, when ``q=.90``, the 5th and
95th percentile of the ultimate/reserve distribution will be
highlighted in the exhibit :math:`(\\frac{1 - q}{2}, \\frac{1 + q}{2})`.
actuals_color: str
Color or hexidecimal color code used to represent actuals.
Defaults to "#00264C".
forecasts_color: str
Color or hexidecimal color code used to represent forecasts.
Defaults to "#FFFFFF".
fill_color: str
Color or hexidecimal color code used to represent the fill color
between reserve distribution quantiles associated with ``q``.
Defaults to "#FCFCB1".
fill_alpha: float
Control transparency of ``fill_color`` between upper and lower
percentile bounds of the ultimate/reserve distribution. Defaults
to .75.
axes_style: {"darkgrid", "whitegrid", "dark", "white", "ticks"}
Aesthetic style of seaborn plots. Default values is "darkgrid".
context: {"notebook", "paper", "talk", "poster"}.
Set the plotting context parameters. According to the seaborn
documentation, This affects things like the size of the labels,
lines, and other elements of the plot, but not the overall style.
Default value is "notebook".
col_wrap: int
The maximum number of origin period axes to have on a single row
of the resulting FacetGrid. Defaults to 5.
hue_kws: dictionary of param:list of values mapping
Other keyword arguments to insert into the plotting call to let
other plot attributes vary across levels of the hue variable
(e.g. the markers in a scatterplot). Each list of values should
have length 4, with each index representing aesthetic
overrides for forecasts, actuals, lower percentile and upper
percentile renderings respectively. Defaults to ``None``.
exhibit_path: str
Path to which exhibit should be written. If None, exhibit will be
rendered via ``plt.show()``.
kwargs: dict
Additional styling options for scatter points. This should include
additional options accepted by ``plt.plot``.
"""
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_context(context)
qtls, qtlhdrs = self._qtls_formatter(q=q, two_sided=True)
data = self._bs_data_transform(qtls, qtlhdrs).dropna(how="any", subset=["loss"])
with sns.axes_style(axes_style):
huekwargs = dict(
marker=["o", "o", None, None], markersize=[6, 6, None, None],
color=["#000000", "#000000", "#000000", "#000000"],
fillstyle=["full", "full", "none", "none"],
markerfacecolor=[forecasts_color, actuals_color, None, None],
markeredgecolor=["#000000", "#000000", None, None],
markeredgewidth=[.50, .50, None, None],
linestyle=["-", "-", "-.", "--"], linewidth=[.475, .475, .625, .625],
)
if hue_kws is not None:
# Determine whether the length of each element of hue_kws is 4.
if all(len(hue_kws[i]) == 4 for i in hue_kws):
huekwargs.update(hue_kws)
else:
warnings.warn("hue_kws overrides not correct length - Ignoring.")
grid = sns.FacetGrid(
data, col="origin", hue="rectype", hue_kws=huekwargs,
col_wrap=col_wrap, margin_titles=False, despine=True,
sharex=False, sharey=False,
hue_order=["forecast", "actual", qtlhdrs[0], qtlhdrs[-1]]
)
ult_vals = grid.map(plt.plot, "dev", "loss",)
devp_xticks = np.sort(data.dev.unique())
devp_xticks_str = [
str(ii) if ii != devp_xticks.max() else "ult" for ii in devp_xticks
]
grid.set(xticks=devp_xticks)
grid.set_xticklabels(devp_xticks_str, size=7)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Change ticklabel font size and place legend on each facet.
for origin, ax_ii in zip(np.sort(data.origin.unique()), grid.axes):
legend = ax_ii.legend(
loc="lower right", fontsize="small", frameon=True,
fancybox=True, shadow=False, edgecolor="#909090",
framealpha=1, markerfirst=True,)
legend.get_frame().set_facecolor("#FFFFFF")
# For given origin, determine optimal 5-point tick labels.
origin_max_val = data[data.origin == origin].loss.max()
y_ticks, y_ticklabels = self._get_yticks(origin_max_val)
ax_ii.set_yticks(y_ticks)
ax_ii.set_yticklabels(y_ticklabels, size=7)
ax_ii.annotate(
origin, xy=(.075, .90), xytext=(.075, .90), xycoords='axes fraction',
textcoords='axes fraction', fontsize=9, rotation=0, color="#000000",
)
ax_ii.set_title("")
ax_ii.set_xlabel("")
ax_ii.set_ylabel("")
# Fill between upper and lower range bounds.
axc = ax_ii.get_children()
lines_ = [jj for jj in axc if isinstance(jj, matplotlib.lines.Line2D)]
xx = [jj._x for jj in lines_ if len(jj._x) > 0]
yy = [jj._y for jj in lines_ if len(jj._y) > 0]
x_, lb_, ub_ = xx[0], yy[-2], yy[-1]
ax_ii.fill_between(x_, lb_, ub_, color=fill_color, alpha=fill_alpha)
# Draw border around each facet.
for _, spine_ in ax_ii.spines.items():
spine_.set(visible=True, color="#000000", linewidth=.50)
if exhibit_path is not None:
plt.savefig(exhibit_path)
else:
plt.show()
[docs] def hist(self, color="#FFFFFF", axes_style="darkgrid", context="notebook",
col_wrap=4, exhibit_path=None, **kwargs):
"""
Generate histogram of estimated reserve distribution by accident
year and in total.
Parameters
----------
color: str
Determines histogram color in each facet. Can also be specified as
a key-value pair in ``kwargs``.
axes_style: str
Aesthetic style of plots. Defaults to "darkgrid". Other options
include {whitegrid, dark, white, ticks}.
context: str
Set the plotting context parameters. According to the seaborn
documentation, This affects things like the size of the labels,
lines, and other elements of the plot, but not the overall style.
Defaults to ``"notebook"``. Additional options include
{paper, talk, poster}.
col_wrap: int
The maximum number of origin period axes to have on a single row
of the resulting FacetGrid. Defaults to 5.
exhibit_path: str
Path to which exhibit should be written. If None, exhibit will be
rendered via ``plt.show()``.
kwargs: dict
Dictionary of optional matplotlib styling parameters.
"""
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_context(context)
# data0 = self.sims_data[["sim", "origin", "dev", "rectype", "latest", "reserve",]]
# data0 = data0[(data0["dev"]==data0["dev"].max()) & (data0["rectype"]=="forecast")].reset_index(drop=True)
# data0 = data0.drop(["dev", "rectype", "latest"], axis=1)
#
# # Include additional origin representing aggregate distribution.
# data1 = data0.groupby("sim", as_index=False)[["reserve"]].sum()
# data1["origin"] ="total"
# data = pd.concat([data0, data1])
data = self.reserve_dist
# Get mean, min and max ultimate and reserve by origin.
med_data = data.groupby("origin", as_index=False)[["reserve"]].median().rename(
{"reserve": "med_res"}, axis=1).set_index("origin")
min_data = data.groupby("origin", as_index=False)[["reserve"]].min().rename(
{"reserve": "min_res"}, axis=1).set_index("origin")
max_data = data.groupby("origin", as_index=False)[["reserve"]].max().rename(
{"reserve": "max_res"}, axis=1).set_index("origin")
dfmetrics = functools.reduce(lambda df1, df2: df1.join(df2), (med_data, min_data, max_data))
dfmetrics = dfmetrics.applymap(lambda v: 0 if v < 0 else v).reset_index(drop=False)
with sns.axes_style(axes_style):
pltkwargs = {"color": color, "bins": 20, "edgecolor": "#484848",
"alpha": 1., "linewidth": .45}
if kwargs is not None:
pltkwargs.update(kwargs)
grid = sns.FacetGrid(
data, col="origin", col_wrap=col_wrap, margin_titles=False,
despine=True, sharex=False, sharey=False,
)
hists = grid.map(plt.hist, "reserve", **pltkwargs)
grid.set_axis_labels("", "")
grid.set_titles("", size=6)
# Change ticklabel font size and place legend on each facet.
origin_vals = sorted([int(ii) for ii in data["origin"].unique() if ii != "total"])
dindex = {jj: ii for ii, jj in enumerate(origin_vals)}
dindex.update({"total": max(dindex.values()) + 1})
data["origin_index"] = data["origin"].map(dindex)
origin_order = data[["origin_index", "origin"]].drop_duplicates().sort_values(
"origin_index"
).origin.values
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for origin, ax_ii in zip(origin_order, grid.axes):
# xmin = np.max([0, dfmetrics[dfmetrics.origin == origin]["min_res"].item()])
xmax = dfmetrics[dfmetrics.origin == origin]["max_res"].item() * 1.025
xmed = dfmetrics[dfmetrics.origin == origin]["med_res"].item()
origin_str = "{}".format(origin)
ax_ii.set_xlim([0, xmax])
ax_ii.axvline(xmed, color="#E02C70", linestyle="--", linewidth=1.5)
ax_ii.grid(False)
ymedloc = max(rect.get_height() for rect in ax_ii.patches) * .30
ax_ii.set_yticks([])
ax_ii.set_yticklabels([])
ax_ii.tick_params(
axis="x", which="both", bottom=True, top=False, labelbottom=True
)
ax_ii.set_xticklabels(
["{:,.0f}".format(jj) for jj in ax_ii.get_xticks()], size=7
)
ax_ii.annotate(
origin_str, xy=(.85, .925), xycoords='axes fraction',
textcoords='axes fraction', fontsize=9, rotation=0, color="#000000",
)
ax_ii.annotate(
"median = {:,.0f}".format(xmed), (xmed, ymedloc), xytext=(7.5, 0),
textcoords="offset points", ha="center", va="bottom", fontsize=7,
rotation=90, color="#000000"
)
# Draw border around each facet.
for _, spine in ax_ii.spines.items():
spine.set(visible=True, color="#000000", linewidth=.50)
if exhibit_path is not None:
plt.savefig(exhibit_path)
else:
plt.show()