# -*- coding: utf-8 -*-
# Copyright 2016-2023 Flensburg University of Applied Sciences,
# Europa-Universität Flensburg,
# Centre for Sustainable Energy Systems,
# DLR-Institute for Networked Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# File description
"""
Constraints.py includes additional constraints for eTraGo-optimizations
"""
import logging
import os
from pyomo.environ import Constraint
from pypsa.descriptors import (
expand_series,
get_switchable_as_dense as get_as_dense,
)
from pypsa.linopt import define_constraints, define_variables, get_var, linexpr
import numpy as np
import pandas as pd
import pyomo.environ as po
if "READTHEDOCS" not in os.environ:
from etrago.tools import db
logger = logging.getLogger(__name__)
__copyright__ = (
"Flensburg University of Applied Sciences, "
"Europa-Universität Flensburg, "
"Centre for Sustainable Energy Systems, "
"DLR-Institute for Networked Energy Systems"
)
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = """ulfmueller, s3pp, wolfbunke, mariusves, lukasol, AmeliaNadal,
CarlosEpia, ClaraBuettner, KathiEsterl"""
def _get_crossborder_components(network, cntr="all"):
"""
Identifies foreign buses and crossborder ac- and dc-lines for all foreign
countries or only one specific.
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
cntr : str, optional
Country code of the returned buses and lines. The default is 'all'.
Returns
-------
buses_de : pandas.base.Index
Index of buses located in Germany
buses_for : pandas.base.Index
Index of buses located in selected country
cb0 : pandas.base.Index
Index of ac-lines from foreign country to Germany
cb1 : pandas.base.Index
Index of ac-lines from Germany to foreign country
cb0_link : pandas.base.Index
Index of dc-lines from foreign country to Germany
cb1_link : pandas.base.Index
Index of dc-lines from Germany to foreign country
"""
buses_de = network.buses.index[network.buses.country == "DE"]
if cntr == "all":
buses_for = network.buses.index[network.buses.country != "DE"]
else:
buses_for = network.buses.index[network.buses.country == cntr]
cb0 = network.lines.index[
(network.lines.bus0.isin(buses_for))
& (network.lines.bus1.isin(buses_de))
]
cb1 = network.lines.index[
(network.lines.bus1.isin(buses_for))
& (network.lines.bus0.isin(buses_de))
]
cb0_link = network.links.index[
(network.links.bus0.isin(buses_for))
& (network.links.bus1.isin(buses_de))
& (network.links.carrier == "DC")
]
cb1_link = network.links.index[
(network.links.bus0.isin(buses_de))
& (network.links.bus1.isin(buses_for))
& (network.links.carrier == "DC")
]
return buses_de, buses_for, cb0, cb1, cb0_link, cb1_link
def _max_line_ext(self, network, snapshots):
"""
Extra-functionality that limits overall line expansion to a multiple
of existing capacity.
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None
"""
lines_snom = network.lines.s_nom_min.sum()
links_elec = network.links[network.links.carrier == "DC"]
links_index = links_elec.index
links_pnom = links_elec.p_nom_min.sum()
def _rule(m):
lines_opt = sum(
m.passive_branch_s_nom[index]
for index in m.passive_branch_s_nom_index
)
links_opt = sum(m.link_p_nom[index] for index in links_index)
return (lines_opt + links_opt) <= (
lines_snom + links_pnom
) * self.args["extra_functionality"]["max_line_ext"]
network.model.max_line_ext = Constraint(rule=_rule)
def _max_line_ext_nmp(self, network, snapshots):
"""
Extra-functionality that limits overall line expansion to a multiple
of existing capacity without pyomo.
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
lines_snom = network.lines.s_nom.sum()
links_elec = network.links[network.links.carrier == "DC"]
links_index = links_elec.index
links_pnom = links_elec.p_nom_min.sum()
get_var(network, "Line", "s_nom")
def _rule(m):
lines_opt = sum(
m.passive_branch_s_nom[index]
for index in m.passive_branch_s_nom_index
)
links_opt = sum(m.link_p_nom[index] for index in links_index)
return (lines_opt + links_opt) <= (
lines_snom + links_pnom
) * self.args["extra_functionality"]["max_line_ext"]
network.model.max_line_ext = Constraint(rule=_rule)
def _min_renewable_share_nmp(self, network, snapshots):
"""
Extra-functionality that limits the minimum share of renewable generation.
Add key 'min_renewable_share' and minimal share in p.u. as float
to args.extra_functionality.
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
renewables = [
"biomass",
"central_biomass_CHP",
"industrial_biomass_CHP",
"solar",
"solar_rooftop",
"wind_offshore",
"wind_onshore",
"run_of_river",
"other_renewable",
"central_biomass_CHP_heat",
"solar_thermal_collector",
"geo_thermal",
]
res = network.generators.index[network.generators.carrier.isin(renewables)]
renew = (
get_var(network, "Generator", "p")
.loc[network.snapshots, res]
.mul(network.snapshot_weightings.generators, axis=0)
)
total = get_var(network, "Generator", "p").mul(
network.snapshot_weightings.generators, axis=0
)
renew_production = linexpr((1, renew)).sum().sum()
total_production = (
linexpr(
(-self.args["extra_functionality"]["min_renewable_share"], total)
)
.sum()
.sum()
)
expr = renew_production + total_production
define_constraints(network, expr, ">=", 0, "Generator", "min_renew_share")
def _min_renewable_share(self, network, snapshots):
"""
Extra-functionality that limits the minimum share of renewable generation.
Add key 'min_renewable_share' and minimal share in p.u. as float
to args.extra_functionality.
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
renewables = [
"biomass",
"central_biomass_CHP",
"industrial_biomass_CHP",
"solar",
"solar_rooftop",
"wind_offshore",
"wind_onshore",
"run_of_river",
"other_renewable",
"CH4_biogas",
"central_biomass_CHP_heat",
"solar_thermal_collector",
"geo_thermal",
]
res = list(
network.generators.index[network.generators.carrier.isin(renewables)]
)
total = list(network.generators.index)
def _rule(m):
renewable_production = sum(
m.generator_p[gen, sn] * network.snapshot_weightings.generators[sn]
for gen in res
for sn in snapshots
)
total_production = sum(
m.generator_p[gen, sn] * network.snapshot_weightings.generators[sn]
for gen in total
for sn in snapshots
)
return (
renewable_production
>= total_production
* self.args["extra_functionality"]["min_renewable_share"]
)
network.model.min_renewable_share = Constraint(rule=_rule)
def _cross_border_flow(self, network, snapshots):
"""
Extra_functionality that limits overall AC crossborder flows from/to
Germany. Add key 'cross_border_flow' and array with minimal and maximal
import/export
Example: {'cross_border_flow': [-x, y]} (with x Import, y Export)
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
(
buses_de,
buses_for,
cb0,
cb1,
cb0_link,
cb1_link,
) = _get_crossborder_components(network)
export = pd.Series(
data=self.args["extra_functionality"]["cross_border_flow"]
)
def _rule_min(m):
cb_flow = (
-sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb0
for sn in snapshots
)
+ sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb1
for sn in snapshots
)
- sum(
m.link_p[link, sn] * network.snapshot_weightings.objective[sn]
for link in cb0_link
for sn in snapshots
)
+ sum(
m.link_p[link, sn] * network.snapshot_weightings.objective[sn]
for link in cb1_link
for sn in snapshots
)
)
return cb_flow >= export[0]
def _rule_max(m):
cb_flow = (
-sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb0
for sn in snapshots
)
+ sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb1
for sn in snapshots
)
- sum(
m.link_p[link, sn] * network.snapshot_weightings.objective[sn]
for link in cb0_link
for sn in snapshots
)
+ sum(
m.link_p[link, sn] * network.snapshot_weightings.objective[sn]
for link in cb1_link
for sn in snapshots
)
)
return cb_flow <= export[1]
network.model.cross_border_flows_min = Constraint(rule=_rule_min)
network.model.cross_border_flows_max = Constraint(rule=_rule_max)
def _cross_border_flow_nmp(self, network, snapshots):
"""
Extra_functionality that limits overall crossborder flows from/to Germany.
Add key 'cross_border_flow' and array with minimal and maximal
import/export
Example: {'cross_border_flow': [-x, y]} (with x Import, y Export)
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
(
buses_de,
buses_for,
cb0,
cb1,
cb0_link,
cb1_link,
) = _get_crossborder_components(network)
export = pd.Series(
data=self.args["extra_functionality"]["cross_border_flow"]
)
cb0_flow = (
get_var(network, "Line", "s")
.loc[snapshots, cb0]
.mul(network.snapshot_weightings.objective, axis=0)
)
cb1_flow = (
get_var(network, "Line", "s")
.loc[snapshots, cb1]
.mul(network.snapshot_weightings.objective, axis=0)
)
cb0_link_flow = (
get_var(network, "Link", "p")
.loc[snapshots, cb0_link]
.mul(network.snapshot_weightings.objective, axis=0)
)
cb1_link_flow = (
get_var(network, "Link", "p")
.loc[snapshots, cb1_link]
.mul(network.snapshot_weightings.objective, axis=0)
)
expr = (
linexpr((-1, cb0_flow)).sum().sum()
+ linexpr((1, cb1_flow)).sum().sum()
+ linexpr((-1, cb0_link_flow)).sum().sum()
+ linexpr((1, cb1_link_flow)).sum().sum()
)
define_constraints(network, expr, ">=", export[0], "Line", "min_cb_flow")
define_constraints(network, expr, "<=", export[1], "Line", "max_cb_flow")
def _cross_border_flow_per_country_nmp(self, network, snapshots):
"""
Extra_functionality that limits AC crossborder flows for each given
foreign country from/to Germany.
Add key 'cross_border_flow_per_country' to args.extra_functionality and
define dictionary of country keys and desired limitations of im/exports
in MWh
Example: {'cross_border_flow_per_country': {'DK':[-X, Y], 'FR':[0,0]}}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
buses_de = network.buses.index[network.buses.country == "DE"]
countries = network.buses.country.unique()
export_per_country = pd.DataFrame(
data=self.args["extra_functionality"]["cross_border_flow_per_country"]
).transpose()
for cntr in export_per_country.index:
if cntr in countries:
(
buses_de,
buses_for,
cb0,
cb1,
cb0_link,
cb1_link,
) = _get_crossborder_components(network, cntr)
cb0_flow = (
get_var(network, "Line", "s")
.loc[snapshots, cb0]
.mul(network.snapshot_weightings.objective, axis=0)
)
cb1_flow = (
get_var(network, "Line", "s")
.loc[snapshots, cb1]
.mul(network.snapshot_weightings.objective, axis=0)
)
cb0_link_flow = (
get_var(network, "Link", "p")
.loc[snapshots, cb0_link]
.mul(network.snapshot_weightings.objective, axis=0)
)
cb1_link_flow = (
get_var(network, "Link", "p")
.loc[snapshots, cb1_link]
.mul(network.snapshot_weightings.objective, axis=0)
)
expr = (
linexpr((-1, cb0_flow)).sum().sum()
+ linexpr((1, cb1_flow)).sum().sum()
+ linexpr((-1, cb0_link_flow)).sum().sum()
+ linexpr((1, cb1_link_flow)).sum().sum()
)
define_constraints(
network,
expr,
">=",
export_per_country[cntr][0],
"Line",
"min_cb_flow_" + cntr,
)
define_constraints(
network,
expr,
"<=",
export_per_country[cntr][1],
"Line",
"max_cb_flow_" + cntr,
)
def _cross_border_flow_per_country(self, network, snapshots):
"""
Extra_functionality that limits AC crossborder flows for each given
foreign country from/to Germany.
Add key 'cross_border_flow_per_country' to args.extra_functionality and
define dictionary of country keys and desired limitations of im/exports
in MWh
Example: {'cross_border_flow_per_country': {'DK':[-X, Y], 'FR':[0,0]}}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
buses_de = network.buses.index[network.buses.country == "DE"]
countries = network.buses.country.unique()
export_per_country = pd.DataFrame(
data=self.args["extra_functionality"]["cross_border_flow_per_country"]
).transpose()
for cntr in export_per_country.index:
if cntr in countries:
(
buses_de,
buses_for,
cb0,
cb1,
cb0_link,
cb1_link,
) = _get_crossborder_components(network, cntr)
def _rule_min(m):
cb_flow = (
-sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb0
for sn in snapshots
)
+ sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb1
for sn in snapshots
)
- sum(
m.link_p[link, sn]
* network.snapshot_weightings.objective[sn]
for link in cb0_link
for sn in snapshots
)
+ sum(
m.link_p[link, sn]
* network.snapshot_weightings.objective[sn]
for link in cb1_link
for sn in snapshots
)
)
return cb_flow >= export_per_country[0][cntr]
setattr(
network.model,
"min_cross_border-" + cntr,
Constraint(rule=_rule_min),
)
def _rule_max(m):
cb_flow = (
-sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb0
for sn in snapshots
)
+ sum(
m.passive_branch_p["Line", line, sn]
* network.snapshot_weightings.objective[sn]
for line in cb1
for sn in snapshots
)
- sum(
m.link_p[link, sn]
* network.snapshot_weightings.objective[sn]
for link in cb0_link
for sn in snapshots
)
+ sum(
m.link_p[link, sn]
* network.snapshot_weightings.objective[sn]
for link in cb1_link
for sn in snapshots
)
)
return cb_flow <= export_per_country[1][cntr]
setattr(
network.model,
"max_cross_border-" + cntr,
Constraint(rule=_rule_max),
)
def _generation_potential(network, carrier, cntr="all"):
"""
Function that calculates the generation potential for chosen carriers and
countries.
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
carrier : str
Energy carrier of generator
cntr : str, optional
Country code or 'all'. The default is 'all'.
Returns
-------
gens : pandas.base.Index
Index of generators with given carrier in chosen country
potential : float
Gerneration potential in MW
"""
if cntr == "all":
gens = network.generators.index[network.generators.carrier == carrier]
else:
gens = network.generators.index[
(network.generators.carrier == carrier)
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country == cntr]
)
)
]
if carrier in ["wind_onshore", "wind_offshore", "solar"]:
potential = (
(
network.generators.p_nom[gens]
* network.generators_t.p_max_pu[gens].mul(
network.snapshot_weightings.generators, axis=0
)
)
.sum()
.sum()
)
else:
potential = (
network.snapshot_weightings.generators.sum()
* network.generators.p_nom[gens].sum()
)
return gens, potential
def _capacity_factor(self, network, snapshots):
"""
Extra-functionality that limits overall dispatch of generators with chosen
energy carrier.
Add key 'capacity_factor' to args.extra_functionality and set limits in
a dictonary as a fraction of generation potential.
Example: 'capacity_factor': {'run_of_river': [0, 0.5], 'solar': [0.1, 1]}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor"]
carrier = arg.keys()
for c in carrier:
factor = arg[c]
gens, potential = _generation_potential(network, c, cntr="all")
def _rule_max(m):
dispatch = sum(
m.generator_p[gen, sn]
* network.snapshot_weightings.generators[sn]
for gen in gens
for sn in snapshots
)
return dispatch <= factor[1] * potential
setattr(network.model, "max_flh_" + c, Constraint(rule=_rule_max))
def _rule_min(m):
dispatch = sum(
m.generator_p[gen, sn]
* network.snapshot_weightings.generators[sn]
for gen in gens
for sn in snapshots
)
return dispatch >= factor[0] * potential
setattr(network.model, "min_flh_" + c, Constraint(rule=_rule_min))
def _capacity_factor_nmp(self, network, snapshots):
"""
Extra-functionality that limits overall dispatch of generators with chosen
energy carrier.
Add key 'capacity_factor' to args.extra_functionality and set limits in
a dictonary as a fraction of generation potential.
Example: 'capacity_factor': {'run_of_river': [0, 0.5], 'solar': [0.1, 1]}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor"]
carrier = arg.keys()
for c in carrier:
gens, potential = _generation_potential(network, c, cntr="all")
generation = (
get_var(network, "Generator", "p")
.loc[snapshots, gens]
.mul(network.snapshot_weightings.generators, axis=0)
)
define_constraints(
network,
linexpr((1, generation)).sum().sum(),
">=",
arg[c][0] * potential,
"Generator",
"min_flh_" + c,
)
define_constraints(
network,
linexpr((1, generation)).sum().sum(),
"<=",
arg[c][1] * potential,
"Generator",
"max_flh_" + c,
)
def _capacity_factor_per_cntr(self, network, snapshots):
"""
Extra-functionality that limits dispatch of generators with chosen
energy carrier located in the chosen country.
Add key 'capacity_factor_per_cntr' to args.extra_functionality and set
limits per carrier in a dictonary with country codes as keys.
Example:
'capacity_factor_per_cntr': {'DE':{'run_of_river': [0, 0.5],
'wind_onshore': [0.1, 1]},
'DK':{'wind_onshore':[0, 0.7]}}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor_per_cntr"]
for cntr in arg.keys():
carrier = arg[cntr].keys()
for c in carrier:
factor = arg[cntr][c]
gens, potential = _generation_potential(network, c, cntr)
if len(gens) > 0:
def _rule_max(m):
dispatch = sum(
m.generator_p[gen, sn]
* network.snapshot_weightings.generators[sn]
for gen in gens
for sn in snapshots
)
return dispatch <= factor[1] * potential
setattr(
network.model,
"max_flh_" + cntr + "_" + c,
Constraint(rule=_rule_max),
)
def _rule_min(m):
dispatch = sum(
m.generator_p[gen, sn]
* network.snapshot_weightings.generators[sn]
for gen in gens
for sn in snapshots
)
return dispatch >= factor[0] * potential
setattr(
network.model,
"min_flh_" + cntr + "_" + c,
Constraint(rule=_rule_min),
)
else:
print(
"Carrier "
+ c
+ " is not available in "
+ cntr
+ ". Skipping this constraint."
)
def _capacity_factor_per_cntr_nmp(self, network, snapshots):
"""
Extra-functionality that limits dispatch of generators with chosen
energy carrier located in the chosen country.
Add key 'capacity_factor_per_cntr' to args.extra_functionality and set
limits per carrier in a dictonary with country codes as keys.
Example:
'capacity_factor_per_cntr': {'DE':{'run_of_river': [0, 0.5],
'wind_onshore': [0.1, 1]},
'DK':{'wind_onshore':[0, 0.7]}}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor_per_cntr"]
for cntr in arg.keys():
carrier = arg[cntr].keys()
for c in carrier:
gens, potential = _generation_potential(network, c, cntr)
if len(gens) > 0:
generation = (
get_var(network, "Generator", "p")
.loc[snapshots, gens]
.mul(network.snapshot_weightings.generators, axis=0)
)
define_constraints(
network,
linexpr((1, generation)).sum().sum(),
">=",
arg[cntr][c][0] * potential,
"Generator",
"min_flh_" + c + "_" + cntr,
)
define_constraints(
network,
linexpr((1, generation)).sum().sum(),
"<=",
arg[cntr][c][1] * potential,
"Generator",
"max_flh_" + c + "_" + cntr,
)
else:
print(
"Carrier "
+ c
+ " is not available in "
+ cntr
+ ". Skipping this constraint."
)
def _capacity_factor_per_gen(self, network, snapshots):
"""
Extra-functionality that limits dispatch for each generator with chosen
energy carrier.
Add key 'capacity_factor_per_gen' to args.extra_functionality and set
limits in a dictonary as a fraction of generation potential.
Example:
'capacity_factor_per_gen': {'run_of_river': [0, 0.5], 'solar': [0.1, 1]}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor_per_gen"]
carrier = arg.keys()
snapshots = network.snapshots
for c in carrier:
factor = arg[c]
gens = network.generators.index[network.generators.carrier == c]
for g in gens:
if c in ["wind_onshore", "wind_offshore", "solar"]:
potential = (
(
network.generators.p_nom[g]
* network.generators_t.p_max_pu[g].mul(
network.snapshot_weightings.generators, axis=0
)
)
.sum()
.sum()
)
else:
potential = (
network.snapshot_weightings.generators.sum()
* network.generators.p_nom[g].sum()
)
def _rule_max(m):
dispatch = sum(
m.generator_p[g, sn]
* network.snapshot_weightings.generators[sn]
for sn in snapshots
)
return dispatch <= factor[1] * potential
setattr(network.model, "max_flh_" + g, Constraint(rule=_rule_max))
def _rule_min(m):
dispatch = sum(
m.generator_p[g, sn]
* network.snapshot_weightings.generators[sn]
for sn in snapshots
)
return dispatch >= factor[0] * potential
setattr(network.model, "min_flh_" + g, Constraint(rule=_rule_min))
def _capacity_factor_per_gen_nmp(self, network, snapshots):
"""
Extra-functionality that limits dispatch for each generator with chosen
energy carrier.
Add key 'capacity_factor_per_gen' to args.extra_functionality and set
limits in a dictonary as a fraction of generation potential.
Example:
'capacity_factor_per_gen': {'run_of_river': [0, 0.5], 'solar': [0.1, 1]}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor_per_gen"]
carrier = arg.keys()
snapshots = network.snapshots
for c in carrier:
gens = network.generators.index[network.generators.carrier == c]
for g in gens:
if c in ["wind_onshore", "wind_offshore", "solar"]:
potential = (
(
network.generators.p_nom[g]
* network.generators_t.p_max_pu[g].mul(
network.snapshot_weightings.generators, axis=0
)
)
.sum()
.sum()
)
else:
potential = (
network.snapshot_weightings.generators.sum()
* network.generators.p_nom[g].sum()
)
generation = (
get_var(network, "Generator", "p")
.loc[snapshots, g]
.mul(network.snapshot_weightings.generators, axis=0)
)
define_constraints(
network,
linexpr((1, generation)).sum(),
">=",
arg[c][0] * potential,
"Generator",
"min_flh_" + g,
)
define_constraints(
network,
linexpr((1, generation)).sum(),
"<=",
arg[c][1] * potential,
"Generator",
"max_flh_" + g,
)
def _capacity_factor_per_gen_cntr(self, network, snapshots):
"""
Extra-functionality that limits dispatch of each generator with chosen
energy carrier located in the chosen country.
Add key 'capacity_factor_per_gen_cntr' to args.extra_functionality and set
limits per carrier in a dictonary with country codes as keys.
Example:
'capacity_factor_per_gen_cntr': {'DE':{'run_of_river': [0, 0.5],
'wind_onshore': [0.1, 1]},
'DK':{'wind_onshore':[0, 0.7]}}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor_per_gen_cntr"]
for cntr in arg.keys():
carrier = arg[cntr].keys()
snapshots = network.snapshots
for c in carrier:
factor = arg[cntr][c]
gens = network.generators.index[
(network.generators.carrier == c)
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country == cntr]
)
)
]
if len(gens) > 0:
for g in gens:
if c in ["wind_onshore", "wind_offshore", "solar"]:
potential = (
(
network.generators.p_nom[g]
* network.generators_t.p_max_pu[g].mul(
network.snapshot_weightings.generators,
axis=0,
)
)
.sum()
.sum()
)
else:
potential = (
network.snapshot_weightings.generators.sum()
* network.generators.p_nom[g].sum()
)
def _rule_max(m):
dispatch = sum(
m.generator_p[g, sn]
* network.snapshot_weightings.generators[sn]
for sn in snapshots
)
return dispatch <= factor[1] * potential
setattr(
network.model,
"max_flh_" + cntr + "_" + g,
Constraint(rule=_rule_max),
)
def _rule_min(m):
dispatch = sum(
m.generator_p[g, sn]
* network.snapshot_weightings.generators[sn]
for sn in snapshots
)
return dispatch >= factor[0] * potential
setattr(
network.model,
"min_flh_" + cntr + "_" + g,
Constraint(rule=_rule_min),
)
else:
print(
"Carrier "
+ c
+ " is not available in "
+ cntr
+ ". Skipping this constraint."
)
def _capacity_factor_per_gen_cntr_nmp(self, network, snapshots):
"""
Extra-functionality that limits dispatch of each generator with chosen
energy carrier located in the chosen country.
Add key 'capacity_factor_per_gen_cntr' to args.extra_functionality and set
limits per carrier in a dictonary with country codes as keys.
Example:
'capacity_factor_per_gen_cntr': {'DE':{'run_of_river': [0, 0.5],
'wind_onshore': [0.1, 1]},
'DK':{'wind_onshore':[0, 0.7]}}
Parameters
----------
network : :class:`pypsa.Network
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
arg = self.args["extra_functionality"]["capacity_factor_per_gen_cntr"]
for cntr in arg.keys():
carrier = arg[cntr].keys()
for c in carrier:
gens = network.generators.index[
(network.generators.carrier == c)
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country == cntr]
)
)
]
if len(gens) > 0:
for g in gens:
if c in ["wind_onshore", "wind_offshore", "solar"]:
potential = (
(
network.generators.p_nom[g]
* network.generators_t.p_max_pu[g].mul(
network.snapshot_weightings.generators,
axis=0,
)
)
.sum()
.sum()
)
else:
potential = (
network.snapshot_weightings.generators.sum()
* network.generators.p_nom[g].sum()
)
generation = (
get_var(network, "Generator", "p")
.loc[snapshots, g]
.mul(network.snapshot_weightings.generators, axis=0)
)
define_constraints(
network,
linexpr((1, generation)).sum(),
">=",
arg[cntr][c][0] * potential,
"Generator",
"min_flh_" + g,
)
define_constraints(
network,
linexpr((1, generation)).sum(),
"<=",
arg[cntr][c][1] * potential,
"Generator",
"max_flh_" + g,
)
else:
print(
"Carrier "
+ c
+ " is not available in "
+ cntr
+ ". Skipping this constraint."
)
[docs]def read_max_gas_generation(self):
"""Return the values limiting the gas production in Germany
Read max_gas_generation_overtheyear from
scenario.egon_scenario_parameters if the table is available in the
database and return the dictionnary containing the values needed
for the constraints to limit the gas production in Germany,
depending of the scenario.
Returns
-------
arg: dict
"""
scn_name = self.args["scn_name"]
arg_def = {
"eGon2035": {
"CH4": 36000000,
"biogas": 10000000,
}, # [MWh] Netzentwicklungsplan Gas 2020–2030
"eGon2035_lowflex": {
"CH4": 36000000,
"biogas": 10000000,
}, # [MWh] Netzentwicklungsplan Gas 2020–2030
"eGon100RE": {
"biogas": 14450103
}, # [MWh] Value from reference p-e-s run used in eGon-data
}
engine = db.connection(section=self.args["db"])
try:
sql = f"""
SELECT gas_parameters
FROM scenario.egon_scenario_parameters
WHERE name = '{scn_name}';"""
df = pd.read_sql(sql, engine)
arg = df["max_gas_generation_overtheyear"]
except:
arg = arg_def[scn_name]
return arg
[docs]def add_ch4_constraints(self, network, snapshots):
"""
Add CH4 constraints for optimization with pyomo
Functionality that limits the dispatch of CH4 generators. In
Germany, there is one limitation specific for biogas and one
limitation specific for natural gas (natural gas only in eGon2035).
Abroad, each generator has its own limitation contains in the
column e_nom_max.
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
scn_name = self.args["scn_name"]
n_snapshots = self.args["end_snapshot"] - self.args["start_snapshot"] + 1
# Add constraint for Germany
arg = read_max_gas_generation(self)
gas_carrier = arg.keys()
carrier_names = {
"eGon2035": {"CH4": "CH4_NG", "biogas": "CH4_biogas"},
"eGon2035_lowflex": {"CH4": "CH4_NG", "biogas": "CH4_biogas"},
"eGon100RE": {"biogas": "CH4"},
}
for c in gas_carrier:
gens = network.generators.index[
(network.generators.carrier == carrier_names[scn_name][c])
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country == "DE"]
)
)
]
if not gens.empty:
factor = arg[c]
def _rule_max(m):
dispatch = sum(
m.generator_p[gen, sn]
* network.snapshot_weightings.generators[sn]
for gen in gens
for sn in snapshots
)
return dispatch <= factor * (n_snapshots / 8760)
setattr(
network.model, "max_flh_DE_" + c, Constraint(rule=_rule_max)
)
# Add contraints for neigbouring countries
gen_abroad = network.generators[
(network.generators.carrier == "CH4")
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country != "DE"]
)
)
& (network.generators.e_nom_max != np.inf)
]
for g in gen_abroad.index:
factor = network.generators.e_nom_max[g]
def _rule_max(m):
dispatch = sum(
m.generator_p[g, sn]
* network.snapshot_weightings.generators[sn]
for sn in snapshots
)
return dispatch <= factor * (n_snapshots / 8760)
setattr(
network.model,
"max_flh_abroad_" + str(g).replace(" ", "_"),
Constraint(rule=_rule_max),
)
[docs]def add_ch4_constraints_nmp(self, network, snapshots):
"""
Add CH4 constraints for optimization without pyomo
Functionality that limits the dispatch of CH4 generators. In
Germany, there is one limitation specific for biogas and one
limitation specific for natural gas (natural gas only in eGon2035).
Abroad, each generator has its own limitation contains in the
column e_nom_max.
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
scn_name = self.args["scn_name"]
n_snapshots = self.args["end_snapshot"] - self.args["start_snapshot"] + 1
# Add constraint for Germany
arg = read_max_gas_generation(self)
gas_carrier = arg.keys()
carrier_names = {
"eGon2035": {"CH4": "CH4_NG", "biogas": "CH4_biogas"},
"eGon100RE": {"biogas": "CH4"},
}
for c in gas_carrier:
gens = network.generators.index[
(network.generators.carrier == carrier_names[scn_name][c])
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country == "DE"]
)
)
]
if not gens.empty:
factor = arg[c]
generation = (
get_var(network, "Generator", "p")
.loc[snapshots, gens]
.mul(network.snapshot_weightings.generators, axis=0)
)
define_constraints(
network,
linexpr((1, generation)).sum().sum(),
"<=",
factor * (n_snapshots / 8760),
"Generator",
"max_flh_DE_" + c,
)
# Add contraints for neigbouring countries
gen_abroad = network.generators[
(network.generators.carrier == "CH4")
& (
network.generators.bus.astype(str).isin(
network.buses.index[network.buses.country != "DE"]
)
)
& (network.generators.e_nom_max != np.inf)
]
for g in gen_abroad.index:
factor = network.generators.e_nom_max[g]
generation = (
get_var(network, "Generator", "p")
.loc[snapshots, g]
.mul(network.snapshot_weightings.generators, axis=0)
)
define_constraints(
network,
linexpr((1, generation)).sum(),
"<=",
factor * (n_snapshots / 8760),
"Generator",
"max_flh_DE_" + str(g).replace(" ", "_"),
)
[docs]def snapshot_clustering_daily_bounds(self, network, snapshots):
"""
Bound the storage level to 0.5 max_level every 24th hour.
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps that will be constrained
Returns
-------
None
"""
sus = network.storage_units
# take every first hour of the clustered days
network.model.period_starts = network.snapshot_weightings.index[0::24]
network.model.storages = sus.index
print("Setting daily_bounds constraint")
def day_rule(m, s, p):
"""
Sets the soc of the every first hour to the
soc of the last hour of the day (i.e. + 23 hours)
"""
return (
m.state_of_charge[s, p]
== m.state_of_charge[s, p + pd.Timedelta(hours=23)]
)
network.model.period_bound = Constraint(
network.model.storages, network.model.period_starts, rule=day_rule
)
[docs]def snapshot_clustering_daily_bounds_nmp(self, network, snapshots):
"""
Bound the storage level to 0.5 max_level every 24th hour.
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps that will be constrained
Returns
-------
None
"""
c = "StorageUnit"
period_starts = snapshots[0::24]
period_ends = period_starts + pd.Timedelta(hours=23)
eh = expand_series(
network.snapshot_weightings.objective[period_ends],
network.storage_units.index,
) # elapsed hours
eff_stand = expand_series(1 - network.df(c).standing_loss, period_ends).T
eff_dispatch = expand_series(
network.df(c).efficiency_dispatch, period_ends
).T
eff_store = expand_series(network.df(c).efficiency_store, period_ends).T
soc = get_var(network, c, "state_of_charge").loc[period_ends, :]
soc_peroid_start = get_var(network, c, "state_of_charge").loc[
period_starts
]
coeff_var = [
(-1, soc),
(
-1 / eff_dispatch * eh,
get_var(network, c, "p_dispatch").loc[period_ends, :],
),
(eff_store * eh, get_var(network, c, "p_store").loc[period_ends, :]),
]
lhs, *axes = linexpr(*coeff_var, return_axes=True)
def masked_term(coeff, var, cols):
return (
linexpr((coeff[cols], var[cols]))
.reindex(index=axes[0], columns=axes[1], fill_value="")
.values
)
lhs += masked_term(
eff_stand, soc_peroid_start, network.storage_units.index
)
rhs = -get_as_dense(network, c, "inflow", period_ends).mul(eh)
define_constraints(network, lhs, "==", rhs, "daily_bounds")
[docs]def snapshot_clustering_seasonal_storage(
self, network, snapshots, simplified=False
):
"""
Depicts intertemporal dependencies of storage units and stores when using
snapshot clustering to typical periods for temporal complexity reduction.
According to:
L. Kotzur et al: 'Time series aggregation for energy
system design:
Modeling seasonal storage', 2018
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : list
A list of datetime objects representing the timestamps of the snapshots
to be clustered.
simplified : bool, optional
A flag indicating whether to use a simplified version of the model that
does not include intra-temporal constraints and variables.
Returns
-------
None
"""
sus = network.storage_units
sto = network.stores
if self.args["snapshot_clustering"]["how"] == "weekly":
network.model.period_starts = network.snapshot_weightings.index[0::168]
elif self.args["snapshot_clustering"]["how"] == "monthly":
network.model.period_starts = network.snapshot_weightings.index[0::720]
else:
network.model.period_starts = network.snapshot_weightings.index[0::24]
network.model.storages = sus.index
network.model.stores = sto.index
candidates = network.cluster.index.get_level_values(0).unique()
# create set for inter-temp constraints and variables
network.model.candidates = po.Set(initialize=candidates, ordered=True)
if not simplified:
# create intra soc variable for each storage/store and each hour
network.model.state_of_charge_intra = po.Var(
sus.index, network.snapshots
)
network.model.state_of_charge_intra_store = po.Var(
sto.index, network.snapshots
)
else:
network.model.state_of_charge_intra_max = po.Var(
sus.index, network.model.candidates
)
network.model.state_of_charge_intra_min = po.Var(
sus.index, network.model.candidates
)
network.model.state_of_charge_intra_store_max = po.Var(
sto.index, network.model.candidates
)
network.model.state_of_charge_intra_store_min = po.Var(
sto.index, network.model.candidates
)
# create intra soc variable for each storage and each hour
network.model.state_of_charge_intra = po.Var(
sus.index, network.snapshots
)
network.model.state_of_charge_intra_store = po.Var(
sto.index, network.snapshots
)
def intra_max(model, st, h):
cand = network.cluster_ts["Candidate_day"][h]
return (
model.state_of_charge_intra_max[st, cand]
>= model.state_of_charge_intra[st, h]
)
network.model.soc_intra_max = Constraint(
network.model.storages, network.snapshots, rule=intra_max
)
def intra_min(model, st, h):
cand = network.cluster_ts["Candidate_day"][h]
return (
model.state_of_charge_intra_min[st, cand]
<= model.state_of_charge_intra[st, h]
)
network.model.soc_intra_min = Constraint(
network.model.storages, network.snapshots, rule=intra_min
)
def intra_max_store(model, st, h):
cand = network.cluster_ts["Candidate_day"][h]
return (
model.state_of_charge_intra_store_max[st, cand]
>= model.state_of_charge_intra_store[st, h]
)
network.model.soc_intra_store_max = Constraint(
network.model.stores, network.snapshots, rule=intra_max_store
)
def intra_min_store(model, st, h):
cand = network.cluster_ts["Candidate_day"][h]
return (
model.state_of_charge_intra_store_min[st, cand]
<= model.state_of_charge_intra_store[st, h]
)
network.model.soc_intra_store_min = Constraint(
network.model.stores, network.snapshots, rule=intra_min_store
)
def intra_soc_rule(m, s, h):
"""
Sets soc_inter of first hour of every day to 0. Other hours
are set by technical coherences of storage units
According to:
L. Kotzur et al: 'Time series aggregation for energy
system design:
Modeling seasonal storage', 2018, equation no. 18
"""
if (
self.args["snapshot_clustering"]["how"] == "weekly"
and h in network.snapshot_weightings[0::168].index
):
expr = m.state_of_charge_intra[s, h] == 0
elif (
self.args["snapshot_clustering"]["how"] == "monthly"
and h in network.snapshot_weightings[0::720].index
):
expr = m.state_of_charge_intra[s, h] == 0
elif (
self.args["snapshot_clustering"]["how"] == "daily" and h.hour == 0
):
expr = m.state_of_charge_intra[s, h] == 0
else:
expr = m.state_of_charge_intra[s, h] == m.state_of_charge_intra[
s, h - pd.DateOffset(hours=1)
] * (1 - network.storage_units.at[s, "standing_loss"]) - (
m.storage_p_dispatch[s, h - pd.DateOffset(hours=1)]
/ network.storage_units.at[s, "efficiency_dispatch"]
- network.storage_units.at[s, "efficiency_store"]
* m.storage_p_store[s, h - pd.DateOffset(hours=1)]
)
return expr
def intra_soc_rule_store(m, s, h):
if (
self.args["snapshot_clustering"]["how"] == "weekly"
and h in network.snapshot_weightings[0::168].index
):
expr = m.state_of_charge_intra_store[s, h] == 0
elif (
self.args["snapshot_clustering"]["how"] == "monthly"
and h in network.snapshot_weightings[0::720].index
):
expr = m.state_of_charge_intra_store[s, h] == 0
elif (
self.args["snapshot_clustering"]["how"] == "daily" and h.hour == 0
):
expr = m.state_of_charge_intra_store[s, h] == 0
else:
expr = (
m.state_of_charge_intra_store[s, h]
== m.state_of_charge_intra_store[s, h - pd.DateOffset(hours=1)]
* (1 - network.stores.at[s, "standing_loss"])
+ m.store_p[s, h - pd.DateOffset(hours=1)]
)
return expr
network.model.soc_intra = po.Constraint(
network.model.storages, network.snapshots, rule=intra_soc_rule
)
network.model.soc_intra_store = po.Constraint(
network.model.stores, network.snapshots, rule=intra_soc_rule_store
)
# create inter soc variable for each storage/store and each candidate
network.model.state_of_charge_inter = po.Var(
sus.index, network.model.candidates, within=po.NonNegativeReals
)
network.model.state_of_charge_inter_store = po.Var(
sto.index, network.model.candidates, within=po.NonNegativeReals
)
def inter_storage_soc_rule(m, s, i):
"""
Define the state_of_charge_inter as the state_of_charge_inter of
the day before minus the storage losses plus the state_of_charge_intra
of one hour after the last hour of the representative day.
For the last reperesentive day, the soc_inter is the same as
the first day due to cyclic soc condition
According to:
L. Kotzur et al: 'Time series aggregation for energy system design:
Modeling seasonal storage', 2018, equation no. 19
"""
if i == network.model.candidates.at(-1):
last_hour = network.cluster["last_hour_RepresentativeDay"][i]
expr = po.Constraint.Skip
else:
last_hour = network.cluster["last_hour_RepresentativeDay"][i]
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
else:
hrs = 24
expr = m.state_of_charge_inter[
s, i + 1
] == m.state_of_charge_inter[s, i] * (
1 - network.storage_units.at[s, "standing_loss"]
) ** hrs + m.state_of_charge_intra[
s, last_hour
] * (
1 - network.storage_units.at[s, "standing_loss"]
) - (
m.storage_p_dispatch[s, last_hour]
/ network.storage_units.at[s, "efficiency_dispatch"]
- network.storage_units.at[s, "efficiency_store"]
* m.storage_p_store[s, last_hour]
)
return expr
def inter_store_soc_rule(m, s, i):
if i == network.model.candidates.at(-1):
last_hour = network.cluster["last_hour_RepresentativeDay"][i]
expr = po.Constraint.Skip
else:
last_hour = network.cluster["last_hour_RepresentativeDay"][i]
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
else:
hrs = 24
expr = (
m.state_of_charge_inter_store[s, i + 1]
== m.state_of_charge_inter_store[s, i]
* (1 - network.stores.at[s, "standing_loss"]) ** hrs
+ m.state_of_charge_intra_store[s, last_hour]
* (1 - network.stores.at[s, "standing_loss"])
+ m.store_p[s, last_hour]
)
return expr
network.model.inter_storage_soc_constraint = po.Constraint(
sus.index, network.model.candidates, rule=inter_storage_soc_rule
)
network.model.inter_store_soc_constraint = po.Constraint(
sto.index, network.model.candidates, rule=inter_store_soc_rule
)
# new definition of the state_of_charge used in pypsa
network.model.del_component("state_of_charge_constraint")
network.model.del_component("state_of_charge_constraint_index")
network.model.del_component("state_of_charge_constraint_index_0")
network.model.del_component("state_of_charge_constraint_index_1")
network.model.del_component("store_constraint")
network.model.del_component("store_constraint_index")
network.model.del_component("store_constraint_index_0")
network.model.del_component("store_constraint_index_1")
def total_state_of_charge(m, s, h):
"""
Define the state_of_charge as the sum of state_of_charge_inter
and state_of_charge_intra
According to:
L. Kotzur et al: 'Time series aggregation for energy system design:
Modeling seasonal storage', 2018
"""
return (
m.state_of_charge[s, h]
== m.state_of_charge_intra[s, h]
+ m.state_of_charge_inter[
s, network.cluster_ts["Candidate_day"][h]
]
)
def total_state_of_charge_store(m, s, h):
return (
m.store_e[s, h]
== m.state_of_charge_intra_store[s, h]
+ m.state_of_charge_inter_store[
s, network.cluster_ts["Candidate_day"][h]
]
)
network.model.total_storage_constraint = po.Constraint(
sus.index, network.snapshots, rule=total_state_of_charge
)
network.model.total_store_constraint = po.Constraint(
sto.index, network.snapshots, rule=total_state_of_charge_store
)
network.model.del_component("state_of_charge_lower")
network.model.del_component("state_of_charge_lower_index")
network.model.del_component("state_of_charge_lower_index_0")
network.model.del_component("state_of_charge_lower_index_1")
network.model.del_component("store_e_lower")
network.model.del_component("store_e_lower_index")
network.model.del_component("store_e_lower_index_0")
network.model.del_component("store_e_lower_index_1")
def state_of_charge_lower(m, s, h):
"""
Define the state_of_charge as the sum of state_of_charge_inter
and state_of_charge_intra
According to:
L. Kotzur et al: 'Time series aggregation for energy system design:
Modeling seasonal storage', 2018
"""
# Choose datetime of representive day
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=167)
period_start = network.cluster_ts.index[0::168][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=719)
period_start = network.cluster_ts.index[0::720][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
else:
hrs = 24
date = str(
network.snapshots[
network.snapshots.dayofyear - 1
== network.cluster["RepresentativeDay"][h.dayofyear]
][0]
).split(" ")[0]
hour = str(h).split(" ")[1]
intra_hour = pd.to_datetime(date + " " + hour)
return (
m.state_of_charge_intra[s, intra_hour]
+ m.state_of_charge_inter[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.storage_units.at[s, "standing_loss"]) ** hrs
>= 0
)
def state_of_charge_lower_store(m, s, h):
# Choose datetime of representive day
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=167)
period_start = network.cluster_ts.index[0::168][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=719)
period_start = network.cluster_ts.index[0::720][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
else:
hrs = 24
date = str(
network.snapshots[
network.snapshots.dayofyear - 1
== network.cluster["RepresentativeDay"][h.dayofyear]
][0]
).split(" ")[0]
hour = str(h).split(" ")[1]
intra_hour = pd.to_datetime(date + " " + hour)
if "DSM" in s:
low = (
network.stores.e_nom[s]
* network.stores_t.e_min_pu.at[intra_hour, s]
)
else:
low = 0
return (
m.state_of_charge_intra_store[s, intra_hour]
+ m.state_of_charge_inter_store[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.stores.at[s, "standing_loss"]) ** hrs
>= low
)
def state_of_charge_lower_simplified(m, s, h):
"""
Define the state_of_charge as the sum of state_of_charge_inter
and state_of_charge_intra
According to:
L. Kotzur et al: 'Time series aggregation for energy system design:
Modeling seasonal storage', 2018
"""
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
else:
hrs = 24
return (
m.state_of_charge_intra_min[
s, network.cluster_ts["Candidate_day"][h]
]
+ m.state_of_charge_inter[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.storage_units.at[s, "standing_loss"]) ** hrs
>= 0
)
def state_of_charge_lower_store_simplified(m, s, h):
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
else:
hrs = 24
if "DSM" in s:
if self.args["snapshot_clustering"]["how"] == "weekly":
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=167)
period_start = network.cluster_ts.index[0::168][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
elif self.args["snapshot_clustering"]["how"] == "monthly":
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=719)
period_start = network.cluster_ts.index[0::720][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
else:
date = str(
network.snapshots[
network.snapshots.dayofyear - 1
== network.cluster["RepresentativeDay"][h.dayofyear]
][0]
).split(" ")[0]
hour = str(h).split(" ")[1]
intra_hour = pd.to_datetime(date + " " + hour)
low = (
network.stores.e_nom[s]
* network.stores_t.e_min_pu.at[intra_hour, s]
)
else:
low = 0
return (
m.state_of_charge_intra_store_min[
s, network.cluster_ts["Candidate_day"][h]
]
+ m.state_of_charge_inter_store[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.stores.at[s, "standing_loss"]) ** hrs
>= low
)
if simplified:
network.model.state_of_charge_lower = po.Constraint(
sus.index,
network.cluster_ts.index,
rule=state_of_charge_lower_simplified,
)
network.model.state_of_charge_lower_store = po.Constraint(
sto.index,
network.cluster_ts.index,
rule=state_of_charge_lower_store_simplified,
)
else:
network.model.state_of_charge_lower = po.Constraint(
sus.index, network.cluster_ts.index, rule=state_of_charge_lower
)
network.model.state_of_charge_lower_store = po.Constraint(
sto.index,
network.cluster_ts.index,
rule=state_of_charge_lower_store,
)
network.model.del_component("state_of_charge_upper")
network.model.del_component("state_of_charge_upper_index")
network.model.del_component("state_of_charge_upper_index_0")
network.model.del_component("state_of_charge_upper_index_1")
network.model.del_component("store_e_upper")
network.model.del_component("store_e_upper_index")
network.model.del_component("store_e_upper_index_0")
network.model.del_component("store_e_upper_index_1")
def state_of_charge_upper(m, s, h):
# Choose datetime of representive day
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=167)
period_start = network.cluster_ts.index[0::168][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=719)
period_start = network.cluster_ts.index[0::720][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
else:
hrs = 24 # 0
date = str(
network.snapshots[
network.snapshots.dayofyear - 1
== network.cluster["RepresentativeDay"][h.dayofyear]
][0]
).split(" ")[0]
hour = str(h).split(" ")[1]
intra_hour = pd.to_datetime(date + " " + hour)
if network.storage_units.p_nom_extendable[s]:
p_nom = m.storage_p_nom[s]
else:
p_nom = network.storage_units.p_nom[s]
return (
m.state_of_charge_intra[s, intra_hour]
+ m.state_of_charge_inter[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.storage_units.at[s, "standing_loss"]) ** hrs
<= p_nom * network.storage_units.at[s, "max_hours"]
)
def state_of_charge_upper_store(m, s, h):
# Choose datetime of representive day
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=167)
period_start = network.cluster_ts.index[0::168][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=719)
period_start = network.cluster_ts.index[0::720][candidate - 1]
delta_t = h - period_start
intra_hour = first_hour + delta_t
else:
hrs = 24 # 0
date = str(
network.snapshots[
network.snapshots.dayofyear - 1
== network.cluster["RepresentativeDay"][h.dayofyear]
][0]
).split(" ")[0]
hour = str(h).split(" ")[1]
intra_hour = pd.to_datetime(date + " " + hour)
if network.stores.e_nom_extendable[s]:
e_nom = m.store_e_nom[s]
else:
if "DSM" in s:
e_nom = (
network.stores.e_nom[s]
* network.stores_t.e_max_pu.at[intra_hour, s]
)
else:
e_nom = network.stores.e_nom[s]
return (
m.state_of_charge_intra_store[s, intra_hour]
+ m.state_of_charge_inter_store[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.stores.at[s, "standing_loss"]) ** hrs
<= e_nom
)
def state_of_charge_upper_simplified(m, s, h):
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
else:
hrs = 24 # 0
if network.storage_units.p_nom_extendable[s]:
p_nom = m.storage_p_nom[s]
else:
p_nom = network.storage_units.p_nom[s]
return (
m.state_of_charge_intra_max[
s, network.cluster_ts["Candidate_day"][h]
]
+ m.state_of_charge_inter[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.storage_units.at[s, "standing_loss"]) ** hrs
<= p_nom * network.storage_units.at[s, "max_hours"]
)
def state_of_charge_upper_store_simplified(m, s, h):
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 168
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 720
else:
hrs = 24 # 0
if network.stores.e_nom_extendable[s]:
e_nom = m.store_e_nom[s]
else:
if "DSM" in s:
if self.args["snapshot_clustering"]["how"] == "weekly":
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=167)
period_start = network.cluster_ts.index[0::168][
candidate - 1
]
delta_t = h - period_start
intra_hour = first_hour + delta_t
elif self.args["snapshot_clustering"]["how"] == "monthly":
candidate = network.cluster_ts["Candidate_day"][h]
last_hour = network.cluster.loc[candidate][
"last_hour_RepresentativeDay"
]
first_hour = last_hour - pd.DateOffset(hours=719)
period_start = network.cluster_ts.index[0::720][
candidate - 1
]
delta_t = h - period_start
intra_hour = first_hour + delta_t
else:
date = str(
network.snapshots[
network.snapshots.dayofyear - 1
== network.cluster["RepresentativeDay"][
h.dayofyear
]
][0]
).split(" ")[0]
hour = str(h).split(" ")[1]
intra_hour = pd.to_datetime(date + " " + hour)
e_nom = (
network.stores.e_nom[s]
* network.stores_t.e_max_pu.at[intra_hour, s]
)
else:
e_nom = network.stores.e_nom[s]
return (
m.state_of_charge_intra_store_max[
s, network.cluster_ts["Candidate_day"][h]
]
+ m.state_of_charge_inter_store[
s, network.cluster_ts["Candidate_day"][h]
]
* (1 - network.stores.at[s, "standing_loss"]) ** hrs
<= e_nom
)
if simplified:
network.model.state_of_charge_upper = po.Constraint(
sus.index,
network.cluster_ts.index,
rule=state_of_charge_upper_simplified,
)
network.model.state_of_charge_upper_store = po.Constraint(
sto.index,
network.cluster_ts.index,
rule=state_of_charge_upper_store_simplified,
)
else:
network.model.state_of_charge_upper = po.Constraint(
sus.index, network.cluster_ts.index, rule=state_of_charge_upper
)
network.model.state_of_charge_upper_store = po.Constraint(
sto.index,
network.cluster_ts.index,
rule=state_of_charge_upper_store,
)
def cyclic_state_of_charge(m, s):
"""
Defines cyclic condition like pypsas 'state_of_charge_contraint'.
There are small differences to original results.
"""
last_day = network.cluster.index[-1]
last_calc_hour = network.cluster["last_hour_RepresentativeDay"][
last_day
]
last_inter = m.state_of_charge_inter[s, last_day]
last_intra = m.state_of_charge_intra[s, last_calc_hour]
first_day = network.cluster.index[0]
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 167
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 719
else:
hrs = 23
first_calc_hour = network.cluster["last_hour_RepresentativeDay"][
first_day
] - pd.DateOffset(hours=hrs)
first_inter = m.state_of_charge_inter[s, first_day]
first_intra = m.state_of_charge_intra[s, first_calc_hour]
return first_intra + first_inter == (
(last_intra + last_inter)
* (1 - network.storage_units.at[s, "standing_loss"])
- (
m.storage_p_dispatch[s, last_calc_hour]
/ network.storage_units.at[s, "efficiency_dispatch"]
- m.storage_p_store[s, last_calc_hour]
* network.storage_units.at[s, "efficiency_store"]
)
)
def cyclic_state_of_charge_store(m, s):
last_day = network.cluster.index[-1]
last_calc_hour = network.cluster["last_hour_RepresentativeDay"][
last_day
]
last_inter = m.state_of_charge_inter_store[s, last_day]
last_intra = m.state_of_charge_intra_store[s, last_calc_hour]
first_day = network.cluster.index[0]
if self.args["snapshot_clustering"]["how"] == "weekly":
hrs = 167
elif self.args["snapshot_clustering"]["how"] == "monthly":
hrs = 719
else:
hrs = 23
first_calc_hour = network.cluster["last_hour_RepresentativeDay"][
first_day
] - pd.DateOffset(hours=hrs)
first_inter = m.state_of_charge_inter_store[s, first_day]
first_intra = m.state_of_charge_intra_store[s, first_calc_hour]
expr = first_intra + first_inter == (
(last_intra + last_inter)
* (1 - network.stores.at[s, "standing_loss"])
+ m.store_p[s, last_calc_hour]
)
return expr
network.model.cyclic_storage_constraint = po.Constraint(
sus.index, rule=cyclic_state_of_charge
)
network.model.cyclic_store_constraint = po.Constraint(
sto.index, rule=cyclic_state_of_charge_store
)
[docs]def snapshot_clustering_seasonal_storage_hourly(self, network, snapshots):
"""
Depicts intertemporal dependencies of storage units and stores when using
snapshot clustering to typical periods for temporal complexity reduction.
According to:
L. Kotzur et al: 'Time series aggregation for energy
system design:
Modeling seasonal storage', 2018
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : list
A list of datetime objects representing the timestamps of the snapshots
to be clustered.
Returns
-------
None
"""
# TODO: updaten mit stores (Sektorkopplung)
network.model.del_component("state_of_charge_all")
network.model.del_component("state_of_charge_all_index")
network.model.del_component("state_of_charge_all_index_0")
network.model.del_component("state_of_charge_all_index_1")
network.model.del_component("state_of_charge_constraint")
network.model.del_component("state_of_charge_constraint_index")
network.model.del_component("state_of_charge_constraint_index_0")
network.model.del_component("state_of_charge_constraint_index_1")
candidates = network.cluster.index.get_level_values(0).unique()
network.model.state_of_charge_all = po.Var(
network.storage_units.index,
candidates - 1 + self.args["start_snapshot"],
within=po.NonNegativeReals,
)
network.model.storages = network.storage_units.index
def set_soc_all(m, s, h):
if h == self.args["start_snapshot"]:
prev = (
network.cluster.index.get_level_values(0)[-1]
- 1
+ self.args["start_snapshot"]
)
else:
prev = h - 1
cluster_hour = network.cluster["Hour"][
h + 1 - self.args["start_snapshot"]
]
expr = m.state_of_charge_all[s, h] == m.state_of_charge_all[
s, prev
] * (1 - network.storage_units.at[s, "standing_loss"]) - (
m.storage_p_dispatch[s, cluster_hour]
/ network.storage_units.at[s, "efficiency_dispatch"]
- network.storage_units.at[s, "efficiency_store"]
* m.storage_p_store[s, cluster_hour]
)
return expr
network.model.soc_all = po.Constraint(
network.model.storages,
candidates - 1 + self.args["start_snapshot"],
rule=set_soc_all,
)
def soc_equals_soc_all(m, s, h):
hour = (h.dayofyear - 1) * 24 + h.hour
return m.state_of_charge_all[s, hour] == m.state_of_charge[s, h]
network.model.soc_equals_soc_all = po.Constraint(
network.model.storages, network.snapshots, rule=soc_equals_soc_all
)
network.model.del_component("state_of_charge_upper")
network.model.del_component("state_of_charge_upper_index")
network.model.del_component("state_of_charge_upper_index_0")
network.model.del_component("state_of_charge_upper_index_1")
def state_of_charge_upper(m, s, h):
if network.storage_units.p_nom_extendable[s]:
p_nom = m.storage_p_nom[s]
else:
p_nom = network.storage_units.p_nom[s]
return (
m.state_of_charge_all[s, h]
<= p_nom * network.storage_units.at[s, "max_hours"]
)
network.model.state_of_charge_upper = po.Constraint(
network.storage_units.index,
candidates - 1 + self.args["start_snapshot"],
rule=state_of_charge_upper,
)
[docs]def snapshot_clustering_seasonal_storage_nmp(self, n, sns, simplified=False):
"""
Depicts intertemporal dependencies of storage units and stores when using
snapshot clustering to typical periods for temporal complexity reduction.
According to:
L. Kotzur et al: 'Time series aggregation for energy
system design:
Modeling seasonal storage', 2018
Parameters
----------
n : :class:`pypsa.Network`
Overall container of PyPSA
sns : list
A list of datetime objects representing the timestamps of the snapshots
to be clustered.
simplified : bool, optional
A flag indicating whether to use a simplified version of the model that
does not include intra-temporal constraints and variables.
Returns
-------
None
"""
# TODO: so noch nicht korrekt...
# TODO: updaten mit stores (Sektorkopplung)
# TODO: simplified ergänzen
sus = n.storage_units
c = "StorageUnit"
period_starts = sns[0::24]
candidates = n.cluster.index.get_level_values(0).unique()
soc_total = get_var(n, c, "state_of_charge")
# inter_soc
# Set lower and upper bound for soc_inter
lb = pd.DataFrame(index=candidates, columns=sus.index, data=0)
ub = pd.DataFrame(index=candidates, columns=sus.index, data=np.inf)
# Create soc_inter variable for each storage and each day
define_variables(n, lb, ub, "StorageUnit", "soc_inter")
# Define soc_intra
# Set lower and upper bound for soc_intra
lb = pd.DataFrame(index=sns, columns=sus.index, data=-np.inf)
ub = pd.DataFrame(index=sns, columns=sus.index, data=np.inf)
# Set soc_intra to 0 at first hour of every day
lb.loc[period_starts, :] = 0
ub.loc[period_starts, :] = 0
# Create intra soc variable for each storage and each hour
define_variables(n, lb, ub, "StorageUnit", "soc_intra")
soc_intra = get_var(n, c, "soc_intra")
last_hour = n.cluster["last_hour_RepresentativeDay"].values
soc_inter = get_var(n, c, "soc_inter")
next_soc_inter = soc_inter.shift(-1).fillna(soc_inter.loc[candidates[0]])
last_soc_intra = soc_intra.loc[last_hour].set_index(candidates)
eff_stand = expand_series(1 - n.df(c).standing_loss, candidates).T
eff_dispatch = expand_series(n.df(c).efficiency_dispatch, candidates).T
eff_store = expand_series(n.df(c).efficiency_store, candidates).T
dispatch = get_var(n, c, "p_dispatch").loc[last_hour].set_index(candidates)
store = get_var(n, c, "p_store").loc[last_hour].set_index(candidates)
coeff_var = [
(-1, next_soc_inter),
(eff_stand.pow(24), soc_inter),
(eff_stand, last_soc_intra),
(-1 / eff_dispatch, dispatch),
(eff_store, store),
]
lhs, *axes = linexpr(*coeff_var, return_axes=True)
define_constraints(n, lhs, "==", 0, c, "soc_inter_constraints")
coeff_var = [
(-1, soc_total),
(1, soc_intra),
(
1,
soc_inter.loc[n.cluster_ts.loc[sns, "Candidate_day"]].set_index(
sns
),
),
]
lhs, *axes = linexpr(*coeff_var, return_axes=True)
define_constraints(n, lhs, "==", 0, c, "soc_intra_constraints")
[docs]def snapshot_clustering_seasonal_storage_hourly_nmp(self, n, sns):
"""
Depicts intertemporal dependencies of storage units and stores when using
snapshot clustering to typical periods for temporal complexity reduction.
According to:
L. Kotzur et al: 'Time series aggregation for energy
system design:
Modeling seasonal storage', 2018
Parameters
----------
n : :class:`pypsa.Network`
Overall container of PyPSA
sns : list
A list of datetime objects representing the timestamps of the snapshots
to be clustered.
Returns
-------
None
"""
print("TODO")
# TODO: implementieren
[docs]def split_dispatch_disaggregation_constraints(self, n, sns):
"""
Add constraints for state of charge of storage units and stores
when separating the optimization into smaller subproblems
while conducting thedispatch_disaggregation in temporally fully resolved
network
The state of charge at the end of each slice is set to the value
calculated in the optimization with the temporally reduced network
to account to ensure compatibility and to reproduce saisonality
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
Returns
-------
None.
"""
tsa_hour = sns[sns.isin(self.conduct_dispatch_disaggregation.index)]
if len(tsa_hour) > 1:
tsa_hour = tsa_hour[-1]
else:
tsa_hour = tsa_hour[0]
n.model.soc_values = self.conduct_dispatch_disaggregation.loc[tsa_hour]
sus = n.storage_units.index
# for stores, exclude emob and dsm because of their special constraints
sto = n.stores[
~n.stores.carrier.isin(["battery storage", "battery_storage", "dsm"])
].index
def disaggregation_sus_soc(m, s, h):
"""
Sets soc at the end of the time slice in disptach_disaggregation
to value calculated in temporally reduced lopf without slices.
"""
return m.state_of_charge[s, h] == m.soc_values[s]
n.model.split_dispatch_sus_soc = po.Constraint(
sus, sns[-1:], rule=disaggregation_sus_soc
)
def disaggregation_sto_soc(m, s, h):
"""
Sets soc at the end of the time slice in disptach_disaggregation
to value calculated in temporally reduced lopf without slices.
"""
return m.store_e[s, h] == m.soc_values[s]
n.model.split_dispatch_sto_soc = po.Constraint(
sto, sns[-1:], rule=disaggregation_sto_soc
)
[docs]def split_dispatch_disaggregation_constraints_nmp(self, n, sns):
print("TODO")
# TODO: implementieren
[docs]class Constraints:
def __init__(self, args, conduct_dispatch_disaggregation):
self.args = args
self.conduct_dispatch_disaggregation = conduct_dispatch_disaggregation
[docs] def functionality(self, network, snapshots):
"""Add constraints to pypsa-model using extra-functionality.
Serveral constraints can be choosen at once. Possible constraints are
set and described in the above functions.
Parameters
----------
network : :class:`pypsa.Network`
Overall container of PyPSA
snapshots : pandas.DatetimeIndex
List of timesteps considered in the optimization
"""
if "CH4" in network.buses.carrier.values:
if self.args["method"]["pyomo"]:
add_chp_constraints(network, snapshots)
if (self.args["scn_name"] != "status2019") & (
len(network.snapshots) > 1500
):
add_ch4_constraints(self, network, snapshots)
else:
add_chp_constraints_nmp(network)
if self.args["scn_name"] != "status2019":
add_ch4_constraints_nmp(self, network, snapshots)
for constraint in self.args["extra_functionality"].keys():
try:
type(network.model)
try:
eval("_" + constraint + "(self, network, snapshots)")
logger.info(
"Added extra_functionality {}".format(constraint)
)
except:
logger.warning(
"Constraint {} not defined".format(constraint)
+ ". New constraints can be defined in"
+ " etrago/tools/constraint.py."
)
except:
try:
eval("_" + constraint + "_nmp(self, network, snapshots)")
logger.info(
"Added extra_functionality {} without pyomo".format(
constraint
)
)
except:
logger.warning(
"Constraint {} not defined".format(constraint)
)
if (
self.args["snapshot_clustering"]["active"]
and self.args["snapshot_clustering"]["method"] == "typical_periods"
):
if (
self.args["snapshot_clustering"]["storage_constraints"]
== "daily_bounds"
):
if self.args["method"]["pyomo"]:
snapshot_clustering_daily_bounds(self, network, snapshots)
else:
snapshot_clustering_daily_bounds_nmp(
self, network, snapshots
)
elif (
self.args["snapshot_clustering"]["storage_constraints"]
== "soc_constraints"
):
if self.args["snapshot_clustering"]["how"] == "hourly":
if self.args["method"]["pyomo"]:
snapshot_clustering_seasonal_storage_hourly(
self, network, snapshots
)
else:
snapshot_clustering_seasonal_storage_hourly_nmp(
self, network, snapshots
)
else:
if self.args["method"]["pyomo"]:
snapshot_clustering_seasonal_storage(
self, network, snapshots
)
else:
snapshot_clustering_seasonal_storage_nmp(
self, network, snapshots
)
elif (
self.args["snapshot_clustering"]["storage_constraints"]
== "soc_constraints_simplified"
):
if self.args["snapshot_clustering"]["how"] == "hourly":
logger.info(
"""soc_constraints_simplified not possible while hourly
clustering -> changed to soc_constraints"""
)
if self.args["method"]["pyomo"]:
snapshot_clustering_seasonal_storage_hourly(
self, network, snapshots
)
else:
snapshot_clustering_seasonal_storage_hourly_nmp(
self, network, snapshots
)
if self.args["method"]["pyomo"]:
snapshot_clustering_seasonal_storage(
self, network, snapshots, simplified=True
)
else:
snapshot_clustering_seasonal_storage_nmp(
self, network, snapshots, simplified=True
)
else:
logger.error(
"""If you want to use constraints considering the storage
behaviour, snapshot clustering constraints must be in
[daily_bounds, soc_constraints,
soc_constraints_simplified]"""
)
if self.conduct_dispatch_disaggregation is not False:
if self.args["method"]["pyomo"]:
split_dispatch_disaggregation_constraints(
self, network, snapshots
)
else:
split_dispatch_disaggregation_constraints_nmp(
self, network, snapshots
)
[docs]def add_chp_constraints_nmp(n):
"""
Limits the dispatch of combined heat and power links based on
T.Brown et. al : Synergies of sector coupling and transmission
reinforcement in a cost-optimised, highly renewable European energy system,
2018
Parameters
----------
n : pypsa.Network
Network container
Returns
-------
None.
"""
# backpressure limit
c_m = 0.75
# marginal loss for each additional generation of heat
c_v = 0.15
electric_bool = n.links.carrier == "central_gas_CHP"
heat_bool = n.links.carrier == "central_gas_CHP_heat"
electric = n.links.index[electric_bool]
heat = n.links.index[heat_bool]
n.links.loc[heat, "efficiency"] = (
n.links.loc[electric, "efficiency"] / c_v
).values.mean()
ch4_nodes_with_chp = n.buses.loc[
n.links.loc[electric, "bus0"].values
].index.unique()
for i in ch4_nodes_with_chp:
elec_chp = n.links[
(n.links.carrier == "central_gas_CHP") & (n.links.bus0 == i)
].index
heat_chp = n.links[
(n.links.carrier == "central_gas_CHP_heat") & (n.links.bus0 == i)
].index
link_p = get_var(n, "Link", "p")
# backpressure
lhs_1 = sum(
c_m * n.links.at[h_chp, "efficiency"] * link_p[h_chp]
for h_chp in heat_chp
)
lhs_2 = sum(
n.links.at[e_chp, "efficiency"] * link_p[e_chp]
for e_chp in elec_chp
)
lhs = linexpr((1, lhs_1), (-1, lhs_2))
define_constraints(
n, lhs, "<=", 0, "chplink_" + str(i), "backpressure"
)
# top_iso_fuel_line
lhs, *ax = linexpr(
(1, sum(link_p[h_chp] for h_chp in heat_chp)),
(1, sum(link_p[h_e] for h_e in elec_chp)),
return_axes=True,
)
define_constraints(
n,
lhs,
"<=",
n.links.loc[elec_chp].p_nom.sum(),
"chplink_" + str(i),
"top_iso_fuel_line_fix",
axes=ax,
)
[docs]def add_chp_constraints(network, snapshots):
"""
Limits the dispatch of combined heat and power links based on
T.Brown et. al : Synergies of sector coupling and transmission
reinforcement in a cost-optimised, highly renewable European energy system,
2018
Parameters
----------
network : pypsa.Network
Network container
snapshots : pandas.DataFrame
Timesteps to optimize
Returns
-------
None.
"""
# backpressure limit
c_m = 0.75
# marginal loss for each additional generation of heat
c_v = 0.15
electric_bool = network.links.carrier == "central_gas_CHP"
heat_bool = network.links.carrier == "central_gas_CHP_heat"
electric = network.links.index[electric_bool]
heat = network.links.index[heat_bool]
network.links.loc[heat, "efficiency"] = (
network.links.loc[electric, "efficiency"] / c_v
).values.mean()
ch4_nodes_with_chp = network.buses.loc[
network.links.loc[electric, "bus0"].values
].index.unique()
for i in ch4_nodes_with_chp:
elec_chp = network.links[
(network.links.carrier == "central_gas_CHP")
& (network.links.bus0 == i)
].index
heat_chp = network.links[
(network.links.carrier == "central_gas_CHP_heat")
& (network.links.bus0 == i)
].index
# Guarantees c_m p_b1 \leq p_g1
def backpressure(model, snapshot):
lhs = sum(
c_m
* network.links.at[h_chp, "efficiency"]
* model.link_p[h_chp, snapshot]
for h_chp in heat_chp
)
rhs = sum(
network.links.at[e_chp, "efficiency"]
* model.link_p[e_chp, snapshot]
for e_chp in elec_chp
)
return lhs <= rhs
setattr(
network.model,
"backpressure_" + str(i),
Constraint(list(snapshots), rule=backpressure),
)
# Guarantees p_g1 +c_v p_b1 \leq p_g1_nom
def top_iso_fuel_line(model, snapshot):
lhs = sum(
model.link_p[h_chp, snapshot] for h_chp in heat_chp
) + sum(model.link_p[e_chp, snapshot] for e_chp in elec_chp)
rhs = network.links[
(network.links.carrier == "central_gas_CHP")
& (network.links.bus0 == i)
].p_nom.sum()
return lhs <= rhs
setattr(
network.model,
"top_iso_fuel_line_" + str(i),
Constraint(list(snapshots), rule=top_iso_fuel_line),
)