Source code for etrago.tools.utilities

# -*- coding: utf-8 -*-
# Copyright 2016-2023  Flensburg University of Applied Sciences,
# Europa-Universität Flensburg,
# Centre for Sustainable Energy Systems,
# DLR-Institute for Networked Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# File description
"""
Utilities.py includes a wide range of useful functions.
"""

from collections.abc import Mapping
from copy import deepcopy
from pathlib import Path
from urllib.request import urlretrieve
import json
import logging
import math
import os
import zipfile

from shapely.geometry import LineString, Point
import geopandas as gpd
import numpy as np
import pandas as pd
import pypsa
import saio
import sqlalchemy.exc

if "READTHEDOCS" not in os.environ:

    from etrago.tools import db

logger = logging.getLogger(__name__)


__copyright__ = (
    "Flensburg University of Applied Sciences, "
    "Europa-Universität Flensburg, "
    "Centre for Sustainable Energy Systems, "
    "DLR-Institute for Networked Energy Systems"
)
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = """ulfmueller, s3pp, wolfbunke, mariusves, lukasol, ClaraBuettner,
CarlosEpia, gnn, pieterhexen, fwitte, KathiEsterl, MGlauer, birgits,
 AmeliaNadal, MarlonSchlemminger, wheitkoetter, jankaeh"""






