"""
This module defines all possible functional forms of harmonization methods and
the default decision tree for choosing which method to use.
"""
from bisect import bisect
import numpy as np
import pandas as pd
import pyomo.environ as pyo
from pandas_indexing import assignlevel
from aneris import utils
def _log(msg, *args, **kwargs):
utils.logger().info(msg, *args, **kwargs)
[docs]
def harmonize_factors(df, hist, harmonize_year=2015):
"""
Calculate offset and ratio values between data and history.
Parameters
----------
df : pd.DataFrame
model data
hist : pd.DataFrame
historical data
harmonize_year : string, optional
column name of harmonization year
Returns
-------
offset : pd.Series
offset (history - model)
ratio : pd.Series
ratio (history / model)
"""
c, m = hist[harmonize_year], df[harmonize_year]
offset = (c - m).fillna(0)
offset.name = "offset"
ratios = (c / m).replace(np.inf, np.nan).fillna(0)
ratios.name = "ratio"
return offset, ratios
[docs]
def constant_offset(df, offset, harmonize_year=2015):
"""
Calculate constant offset harmonized trajectory.
Parameters
----------
df : pd.DataFrame
model data
offset : pd.DataFrame
offset data
harmonize_year : string, optional
column name of harmonization year, ignored
Returns
-------
df : pd.DataFrame
harmonized trajectories
"""
df = df.copy()
numcols = utils.numcols(df)
# just add offset to all values
df[numcols] = df[numcols].add(offset, axis=0)
return df
[docs]
def constant_ratio(df, ratios, harmonize_year=2015):
"""
Calculate constant ratio harmonized trajectory.
Parameters
----------
df : pd.DataFrame
model data
ratio : pd.DataFrame
ratio data
harmonize_year : string, optional
column name of harmonization year, ignored
Returns
-------
df : pd.DataFrame
harmonized trajectories
"""
df = df.copy()
numcols = utils.numcols(df)
# just add offset to all values
df[numcols] = df[numcols].multiply(ratios, axis=0)
return df
[docs]
def linear_interpolate(df, offset, final_year=2050, harmonize_year=2015):
"""
Calculate linearly interpolated convergence harmonized trajectory.
Parameters
----------
df : pd.DataFrame
model data
offset : pd.DataFrame
offset data
final_year : string, optional
column name of convergence year
harmonize_year : string, optional
column name of harmonization year
Returns
-------
df : pd.DataFrame
harmonized trajectories
"""
df = df.copy()
x1, x2 = harmonize_year, final_year
y1, y2 = offset + df[x1], df[x2]
m = (y2 - y1) / (float(x2) - float(x1))
b = y1 - m * float(x1)
cols = [x for x in utils.numcols(df) if int(x) < int(final_year)]
for c in cols:
df[c] = m * float(c) + b
return df
[docs]
def reduce_offset(df, offset, final_year=2050, harmonize_year=2015):
"""
Calculate offset convergence harmonized trajectory.
Parameters
----------
df : pd.DataFrame
model data
offset : pd.DataFrame
offset data
final_year : string, optional
column name of convergence year
harmonize_year : string, optional
column name of harmonization year
Returns
-------
df : pd.DataFrame
harmonized trajectories
"""
df = df.copy()
yi, yf = int(harmonize_year), int(final_year)
numcols = utils.numcols(df)
numcols_int = [int(v) for v in numcols]
# get factors that reduce from 1 to 0; factors before base year are > 1
f = lambda year: -(year - yi) / float(yf - yi) + 1
factors = [f(year) if year <= yf else 0.0 for year in numcols_int]
# add existing values to offset time series
offsets = pd.DataFrame(
np.outer(offset, factors), columns=numcols, index=offset.index
)
df[numcols] = df[numcols] + offsets
return df
[docs]
def reduce_ratio(df, ratios, final_year=2050, harmonize_year=2015):
"""
Calculate ratio convergence harmonized trajectory.
Parameters
----------
df : pd.DataFrame
model data
ratio : pd.DataFrame
ratio data
final_year : string, optional
column name of convergence year
harmonize_year : string, optional
column name of harmonization year
Returns
-------
df : pd.DataFrame
harmonized trajectories
"""
df = df.copy()
yi, yf = int(harmonize_year), int(final_year)
numcols = utils.numcols(df)
numcols_int = [int(v) for v in numcols]
# get factors that reduce from 1 to 0, but replace with 1s in years prior
# to harmonization
f = lambda year: -(year - yi) / float(yf - yi) + 1
prefactors = [f(yi) for year in numcols_int if year < yi]
postfactors = [f(year) if year <= yf else 0.0 for year in numcols_int if year >= yi]
factors = prefactors + postfactors
# multiply existing values by ratio time series
ratios = (
pd.DataFrame(np.outer(ratios - 1, factors), columns=numcols, index=ratios.index)
+ 1
)
df[numcols] = df[numcols] * ratios
return df
[docs]
def budget(df, df_hist, harmonize_year=2015):
r"""
Calculate budget harmonized trajectory.
Parameters
----------
df : pd.DataFrame
model data
df_hist : pd.DataFrame
historic data
harmonize_year : string, optional
column name of harmonization year
Returns
-------
df_harm : pd.DataFrame
harmonized trajectories
Notes
-----
Finds an emissions trajectory consistent with a provided historical emissions
timeseries that closely matches a modeled result, while maintaining the overall
carbon budget.
An optimization problem is constructed and solved by IPOPT, which minimizes the
difference between the rate of change of the model and the harmonized model
in each year, while
1. preserving the carbon budget of the model, and
2. being consistent with the historical value.
With years :math:`y_i`, model results :math:`m_i`, harmonized results :math:`x_i`,
historical value :math:`h_0` and a remaining carbon budget :math:`B`, the
optimization problem can be formulated as
.. math::
\min_{x_i} \sum_{i \in |I - 1|}
\big( \frac{m_{i+1} - m_i}{y_{i + 1} - y_{i}} -
\frac{x_{i+1} - x_i}{y_{i + 1} - y_{i}} \big)^2
s.t.
.. math::
\sum_{i} (y_{i + 1} - y_{i}) \big( x_i + 0.5 (x_{i+1} - x_i) \big) = B
\quad \text{(carbon budget preservation)}
and
.. math::
x_0 = h_0 \quad \text{(consistency with historical values)}
"""
harmonize_year = int(harmonize_year)
# df = df.set_axis(df.columns.astype(int), axis="columns")
# df_hist = df_hist.set_axis(df_hist.columns.astype(int), axis="columns")
data_years = df.columns
hist_years = df_hist.columns
years = data_years[data_years >= harmonize_year]
if data_years[0] not in hist_years:
hist_years = hist_years.insert(bisect(hist_years, data_years[0]), data_years[0])
df_hist = df_hist.reindex(columns=hist_years).interpolate(
method="slinear", axis=1
)
def carbon_budget(years, emissions):
# trapezoid rule
dyears = np.diff(years)
demissions = np.diff(emissions)
budget = (dyears * (np.asarray(emissions)[:-1] + demissions / 2)).sum()
return budget
solver = pyo.SolverFactory("ipopt")
if solver.executable() is None:
raise RuntimeError(
"No executable for the solver 'ipopt' found "
"(necessary for the budget harmonization). "
"Install from conda-forge or add to PATH."
)
harmonized = []
for region in df.index:
model = pyo.ConcreteModel()
"""
PARAMETERS
"""
data_vals = df.loc[region, years]
hist_val = df_hist.loc[region, harmonize_year]
budget_val = carbon_budget(data_years, df.loc[region, :])
if data_years[0] < harmonize_year:
hist_in_overlap = df_hist.loc[region, data_years[0] : harmonize_year]
budget_val -= carbon_budget(hist_in_overlap.index, hist_in_overlap)
"""
VARIABLES
"""
model.x = pyo.Var(years, initialize=0, domain=pyo.Reals)
x = np.array(
[model.x[y] for y in years]
) # keeps pyomo VarData objects, ie. modelling vars not numbers
"""
OBJECTIVE FUNCTION
"""
delta_years = np.diff(years)
delta_x = np.diff(x)
delta_m = np.diff(data_vals)
def l2_norm():
return pyo.quicksum((delta_m / delta_years - delta_x / delta_years) ** 2)
model.obj = pyo.Objective(expr=l2_norm(), sense=pyo.minimize)
"""
CONSTRAINTS
"""
model.hist_val = pyo.Constraint(expr=model.x[harmonize_year] == hist_val)
model.budget = pyo.Constraint(expr=carbon_budget(years, x) == budget_val)
"""
RUN
"""
results = solver.solve(model)
assert (results.solver.status == pyo.SolverStatus.ok) and (
results.solver.termination_condition == pyo.TerminationCondition.optimal
), f"ipopt terminated budget optimization with status: {results.solver.status}, {results.solver.termination_condition}"
harmonized.append([pyo.value(model.x[y]) for y in years])
df_harm = pd.DataFrame(
harmonized,
index=df.index,
columns=years,
)
return df_harm
[docs]
def model_zero(df, offset, harmonize_year=2015):
"""
Returns result of aneris.methods.constant_offset()
"""
# current decision is to return a simple offset, this will be a straight
# line for all time periods. previous behavior was to set df[numcols] = 0,
# i.e., report 0 if model reports 0.
return constant_offset(df, offset)
[docs]
def hist_zero(df, *args, **kwargs):
"""
Returns df (no change)
"""
# TODO: should this set values to 0?
df = df.copy()
return df
[docs]
def coeff_of_var(s):
"""
Returns coefficient of variation of a Series.
.. math:: c_v = \\frac{\\sigma(s^{\\prime}(t))}{\\mu(s^{\\prime}(t))}
Parameters
----------
s : pd.Series
timeseries
Returns
-------
c_v : float
coefficient of variation
"""
x = np.diff(s.to_numpy())
with np.errstate(invalid="ignore"):
return np.abs(np.std(x) / np.mean(x))
[docs]
def default_method_choice(
row,
ratio_method="reduce_ratio_2080",
offset_method="reduce_offset_2080",
luc_method="reduce_offset_2150_cov",
luc_cov_threshold=10,
):
"""
Default decision tree as documented at.
Refer to choice flow chart at
https://drive.google.com/drive/folders/0B6_Oqvcg8eP9QXVKX2lFVUJiZHc
for arguments available in row and their definition
Neil Grant: Have made changes which would amend this flow-chart
"""
# special cases
if np.isclose(row.h, 1e-6, atol=1e-3):
return "hist_zero"
if row.zero_m:
return "model_zero"
if np.isinf(row.f) and row.neg_m and row.pos_m:
# model == 0 in base year, and model goes negative
# and positive
return "unicorn" # this shouldn't exist!
# model 0 in base year?
if np.isclose(row.m, 0):
# goes negative?
if row.neg_m:
return offset_method
else:
return "constant_offset"
else:
# variable going to zero (phase-out)? Always use ratio to avoid
# an offset driving values below zero as the variable approaches zero.
if np.isclose(row.variable_min, 0, atol=1e-6):
return ratio_method
# is this co2?
# ZN: This gas dependence isn't documented in the default
# decision tree
if hasattr(row, "gas") and row.gas == "CO2":
return ratio_method
# is cov big?
if np.isfinite(row["cov"]) and row["cov"] > luc_cov_threshold:
return luc_method
else:
# dH small?
# Defined at the relative difference is less than 50% of historical data
# or under the absolute threshold
if (abs(row.dH) > 0.5) & (row.dH_abs < row.dH_abs_thresh):
# If dH is large, but dH_abs is small, this suggests
# The data being harmonised is a small ~near zero component
# (under 10% of the parent variable)
if row.dH < 0 and row.dH_abs > row.variable_min:
# dH < 0: model > hist, so offset would be negative.
# dH_abs > variable_min: the offset magnitude exceeds the
# variable's minimum future value, so applying an offset
# would push values below zero. Use ratio based method instead.
# This also captures the "shrinking variable" case: if the
# variable is growing, variable_min will be large and the
# offset is safe, falling through to offset_method below.
return ratio_method
else:
# Either the offset is positive (model < hist), or the offset
# is negative but small enough not to cause negative values
# (variable is growing or large enough to absorb the offset).
return offset_method
# If dH_abs is large, this suggests that the component being harmonised is not ~near zero
# In this context we can just rely on the ratio method throughout
elif abs(row.dH) < 0.5:
return ratio_method
else:
# goes negative?
if row.neg_m:
return "reduce_ratio_2100"
else:
return "constant_ratio"
[docs]
def calc_dh_abs_threshold(df, hist, base_year):
"""
Calculates a value of dH_abs_threshold, which is a threshold used to help determine methods choice
Parameters
----------
df : pd.DataFrame
DataFrame produced by default_methods function
hist : pd.DataFrame
Historical data
base_year : string, int
harmonization year
Returns
-------
df : pd.DataFrame
Modified dataframe with the dH_abs_threshold added
"""
# Calculate the dH_abs_threshold (for use in methods choice)
# Find parent variable, which is defined as the next level up in the hiearchy.
# E.g. parent variable of FE|Industry|Liquids = FE|Industry
# Or parent variable of Emissions|CO2|Demand|Industry = Emissions|CO2|Demand
try:
df = assignlevel(
df,
parent_var=df.variable.map(
lambda s: "|".join(s.split("|")[:-1])
# if len(s.split('|')[:-1])>1 else s
),
)
except Exception:
_log("Dataframe has no attribute 'variable', dH abs cannot be calculated")
df.loc[:, "dH_abs_thresh"] = np.nan
return df
# Find the index with the same indices, but switch the parent_variable in for the variable
select_ix = df.index.swaplevel("variable", "parent_var").droplevel("variable")
select_ix.names = df.index.droplevel("parent_var").names
for ix, sel_ix in zip(df.index, select_ix):
# Currently, the absolute threshold is set as 10% of the parent variable in the historical data
# Under 10% is counted as "small". We don't use model data here, as the model data could
# also be in need of harmonisation.
try:
df.loc[ix, "dH_abs_thresh"] = hist.loc[sel_ix, base_year] * 0.1
except Exception:
_log(
f"No data in the model dataframe for {sel_ix}, dH_abs_thresh not calculated"
)
df.loc[ix, "dH_abs_thresh"] = np.nan
df = df.droplevel("parent_var")
return df
def default_methods(hist, model, base_year, method_choice=None, **kwargs):
"""
Determine default harmonization or downscaling methods to use.
See http://mattgidden.com/aneris/theory.html#default-decision-tree for a
graphical description of the decision tree.
Parameters
----------
hist : pd.DataFrame
historical data
model : pd.DataFrame
model data
base_year : string, int
harmonization year
method_choice : function, optional
codified decision tree, see `default_method_choice` function
**kwargs :
Additional parameters passed on to the choice functions.
Harmonization functions might depend on the following method names:
ratio_method : string
method to use for ratio harmonization, default: reduce_ratio_2080
offset_method : string
method to use for offset harmonization, default: reduce_offset_2080
luc_method : string
method to use for high coefficient of variation, reduce_offset_2150_cov
luc_cov_threshold : float
cov threshold above which to use `luc_method`
Downscaling functions require the following choices:
intensity_method : string
method to use for intensity convergence, default ipat_gdp_2100
luc_method : string
method to use for agriculture and luc emissions, default base_year_pattern
Returns
-------
methods : pd.Series
default harmonization methods
metadata : pd.DataFrame
metadata regarding why each method was chosen
See also
--------
`default_method_choice`
"""
y = str(base_year)
try:
h = hist[base_year]
m = model[base_year]
except KeyError:
h = hist[y]
m = model[y]
dH = (h - m) / h
dH_abs = (h - m).abs()
f = h / m
dM = (model.max(axis=1) - model.min(axis=1)).abs() / model.max(axis=1)
neg_m = (model < 0).any(axis=1)
pos_m = (model > 0).any(axis=1)
zero_m = (model == 0).all(axis=1)
go_neg = ((model.min(axis=1) - h) < 0).any()
cov = hist.apply(coeff_of_var, axis=1)
future_cols = [c for c in model.columns if int(c) >= int(base_year)]
variable_min = model[future_cols].min(axis=1)
df = pd.DataFrame(
{
"dH": dH,
"dH_abs": dH_abs,
"f": f,
"dM": dM,
"neg_m": neg_m,
"pos_m": pos_m,
"zero_m": zero_m,
"go_neg": go_neg,
"cov": cov,
"h": h,
"m": m,
"variable_min": variable_min,
}
).join(model.index.to_frame())
df = calc_dh_abs_threshold(df, hist, base_year)
if method_choice is None:
method_choice = default_method_choice
ret = df.apply(method_choice, axis=1, **kwargs)
ret.name = "method"
return ret, df