# -*- coding: utf-8 -*-
# Copyright 2016-2023 Flensburg University of Applied Sciences,
# Europa-Universität Flensburg,
# Centre for Sustainable Energy Systems
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# File description for read-the-docs
""" This module contains functions for reducing the complexity of a PyPSA
network in temporal dimension by
a) downsampling to every n-th snapshot
b) clustering to typical periods (eg days, weeks)
c) clustering to segments of variable length
Essentially used is the tsam package
( https://github.com/FZJ-IEK3-VSA/tsam ) developed by Leander Kotzur et al.
"""
import os
import pandas as pd
if "READTHEDOCS" not in os.environ:
import tsam.timeseriesaggregation as tsam
__copyright__ = (
"Flensburg University of Applied Sciences, "
"Europa-Universität Flensburg, "
"Centre for Sustainable Energy Systems"
)
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = """ClaraBuettner, ulfmueller, KathiEsterl, simnh, wheitkoetter,
BartelsJ, AmeliaNadal"""
[docs]def snapshot_clustering(self):
"""
Function to call the snapshot clustering function with the respecting
method and settings.
Raises
------
ValueError
When calling a non-available function.
Returns
-------
None.
"""
if self.args["snapshot_clustering"]["active"]:
# save second network for optional dispatch disaggregation
if self.args["temporal_disaggregation"]["active"]:
self.network_tsa = self.network.copy()
if self.args["snapshot_clustering"]["method"] == "segmentation":
self.network = run(
network=self.network.copy(),
n_clusters=1,
segmented_to=self.args["snapshot_clustering"]["n_segments"],
extreme_periods=self.args["snapshot_clustering"][
"extreme_periods"
],
)
elif self.args["snapshot_clustering"]["method"] == "typical_periods":
self.network = run(
network=self.network.copy(),
n_clusters=self.args["snapshot_clustering"]["n_clusters"],
how=self.args["snapshot_clustering"]["how"],
extreme_periods=self.args["snapshot_clustering"][
"extreme_periods"
],
)
else:
raise ValueError(
"""Type of clustering should be 'typical_periods' or
'segmentation'"""
)
[docs]def tsam_cluster(
timeseries_df,
typical_periods=10,
how="daily",
extremePeriodMethod="None",
segmentation=False,
segment_no=10,
segm_hoursperperiod=24,
):
"""
Conducts the clustering of the snapshots for temporal aggregation with the
respecting method.
Parameters
----------
timeseries_df : pd.DataFrame
Dataframe wit timeseries to cluster.
typical_periods : int, optional
Number of clusters for typical_periods. The default is 10.
how : {'daily', 'weekly', 'monthly'}, optional
Definition of period for typical_periods. The default is 'daily'.
extremePeriodMethod : {'None','append','new_cluster_center',
'replace_cluster_center'}, optional Method to consider extreme
snapshots in reduced timeseries. The default is 'None'.
segmentation : boolean, optional
Argument to activate segmenation method. The default is False.
segment_no : int, optional
Number of segments for segmentation. The default is 10.
segm_hoursperperiod : int, optional
Only for segmentation, ensures to cluster to segments considering all
snapshots. The default is 24.
Returns
-------
df_cluster : pd.DataFrame
Information on cluster after clustering to typical periods.
cluster_weights : dict
Weightings per cluster after clustering to typical periods.
dates : DatetimeIndex
Dates of clusters after clustering to typical periods.
hours : int
Hours per typical period.
df_i_h : pd.DataFrame
Information on cluster after clustering to typical periods.
timeseries : pd.DataFrame
Information on segments after segmentation.
"""
if how == "daily":
hours = 24
period = " days"
elif how == "weekly":
hours = 168
period = " weeks"
elif how == "monthly":
hours = 720
period = " months"
elif how == "hourly":
hours = 1
period = " hours"
if segmentation:
hoursPerPeriod = segm_hoursperperiod
hours = 1
else:
hoursPerPeriod = hours
# define weight for weightDict:
# residual load should not impact cluster findings,
# but only be the optional parameter to choose an extreme period
weight = pd.Series(data=1, index=timeseries_df.columns)
weight["residual_load"] = 0
weight = weight.to_dict()
aggregation = tsam.TimeSeriesAggregation(
timeseries_df,
noTypicalPeriods=typical_periods,
extremePeriodMethod=extremePeriodMethod,
addPeakMin=["residual_load"],
addPeakMax=["residual_load"],
rescaleClusterPeriods=False,
hoursPerPeriod=hoursPerPeriod,
clusterMethod="hierarchical",
segmentation=segmentation,
noSegments=segment_no,
weightDict=weight,
)
if segmentation:
print(
"Snapshot clustering to "
+ str(segment_no)
+ " segments"
+ "\n"
+ "Using extreme period method: "
+ extremePeriodMethod
)
else:
print(
"Snapshot clustering to "
+ str(typical_periods)
+ period
+ "\n"
+ "Using extreme period method: "
+ extremePeriodMethod
)
timeseries_creator = aggregation.createTypicalPeriods()
timeseries = timeseries_creator.copy()
# If Segmentation is True, insert 'Dates' and 'SegmentNo' column in
# timeseries
if segmentation:
weights = timeseries.index.get_level_values(2)
dates_df = timeseries_df.index.get_level_values(0)
dates = []
segmentno = []
wcount = 0
count = 0
for weight in weights:
dates.append(dates_df[wcount])
wcount = wcount + weight
segmentno.append(count)
count = count + 1
timeseries.insert(0, "dates", dates, True)
timeseries.insert(1, "SegmentNo", segmentno, True)
timeseries.insert(2, "SegmentDuration", weights, True)
timeseries.set_index(
["dates", "SegmentNo", "SegmentDuration"], inplace=True
)
if "Unnamed: 0" in timeseries.columns:
del timeseries["Unnamed: 0"]
if "Segment Step" in timeseries.columns:
del timeseries["Segment Step"]
# print(timeseries)
cluster_weights = aggregation.clusterPeriodNoOccur
clusterOrder = aggregation.clusterOrder
clusterCenterIndices = aggregation.clusterCenterIndices
if segmentation:
if extremePeriodMethod != "None":
timeseries = segmentation_extreme_periods(
timeseries_df, timeseries, extremePeriodMethod
)
else:
if extremePeriodMethod == "new_cluster_center":
for i in aggregation.extremePeriods.keys():
clusterCenterIndices.insert(
aggregation.extremePeriods[i]["newClusterNo"],
aggregation.extremePeriods[i]["stepNo"],
)
if extremePeriodMethod == "append":
for i in aggregation.extremePeriods.keys():
clusterCenterIndices.insert(
aggregation.extremePeriods[i]["clusterNo"],
aggregation.extremePeriods[i]["stepNo"],
)
# get all index for every hour of that day of the clusterCenterIndices
start = []
# get the first hour of the clusterCenterIndices (days start with 0)
for i in clusterCenterIndices:
start.append(i * hours)
# get a list with all hours belonging to the clusterCenterIndices
nrhours = []
for j in start:
nrhours.append(j)
x = 1
while x < hours:
j = j + 1
nrhours.append(j)
x = x + 1
# get the origial Datetimeindex
dates = timeseries_df.iloc[nrhours].index
# get list of representative days
representative_day = []
# cluster:medoid des jeweiligen Clusters
dic_clusterCenterIndices = dict(enumerate(clusterCenterIndices))
for i in clusterOrder:
representative_day.append(dic_clusterCenterIndices[i])
# get list of last and first hour of representative days
last_hour_datetime = []
for i in representative_day:
last_hour = i * hours + hours - 1
last_hour_datetime.append(timeseries_df.index[last_hour])
# create a dataframe (index=nr. of day in a year/candidate)
df_cluster = pd.DataFrame(
{
"Cluster": clusterOrder, # Cluster of the day
"RepresentativeDay": representative_day, # representative day of
# the cluster
"last_hour_RepresentativeDay": last_hour_datetime,
}
) # last hour of the cluster
df_cluster.index = df_cluster.index + 1
df_cluster.index.name = "Candidate"
# create a dataframe each timeseries (h) and its candiddate day (i) df_i_h
nr_day = []
x = len(timeseries_df.index) / hours + 1
for i in range(1, int(x)):
j = 1
while j <= hours:
nr_day.append(i)
j = j + 1
df_i_h = pd.DataFrame(
{"Timeseries": timeseries_df.index, "Candidate_day": nr_day}
)
df_i_h.set_index("Timeseries", inplace=True)
return df_cluster, cluster_weights, dates, hours, df_i_h, timeseries
[docs]def segmentation_extreme_periods(
timeseries_df, timeseries, extremePeriodMethod
):
"""
Function to consider extreme snapshots while using segmentation.
Parameters
----------
timeseries_df : pd.DataFrame
Dataframe wit timeseries to cluster.
timeseries : pd.DataFrame
Information on segments after segmentation.
extremePeriodMethod : {'None','append','new_cluster_center',
'replace_cluster_center'}, optional method to consider extreme
snapshots in reduced timeseries. The default is 'None'.
Raises
------
ValueError
When calling wrong method to consider extreme values.
Returns
-------
timeseries : pd.DataFrame
Information on segments including extreme snapshots after segmentation.
"""
# find maximum / minimum value in residual load
maxi = timeseries_df["residual_load"].idxmax()
mini = timeseries_df["residual_load"].idxmin()
# add timestep if it is not already calculated
if maxi not in timeseries.index.get_level_values("dates"):
# identifiy timestep, adapt it to timeseries-df and add it
max_val = timeseries_df.loc[maxi].copy()
max_val["SegmentNo"] = len(timeseries)
max_val["SegmentDuration"] = 1
max_val["dates"] = max_val.name
max_val = pd.DataFrame(max_val).transpose()
if extremePeriodMethod == "append":
max_val.set_index(
["dates", "SegmentNo", "SegmentDuration"], inplace=True
)
timeseries = timeseries.append(max_val)
timeseries = timeseries.sort_values(by="dates")
# split up segment in which the extreme timestep was added
i = -1
for date in timeseries.index.get_level_values("dates"):
if date < maxi:
i = i + 1
else:
timeseries["SegmentDuration_Extreme"] = (
timeseries.index.get_level_values("SegmentDuration")
)
old_row = timeseries.iloc[i].copy()
old_row = pd.DataFrame(old_row).transpose()
delta_t = (
timeseries.index.get_level_values("dates")[i + 1]
- timeseries.index.get_level_values("dates")[i]
)
delta_t = delta_t.total_seconds() / 3600
timeseries["SegmentDuration_Extreme"].iloc[i] = delta_t
timeseries_df["row_no"] = range(0, len(timeseries_df))
new_row = int(timeseries_df.loc[maxi]["row_no"]) + 1
new_date = timeseries_df[
timeseries_df.row_no == new_row
].index
if new_date.isin(
timeseries.index.get_level_values("dates")
):
timeseries["dates"] = (
timeseries.index.get_level_values("dates")
)
timeseries["SegmentNo"] = (
timeseries.index.get_level_values("SegmentNo")
)
timeseries["SegmentDuration"] = timeseries[
"SegmentDuration_Extreme"
]
timeseries.drop(
"SegmentDuration_Extreme", axis=1, inplace=True
)
timeseries.set_index(
["dates", "SegmentNo", "SegmentDuration"],
inplace=True,
)
break
else:
new_row = timeseries_df.iloc[new_row].copy()
new_row.drop("row_no", inplace=True)
new_row["SegmentNo"] = len(timeseries)
new_row["SegmentDuration"] = (
old_row["SegmentDuration_Extreme"][0] - delta_t - 1
)
new_row["dates"] = new_row.name
new_row = pd.DataFrame(new_row).transpose()
new_row.set_index(
["dates", "SegmentNo", "SegmentDuration"],
inplace=True,
)
for col in new_row.columns:
new_row[col][0] = old_row[col][0]
timeseries["dates"] = (
timeseries.index.get_level_values("dates")
)
timeseries["SegmentNo"] = (
timeseries.index.get_level_values("SegmentNo")
)
timeseries["SegmentDuration"] = timeseries[
"SegmentDuration_Extreme"
]
timeseries.drop(
"SegmentDuration_Extreme", axis=1, inplace=True
)
timeseries.set_index(
["dates", "SegmentNo", "SegmentDuration"],
inplace=True,
)
timeseries = timeseries.append(new_row)
timeseries = timeseries.sort_values(by="dates")
break
elif extremePeriodMethod == "replace_cluster_center":
# replace segment in which the extreme timestep was added
i = -1
for date in timeseries.index.get_level_values("dates"):
if date < maxi:
i = i + 1
else:
if i == -1:
i = 0
max_val["SegmentDuration"] = (
timeseries.index.get_level_values("SegmentDuration")[i]
)
max_val.set_index(
["dates", "SegmentNo", "SegmentDuration"], inplace=True
)
timeseries.drop(timeseries.index[i], inplace=True)
timeseries = timeseries.append(max_val)
timeseries = timeseries.sort_values(by="dates")
break
else:
raise ValueError(
"""Choose 'append' or 'replace_cluster_center' for
consideration of extreme periods with segmentation method"""
)
# add timestep if it is not already calculated
if mini not in timeseries.index.get_level_values("dates"):
# identifiy timestep, adapt it to timeseries-df and add it
min_val = timeseries_df.loc[mini].copy()
min_val["SegmentNo"] = len(timeseries) + 1
min_val["SegmentDuration"] = 1
min_val["dates"] = min_val.name
min_val = pd.DataFrame(min_val).transpose()
if extremePeriodMethod == "append":
min_val.set_index(
["dates", "SegmentNo", "SegmentDuration"], inplace=True
)
timeseries = timeseries.append(min_val)
timeseries = timeseries.sort_values(by="dates")
# split up segment in which the extreme timestep was added
i = -1
for date in timeseries.index.get_level_values("dates"):
if date < mini:
i = i + 1
else:
timeseries["SegmentDuration_Extreme"] = (
timeseries.index.get_level_values("SegmentDuration")
)
old_row = timeseries.iloc[i].copy()
old_row = pd.DataFrame(old_row).transpose()
delta_t = (
timeseries.index.get_level_values("dates")[i + 1]
- timeseries.index.get_level_values("dates")[i]
)
delta_t = delta_t.total_seconds() / 3600
timeseries["SegmentDuration_Extreme"].iloc[i] = delta_t
timeseries_df["row_no"] = range(0, len(timeseries_df))
new_row = int(timeseries_df.loc[mini]["row_no"]) + 1
new_date = timeseries_df[
timeseries_df.row_no == new_row
].index
if new_date.isin(
timeseries.index.get_level_values("dates")
):
timeseries["dates"] = (
timeseries.index.get_level_values("dates")
)
timeseries["SegmentNo"] = (
timeseries.index.get_level_values("SegmentNo")
)
timeseries["SegmentDuration"] = timeseries[
"SegmentDuration_Extreme"
]
timeseries.drop(
"SegmentDuration_Extreme", axis=1, inplace=True
)
timeseries.set_index(
["dates", "SegmentNo", "SegmentDuration"],
inplace=True,
)
break
else:
new_row = timeseries_df.iloc[new_row].copy()
new_row.drop("row_no", inplace=True)
new_row["SegmentNo"] = len(timeseries) + 1
new_row["SegmentDuration"] = (
old_row["SegmentDuration_Extreme"][0] - delta_t - 1
)
new_row["dates"] = new_row.name
new_row = pd.DataFrame(new_row).transpose()
new_row.set_index(
["dates", "SegmentNo", "SegmentDuration"],
inplace=True,
)
for col in new_row.columns:
new_row[col][0] = old_row[col][0]
timeseries["dates"] = (
timeseries.index.get_level_values("dates")
)
timeseries["SegmentNo"] = (
timeseries.index.get_level_values("SegmentNo")
)
timeseries["SegmentDuration"] = timeseries[
"SegmentDuration_Extreme"
]
timeseries.drop(
"SegmentDuration_Extreme", axis=1, inplace=True
)
timeseries.set_index(
["dates", "SegmentNo", "SegmentDuration"],
inplace=True,
)
timeseries = timeseries.append(new_row)
timeseries = timeseries.sort_values(by="dates")
break
elif extremePeriodMethod == "replace_cluster_center":
# replace segment in which the extreme timestep was added
i = -1
for date in timeseries.index.get_level_values("dates"):
if date < mini:
i = i + 1
else:
if i == -1:
i = 0
min_val["SegmentDuration"] = (
timeseries.index.get_level_values("SegmentDuration")[i]
)
min_val.set_index(
["dates", "SegmentNo", "SegmentDuration"], inplace=True
)
timeseries.drop(timeseries.index[i], inplace=True)
timeseries = timeseries.append(min_val)
timeseries = timeseries.sort_values(by="dates")
break
else:
raise ValueError(
"""Choose 'append' or 'replace_cluster_center' for
consideration of extreme periods with segmentation method"""
)
if "row_no" in timeseries.columns:
timeseries.drop("row_no", axis=1, inplace=True)
return timeseries
[docs]def run(
network,
n_clusters=None,
how="daily",
segmented_to=False,
extreme_periods="None",
):
"""
Function to call the respecting snapshot clustering function and export the
result to a csv-file.
Parameters
----------
network : pypsa.Network object
Container for all network components.
n_clusters : int, optional
Number of clusters for typical_periods. The default is None.
how : {'daily', 'weekly', 'monthly'}, optional
Definition of period for typical_periods. The default is 'daily'.
segmented_to : int, optional
Number of segments for segmentation. The default is False.
extremePeriodMethod : {'None','append','new_cluster_center',
'replace_cluster_center'}, optional
Method to consider extreme snapshots in reduced timeseries.
The default is 'None'.
Returns
-------
network : pypsa.Network object
Container for all network components.
"""
if segmented_to is not False:
segment_no = segmented_to
segmentation = True
else:
segment_no = 24
segmentation = False
if not extreme_periods:
extreme_periods = "None"
# calculate clusters
(
df_cluster,
cluster_weights,
dates,
hours,
df_i_h,
timeseries,
) = tsam_cluster(
prepare_pypsa_timeseries(network),
typical_periods=n_clusters,
how="daily",
extremePeriodMethod=extreme_periods,
segmentation=segmentation,
segment_no=segment_no,
segm_hoursperperiod=network.snapshots.size,
)
if segmentation:
pd.DataFrame(
timeseries.reset_index(),
columns=["dates", "SegmentNo", "SegmentDuration"],
).set_index("SegmentNo").to_csv(
"timeseries_segmentation=" + str(segment_no) + ".csv"
)
else:
if how == "daily":
howie = "days"
elif how == "weekly":
howie = "weeks"
elif how == "monthly":
howie = "months"
elif how == "hourly":
howie = "hours"
df_cluster.to_csv(
"cluster_typical-periods=" + str(n_clusters) + howie + ".csv"
)
network.cluster = df_cluster
network.cluster_ts = df_i_h
update_data_frames(
network, cluster_weights, dates, hours, timeseries, segmentation
)
return network
[docs]def prepare_pypsa_timeseries(network):
"""
Prepares timeseries and residual load timeseries for clustering.
Parameters
----------
network : pypsa.Network object
Container for all network components.
Returns
-------
df : pd.DataFrame
Timeseries to be considered when clustering.
"""
loads = network.loads_t.p_set.copy()
loads.columns = "L" + loads.columns
renewables = network.generators_t.p_max_pu.mul(
network.generators.p_nom[network.generators_t.p_max_pu.columns], axis=1
).copy()
renewables.columns = "G" + renewables.columns
residual_load = pd.DataFrame()
residual_load["residual_load"] = loads.sum(axis=1) - renewables.sum(axis=1)
df = pd.concat([renewables, loads, residual_load], axis=1)
return df
[docs]def update_data_frames(
network, cluster_weights, dates, hours, timeseries, segmentation
):
"""
Updates the snapshots, snapshot weightings and the dataframes based on
the original data in the network and the medoids created by clustering
these original data.
Parameters
----------
network : pypsa.Network object
Container for all network components.
cluster_weights : dict
Weightings per cluster after clustering to typical periods.
dates : DatetimeIndex
Dates of clusters after clustering to typical periods.
hours : int
Hours per typical period.
timeseries : pd.DataFrame
Information on segments after segmentation.
segmentation : boolean
Checks if segmentation of clustering to typical periods has been used.
Returns
-------
network : pypsa.Network object
Container for all network components.
"""
if segmentation:
network.snapshots = timeseries.index.get_level_values(0)
network.snapshot_weightings["objective"] = pd.Series(
data=timeseries.index.get_level_values(2).values,
index=timeseries.index.get_level_values(0),
)
network.snapshot_weightings["stores"] = pd.Series(
data=timeseries.index.get_level_values(2).values,
index=timeseries.index.get_level_values(0),
)
network.snapshot_weightings["generators"] = pd.Series(
data=timeseries.index.get_level_values(2).values,
index=timeseries.index.get_level_values(0),
)
else:
network.snapshots = dates
network.snapshot_weightings = network.snapshot_weightings.loc[dates]
snapshot_weightings = []
for i in cluster_weights.values():
x = 0
while x < hours:
snapshot_weightings.append(i)
x += 1
for i in range(len(network.snapshot_weightings)):
network.snapshot_weightings["objective"][i] = snapshot_weightings[
i
]
network.snapshot_weightings["stores"][i] = snapshot_weightings[i]
network.snapshot_weightings["generators"][i] = snapshot_weightings[
i
]
# put the snapshot in the right order
network.snapshots.sort_values()
network.snapshot_weightings.sort_index()
print(network.snapshots)
return network
[docs]def skip_snapshots(self):
"""
Conducts the downsapling to every n-th snapshot.
Returns
-------
None.
"""
# save second network for optional dispatch disaggregation
if (
self.args["temporal_disaggregation"]["active"]
and not self.args["snapshot_clustering"]["active"]
) or self.args["method"]["type"] == "market_grid":
self.network_tsa = self.network.copy()
n_skip = self.args["skip_snapshots"]
if n_skip:
last_weight = (
int(
(
self.network.snapshots[-1]
- self.network.snapshots[::n_skip][-1]
).seconds
/ 3600
)
+ 1
)
self.network.snapshots = self.network.snapshots[::n_skip]
self.network.snapshot_weightings["objective"] = n_skip
self.network.snapshot_weightings["stores"] = n_skip
self.network.snapshot_weightings["generators"] = n_skip
if last_weight < n_skip:
self.network.snapshot_weightings.loc[
self.network.snapshot_weightings.index[-1]
]["objective"] = last_weight
self.network.snapshot_weightings.loc[
self.network.snapshot_weightings.index[-1]
]["stores"] = last_weight
self.network.snapshot_weightings.loc[
self.network.snapshot_weightings.index[-1]
]["generators"] = last_weight