Source code for aneris.methods

"""
This module defines all possible functional forms of harmonization methods and
the default decision tree for choosing which method to use.
"""

from bisect import bisect

import numpy as np
import pandas as pd
import pyomo.environ as pyo

from aneris import utils


[docs] def harmonize_factors(df, hist, harmonize_year=2015): """ Calculate offset and ratio values between data and history. Parameters ---------- df : pd.DataFrame model data hist : pd.DataFrame historical data harmonize_year : string, optional column name of harmonization year Returns ------- offset : pd.Series offset (history - model) ratio : pd.Series ratio (history / model) """ c, m = hist[harmonize_year], df[harmonize_year] offset = (c - m).fillna(0) offset.name = "offset" ratios = (c / m).replace(np.inf, np.nan).fillna(0) ratios.name = "ratio" return offset, ratios
[docs] def constant_offset(df, offset, harmonize_year=2015): """ Calculate constant offset harmonized trajectory. Parameters ---------- df : pd.DataFrame model data offset : pd.DataFrame offset data harmonize_year : string, optional column name of harmonization year, ignored Returns ------- df : pd.DataFrame harmonized trajectories """ df = df.copy() numcols = utils.numcols(df) # just add offset to all values df[numcols] = df[numcols].add(offset, axis=0) return df
[docs] def constant_ratio(df, ratios, harmonize_year=2015): """ Calculate constant ratio harmonized trajectory. Parameters ---------- df : pd.DataFrame model data ratio : pd.DataFrame ratio data harmonize_year : string, optional column name of harmonization year, ignored Returns ------- df : pd.DataFrame harmonized trajectories """ df = df.copy() numcols = utils.numcols(df) # just add offset to all values df[numcols] = df[numcols].multiply(ratios, axis=0) return df
[docs] def linear_interpolate(df, offset, final_year=2050, harmonize_year=2015): """ Calculate linearly interpolated convergence harmonized trajectory. Parameters ---------- df : pd.DataFrame model data offset : pd.DataFrame offset data final_year : string, optional column name of convergence year harmonize_year : string, optional column name of harmonization year Returns ------- df : pd.DataFrame harmonized trajectories """ df = df.copy() x1, x2 = harmonize_year, final_year y1, y2 = offset + df[x1], df[x2] m = (y2 - y1) / (float(x2) - float(x1)) b = y1 - m * float(x1) cols = [x for x in utils.numcols(df) if int(x) < int(final_year)] for c in cols: df[c] = m * float(c) + b return df
[docs] def reduce_offset(df, offset, final_year=2050, harmonize_year=2015): """ Calculate offset convergence harmonized trajectory. Parameters ---------- df : pd.DataFrame model data offset : pd.DataFrame offset data final_year : string, optional column name of convergence year harmonize_year : string, optional column name of harmonization year Returns ------- df : pd.DataFrame harmonized trajectories """ df = df.copy() yi, yf = int(harmonize_year), int(final_year) numcols = utils.numcols(df) numcols_int = [int(v) for v in numcols] # get factors that reduce from 1 to 0; factors before base year are > 1 f = lambda year: -(year - yi) / float(yf - yi) + 1 factors = [f(year) if year <= yf else 0.0 for year in numcols_int] # add existing values to offset time series offsets = pd.DataFrame( np.outer(offset, factors), columns=numcols, index=offset.index ) df[numcols] = df[numcols] + offsets return df
[docs] def reduce_ratio(df, ratios, final_year=2050, harmonize_year=2015): """ Calculate ratio convergence harmonized trajectory. Parameters ---------- df : pd.DataFrame model data ratio : pd.DataFrame ratio data final_year : string, optional column name of convergence year harmonize_year : string, optional column name of harmonization year Returns ------- df : pd.DataFrame harmonized trajectories """ df = df.copy() yi, yf = int(harmonize_year), int(final_year) numcols = utils.numcols(df) numcols_int = [int(v) for v in numcols] # get factors that reduce from 1 to 0, but replace with 1s in years prior # to harmonization f = lambda year: -(year - yi) / float(yf - yi) + 1 prefactors = [f(yi) for year in numcols_int if year < yi] postfactors = [f(year) if year <= yf else 0.0 for year in numcols_int if year >= yi] factors = prefactors + postfactors # multiply existing values by ratio time series ratios = ( pd.DataFrame(np.outer(ratios - 1, factors), columns=numcols, index=ratios.index) + 1 ) df[numcols] = df[numcols] * ratios return df
[docs] def budget(df, df_hist, harmonize_year=2015): r""" Calculate budget harmonized trajectory. Parameters ---------- df : pd.DataFrame model data df_hist : pd.DataFrame historic data harmonize_year : string, optional column name of harmonization year Returns ------- df_harm : pd.DataFrame harmonized trajectories Notes ----- Finds an emissions trajectory consistent with a provided historical emissions timeseries that closely matches a modeled result, while maintaining the overall carbon budget. An optimization problem is constructed and solved by IPOPT, which minimizes the difference between the rate of change of the model and the harmonized model in each year, while 1. preserving the carbon budget of the model, and 2. being consistent with the historical value. With years :math:`y_i`, model results :math:`m_i`, harmonized results :math:`x_i`, historical value :math:`h_0` and a remaining carbon budget :math:`B`, the optimization problem can be formulated as .. math:: \min_{x_i} \sum_{i \in |I - 1|} \big( \frac{m_{i+1} - m_i}{y_{i + 1} - y_{i}} - \frac{x_{i+1} - x_i}{y_{i + 1} - y_{i}} \big)^2 s.t. .. math:: \sum_{i} (y_{i + 1} - y_{i}) \big( x_i + 0.5 (x_{i+1} - x_i) \big) = B \quad \text{(carbon budget preservation)} and .. math:: x_0 = h_0 \quad \text{(consistency with historical values)} """ harmonize_year = int(harmonize_year) # df = df.set_axis(df.columns.astype(int), axis="columns") # df_hist = df_hist.set_axis(df_hist.columns.astype(int), axis="columns") data_years = df.columns hist_years = df_hist.columns years = data_years[data_years >= harmonize_year] if data_years[0] not in hist_years: hist_years = hist_years.insert(bisect(hist_years, data_years[0]), data_years[0]) df_hist = df_hist.reindex(columns=hist_years).interpolate( method="slinear", axis=1 ) def carbon_budget(years, emissions): # trapezoid rule dyears = np.diff(years) demissions = np.diff(emissions) budget = (dyears * (np.asarray(emissions)[:-1] + demissions / 2)).sum() return budget solver = pyo.SolverFactory("ipopt") if solver.executable() is None: raise RuntimeError( "No executable for the solver 'ipopt' found " "(necessary for the budget harmonization). " "Install from conda-forge or add to PATH." ) harmonized = [] for region in df.index: model = pyo.ConcreteModel() """ PARAMETERS """ data_vals = df.loc[region, years] hist_val = df_hist.loc[region, harmonize_year] budget_val = carbon_budget(data_years, df.loc[region, :]) if data_years[0] < harmonize_year: hist_in_overlap = df_hist.loc[region, data_years[0] : harmonize_year] budget_val -= carbon_budget(hist_in_overlap.index, hist_in_overlap) """ VARIABLES """ model.x = pyo.Var(years, initialize=0, domain=pyo.Reals) x = np.array( [model.x[y] for y in years] ) # keeps pyomo VarData objects, ie. modelling vars not numbers """ OBJECTIVE FUNCTION """ delta_years = np.diff(years) delta_x = np.diff(x) delta_m = np.diff(data_vals) def l2_norm(): return pyo.quicksum((delta_m / delta_years - delta_x / delta_years) ** 2) model.obj = pyo.Objective(expr=l2_norm(), sense=pyo.minimize) """ CONSTRAINTS """ model.hist_val = pyo.Constraint(expr=model.x[harmonize_year] == hist_val) model.budget = pyo.Constraint(expr=carbon_budget(years, x) == budget_val) """ RUN """ results = solver.solve(model) assert (results.solver.status == pyo.SolverStatus.ok) and ( results.solver.termination_condition == pyo.TerminationCondition.optimal ), ( f"ipopt terminated budget optimization with status: " f"{results.solver.status}, {results.solver.termination_condition}" ) harmonized.append([pyo.value(model.x[y]) for y in years]) df_harm = pd.DataFrame( harmonized, index=df.index, columns=years, ) return df_harm
[docs] def model_zero(df, offset, harmonize_year=2015): """ Returns result of aneris.methods.constant_offset() """ # current decision is to return a simple offset, this will be a straight # line for all time periods. previous behavior was to set df[numcols] = 0, # i.e., report 0 if model reports 0. return constant_offset(df, offset)
[docs] def hist_zero(df, *args, **kwargs): """ Returns df (no change) """ # TODO: should this set values to 0? df = df.copy() return df
[docs] def coeff_of_var(s): """ Returns coefficient of variation of a Series. .. math:: c_v = \\frac{\\sigma(s^{\\prime}(t))}{\\mu(s^{\\prime}(t))} Parameters ---------- s : pd.Series timeseries Returns ------- c_v : float coefficient of variation """ x = np.diff(s.to_numpy()) with np.errstate(invalid="ignore"): return np.abs(np.std(x) / np.mean(x))
[docs] def default_method_choice( row, ratio_method="reduce_ratio_2080", offset_method="reduce_offset_2080", luc_method="reduce_offset_2150_cov", luc_cov_threshold=10, ): """ Default decision tree as documented at. Refer to choice flow chart at https://drive.google.com/drive/folders/0B6_Oqvcg8eP9QXVKX2lFVUJiZHc for arguments available in row and their definition """ # special cases if row.h == 0: return "hist_zero" if row.zero_m: return "model_zero" if np.isinf(row.f) and row.neg_m and row.pos_m: # model == 0 in base year, and model goes negative # and positive return "unicorn" # this shouldn't exist! # model 0 in base year? if np.isclose(row.m, 0): # goes negative? if row.neg_m: return offset_method else: return "constant_offset" else: # is this co2? # ZN: This gas dependence isn't documented in the default # decision tree if hasattr(row, "gas") and row.gas == "CO2": return ratio_method # is cov big? if np.isfinite(row["cov"]) and row["cov"] > luc_cov_threshold: return luc_method else: # dH small? if row.dH < 0.5: return ratio_method else: # goes negative? if row.neg_m: return "reduce_ratio_2100" else: return "constant_ratio"
def default_methods(hist, model, base_year, method_choice=None, **kwargs): """ Determine default harmonization or downscaling methods to use. See http://mattgidden.com/aneris/theory.html#default-decision-tree for a graphical description of the decision tree. Parameters ---------- hist : pd.DataFrame historical data model : pd.DataFrame model data base_year : string, int harmonization year method_choice : function, optional codified decision tree, see `default_method_choice` function **kwargs : Additional parameters passed on to the choice functions. Harmonization functions might depend on the following method names: ratio_method : string method to use for ratio harmonization, default: reduce_ratio_2080 offset_method : string method to use for offset harmonization, default: reduce_offset_2080 luc_method : string method to use for high coefficient of variation, reduce_offset_2150_cov luc_cov_threshold : float cov threshold above which to use `luc_method` Downscaling functions require the following choices: intensity_method : string method to use for intensity convergence, default ipat_gdp_2100 luc_method : string method to use for agriculture and luc emissions, default base_year_pattern Returns ------- methods : pd.Series default harmonization methods metadata : pd.DataFrame metadata regarding why each method was chosen See also -------- `default_method_choice` """ y = str(base_year) try: h = hist[base_year] m = model[base_year] except KeyError: h = hist[y] m = model[y] dH = (h - m).abs() / h f = h / m dM = (model.max(axis=1) - model.min(axis=1)).abs() / model.max(axis=1) neg_m = (model < 0).any(axis=1) pos_m = (model > 0).any(axis=1) zero_m = (model == 0).all(axis=1) go_neg = ((model.min(axis=1) - h) < 0).any() cov = hist.apply(coeff_of_var, axis=1) df = pd.DataFrame( { "dH": dH, "f": f, "dM": dM, "neg_m": neg_m, "pos_m": pos_m, "zero_m": zero_m, "go_neg": go_neg, "cov": cov, "h": h, "m": m, } ).join(model.index.to_frame()) if method_choice is None: method_choice = default_method_choice ret = df.apply(method_choice, axis=1, **kwargs) ret.name = "method" return ret, df