Source code for etrago.disaggregate.temporal

# -*- coding: utf-8 -*-
# Copyright 2016-2023  Flensburg University of Applied Sciences,
# Europa-Universität Flensburg,
# Centre for Sustainable Energy Systems,
# DLR-Institute for Networked Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# File description
"""
temporal.py defines the methods to run temporal disaggregation on networks.
"""

from datetime import timedelta
import logging
import math
import os

import pandas as pd

logger = logging.getLogger(__name__)

if "READTHEDOCS" not in os.environ:

    from etrago.execute import optimize_with_rolling_horizon
    from etrago.tools.constraints import Constraints


__copyright__ = (
    "Flensburg University of Applied Sciences, "
    "Europa-Universität Flensburg, "
    "Centre for Sustainable Energy Systems, "
    "DLR-Institute for Networked Energy Systems"
)
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = (
    "ulfmueller, s3pp, wolfbunke, mariusves, lukasol, KathiEsterl, "
    "ClaraBuettner, CarlosEpia, AmeliaNadal"
)


[docs] def temp_disagg_soc(n, n_tsa): """Interpolate state of charges for not-optimized snapshots Parameters ---------- n : pypsa.Network Network in full temporal resolution without optimization results n_tsa : pypsa.Network Network in reduced temporal resolution with optimization results Returns ------- stores_e_full_ts : pd.DataFrame Interpolated store-e timeseries for each snapshot su_soc_full_ts : pd.DataFrame Interpolated storage units-soc timeseries for each snapshot """ stores_e_full_ts = pd.DataFrame(index=n.snapshots, columns=n.stores.index) stores_e_full_ts.loc[n_tsa.snapshots] = n_tsa.stores_t.e su_soc_full_ts = pd.DataFrame( index=n.snapshots, columns=n.storage_units.index ) su_soc_full_ts.loc[n_tsa.snapshots] = n_tsa.storage_units_t.state_of_charge n_skip = int(n_tsa.snapshot_weightings.iloc[0, 0]) for calc_sns in n_tsa.snapshots: next_calc = calc_sns + timedelta(hours=int(n_skip)) if next_calc in n_tsa.snapshots: next_calc = next_calc else: next_calc = n_tsa.snapshots[0] e_now = stores_e_full_ts.loc[calc_sns] e_next = stores_e_full_ts.loc[next_calc] soc_now = su_soc_full_ts.loc[calc_sns] soc_next = su_soc_full_ts.loc[next_calc] diff_e = (e_next - e_now) / n_skip diff_soc = (soc_next - soc_now) / n_skip for i in range(n_skip - 1): hour = calc_sns + timedelta(hours=1 + i) if hour in stores_e_full_ts.index: stores_e_full_ts.loc[hour] = ( stores_e_full_ts.loc[hour - timedelta(hours=1)] + diff_e ) su_soc_full_ts.loc[hour] = ( su_soc_full_ts.loc[hour - timedelta(hours=1)] + diff_soc ) return stores_e_full_ts, su_soc_full_ts
[docs] def dispatch_disaggregation(self): """ Function running the tempral disaggregation meaning the optimization of dispatch in the temporally fully resolved network; therfore, the problem is reduced to smaller subproblems by slicing the whole considered time span while keeping inforation on the state of charge of storage units and stores to ensure compatibility and to reproduce saisonality. Returns ------- None. """ if self.args["temporal_disaggregation"]["active"]: horizon = math.ceil( len(self.network_tsa.snapshots) / self.args["temporal_disaggregation"]["no_slices"] ) n = self.network_tsa.copy() n_tsa = self.network.copy() if self.args["skip_snapshots"]: n_tsa.stores_t.e, n_tsa.storage_units_t.state_of_charge = ( temp_disagg_soc(n, n_tsa) ) # Avoid infeasibilies in e-Mobility stores e_mob_stores = n.stores[n.stores.carrier == "battery_storage"] max_e_first_hour = ( n.stores_t.e_max_pu.iloc[0] .loc[e_mob_stores.index] .mul(e_mob_stores.e_nom) ) index = e_mob_stores[e_mob_stores.e_initial > max_e_first_hour].index n.stores.loc[index, "e_initial"] = max_e_first_hour.loc[index] # Copy extension results n.lines["s_nom"] = n_tsa.lines["s_nom_opt"] n.lines["s_nom_extendable"] = False n.links["p_nom"] = n_tsa.links["p_nom_opt"] n.links["p_nom_extendable"] = False n.transformers["s_nom"] = n_tsa.transformers["s_nom_opt"] n.transformers.s_nom_extendable = False n.storage_units["p_nom"] = n_tsa.storage_units["p_nom_opt"] n.storage_units["p_nom_extendable"] = False n.stores["e_nom"] = n_tsa.stores["e_nom_opt"] n.stores["e_nom_extendable"] = False n.storage_units.cyclic_state_of_charge = False n.stores.e_cyclic = False n = optimize_with_rolling_horizon( n, n_tsa, snapshots=None, horizon=horizon, overlap=2, solver_name=self.args["solver"], extra_functionality=Constraints( self.args, False, apply_on="market_model" ).functionality, args=self.args, temporal_disaggregation=True, ) # switch to temporally fully resolved network as standard network, # temporally reduced network is stored in network_tsa self.network_tsa = self.network.copy() self.network = n # keep original settings self.network.lines["s_nom_extendable"] = self.network_tsa.lines[ "s_nom_extendable" ] self.network.links["p_nom_extendable"] = self.network_tsa.links[ "p_nom_extendable" ] self.network.transformers.s_nom_extendable = ( self.network_tsa.transformers.s_nom_extendable ) self.network.storage_units["p_nom_extendable"] = ( self.network_tsa.storage_units["p_nom_extendable"] ) self.network.stores["e_nom_extendable"] = self.network_tsa.stores[ "e_nom_extendable" ] self.network.storage_units.cyclic_state_of_charge = ( self.network_tsa.storage_units.cyclic_state_of_charge ) self.network.stores.e_cyclic = self.network_tsa.stores.e_cyclic if self.args["csv_export"]: path = self.args["csv_export"] self.export_to_csv(path) self.export_to_csv(path + "/temporal_disaggregaton")