[docs] def buses_of_vlvl(network, voltage_level): """Get bus-ids of given voltage level(s). Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA voltage_level: list Returns ------- list List containing bus-ids. """ mask = network.buses.v_nom.isin(voltage_level) df = network.buses[mask] return df.index
[docs] def buses_grid_linked(network, voltage_level): """Get bus-ids of a given voltage level connected to the grid. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA voltage_level: list Returns ------- list List containing bus-ids. """ mask = ( network.buses.index.isin(network.lines.bus0) | (network.buses.index.isin(network.lines.bus1)) | ( network.buses.index.isin( network.links.loc[network.links.carrier == "DC", "bus0"] ) ) | ( network.buses.index.isin( network.links.loc[network.links.carrier == "DC", "bus1"] ) ) ) & (network.buses.v_nom.isin(voltage_level)) df = network.buses[mask] return df.index
[docs] def geolocation_buses(self, apply_on="grid_model"): """ If geopandas is installed: Use geometries of buses x/y(lon/lat) and polygons of countries from RenpassGisParameterRegion in order to locate the buses Else: Use coordinats of buses to locate foreign buses, which is less accurate. TODO: Why not alway use geopandas?? Parameters ---------- etrago : :class:`etrago.Etrago` Transmission grid object apply_on: str State if this function is applied on the grid_model or the market_model. The market_model options can only be used if the method type is "market_grid". """ if apply_on == "grid_model": network = self.network elif apply_on == "market_model": network = self.market_model elif apply_on == "pre_market_model": network = self.pre_market_model else: logger.warning( """Parameter apply_on must be either 'grid_model' or 'market_model' or 'pre_market_model'.""" ) transborder_lines_0 = network.lines[ network.lines["bus0"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index transborder_lines_1 = network.lines[ network.lines["bus1"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index # set country tag for lines network.lines.loc[transborder_lines_0, "country"] = network.buses.loc[ network.lines.loc[transborder_lines_0, "bus0"].values, "country" ].values network.lines.loc[transborder_lines_1, "country"] = network.buses.loc[ network.lines.loc[transborder_lines_1, "bus1"].values, "country" ].values network.lines["country"].fillna("DE", inplace=True) doubles = list(set(transborder_lines_0.intersection(transborder_lines_1))) for line in doubles: c_bus0 = network.buses.loc[network.lines.loc[line, "bus0"], "country"] c_bus1 = network.buses.loc[network.lines.loc[line, "bus1"], "country"] network.lines.loc[line, "country"] = "{}{}".format(c_bus0, c_bus1) transborder_links_0 = network.links[ network.links["bus0"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index transborder_links_1 = network.links[ network.links["bus1"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index # set country tag for links network.links.loc[transborder_links_0, "country"] = network.buses.loc[ network.links.loc[transborder_links_0, "bus0"].values, "country" ].values network.links.loc[transborder_links_1, "country"] = network.buses.loc[ network.links.loc[transborder_links_1, "bus1"].values, "country" ].values network.links["country"].fillna("DE", inplace=True) doubles = list(set(transborder_links_0.intersection(transborder_links_1))) for link in doubles: c_bus0 = network.buses.loc[network.links.loc[link, "bus0"], "country"] c_bus1 = network.buses.loc[network.links.loc[link, "bus1"], "country"] network.links.loc[link, "country"] = "{}{}".format(c_bus0, c_bus1) return network
[docs] def buses_by_country(self, apply_on="grid_model"): """ Find buses of foreign countries using coordinates and return them as Pandas Series Parameters ---------- self : Etrago object Overall container of PyPSA apply_on: str State if this function is applied on the grid_model or the market_model. The market_model options can only be used if the method type is "market_grid". Returns ------- None """ if apply_on == "grid_model": network = self.network elif apply_on == "market_model": network = self.market_model elif apply_on == "pre_market_model": network = self.pre_market_model else: logger.warning( """Parameter apply_on must be either 'grid_model' or 'market_model' or 'pre_market_model'.""" ) countries = { "Poland": "PL", "Czechia": "CZ", "Denmark": "DK", "Sweden": "SE", "Austria": "AT", "Switzerland": "CH", "Netherlands": "NL", "Luxembourg": "LU", "France": "FR", "Belgium": "BE", "United Kingdom": "GB", "Norway": "NO", "Finland": "FI", "Germany": "DE", "Russia": "RU", } if "oep.iks.cs.ovgu.de" in str(self.engine.url): saio.register_schema("tables", self.engine) from saio.tables import edut_00_013 as vg250_lan else: saio.register_schema("boundaries", self.engine) from saio.boundaries import vg250_lan # read Germany borders from egon-data query = self.session.query(vg250_lan) germany_sh = saio.as_pandas(query, geometry="geometry") # read Europe borders. Original data downloaded from naturalearthdata.com/ # under Public Domain license path_countries = Path(".") / "data" / "shapes_europe" if not os.path.exists(path_countries): path_countries.mkdir(exist_ok=True, parents=True) url_countries = ( "https://naciscdn.org/naturalearth/110m/cultural/" + "ne_110m_admin_0_countries.zip" ) urlretrieve(url_countries, path_countries / "shape_countries.zip") with zipfile.ZipFile( path_countries / "shape_countries.zip", "r" ) as zip_ref: zip_ref.extractall(path_countries) shapes = ( gpd.read_file(path_countries) .rename(columns={"NAME": "name"}) .set_index("name") ) # Use Germany borders from egon-data if not using the SH test case if len(germany_sh.gen.unique()) > 1: shapes.at["Germany", "geometry"] = germany_sh.geometry.unary_union geobuses = network.buses.copy() geobuses["geom"] = geobuses.apply( lambda x: Point([x["x"], x["y"]]), axis=1 ) geobuses = gpd.GeoDataFrame( data=geobuses, geometry="geom", crs="EPSG:4326" ) geobuses["country"] = np.nan for country in countries: geobuses["country"][ network.buses.index.isin( geobuses.clip(shapes[shapes.index == country]).index ) ] = countries[country] shapes = shapes.to_crs(3035) geobuses = geobuses.to_crs(3035) for bus in geobuses[geobuses["country"].isna()].index: distances = shapes.distance(geobuses.loc[bus, "geom"]) closest = distances.idxmin() geobuses.loc[bus, "country"] = countries[closest] network.buses = geobuses.drop(columns="geom") return
[docs] def clip_foreign(network): """ Delete all components and timelines located outside of Germany. If applied after optimization, transborder flows divided by country of origin are added as network.foreign_trade. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA Returns ------- network : :class:`pypsa.Network Overall container of PyPSA """ # get foreign buses by country foreign_buses = network.buses[network.buses.country != "DE"] network.buses = network.buses.drop( network.buses.loc[foreign_buses.index].index ) if not network.lines_t.p0.empty: # identify transborder lines # TODO: Add links! transborder_lines = network.lines.query("country != 'DE'") transborder_lines["bus0"] = network.lines["bus0"] transborder_lines["bus1"] = network.lines["bus1"] transborder_lines["country"] = network.lines.country # identify amount of flows per line and group to get flow per country transborder_flows = network.lines_t.p0[transborder_lines.index] for i in transborder_flows.columns: if network.lines.loc[str(i)]["bus1"] in foreign_buses.index: transborder_flows.loc[:, str(i)] = ( transborder_flows.loc[:, str(i)] * -1 ) network.foreign_trade = transborder_flows.groupby( transborder_lines["country"], axis=1 ).sum() # drop foreign components network.lines = network.lines.drop( network.lines[ ~(network.lines["bus0"].isin(network.buses.index)) | ~(network.lines["bus1"].isin(network.buses.index)) ].index ) network.links = network.links.drop( network.links[ ~(network.links["bus0"].isin(network.buses.index)) | ~(network.links["bus1"].isin(network.buses.index)) ].index ) network.transformers = network.transformers.drop( network.transformers[ ~(network.transformers["bus0"].isin(network.buses.index)) | ~(network.transformers["bus1"].isin(network.buses.index)) ].index ) network.generators = network.generators.drop( network.generators[ ~(network.generators["bus"].isin(network.buses.index)) ].index ) network.loads = network.loads.drop( network.loads[~(network.loads["bus"].isin(network.buses.index))].index ) network.storage_units = network.storage_units.drop( network.storage_units[ ~(network.storage_units["bus"].isin(network.buses.index)) ].index ) components = [ "loads", "generators", "lines", "buses", "transformers", "links", ] for g in components: # loads_t h = g + "_t" nw = getattr(network, h) # network.loads_t for i in nw.keys(): # network.loads_t.p cols = [ j for j in getattr(nw, i).columns if j not in getattr(network, g).index ] for k in cols: del getattr(nw, i)[k] return network
[docs] def set_q_national_loads(self, cos_phi): """ Set q component of national loads based on the p component and cos_phi Parameters ---------- network : :class:`pypsa.Network` Overall container of PyPSA cos_phi : float Choose ration of active and reactive power of foreign loads Returns ------- network : :class:`pypsa.Network` Overall container of PyPSA """ network = self.network national_buses = network.buses[ (network.buses.country == "DE") & (network.buses.carrier == "AC") ] # Calculate q national loads based on p and cos_phi new_q_loads = network.loads_t["p_set"].loc[ :, network.loads.index[ (network.loads.bus.astype(str).isin(national_buses.index)) & (network.loads.carrier.astype(str) == "AC") ], ] * math.tan(math.acos(cos_phi)) # insert the calculated q in loads_t. Only loads without previous # assignment are affected network.loads_t.q_set = pd.merge( network.loads_t.q_set, new_q_loads, how="inner", right_index=True, left_index=True, suffixes=("", "delete_"), ) network.loads_t.q_set.drop( [i for i in network.loads_t.q_set.columns if "delete" in i], axis=1, inplace=True, )
[docs] def set_q_foreign_loads(self, cos_phi): """Set reative power timeseries of loads in neighbouring countries Parameters ---------- etrago : :class:`etrago.Etrago Transmission grid object cos_phi: float Choose ration of active and reactive power of foreign loads Returns ------- None """ network = self.network foreign_buses = network.buses[ (network.buses.country != "DE") & (network.buses.carrier == "AC") ] network.loads_t["q_set"].loc[ :, network.loads.index[ (network.loads.bus.astype(str).isin(foreign_buses.index)) & (network.loads.carrier != "H2_for_industry") ].astype(int), ] = network.loads_t["p_set"].loc[ :, network.loads.index[ (network.loads.bus.astype(str).isin(foreign_buses.index)) & (network.loads.carrier != "H2_for_industry") ], ].values * math.tan( math.acos(cos_phi) ) # To avoid a problem when the index of the load is the weather year, # the column names were temporarily set to `int` and changed back to # `str`. network.loads_t["q_set"].columns = network.loads_t["q_set"].columns.astype( str )
[docs] def connected_grid_lines(network, busids): """Get grid lines connected to given buses. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA busids : list List containing bus-ids. Returns ------- :class:`pandas.DataFrame PyPSA lines. """ mask = network.lines.bus1.isin(busids) | network.lines.bus0.isin(busids) return network.lines[mask]
[docs] def connected_transformer(network, busids): """Get transformer connected to given buses. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA busids : list List containing bus-ids. Returns ------- :class:`pandas.DataFrame PyPSA transformer. """ mask = network.transformers.bus0.isin(busids) return network.transformers[mask]
[docs] def load_shedding( self, temporal_disaggregation=False, negative_load_shedding=["Li_ion"], **kwargs, ): """Implement load shedding in existing network to identify feasibility problems Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA temporal_disaggregation : bool It is set to False by default. negative_load_shedding : False, list Define if generators for negative load shedding will be created. It is set to False by default. When activated supply a list of carriers e.g. ["AC", "Li_ion"] Returns ------- None. """ logger.debug("Shedding the load.") if self.args["load_shedding"]: if temporal_disaggregation: network = self.network_tsa else: network = self.network marginal_cost_def = 10000 # network.generators.marginal_cost.max()*2 p_nom_def = network.loads_t.p_set.max().max() marginal_cost = kwargs.get("marginal_cost", marginal_cost_def) p_nom = kwargs.get("p_nom", p_nom_def) network.add("Carrier", "load") start = ( network.generators.index.to_series() .str.extract("(\d+)") .astype(int) .max() + 1 )[0] if start != start: start = 0 index = list(range(start, start + len(network.buses.index))) network.import_components_from_dataframe( pd.DataFrame( dict( marginal_cost=marginal_cost, p_nom=p_nom, carrier="load shedding", bus=network.buses.index, control="PQ", ), index=index, ), "Generator", ) if negative_load_shedding: logger.debug("Shedding the generation.") # Add negative load shedding generators for provided carriers neg_shedding_buses = network.buses[ network.buses.carrier.isin(negative_load_shedding) ].index index_neg_shedding = list( range( start + len(network.buses.index), start + len(network.buses.index) + len(neg_shedding_buses), ) ) network.import_components_from_dataframe( pd.DataFrame( dict( marginal_cost=-marginal_cost, p_nom=p_nom, p_min_pu=-1, p_max_pu=0, carrier="negative load shedding", bus=neg_shedding_buses, control="PQ", ), index=index_neg_shedding, ), "Generator", )
[docs] def set_control_strategies(network): """Sets control strategies for AC generators and storage units Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA Returns ------- None. """ # Assign generators control strategy network.generators.loc[:, "control"] = "PV" network.generators.loc[ network.generators.carrier.isin( [ "load shedding", "negative load shedding", "CH4", "CH4_biogas", "CH4_NG", "central_biomass_CHP_heat", "geo_thermal", "solar_thermal_collector", ] ), "control", ] = "PQ" # Assign storage units control strategy network.storage_units.loc[:, "control"] = "PV"
def _enumerate_row(row): row["name"] = row.name return row
[docs] def export_to_csv(self, path): """Write calculation results to csv-files in `path`. Parameters ---------- network : :class:`pypsa.Network` Overall container of PyPSA args: dict Contains calculation settings of appl.py path: str or False or None Choose path for csv-files. Specify `""`, `False` or `None` to not do anything. Returns ------- None """ if not path: pass if not os.path.exists(path): os.makedirs(path, exist_ok=True) self.network.export_to_csv_folder(path) data = pd.read_csv(os.path.join(path, "network.csv")) # data['time'] = network.results['Solver'].Time data = data.apply(_enumerate_row, axis=1) data.to_csv(os.path.join(path, "network.csv"), index=False) with open(os.path.join(path, "args.json"), "w") as fp: json.dump(self.args, fp, indent=4) if hasattr(self.network, "Z"): file = [ i for i in os.listdir(path.strip("0123456789")) if i == "Z.csv" ] if file: print("Z already calculated") else: self.network.Z.to_csv( path.strip("0123456789") + "/Z.csv", index=False ) if bool(self.busmap): path_clus = os.path.join(path, "clustering") if not os.path.exists(path_clus): os.makedirs(path_clus, exist_ok=True) with open(os.path.join(path_clus, "busmap.json"), "w") as d: json.dump(self.busmap["busmap"], d, indent=4) self.busmap["orig_network"].export_to_csv_folder(path_clus) data = pd.read_csv(os.path.join(path_clus, "network.csv")) data = data.apply(_enumerate_row, axis=1) data.to_csv(os.path.join(path_clus, "network.csv"), index=False) if isinstance(self.ch4_h2_mapping, pd.Series): path_clus = os.path.join(path, "clustering") if not os.path.exists(path_clus): os.makedirs(path_clus, exist_ok=True) with open(os.path.join(path_clus, "ch4_h2_mapping.json"), "w") as d: self.ch4_h2_mapping.to_json(d, indent=4) return
def _make_consense(component, attr): """ Returns a function `consense` that will be used to generate a consensus value for the attribute `attr` of the given `component`. This consensus value is derived from the input DataFrame `x`. If all values in the DataFrame are equal, the consensus value will be that common value. If all values are missing (NaN), the consensus value will be NaN. Otherwise, an assertion error will be raised. Parameters ---------- component : str specify the name of the component being clustered. attr : str specify the name of the attribute of the commponent being considered. Returns ------- function A function that takes a DataFrame as input and returns a single value as output when all the elements of the commponent attribute are the same. """ def consense(x): v = x.iat[0] assert (x == v).all() or x.isnull().all(), ( f"In {component} cluster {x.name} the values" f" of attribute {attr} do not agree:\n{x}" ) return v return consense def _normed(s): """ Given a pandas Series `s`, normalizes the series by dividing each element by the sum of the series. If the sum of the series is zero, returns 1.0 to avoid division by zero errors. Parameters ---------- s : pandas.Series A pandas Series. Returns ------- pandas.Series A normalized pandas Series. """ tot = s.sum() if tot == 0: return 1.0 else: return s / tot
[docs] def agg_series_lines(l0, network): """ Given a pandas DataFrame `l0` containing information about lines in a network and a network object, aggregates the data in `l0` for all its attributes. Returns a pandas Series containing the aggregated data. Parameters ---------- l0: pandas.DataFrame contain information about lines in a network. network : :class:`pypsa.Network Overall container of PyPSA Returns ------- pandas.Series A pandas Series containing aggregated data for the lines in the network. """ attrs = network.components["Line"]["attrs"] columns = set( attrs.index[attrs.static & attrs.status.str.startswith("Input")] ).difference(("name", "bus0", "bus1")) consense = { attr: _make_consense("Bus", attr) for attr in ( columns # | {"sub_network"} - { "r", "x", "g", "b", "terrain_factor", "s_nom", "s_nom_min", "s_nom_max", "s_nom_extendable", "length", "v_ang_min", "v_ang_max", } ) } Line = l0.index[0] data = dict( r=l0["r"].sum(), x=l0["x"].sum(), g=1.0 / (1.0 / l0["g"].apply(float)).sum(), b=1.0 / (1.0 / l0["b"]).sum(), terrain_factor=l0["terrain_factor"].mean(), s_max_pu=(l0["s_max_pu"] * _normed(l0["s_nom"])).sum(), s_nom=l0["s_nom"].iloc[0], s_nom_min=l0["s_nom_min"].max(), s_nom_max=l0["s_nom_max"].min(), s_nom_extendable=l0["s_nom_extendable"].any(), num_parallel=l0["num_parallel"].max(), capital_cost=(_normed(l0["s_nom"]) * l0["capital_cost"]).sum(), length=l0["length"].sum(), v_ang_min=l0["v_ang_min"].max(), v_ang_max=l0["v_ang_max"].min(), ) data.update((f, consense[f](l0[f])) for f in columns.difference(data)) return pd.Series( data, index=[f for f in l0.columns if f in columns], name=Line )
[docs] def group_parallel_lines(network): """ Function that groups parallel lines of the same voltage level to one line component representing all parallel lines Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA Returns ------- None. """ def agg_parallel_lines(l0): attrs = network.components["Line"]["attrs"] columns = set( attrs.index[attrs.static & attrs.status.str.startswith("Input")] ).difference(("name", "bus0", "bus1")) columns.add("Line") columns.add("geom") consense = { attr: _make_consense("Bus", attr) for attr in ( columns | {"sub_network"} - { "Line", "r", "x", "g", "b", "terrain_factor", "s_nom", "s_nom_min", "s_nom_max", "s_nom_extendable", "length", "v_ang_min", "v_ang_max", "geom", } ) } data = dict( Line=l0["Line"].iloc[0], r=1.0 / (1.0 / l0["r"]).sum(), x=1.0 / (1.0 / l0["x"]).sum(), g=l0["g"].sum(), b=l0["b"].sum(), terrain_factor=l0["terrain_factor"].mean(), s_max_pu=(l0["s_max_pu"] * _normed(l0["s_nom"])).sum(), s_nom=l0["s_nom"].sum(), s_nom_min=l0["s_nom_min"].sum(), s_nom_max=l0["s_nom_max"].sum(), s_nom_extendable=l0["s_nom_extendable"].any(), num_parallel=l0["num_parallel"].sum(), capital_cost=(_normed(l0["s_nom"]) * l0["capital_cost"]).sum(), length=l0["length"].mean(), sub_network=consense["sub_network"](l0["sub_network"]), v_ang_min=l0["v_ang_min"].max(), v_ang_max=l0["v_ang_max"].min(), geom=l0["geom"].iloc[0], ) data.update((f, consense[f](l0[f])) for f in columns.difference(data)) return pd.Series(data, index=[f for f in l0.columns if f in columns]) # Make bus0 always the greattest to identify repeated lines lines_2 = network.lines.copy() bus_max = lines_2.apply(lambda x: max(x.bus0, x.bus1), axis=1) bus_min = lines_2.apply(lambda x: min(x.bus0, x.bus1), axis=1) lines_2["bus0"] = bus_max lines_2["bus1"] = bus_min lines_2.reset_index(inplace=True) lines_2["geom"] = lines_2.apply( lambda x: None if x.geom is None else x.geom.wkt, axis=1 ) network.lines = ( lines_2.groupby(["bus0", "bus1"]) .apply(agg_parallel_lines) .reset_index() .set_index("Line", drop=True) ) # network.lines["geom"] = gpd.GeoSeries.from_wkt(network.lines["geom"]) return
[docs] def delete_dispensable_ac_buses(etrago): """ Function that identifies and delete AC buses without links, transformers, generators, loads, stores or storage_units, which also are connected to just one or two other buses Parameters ---------- etrago : etrago object Returns ------- None. """ if etrago.args["delete_dispensable_ac_buses"] is False: return def delete_buses(delete_buses, network): drop_buses = delete_buses.index.to_list() network.buses.drop(labels=drop_buses, inplace=True) drop_lines = network.lines.index[ (network.lines.bus0.isin(drop_buses)) | (network.lines.bus1.isin(drop_buses)) ].to_list() network.lines.drop(labels=drop_lines, inplace=True) drop_storage_units = network.storage_units.index[ (network.storage_units.bus.isin(drop_buses)) ].to_list() network.storage_units.drop(drop_storage_units, inplace=True) drop_generators = network.generators.index[ (network.generators.bus.isin(drop_buses)) ].to_list() network.generators.drop(drop_generators, inplace=True) return ( network.buses, network.lines, network.storage_units, network.generators, ) def count_lines(lines): buses_in_lines = lines[["bus0", "bus1"]].drop_duplicates() def count(bus): total = ( (buses_in_lines["bus0"] == bus.name) | (buses_in_lines["bus1"] == bus.name) ).sum() return total return count network = etrago.network # Group the parallel transmission lines to reduce the complexity group_parallel_lines(etrago.network) initial_ac = etrago.network.buses.carrier.value_counts()["AC"] diff = len(etrago.network.buses) while diff != 0: bus_initial = len(etrago.network.buses) # ordering of buses bus0_new = network.lines.apply(lambda x: max(x.bus0, x.bus1), axis=1) bus1_new = network.lines.apply(lambda x: min(x.bus0, x.bus1), axis=1) network.lines["bus0"] = bus0_new network.lines["bus1"] = bus1_new # Find the buses without any other kind of elements attached to them # more than transmission lines. ac_buses = network.buses[network.buses.carrier == "AC"][ ["geom", "country"] ] b_links = pd.concat([network.links.bus0, network.links.bus1]).unique() b_trafo = pd.concat( [network.transformers.bus0, network.transformers.bus1] ).unique() b_gen = network.generators[ (network.generators.carrier != "load shedding") & (network.generators.carrier != "negative load shedding") ].bus.unique() b_load = network.loads.bus.unique() b_store = network.stores[network.stores.e_nom > 0].bus.unique() b_store_unit = network.storage_units[ network.storage_units.p_nom > 0 ].bus.unique() ac_buses["links"] = ac_buses.index.isin(b_links) ac_buses["trafo"] = ac_buses.index.isin(b_trafo) ac_buses["gen"] = ac_buses.index.isin(b_gen) ac_buses["load"] = ac_buses.index.isin(b_load) ac_buses["store"] = ac_buses.index.isin(b_store) ac_buses["storage_unit"] = ac_buses.index.isin(b_store_unit) ac_buses = ac_buses[ ~(ac_buses.links) & ~(ac_buses.trafo) & ~(ac_buses.gen) & ~(ac_buses.load) & ~(ac_buses.store) & ~(ac_buses.storage_unit) ][[]] # count how many lines are connected to each bus number_of_lines = count_lines(network.lines) ac_buses["n_lines"] = 0 ac_buses["n_lines"] = ac_buses.apply(number_of_lines, axis=1) # Keep the buses with two or less transmission lines ac_buses = ac_buses[ac_buses["n_lines"] <= 2] # Keep only the buses connecting 2 lines with the same capacity lines_cap = network.lines[ (network.lines.bus0.isin(ac_buses.index)) | (network.lines.bus1.isin(ac_buses.index)) ][["bus0", "bus1", "s_nom"]] delete_bus = [] for bus in ac_buses[ac_buses["n_lines"] == 2].index: l0 = lines_cap[(lines_cap.bus0 == bus) | (lines_cap.bus1 == bus)][ "s_nom" ].unique() if len(l0) != 1: delete_bus.append(bus) ac_buses.drop(delete_bus, inplace=True) # create groups of lines to join buses_2 = ac_buses[ac_buses["n_lines"] == 2] lines = network.lines[ (network.lines.bus0.isin(buses_2.index)) | (network.lines.bus1.isin(buses_2.index)) ][["bus0", "bus1"]].copy() lines_index = lines.index new_lines = pd.DataFrame(columns=["bus0", "bus1", "lines"]) group = 0 for line in lines_index: if line not in lines.index: continue bus0 = lines.at[line, "bus0"] bus1 = lines.at[line, "bus1"] lines_group = [line] lines.drop(line, inplace=True) # Determine bus0 new group end_search = False while not end_search: if bus0 not in ac_buses.index: end_search = True continue lines_b = lines[(lines.bus0 == bus0) | (lines.bus1 == bus0)] if len(lines_b) > 0: lines_group.append(lines_b.index[0]) if lines_b.iat[0, 0] == bus0: bus0 = lines_b.iat[0, 1] else: bus0 = lines_b.iat[0, 0] lines.drop(lines_b.index[0], inplace=True) else: end_search = True # Determine bus1 new group end_search = False while not end_search: if bus1 not in ac_buses.index: end_search = True continue lines_b = lines[(lines.bus0 == bus1) | (lines.bus1 == bus1)] if len(lines_b) > 0: lines_group.append(lines_b.index[0]) if lines_b.iat[0, 0] == bus1: bus1 = lines_b.iat[0, 1] else: bus1 = lines_b.iat[0, 0] lines.drop(lines_b.index[0], inplace=True) else: end_search = True # Define the parameters of the new lines to be inserted into # `network.lines`. new_lines.loc[group] = [bus0, bus1, lines_group] group = group + 1 # Create the new lines as result of aggregating series lines lines = network.lines[ (network.lines.bus0.isin(buses_2.index)) | (network.lines.bus1.isin(buses_2.index)) ] new_lines_df = pd.DataFrame(columns=lines.columns).rename_axis("Lines") for l0 in new_lines.index: lines_group = lines[ lines.index.isin(new_lines.at[l0, "lines"]) ].copy() l_new = agg_series_lines(lines_group, network) l_new["bus0"] = new_lines.at[l0, "bus0"] l_new["bus1"] = new_lines.at[l0, "bus1"] new_lines_df["s_nom_extendable"] = new_lines_df[ "s_nom_extendable" ].astype(bool) new_lines_df.loc[l_new.name] = l_new # Delete all the dispensable buses ( network.buses, network.lines, network.storage_units, network.generators, ) = delete_buses(ac_buses, network) # exclude from the new lines the ones connected to deleted buses new_lines_df = new_lines_df[ (~new_lines_df.bus0.isin(ac_buses.index)) & (~new_lines_df.bus1.isin(ac_buses.index)) ] etrago.network.lines = pd.concat([etrago.network.lines, new_lines_df]) # Drop s_max_pu timeseries for deleted lines etrago.network.lines_t.s_max_pu = ( etrago.network.lines_t.s_max_pu.transpose()[ etrago.network.lines_t.s_max_pu.columns.isin( etrago.network.lines.index ) ].transpose() ) diff = bus_initial - len(etrago.network.buses) final_ac = etrago.network.buses.carrier.value_counts()["AC"] logger.info(f"{initial_ac - final_ac} dispensable AC buses were removed") return
[docs] def delete_irrelevant_oneports(etrago): network = etrago.network network.generators.drop( network.generators[ (network.generators.p_nom == 0) & (network.generators.p_nom_extendable is False) ].index, inplace=True, ) network.storage_units.drop( network.storage_units[ (network.storage_units.p_nom == 0) & (network.storage_units.p_nom_extendable is False) ].index, inplace=True, ) components = ["generators", "storage_units"] for g in components: # loads_t h = g + "_t" nw = getattr(network, h) # network.loads_t for i in nw.keys(): # network.loads_t.p cols = [ j for j in getattr(nw, i).columns if j not in getattr(network, g).index ] for k in cols: del getattr(nw, i)[k] return
[docs] def set_line_costs(self, cost110=230, cost220=290, cost380=85, costDC=375): """Set capital costs for extendable lines in respect to PyPSA [€/MVA] Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA args: dict containing settings from appl.py cost110 : capital costs per km for 110kV lines and cables default: 230€/MVA/km, source: costs for extra circuit in dena Verteilnetzstudie, p. 146) cost220 : capital costs per km for 220kV lines and cables default: 280€/MVA/km, source: costs for extra circuit in NEP 2025, capactity from most used 220 kV lines in model cost380 : capital costs per km for 380kV lines and cables default: 85€/MVA/km, source: costs for extra circuit in NEP 2025, capactity from most used 380 kV lines in NEP costDC : capital costs per km for DC-lines default: 375€/MVA/km, source: costs for DC transmission line in NEP 2035 """ network = self.network network.lines.loc[(network.lines.v_nom == 110), "capital_cost"] = ( cost110 * network.lines.length ) network.lines.loc[(network.lines.v_nom == 220), "capital_cost"] = ( cost220 * network.lines.length ) network.lines.loc[(network.lines.v_nom == 380), "capital_cost"] = ( cost380 * network.lines.length ) network.links.loc[ (network.links.p_nom_extendable) & (network.links.index.isin(self.dc_lines().index)), "capital_cost", ] = ( costDC * network.links.length ) return network
[docs] def set_trafo_costs( self, cost110_220=7500, cost110_380=17333, cost220_380=14166 ): """Set capital costs for extendable transformers in respect to PyPSA [€/MVA] Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA cost110_220 : capital costs for 110/220kV transformer default: 7500€/MVA, source: costs for extra trafo in dena Verteilnetzstudie, p. 146; S of trafo used in osmTGmod cost110_380 : capital costs for 110/380kV transformer default: 17333€/MVA, source: NEP 2025 cost220_380 : capital costs for 220/380kV transformer default: 14166€/MVA, source: NEP 2025 """ network = self.network network.transformers["v_nom0"] = network.transformers.bus0.map( network.buses.v_nom ) network.transformers["v_nom1"] = network.transformers.bus1.map( network.buses.v_nom ) network.transformers.loc[ (network.transformers.v_nom0 == 110) & (network.transformers.v_nom1 == 220), "capital_cost", ] = cost110_220 network.transformers.loc[ (network.transformers.v_nom0 == 110) & (network.transformers.v_nom1 == 380), "capital_cost", ] = cost110_380 network.transformers.loc[ (network.transformers.v_nom0 == 220) & (network.transformers.v_nom1 == 380), "capital_cost", ] = cost220_380 return network
[docs] def convert_capital_costs(self): """Convert capital_costs to fit to considered timesteps Parameters ---------- etrago : :class:`etrago.Etrago Transmission grid object """ network = self.network n_snapshots = self.args["end_snapshot"] - self.args["start_snapshot"] + 1 # Costs are already annuized yearly in the datamodel # adjust to number of considered snapshots network.lines.loc[network.lines.s_nom_extendable, "capital_cost"] *= ( n_snapshots / 8760 ) network.links.loc[network.links.p_nom_extendable, "capital_cost"] *= ( n_snapshots / 8760 ) network.transformers.loc[ network.transformers.s_nom_extendable, "capital_cost" ] *= (n_snapshots / 8760) network.storage_units.loc[ network.storage_units.p_nom_extendable, "capital_cost" ] *= (n_snapshots / 8760) network.stores.loc[network.stores.e_nom_extendable, "capital_cost"] *= ( n_snapshots / 8760 )
[docs] def find_snapshots(network, carrier, maximum=True, minimum=True, n=3): """ Function that returns snapshots with maximum and/or minimum feed-in of selected carrier. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA carrier: str Selected carrier of generators maximum: bool Choose if timestep of maximal feed-in is returned. minimum: bool Choose if timestep of minimal feed-in is returned. n: int Number of maximal/minimal snapshots Returns ------- calc_snapshots : 'pandas.core.indexes.datetimes.DatetimeIndex' List containing snapshots """ if carrier == "residual load": power_plants = network.generators[ network.generators.carrier.isin(["solar", "wind", "wind_onshore"]) ] power_plants_t = ( network.generators.p_nom[power_plants.index] * network.generators_t.p_max_pu[power_plants.index] ) load = network.loads_t.p_set.sum(axis=1) all_renew = power_plants_t.sum(axis=1) all_carrier = load - all_renew if carrier in ( "solar", "wind", "wind_onshore", "wind_offshore", "run_of_river", ): power_plants = network.generators[ network.generators.carrier == carrier ] power_plants_t = ( network.generators.p_nom[power_plants.index] * network.generators_t.p_max_pu[power_plants.index] ) all_carrier = power_plants_t.sum(axis=1) if maximum and not minimum: times = all_carrier.sort_values().head(n=n) if minimum and not maximum: times = all_carrier.sort_values().tail(n=n) if maximum and minimum: times = all_carrier.sort_values().head(n=n) times = pd.concat([times, all_carrier.sort_values().tail(n=n)]) calc_snapshots = all_carrier.index[all_carrier.index.isin(times.index)] return calc_snapshots
[docs] def ramp_limits(network): """Add ramping constraints to thermal power plants. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA Returns ------- """ carrier = [ "coal", "biomass", "gas", "oil", "waste", "lignite", "uranium", "geothermal", ] data = { "start_up_cost": [77, 57, 42, 57, 57, 77, 50, 57], # €/MW "start_up_fuel": [4.3, 2.8, 1.45, 2.8, 2.8, 4.3, 16.7, 2.8], # MWh/MW "min_up_time": [5, 2, 3, 2, 2, 5, 12, 2], "min_down_time": [7, 2, 2, 2, 2, 7, 17, 2], # =================================================================== # 'ramp_limit_start_up':[0.4, 0.4, 0.4, 0.4, 0.4, 0.6, 0.5, 0.4], # 'ramp_limit_shut_down':[0.4, 0.4, 0.4, 0.4, 0.4, 0.6, 0.5, 0.4] # =================================================================== "p_min_pu": [0.33, 0.38, 0.4, 0.38, 0.38, 0.5, 0.45, 0.38], } df = pd.DataFrame(data, index=carrier) fuel_costs = network.generators.marginal_cost.groupby( network.generators.carrier ).mean()[carrier] df["start_up_fuel"] = df["start_up_fuel"] * fuel_costs df["start_up_cost"] = df["start_up_cost"] + df["start_up_fuel"] df.drop("start_up_fuel", axis=1, inplace=True) for tech in df.index: for limit in df.columns: network.generators.loc[ network.generators.carrier == tech, limit ] = df.loc[tech, limit] network.generators.start_up_cost = ( network.generators.start_up_cost * network.generators.p_nom ) network.generators.committable = True
[docs] def get_args_setting(self, jsonpath="scenario_setting.json"): """ Get and open json file with scenaio settings of eTraGo ``args``. The settings includes all eTraGo specific settings of arguments and parameters for a reproducible calculation. Parameters ---------- json_file : str Default: ``scenario_setting.json`` Name of scenario setting json file Returns ------- args : dict Dictionary of json file """ if jsonpath is not None: if os.path.exists(jsonpath): with open(jsonpath) as f: if "args" in locals(): self.args = merge_dicts(self.args, json.load(f)) else: self.args = json.load(f) else: self.args = None
[docs] def merge_dicts(dict1, dict2): """ Return a new dictionary by merging two dictionaries recursively. Parameters ---------- dict1 : dict dictionary 1. dict2 : dict dictionary 2. Returns ------- result : dict Union of dict1 and dict2 """ result = deepcopy(dict1) for key, value in dict2.items(): if isinstance(value, Mapping): result[key] = merge_dicts(result.get(key, {}), value) else: result[key] = deepcopy(dict2[key]) return result
[docs] def get_clustering_data(self, path): """ Import the final busmap and the initial buses, lines and links Parameters ---------- path : str Name of folder from which to import CSVs of network data. Returns ------- None """ try: path_clus = os.path.join(path, "clustering") if os.path.exists(path_clus): ch4_h2_mapping_path = os.path.join( path_clus, "ch4_h2_mapping.json" ) if os.path.exists(ch4_h2_mapping_path): with open(ch4_h2_mapping_path) as f: self.ch4_h2_mapping = pd.read_json(f, typ="series").astype( str ) self.ch4_h2_mapping.index.name = "CH4_bus" self.ch4_h2_mapping.index = ( self.ch4_h2_mapping.index.astype(str) ) else: logger.info("""There is no CH4 to H2 bus mapping data available in the loaded object.""") busmap_path = os.path.join(path_clus, "busmap.json") if os.path.exists(busmap_path): with open(busmap_path) as f: self.busmap["busmap"] = json.load(f) self.busmap["orig_network"] = pypsa.Network( path_clus, name="orig" ) else: logger.info( "There is no busmap data available in the loaded object." ) except ValueError: logger.info( "There is no clustering data available in the loaded object." )
[docs] def set_random_noise(self, sigma=0.01): """ Sets random noise to marginal cost of each generator. Parameters ---------- etrago : :class:`etrago.Etrago Transmission grid object seed: int seed number, needed to reproduce results sigma: float Default: 0.01 standard deviation, small values reduce impact on dispatch but might lead to numerical instability """ if self.args["generator_noise"]: network = self.network seed = self.args["generator_noise"] s = np.random.RandomState(seed) network.generators.marginal_cost[ network.generators.bus.isin( network.buses.index[network.buses.country == "DE"] ) ] += abs( s.normal( 0, sigma, len( network.generators.marginal_cost[ network.generators.bus.isin( network.buses.index[network.buses.country == "DE"] ) ] ), ) ) network.generators.marginal_cost[ network.generators.bus.isin( network.buses.index[network.buses.country != "DE"] ) ] += abs( s.normal( 0, sigma, len( network.generators.marginal_cost[ network.generators.bus.isin( network.buses.index[network.buses.country == "DE"] ) ] ), ) ).max()
[docs] def set_line_country_tags(network): """ Set country tag for AC- and DC-lines. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA """ transborder_lines_0 = network.lines[ network.lines["bus0"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index transborder_lines_1 = network.lines[ network.lines["bus1"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index # set country tag for lines network.lines.loc[transborder_lines_0, "country"] = network.buses.loc[ network.lines.loc[transborder_lines_0, "bus0"].values, "country" ].values network.lines.loc[transborder_lines_1, "country"] = network.buses.loc[ network.lines.loc[transborder_lines_1, "bus1"].values, "country" ].values network.lines["country"].fillna("DE", inplace=True) doubles = list(set(transborder_lines_0.intersection(transborder_lines_1))) for line in doubles: c_bus0 = network.buses.loc[network.lines.loc[line, "bus0"], "country"] c_bus1 = network.buses.loc[network.lines.loc[line, "bus1"], "country"] network.lines.loc[line, "country"] = "{}{}".format(c_bus0, c_bus1) transborder_links_0 = network.links[ network.links["bus0"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index transborder_links_1 = network.links[ network.links["bus1"].isin( network.buses.index[network.buses["country"] != "DE"] ) ].index # set country tag for links network.links.loc[transborder_links_0, "country"] = network.buses.loc[ network.links.loc[transborder_links_0, "bus0"].values, "country" ].values network.links.loc[transborder_links_1, "country"] = network.buses.loc[ network.links.loc[transborder_links_1, "bus1"].values, "country" ].values network.links["country"].fillna("DE", inplace=True) doubles = list(set(transborder_links_0.intersection(transborder_links_1))) for link in doubles: c_bus0 = network.buses.loc[network.links.loc[link, "bus0"], "country"] c_bus1 = network.buses.loc[network.links.loc[link, "bus1"], "country"] network.links.loc[link, "country"] = "{}{}".format(c_bus0, c_bus1)
[docs] def crossborder_capacity_tyndp2020(): """ This function downloads and extracts a scenario datafile for the TYNDP 2020 (Ten-Year Network Development Plan), reads a specific sheet from the file, filters it based on certain criteria, and then calculates the minimum cross-border capacities for a list of European countries. The minimum cross-border capacity is the minimum of the export and import capacities between two countries. Returns ------- dict Dictionary with cossborder capacities. """ from urllib.request import urlretrieve import zipfile path = "TYNDP-2020-Scenario-Datafile.xlsx" urlretrieve( "https://www.entsos-tyndp2020-scenarios.eu/wp-content/uploads" "/2020/06/TYNDP-2020-Scenario-Datafile.xlsx.zip", path, ) file = zipfile.ZipFile(path) df = pd.read_excel( file.open("TYNDP-2020-Scenario-Datafile.xlsx").read(), sheet_name="Line", ) df = df[ (df.Scenario == "Distributed Energy") & (df.Case == "Reference Grid") & (df.Year == 2040) & (df["Climate Year"] == 1984) & ( (df.Parameter == "Import Capacity") | (df.Parameter == "Export Capacity") ) ] df["country0"] = df["Node/Line"].str[:2] df["country1"] = df["Node/Line"].str[5:7] c_export = ( df[df.Parameter == "Export Capacity"] .groupby(["country0", "country1"]) .Value.sum() ) c_import = ( df[df.Parameter == "Import Capacity"] .groupby(["country0", "country1"]) .Value.sum() ) capacities = pd.DataFrame( index=c_export.index, data={"export": c_export.abs(), "import": c_import.abs()}, ).reset_index() with_de = capacities[ (capacities.country0 == "DE") & (capacities.country1 != "DE") ].set_index("country1")[["export", "import"]] with_de = pd.concat( [ with_de, capacities[ (capacities.country0 != "DE") & (capacities.country1 == "DE") ].set_index("country0")[["export", "import"]], ] ) countries = [ "DE", "DK", "NL", "CZ", "PL", "AT", "CH", "FR", "LU", "BE", "GB", "NO", "SE", ] without_de = capacities[ (capacities.country0 != "DE") & (capacities.country1 != "DE") & (capacities.country0.isin(countries)) & (capacities.country1.isin(countries)) & (capacities.country1 != capacities.country0) ] without_de["country"] = without_de.country0 + without_de.country1 without_de.set_index("country", inplace=True) without_de = without_de[["export", "import"]].fillna(0.0) return { **without_de.min(axis=1).to_dict(), **with_de.min(axis=1).to_dict(), }
[docs] def crossborder_capacity(self): """ Adjust interconnector capacties. Parameters ---------- network : :class:`pypsa.Network Overall container of PyPSA method : string Method of correction. Options are 'ntc_acer' and 'thermal_acer'. 'ntc_acer' corrects all capacities according to values published by the ACER in 2016. 'thermal_acer' corrects certain capacities where our dataset most likely overestimates the thermal capacity. """ if self.args["foreign_lines"]["capacity"] != "osmTGmod": network = self.network if self.args["foreign_lines"]["capacity"] == "ntc_acer": cap_per_country = { "AT": 4900, "CH": 2695, "CZ": 1301, "DK": 913, "FR": 3593, "LU": 2912, "NL": 2811, "PL": 280, "SE": 217, "CZAT": 574, "ATCZ": 574, "CZPL": 312, "PLCZ": 312, "ATCH": 979, "CHAT": 979, "CHFR": 2087, "FRCH": 2087, "FRLU": 364, "LUFR": 364, "SEDK": 1928, "DKSE": 1928, } elif self.args["foreign_lines"]["capacity"] == "thermal_acer": cap_per_country = { "CH": 12000, "DK": 4000, "SEDK": 3500, "DKSE": 3500, } elif self.args["foreign_lines"]["capacity"] == "tyndp2020": cap_per_country = crossborder_capacity_tyndp2020() else: logger.info( "args['foreign_lines']['capacity'] has to be " "in ['osmTGmod', 'ntc_acer', 'thermal_acer', 'tyndp2020']" ) if not network.lines[network.lines.country != "DE"].empty: weighting = ( network.lines.loc[network.lines.country != "DE", "s_nom"] .groupby(network.lines.country) .transform(lambda x: x / x.sum()) ) dc_lines = self.dc_lines() weighting_links = ( dc_lines.loc[dc_lines.country != "DE", "p_nom"] .groupby(dc_lines.country) .transform(lambda x: x / x.sum()) .fillna(0.0) ) for country in cap_per_country: index_HV = network.lines[ (network.lines.country == country) & (network.lines.v_nom == 110) ].index index_eHV = network.lines[ (network.lines.country == country) & (network.lines.v_nom > 110) ].index index_links = dc_lines[dc_lines.country == country].index if not network.lines[network.lines.country == country].empty: network.lines.loc[index_HV, "s_nom"] = ( weighting[index_HV] * cap_per_country[country] ) network.lines.loc[index_eHV, "s_nom"] = ( weighting[index_eHV] * cap_per_country[country] ) if not dc_lines[dc_lines.country == country].empty: network.links.loc[index_links, "p_nom"] = ( weighting_links[index_links] * cap_per_country[country] ) if country == "SE": network.links.loc[ dc_lines[dc_lines.country == country].index, "p_nom" ] = cap_per_country[country] if not network.lines[ network.lines.country == (country + country) ].empty: i_HV = network.lines[ (network.lines.v_nom == 110) & (network.lines.country == country + country) ].index i_eHV = network.lines[ (network.lines.v_nom == 110) & (network.lines.country == country + country) ].index network.lines.loc[i_HV, "s_nom"] = ( weighting[i_HV] * cap_per_country[country] ) network.lines.loc[i_eHV, "s_nom"] = ( weighting[i_eHV] * cap_per_country[country] ) if not dc_lines[dc_lines.country == (country + country)].empty: i_links = dc_lines[ dc_lines.country == (country + country) ].index network.links.loc[i_links, "p_nom"] = ( weighting_links[i_links] * cap_per_country[country] )
[docs] def set_branch_capacity(etrago): """ Set branch capacity factor of lines and transformers, different factors for HV (110kV) and eHV (220kV, 380kV). Parameters ---------- etrago : :class:`etrago.Etrago Transmission grid object """ network = etrago.network args = etrago.args network.transformers["v_nom0"] = network.transformers.bus0.map( network.buses.v_nom ) # If any line has a time dependend s_max_pu, use the time dependend # factor for all lines, to avoid problems in the clustering if not network.lines_t.s_max_pu.empty: # Set time dependend s_max_pu for # lines without dynamic line rating to 1.0 network.lines_t.s_max_pu[ network.lines[ ~network.lines.index.isin(network.lines_t.s_max_pu.columns) ].index ] = 1.0 # Multiply time dependend s_max_pu with static branch capacitiy fator network.lines_t.s_max_pu[ network.lines[network.lines.v_nom == 110].index ] *= args["branch_capacity_factor"]["HV"] network.lines_t.s_max_pu[ network.lines[network.lines.v_nom > 110].index ] *= args["branch_capacity_factor"]["eHV"] else: network.lines.s_max_pu[network.lines.v_nom == 110] = args[ "branch_capacity_factor" ]["HV"] network.lines.s_max_pu[network.lines.v_nom > 110] = args[ "branch_capacity_factor" ]["eHV"] network.transformers.s_max_pu[network.transformers.v_nom0 == 110] = args[ "branch_capacity_factor" ]["HV"] network.transformers.s_max_pu[network.transformers.v_nom0 > 110] = args[ "branch_capacity_factor" ]["eHV"]
[docs] def check_args(etrago): """ Function that checks the consistency of etragos input parameters. Parameters ---------- etrago : :class:`etrago.Etrago Overall container of eTraGo Returns ------- None. """ if "oep.iks.cs.ovgu.de" in str(etrago.engine.url): saio.register_schema("tables", etrago.engine) from saio.tables import edut_00_056 as egon_etrago_bus else: saio.register_schema("grid", etrago.engine) from saio.grid import egon_etrago_bus query = etrago.session.query(egon_etrago_bus).filter( egon_etrago_bus.scn_name == etrago.args["scn_name"] ) df_scenario = saio.as_pandas(query, crs=4326, geometry=None) assert len(df_scenario) > 0, ( f"Selected scenario {etrago.args['scn_name']} " "not available in selected database." ) if etrago.args["scn_extension"]: assert ( type(etrago.args["scn_extension"]) is not str ), "scn_extension should be defined as a list but is a string." if "toep.iks.cs.ovgu.de" in str(etrago.engine.url): print( "Extension scenarios are not available in selected database." ) else: try: saio.register_schema("grid", etrago.engine) from saio.grid import egon_etrago_extension_bus except sqlalchemy.exc.NoSuchTableError: print( "Extension scenarios are not available in selected" " database." ) raise for scenario_extension in etrago.args["scn_extension"]: query = etrago.session.query(egon_etrago_bus).filter( egon_etrago_extension_bus.scn_name == scenario_extension ) df_scenario = saio.as_pandas(query, crs=4326, geometry=None) assert len(df_scenario) > 0, ( f"Selected extension scenario {scenario_extension} not " "available in selected database." ) assert ( etrago.args["start_snapshot"] <= etrago.args["end_snapshot"] ), "start_snapshot after end_snapshot" if etrago.args["gridversion"] is not None: from saio.grid import egon_etrago_bus assert ( etrago.args["gridversion"] in pd.read_sql( etrago.session.query(egon_etrago_bus).statement, etrago.session.bind, ).version.unique() ), "gridversion does not exist" if etrago.args["snapshot_clustering"]["active"]: # Assert that skip_snapshots and snapshot_clustering are not combined # more information: https://github.com/openego/eTraGo/issues/691 assert etrago.args["skip_snapshots"] is False, ( "eTraGo does not support combining snapshot_clustering and" " skip_snapshots. Please update your settings and choose either" " snapshot_clustering or skip_snapshots." ) # typical periods if etrago.args["snapshot_clustering"]["method"] == "typical_periods": # typical days if etrago.args["snapshot_clustering"]["how"] == "daily": assert ( etrago.args["end_snapshot"] / etrago.args["start_snapshot"] % 24 == 0 ), ( "Please select snapshots covering whole days when" " choosing clustering to typical days." ) if ( etrago.args["snapshot_clustering"]["method"] == "typical_periods" ): assert etrago.args["end_snapshot"] - etrago.args[ "start_snapshot" ] + 1 >= ( 24 * etrago.args["snapshot_clustering"]["n_clusters"] ), ( "The umber of selected snapshots is is too small" " for the chosen number of typical days." ) # typical weeks if etrago.args["snapshot_clustering"]["how"] == "weekly": assert ( etrago.args["end_snapshot"] / etrago.args["start_snapshot"] % 168 == 0 ), ( "Please select snapshots covering whole weeks when" " choosing clustering to typical weeks." ) if ( etrago.args["snapshot_clustering"]["method"] == "typical_periods" ): assert etrago.args["end_snapshot"] - etrago.args[ "start_snapshot" ] + 1 >= ( 168 * etrago.args["snapshot_clustering"]["n_clusters"] ), ( "The number of selected snapshots is too small" " for the chosen number of typical weeks." ) # typical months if etrago.args["snapshot_clustering"]["how"] == "monthly": assert ( etrago.args["end_snapshot"] / etrago.args["start_snapshot"] % 720 == 0 ), ( "Please select snapshots covering whole months when" " choosing clustering to typical months." ) if ( etrago.args["snapshot_clustering"]["method"] == "typical_periods" ): assert etrago.args["end_snapshot"] - etrago.args[ "start_snapshot" ] + 1 >= ( 720 * etrago.args["snapshot_clustering"]["n_clusters"] ), ( "The number of selected snapshots is too small" " for the chosen number of typical months." ) # segmentation elif etrago.args["snapshot_clustering"]["method"] == "segmentation": assert etrago.args["end_snapshot"] - etrago.args[ "start_snapshot" ] + 1 >= ( etrago.args["snapshot_clustering"]["n_segments"] ), "Number of segments is higher than number of snapshots" if etrago.args["method"]["formulation"] != "pyomo": logger.warning( "Snapshot clustering constraints are" " not yet correctly implemented without pyomo." " Setting `args['method']['formulation']` to `pyomo`." ) etrago.args["method"]["formulation"] = "pyomo"
[docs] def drop_sectors(self, drop_carriers): """ Manually drop secors from network. Makes sure the network can be calculated without the dropped sectors. Parameters ---------- drop_carriers : array List of sectors that will be dropped. e.g. ['dsm', 'CH4', 'H2_saltcavern', 'H2_grid', 'central_heat', 'rural_heat', 'central_heat_store', 'rural_heat_store', 'Li ion'] means everything but AC Returns ------- None. """ if self.args["scn_name"] == "eGon2035": if "CH4" in drop_carriers: # create gas generators from links # in order to not lose them when dropping non-electric carriers gas_to_add = ["central_gas_CHP", "industrial_gas_CHP", "OCGT"] gen = self.network.generators for i in gas_to_add: gen_empty = gen.drop(gen.index) gen_empty.bus = self.network.links[ self.network.links.carrier == i ].bus1 gen_empty.p_nom = ( self.network.links[self.network.links.carrier == i].p_nom * self.network.links[ self.network.links.carrier == i ].efficiency ) gen_empty.marginal_cost = ( self.network.links[ self.network.links.carrier == i ].marginal_cost + 35.851 ) # add fuel costs (source: NEP) gen_empty.efficiency = 1 gen_empty.carrier = i gen_empty.scn_name = "eGon2035" gen_empty.p_nom_extendable = False gen_empty.sign = 1 gen_empty.p_min_pu = 0 gen_empty.p_max_pu = 1 gen_empty.control = "PV" gen_empty.fillna(0, inplace=True) self.network.import_components_from_dataframe( gen_empty, "Generator" ) self.network.mremove( "Bus", self.network.buses[ self.network.buses.carrier.isin(drop_carriers) ].index, ) for one_port in self.network.iterate_components( ["Load", "Generator", "Store", "StorageUnit"] ): self.network.mremove( one_port.name, one_port.df[~one_port.df.bus.isin(self.network.buses.index)].index, ) for two_port in self.network.iterate_components( ["Line", "Link", "Transformer"] ): self.network.mremove( two_port.name, two_port.df[ ~two_port.df.bus0.isin(self.network.buses.index) ].index, ) self.network.mremove( two_port.name, two_port.df[ ~two_port.df.bus1.isin(self.network.buses.index) ].index, ) logger.info("The following sectors are dropped: " + str(drop_carriers))
[docs] def update_busmap(self, new_busmap): """ Update busmap after any clustering process Parameters ---------- new_busmap : dictionary busmap used to clusted the network. Returns ------- None. """ if "busmap" not in self.busmap.keys(): self.busmap["busmap"] = new_busmap self.busmap["orig_network"] = pypsa.Network() pypsa.io.import_components_from_dataframe( self.busmap["orig_network"], self.network.buses, "Bus" ) pypsa.io.import_components_from_dataframe( self.busmap["orig_network"], self.network.lines, "Line" ) pypsa.io.import_components_from_dataframe( self.busmap["orig_network"], self.network.links, "Link" ) else: self.busmap["busmap"] = ( pd.Series(self.busmap["busmap"]).map(new_busmap).to_dict() )
[docs] def adjust_CH4_gen_carriers(self): """Precise the carrier for the generators with CH4 carrier For the eGon2035 scenario, the generators with carrier CH4 represent the prodution od biogas and methan. In the data model, these two differents types are differenciated only by the marginal cost of the generator. This function introduces a carrier distion (CH4_biogas and CH4_NG) in order to avoid the clustering of these two types of generator together and facilitate the contraint applying differently to each of them. """ if "eGon2035" in self.args["scn_name"]: # Define marginal cost marginal_cost_def = {"CH4": 40.9765, "biogas": 25.6} engine = db.connection(section=self.args["db"]) try: if "oep.iks.cs.ovgu.de" in str(engine.url): saio.register_schema("data", engine) from saio.data import edut_00_137 as egon_scenario_parameters else: saio.register_schema("grid", engine) from saio.grid import egon_scenario_parameters df = saio.as_pandas( self.session.query(egon_scenario_parameters).filter( egon_scenario_parameters.name == self.args["scn_name"].split("_")[0] ) ) marginal_cost = df["gas_parameters"][0]["marginal_cost"] except sqlalchemy.exc.NoSuchTableError as e: logging.warning(f""" The database query failed for 'scenario.egon_scenario_parameters'. Fallback values are being used. Error message: {e} """) marginal_cost = marginal_cost_def self.network.generators.loc[ self.network.generators[ (self.network.generators.carrier == "CH4") & ( self.network.generators.marginal_cost == marginal_cost["CH4"] ) & ( self.network.generators.bus.astype(str).isin( self.network.buses.index[ self.network.buses.country == "DE" ] ) ) ].index, "carrier", ] = "CH4_NG" self.network.generators.loc[ self.network.generators[ (self.network.generators.carrier == "CH4") & ( self.network.generators.marginal_cost == marginal_cost["biogas"] ) & ( self.network.generators.bus.astype(str).isin( self.network.buses.index[ self.network.buses.country == "DE" ] ) ) ].index, "carrier", ] = "CH4_biogas"
[docs] def residual_load(network, sector="electricity"): """ Calculate the residual load for the specified sector. In case of the electricity sector residual load is calculated using all AC loads and all renewable generators with carriers 'wind_onshore', 'wind_offshore', 'solar', 'solar_rooftop', 'biomass', 'run_of_river', and 'reservoir'. In case of the central heat sector residual load is calculated using all central heat loads and all renewable generators with carriers 'solar_thermal_collector' and 'geo_thermal'. Parameters ----------- network : PyPSA network Network to retrieve load and generation time series from, needed to determine residual load. sector : str Sector to determine residual load for. Possible options are 'electricity' and 'central_heat'. Default: 'electricity'. Returns -------- pd.DataFrame Dataframe with residual load for each bus in the network. Columns of the dataframe contain the corresponding bus name and index of the dataframe is a datetime index with the corresponding time step. """ if sector == "electricity": carrier_gen = [ "wind_onshore", "wind_offshore", "solar", "solar_rooftop", "biomass", "run_of_river", "reservoir", ] carrier_load = ["AC"] elif sector == "central_heat": carrier_gen = ["solar_thermal_collector", "geo_thermal"] carrier_load = ["central_heat"] else: raise ValueError( f"Specified sector {sector} is not a valid option." " Valid options are 'electricity' and 'central_heat'." ) # Calculate loads per bus and timestep loads = network.loads[network.loads.carrier.isin(carrier_load)] loads_per_bus = ( network.loads_t.p_set[loads.index].groupby(loads.bus, axis=1).sum() ) # Calculate dispatch of renewable generators per bus of loads and timesteps renewable_dispatch = pd.DataFrame( index=loads_per_bus.index, columns=loads_per_bus.columns, data=0 ) renewable_generators = network.generators[ network.generators.carrier.isin(carrier_gen) ] renewable_dispatch[renewable_generators.bus.unique()] = ( network.generators_t.p[renewable_generators.index] .groupby(renewable_generators.bus, axis=1) .sum() ) return loads_per_bus - renewable_dispatch
[docs] def manual_fixes_datamodel(etrago): """Apply temporal fixes to the data model until a new egon-data run is there Parameters ---------- etrago : :class:`Etrago Overall container of Etrago Returns ------- None. """ # Set line type etrago.network.lines.type = "" # Set life time of storage_units, transformers and lines etrago.network.storage_units.lifetime = 27.5 etrago.network.transformers.lifetime = 40 etrago.network.lines.lifetime = 40 # Set cyclic state of charge of storage units and stores if etrago.args["scn_name"] in [ "status2019", "eGon2035", "eGon2035_lowflex", ]: etrago.network.storage_units.cyclic_state_of_charge = True etrago.network.stores.loc[ (etrago.network.stores.carrier != "dsm") & (etrago.network.stores.carrier != "battery_storage"), "e_cyclic", ] = True # Set build years to 0 to avoid problems in the clustering etrago.network.lines.build_year = 0 etrago.network.links.build_year = 0 # Set foreign links not-extendable etrago.network.links.loc[ etrago.network.links.carrier.isin( [ "electricity_distribution_grid", "rural_ground_heat_pump", "rural_resistive_heater", "urban_central_air_heat_pump", "urban_central_resistive_heater", "urban_decentral_resistive_heater", ] ), "p_nom_extendable", ] = False etrago.network.links.loc[ etrago.network.links.carrier.isin( [ "DC", "power_to_H2", "H2_to_CH4", "CH4_to_H2", ] ), "p_nom_extendable", ] = True # Set efficiences of CHP etrago.network.links.loc[ etrago.network.links[ etrago.network.links.carrier.str.contains("CHP") ].index, "efficiency", ] = 0.43 # Enlarge gas boilers as backup heat supply etrago.network.links.loc[ etrago.network.links[ etrago.network.links.carrier.str.contains("gas_boiler") ].index, "p_nom", ] *= 1000 # Set p_max_pu for run of river and reservoir etrago.network.generators.loc[ etrago.network.generators[ etrago.network.generators.carrier.isin( ["run_of_river", "reservoir"] ) ].index, "p_max_pu", ] = 0.65 # Set costs for CO2 from DAC for needed for methanation etrago.network.links.loc[ etrago.network.links.carrier == "H2_to_CH4", "marginal_cost" ] = 25 # Set r value if missing if not etrago.network.lines.loc[etrago.network.lines.r == 0, "r"].empty: logger.info(f""" There are {len( etrago.network.lines.loc[etrago.network.lines.r == 0, "r"] )} lines without a resistance (r) in the data model. The resistance of these lines will be automatically set to 0.0001. """) etrago.network.lines.loc[etrago.network.lines.r == 0, "r"] = 0.0001 if not etrago.network.transformers.loc[ etrago.network.transformers.r == 0, "r" ].empty: logger.info(f"""There are {len(etrago.network.transformers.loc[ etrago.network.transformers.r == 0, "r"] )} trafos without a resistance (r) in the data model. The resistance of these trafos will be automatically set to 0.0001. """) etrago.network.transformers.loc[ etrago.network.transformers.r == 0, "r" ] = 0.0001 # Set vnom of transformers etrago.network.transformers["v_nom"] = etrago.network.buses.loc[ etrago.network.transformers.bus0.values, "v_nom" ].values # Drop methanation option in lowflex sceanrio if etrago.args["scn_name"] == "eGon2035_lowflex": etrago.network.links.drop( etrago.network.links[ etrago.network.links.carrier == "H2_to_CH4" ].index, inplace=True, ) # Temporary drop DLR as it is currently not working with sclopf if (etrago.args["method"]["type"] == "sclopf") & ( not etrago.network.lines_t.s_max_pu.empty ): print(""" Dynamic line rating is not implemented for the sclopf yet. Setting s_max_pu timeseries to 1 """) etrago.network.lines_t.s_max_pu = pd.DataFrame( index=etrago.network.snapshots, ) if etrago.args["scn_name"] == "eGon100RE": # Drop H2 load for transport etrago.network.mremove( "Load", etrago.network.loads[ etrago.network.loads.carrier == "H2_hgv_load" ].index, ) # Fix starting capacity of foreign DC-lines etrago.network.links.loc[ (etrago.network.links.carrier == "DC"), "p_nom_min" ] = etrago.network.links.loc[ (etrago.network.links.carrier == "DC"), "p_nom" ] # Model foreign batteries as StorageUnits for carrier in ["battery", "home_battery"]: foreign_stores = etrago.network.stores[ (etrago.network.stores.carrier == carrier) & ( etrago.network.stores.bus.isin( etrago.network.buses[ etrago.network.buses.country != "DE" ].index ) ) ].copy() foreign_stores_grouped = ( foreign_stores.groupby("bus").e_nom.sum().reset_index() ) for i, row in foreign_stores_grouped.iterrows(): charger = etrago.network.links[ etrago.network.links.bus1 == row.bus ] discharger = etrago.network.links[ etrago.network.links.bus0 == row.bus ] max_hours = 6 etrago.network.add( "StorageUnit", name=str( etrago.network.storage_units.index.astype(int).max() + 1 ), bus=charger.bus0.values[0], p_nom=row.e_nom / max_hours, max_hours=max_hours, carrier=carrier, p_nom_min=0, p_nom_extendable=True, cyclic_state_of_charge=True, ) etrago.network.remove("Bus", row.bus) etrago.network.remove("Link", charger.index[0]) etrago.network.remove("Link", discharger.index[0]) etrago.network.mremove("Store", foreign_stores.index) # Set foreign store components extendable etrago.network.stores.carrier.unique() ext_foreign_stores = etrago.network.stores[ ( etrago.network.stores.carrier.isin( [ "urban_central_water_tanks", "rural_water_tanks", "urban_decentral_water_tanks", "H2_overground", "H2_underground", "H2_Store", "central_heat_store", "rural_heat_store", ] ) ) & ( etrago.network.stores.bus.isin( etrago.network.buses[ etrago.network.buses.country != "DE" ].index ) ) ].copy() etrago.network.stores.loc[ ext_foreign_stores.index, "e_nom_extendable" ] = True # Temporary drop low_voltage buses in foreign countries and combine # them with AC low_voltage_buses = etrago.network.buses.loc[ etrago.network.buses.carrier == "low_voltage" ].copy() for i, row in low_voltage_buses.iterrows(): link = etrago.network.links[ (etrago.network.links.bus1 == i) & ( etrago.network.links.carrier == "electricity_distribution_grid" ) ].copy() new_bus = link.bus0.unique()[0] for comp in etrago.network.iterate_components(): if "bus" in comp.df.columns: comp.df.loc[comp.df.bus == i, "bus"] = new_bus elif "bus0" in comp.df.columns: comp.df.loc[comp.df.bus0 == i, "bus0"] = new_bus comp.df.loc[comp.df.bus1 == i, "bus1"] = new_bus etrago.network.remove("Bus", i) etrago.network.mremove("Link", link.index) # Drop CH4_for_industry bus and link and add the load to the # corresponding CH4 bus ch4_ind_buses = etrago.network.buses.loc[ etrago.network.buses.carrier == "CH4_for_industry" ].copy() for i, row in ch4_ind_buses.iterrows(): link = etrago.network.links[ (etrago.network.links.bus1 == i) & (etrago.network.links.carrier == "CH4_for_industry") ].copy() new_bus = link.bus0.unique()[0] for comp in etrago.network.iterate_components(): if "bus" in comp.df.columns: comp.df.loc[comp.df.bus == i, "bus"] = new_bus elif "bus0" in comp.df.columns: comp.df.loc[comp.df.bus0 == i, "bus0"] = new_bus comp.df.loc[comp.df.bus1 == i, "bus1"] = new_bus etrago.network.remove("Bus", i) etrago.network.mremove("Link", link.index) # Drop very small generators etrago.network.mremove( "Generator", etrago.network.generators[ (etrago.network.generators.p_nom_extendable is False) & (etrago.network.generators.p_nom < 10) ].index, ) # Set ramps to nan etrago.network.links.ramp_limit_up = np.nan etrago.network.links.ramp_limit_down = np.nan etrago.network.generators.ramp_limit_up = np.nan etrago.network.generators.ramp_limit_down = np.nan # Fix cyclic conditions of stores etrago.network.stores.e_cyclic_per_period = False etrago.network.stores.loc[ etrago.network.stores.carrier == "battery_storage", "e_cyclic" ] = False # Avoid nurical problems with large values in gas stores # etrago.network.stores.e_nom_max = np.inf # etrago.network.stores.loc[ # etrago.network.stores.carrier=="CH4", "capital_cost"] = 0.0 # etrago.network.stores.loc[ # etrago.network.stores.carrier=="CH4", "e_nom_min"] = 0.0 # etrago.network.stores.loc[ # etrago.network.stores.carrier=="CH4", "e_nom_extendable"] = True # Add static p-set to other AC load in foreign countries static_ac_loads = etrago.network.loads[ (etrago.network.loads.carrier == "AC") & (etrago.network.loads.p_set > 0) ] if not static_ac_loads.empty: for i, row in static_ac_loads.iterrows(): etrago.network.loads_t.p_set[ etrago.network.loads[ (etrago.network.loads.bus == row.bus) & (etrago.network.loads.p_set == 0) ].index[0] ] += row.p_set etrago.network.remove("Load", i) # Standardize naming of H2 infrastructure if "H2_pipeline" in etrago.network.links.carrier.unique(): etrago.network.links.loc[ etrago.network.links.carrier == "H2_pipeline", "carrier" ] = "H2_grid" etrago.network.links.loc[ etrago.network.links.carrier == "H2_retrofit", "carrier" ] = "H2_grid" etrago.network.buses.loc[ (etrago.network.buses.carrier == "H2") & (etrago.network.buses.country != "DE"), "carrier", ] = "H2_grid" # Standardize "type" attribute in O2 buses if "O2" in etrago.network.buses.carrier.unique(): o2_buses = etrago.network.buses[etrago.network.buses.carrier == "O2"] etrago.network.buses.loc[o2_buses.index, "type"] = 1 etrago.network.links.loc[ etrago.network.links.carrier.isin( ["DC", "CH4", "H2_grid", "H2_saltcavern"] ), "p_min_pu", ] = -1.0 etrago.network.links.loc[ etrago.network.links.carrier.isin(["H2_to_CH4"]), "p_min_pu" ] = 0.0
[docs] def export_to_shapefile(pypsa_network, shape_files_path=None, srid=4326): """ Translates all component DataFrames within the pypsa network to GeoDataFrames and saves them to shape files. Shape files can be used to plot the network in QGIS. Currently, only the AC network is exported. Parameters ---------- pypsa_network : PyPSA network PyPSA network as in etrago.network. shape_files_path : str or None If provided, geodataframes are saved as shapefiles to given directory. Default: None. srid : int SRID bus coordinates are given in. Per default WGS84 is assumed. Default: 4326. Returns ------- dict Dictionary with geodataframes. """ os.makedirs(shape_files_path, exist_ok=True) # convert buses_df buses_df = pypsa_network.buses[pypsa_network.buses.carrier == "AC"] buses_df = buses_df.assign( geometry=gpd.points_from_xy(buses_df.x, buses_df.y, crs=f"EPSG:{srid}") ).drop(columns=["x", "y", "geom"]) buses_gdf = gpd.GeoDataFrame(buses_df, crs=f"EPSG:{srid}") # convert component DataFrames components = [ "generators", "loads", "storage_units", "stores", "transformers", ] components_dict = {"buses_gdf": buses_gdf} for component in components: left_on = "bus1" if component == "transformers" else "bus" attr = getattr(pypsa_network, component) components_dict[f"{component}_gdf"] = gpd.GeoDataFrame( attr.merge( buses_gdf[["geometry", "v_nom"]], left_on=left_on, right_index=True, ), crs=f"EPSG:{srid}", ) if components_dict[f"{component}_gdf"].empty: components_dict[f"{component}_gdf"].index = components_dict[ f"{component}_gdf" ].index.astype(object) # convert lines lines_df = pypsa_network.lines lines_df = lines_df.drop(columns=["geom"]) lines_gdf = lines_df.merge( buses_gdf[["geometry", "v_nom"]].rename( columns={"geometry": "geom_0"} ), left_on="bus0", right_index=True, ) lines_gdf = lines_gdf.merge( buses_gdf[["geometry"]].rename(columns={"geometry": "geom_1"}), left_on="bus1", right_index=True, ) lines_gdf["geometry"] = lines_gdf.apply( lambda _: LineString([_["geom_0"], _["geom_1"]]), axis=1 ) lines_gdf = gpd.GeoDataFrame(lines_gdf, crs=f"EPSG:{srid}") components_dict["lines_gdf"] = lines_gdf save_cols = { "buses_gdf": ["scn_name", "v_nom", "carrier", "country", "geometry"], "generators_gdf": [ "scn_name", "bus", "carrier", "p_nom", "p_nom_extendable", "v_nom", "geometry", ], "loads_gdf": ["scn_name", "bus", "carrier", "v_nom", "geometry"], "storage_units_gdf": [ "scn_name", "bus", "carrier", "p_nom", "p_nom_extendable", "v_nom", "geometry", ], "stores_gdf": [ "scn_name", "bus", "carrier", "e_nom", "e_nom_extendable", "v_nom", "geometry", ], "transformers_gdf": [ "scn_name", "bus0", "bus1", "x", "r", "s_nom", "s_nom_extendable", "geometry", ], "lines_gdf": [ "bus0", "bus1", "x", "r", "s_nom", "s_nom_extendable", "length", "num_parallel", "geometry", "v_nom", ], } if shape_files_path: for k, v in components_dict.items(): shp_filename = os.path.join(shape_files_path, f"{k}.shp") v.loc[:, save_cols[k]].to_file(shp_filename) return components_dict
[docs] def adjust_PtH2_model(self, apply_on="pre_market_model"): """ Adjust the modelling of electrolyzer with waste-heat and O2-utilisation. The method creates a multiple-link-model out of the single links from the dataset (power_to_H2-, PtH2_waste_heat-, PtH2_O2-links) for each location where coupling-product usage is possible. The resulting model consists of a multiple link, additional buses and stores for waste_heat and O2-utilisation, and the connection link to the final heat- and O2-Bus. Parameters ---------- etrago : :class:`Etrago Overall container of Etrago apply_on: str Returns ------- network : PyPSA network """ if apply_on == "grid_model": network = self.network elif apply_on == "market_model": network = self.market_model elif apply_on == "pre_market_model": network = self.pre_market_model PtH2_links = network.links[network.links.carrier == "power_to_H2"].index PtH2_AC_buses = network.buses.loc[ network.links.loc[PtH2_links, "bus0"] ].index.unique() from itertools import count store_ids = network.stores.index numeric_parts = [int(idx.split()[0]) for idx in store_ids] next_bus_id = max(network.buses.index.astype(int)) + 1 next_store_id = max(numeric_parts) + 1 bus_id_counter = count(start=next_bus_id) store_id_counter = count(start=next_store_id) for bus in PtH2_AC_buses: power_to_h2_links = network.links[ (network.links.carrier == "power_to_H2") & (network.links.bus0 == bus) ].index power_to_heat_links = network.links[ (network.links.carrier == "PtH2_waste_heat") & (network.links.bus0 == bus) ].index power_to_o2_links = network.links[ (network.links.carrier == "PtH2_O2") & (network.links.bus0 == bus) ].index link_data = [] if len(power_to_heat_links) > 0: link_data.append( { "links": power_to_heat_links, "efficiency": 0.2, "carrier": "waste_heat", } ) if len(power_to_o2_links) > 0: link_data.append( { "links": power_to_o2_links, "efficiency": 0.015, "carrier": "O2", } ) for idx, data in enumerate(link_data, start=2): links = data["links"] efficiency = data["efficiency"] carrier = data["carrier"] new_bus_id = next(bus_id_counter) new_store_id = next(store_id_counter) store_name = f"{new_store_id} PtH2_{carrier}" # Add new heat/o2-bus and -store network.add( "Bus", name=new_bus_id, carrier=f"PtH2_extra_bus_{carrier}" ) network.add( "Store", name=store_name, bus=new_bus_id, carrier=f"PtH2_{carrier}", e_nom_extendable=True, e_nom=100000, marginal_cost=0, capital_cost=0, e_cyclic=False, standing_loss=1, ) network.stores.loc[store_name, "scn_name"] = self.args["scn_name"] network.buses.loc[str(new_bus_id), "scn_name"] = self.args[ "scn_name" ] network.buses.loc[str(new_bus_id), "country"] = "DE" # set coordinates for new bus for the clustering method bus0_index = network.links.loc[power_to_h2_links, "bus0"] network.buses.loc[str(new_bus_id), "x"] = network.buses.loc[ bus0_index, "x" ].values[0] network.buses.loc[str(new_bus_id), "y"] = network.buses.loc[ bus0_index, "y" ].values[0] # Connect multiple link with new heat/o2-bus if self.args["method"]["formulation"] == "linopy": network.links.loc[power_to_h2_links, f"bus{idx}"] = str( new_bus_id ) network.links.loc[power_to_h2_links, f"efficiency{idx}"] = ( efficiency ) else: network.madd( "Generator", names=power_to_h2_links + f"_{carrier}", carrier=f"PtH2_{carrier}", p_nom_extendable=True, bus=str(new_bus_id), ) # Adjust originals waste_heat- and oxygen-links with new bus0 for link in links: network.links.loc[link, "bus0"] = str(new_bus_id) return network
[docs] def adjust_chp_model(self, apply_on="pre_market_model"): """ Adjust the modelling of chp plants in foreign countries for eGon100RE Parameters ---------- etrago : :class:`Etrago Overall container of Etrago apply_on: str Returns ------- network : PyPSA network """ if apply_on == "grid_model": network = self.network elif apply_on == "market_model": network = self.market_model elif apply_on == "pre_market_model": network = self.pre_market_model efficiency_heat = 0.42500 chp_links = network.links[ (network.links.carrier == "central_gas_CHP") & ( network.links.bus1.isin( network.buses[network.buses.country != "DE"].index ) ) ].index if self.args["method"]["formulation"] == "pyomo": for i in chp_links: print(i) # find central_heat bus country = network.buses.loc[ network.links.loc[i, "bus1"], "country" ] central_heat_bus = network.buses[ (network.buses.carrier == "central_heat") & (network.buses.country == country) ].index[0] network.add( "Generator", name=i + "_heat", carrier="central_gas_CHP_heat", p_nom=network.links.loc[i, "p_nom"], bus=central_heat_bus, ) else: for i in chp_links: print(i) # find central_heat bus country = network.buses.loc[ network.links.loc[i, "bus1"], "country" ] central_heat_bus = network.buses[ (network.buses.carrier == "central_heat") & (network.buses.country == country) ].index[0] network.links.loc[i, "bus2"] = central_heat_bus network.links.loc[i, "efficiency2"] = efficiency_heat return network
[docs] def levelize_abroad_inland_parameters(self): """ The method levelize the techno-economic parameters of inland and abroad components of the same carrier. Thus, the favoring of one component is avoided just based on the parameters. The differences in the parameters of the input dataset are caused by a updated source used for the pypsa_eur network and different assumed interest_rates. All necessary parameters of the foreign components are adjusted to the values of the inland components. Parameters ---------- etrago : :class:`Etrago Overall container of Etrago Returns ------- None """ # define carrier and regarding parameters for adjustment carriers_to_adjust = { "links": { "power_to_H2": ["efficiency", "capital_cost", "marginal_cost"], "H2_to_power": ["capital_cost", "marginal_cost"], "CH4_to_H2": ["efficiency", "capital_cost", "marginal_cost"], "H2_to_CH4": ["capital_cost", "marginal_cost"], "OCGT": ["efficiency", "marginal_cost"], "central_resistive_heater": ["efficiency", "marginal_cost"], }, "stores": { "central_heat_store": ["capital_cost", "marginal_cost"], "H2_overground": ["capital_cost", "marginal_cost"], "H2_underground": ["capital_cost", "marginal_cost"], "rural_heat_store": ["capital_cost", "marginal_cost"], }, "storage_units": { "battery": [ "capital_cost", "marginal_cost", "efficiency_dispatch", "efficiency_store", ] }, "generators": { "run_of_river": ["efficiency", "marginal_cost"], "rural_solar_thermal": ["marginal_cost"], "solar": ["marginal_cost"], "solar_rooftop": ["marginal_cost"], "wind_offshore": ["marginal_cost"], "wind_onshore": ["marginal_cost"], "rural_oil_boiler": ["marginal_cost"], "oil": ["marginal_cost"], "rural_biomass_boiler": ["marginal_cost"], }, } german_buses = self.network.buses[self.network.buses.country == "DE"].index def filter_german_components(df, component, german_buses): if component == "links": return df[df.bus0.isin(german_buses) & df.bus1.isin(german_buses)] else: return df[df.bus.isin(german_buses)] for component_type, carriers in carriers_to_adjust.items(): component_table = getattr(self.network, component_type) german_components = filter_german_components( component_table, component_type, german_buses ) foreign_components = component_table[ ~component_table.index.isin(german_components.index) ] for carrier, parameters in carriers.items(): # test if carrier exists in both german and abroad components carrier_in_germany = carrier in german_components.carrier.values carrier_in_foreign = carrier in foreign_components.carrier.values if not carrier_in_germany: logger.warning( f"⚠️ Carrier '{carrier}' does NOT exist in " f"the German components ({component_type})." ) if not carrier_in_foreign: logger.warning( f"⚠️ Carrier '{carrier}' does NOT exist in " f"the foreign components ({component_type})." ) if not carrier_in_germany or not carrier_in_foreign: logger.warning( f"⚠️ Skipping carrier '{carrier}' for {component_type}." ) continue german_carrier_components = german_components[ german_components.carrier == carrier ] for parameter in parameters: if parameter in german_carrier_components.columns: component_table.loc[ (component_table.index.isin(foreign_components.index)) & (component_table.carrier == carrier), parameter, ] = german_carrier_components[parameter].mean() else: logger.warning( f"⚠️ {parameter} doesn't exist for " f"Carrier '{carrier}' in {component_type}." ) logger.info( "✅ All required parameters for inland and abroad " f"{component_type} are levelized." )