"""
This module contains a class that can be used to visualise the data. There are different visualisation functions.
"""
import pandas as pd
# import numpy as np
from .functions import balance
try:
import plotly.graph_objects as go
from plotly.offline import iplot
import cufflinks as cf
from plotly.subplots import make_subplots
cf.go_offline()
except ImportError:
raise Exception("This plotly code only works within a jupyter notebook")
from IPython.display import display, HTML
from emobpy import DataBase
[docs]class NBplot:
"""
Work in Jupyter notebooks only.
Set of plots for a single time series and groups.
Three kind of plots:
- self.sgplot_dp(tscode) for driving profiles
- self.sgplot_ga(tscode) for grid availability profiles
- self.sgplot_ged(tscode) for grid electricity demand profiles
- tscode: time series code (string of profile name)
self.__init__(db)
db is an instance of a DataBase class that contains the time series.
"""
def __init__(self, db):
self.db = db
[docs] def sgplot_dp(self, tscode, rng=None, to_html=False, path=None):
"""
Plot of a single driving profile.
Args:
tscode (str): Time series code. Eg. 'ev_user_W3_85e59_avai_65t2p'
rng (tuple): (a,b) index if only part of timeseries should be copied. Defaults to None.
to_html (bool): Save as a html file. Defaults to False.
path (str): Path if plot should be saved to file. Defaults to None.
Returns:
plotly.plot: Plot object.
"""
self.db.update()
if rng is None:
df = self.db.db[tscode]["timeseries"].copy()
else:
df = self.db.db[tscode]["timeseries"].iloc[rng[0]: rng[1]].copy()
if self.db.db[tscode]["kind"] != "driving":
raise Exception(
"code '{}' does not correspond to a driving profile".format(tscode)
)
cnt = df.groupby([df.index, "state"])["state"].count()
cn = (
pd.DataFrame(cnt)
.rename(columns={"state": "count"})
.unstack(level=-1)
.fillna(0)
)
cn.columns = cn.columns.droplevel()
rr = (cn.T / cn.T.sum(axis=0)).T
figa = rr.iplot(kind="area", fill=True, asFigure=True)
figb = df["distance"].iplot(asFigure=True)
fig = cf.subplots([figa, figb], shape=(2, 1), shared_xaxes=True)
fig["layout"]["yaxis"].update(
{"title": "Location", "rangemode": "tozero", "domain": [0.7, 1.0]}
)
fig["layout"]["yaxis2"].update(
{"title": "Distance (km)", "rangemode": "tozero", "domain": [0.0, 0.65]}
)
fig = go.Figure(data=fig["data"], layout=fig["layout"])
if to_html:
if path is None:
raise Exception(
"""when to_html is True then path must be given with .html extension"""
)
else:
fig.write_html(file=path)
return fig
[docs] def sgplot_ga(self, tscode, rng=None, to_html=False, path=None):
"""
Plot of a single grid availability profile.
Args:
tscode (str): Time series code. Eg. 'ev_user_W3_85e59_avai_65t2p'
rng (tuple): (a,b) index if only part of timeseries should be copied. Defaults to None.
to_html (bool): Save as a html file. Defaults to False.
path (str): Path if plot should be saved to file. Defaults to None.
Returns:
plotly.plot: Plot object.
"""
self.db.update()
if rng is None:
df = self.db.db[tscode]["timeseries"].copy()
else:
df = self.db.db[tscode]["timeseries"].iloc[rng[0]: rng[1]].copy()
if self.db.db[tscode]["kind"] != "availability":
raise Exception(
"code '{}' does not correspond to a grid availability profile".format(
tscode
)
)
dt = df[["state", "consumption", "charging_point", "charging_cap", "soc"]]
cnt = dt.groupby([dt.index, "state"])["state"].count()
cn = (
pd.DataFrame(cnt)
.rename(columns={"state": "count"})
.unstack(level=-1)
.fillna(0)
)
cn.columns = cn.columns.droplevel()
rr = (cn.T / cn.T.sum(axis=0)).T
figa = rr.iplot(kind="area", fill=True, asFigure=True)
dk = dt[["consumption", "charging_cap"]]
figb = dk.iplot(asFigure=True)
dd = dt["soc"]
figc = dd.iplot(asFigure=True)
fig = cf.subplots([figa, figb, figc], shape=(3, 1), shared_xaxes=True)
fig["layout"]["xaxis"].update(
{"tickfont": {"family": "Arial, sans-serif", "size": 13, "color": "black"}}
)
fig["layout"]["yaxis"].update(
{
"title": "Location",
"titlefont": {"size": 12},
"showgrid": False,
"showline": True,
"rangemode": "tozero",
"zeroline": True,
"domain": [0.75, 1.0],
"tickformat": "%",
"tickfont": {
"family": "Arial, sans-serif",
"size": 12,
"color": "black",
},
"linewidth": 2,
}
)
fig["layout"]["yaxis2"].update(
{
"title": "Grid Availability (kW)",
"titlefont": {"size": 12},
"showgrid": True,
"showline": True,
"rangemode": "tozero",
"domain": [0.4, 0.7],
"tickfont": {
"family": "Arial, sans-serif",
"size": 12,
"color": "black",
},
"linewidth": 2,
}
)
fig["layout"]["yaxis3"].update(
{
"title": "SOC",
"titlefont": {"size": 12},
"showgrid": True,
"showline": True,
"rangemode": "tozero",
"domain": [0.0, 0.35],
"tickformat": "%",
"tickfont": {
"family": "Arial, sans-serif",
"size": 12,
"color": "black",
},
"linewidth": 2,
}
)
fig["layout"].update(
{
"paper_bgcolor": "white",
"plot_bgcolor": "white",
"margin": dict(l=10, r=10, t=20, b=10, pad=0),
}
) # ,'width': 800,'height': 450,'showlegend': True
fig = go.Figure(data=fig["data"], layout=fig["layout"])
if to_html:
if path is None:
raise Exception(
"""when to_html is True then path must be given with .html extension"""
)
else:
fig.write_html(file=path)
return fig
[docs] def sgplot_ged(self, tscode, rng=None, to_html=False, path=None):
"""
Plot of grid electricity demand profiles associated with the same grid availability profile.
Args:
tscode (str): Time series code. Eg. 'ev_user_W3_85e59_avai_65t2p'
rng (tuple): (a,b) index if only part of timeseries should be copied. Defaults to None.
to_html (bool): Save as a html file. Defaults to False.
path (str): Path if plot should be saved to file. Defaults to None.
Returns:
plotly.plot: Plot object.
"""
self.db.update()
if self.db.db[tscode]["kind"] != "charging":
raise Exception(
"code '{}' does not correspond to a grid electricity demand profile".format(
tscode
)
)
df = pd.DataFrame()
availcode = self.db.db[tscode]["input"]
for k in self.db.db.keys():
if self.db.db[k]["kind"] == "charging":
if self.db.db[k]["input"] == availcode:
tmp = self.db.db[k]["timeseries"].copy()
tmp.loc[:, "option"] = self.db.db[k]["option"]
df = df.append(tmp, sort=False)
if rng is None:
pass
else:
df = df.iloc[rng[0] : rng[1]].copy()
dt = df[["state", "actual_soc", "charge_grid", "option"]]
dt = dt.astype(dtype={'actual_soc':float, 'charge_grid':float})
cnt = dt.groupby([dt.index, "state"])["state"].count()
cn = (
pd.DataFrame(cnt)
.rename(columns={"state": "count"})
.unstack(level=-1)
.fillna(0)
)
cn.columns = cn.columns.droplevel()
rr = (cn.T / cn.T.sum(axis=0)).T
figc = rr.iplot(kind="area", fill=True, asFigure=True)
dff = dt.pivot_table(
index=dt.index, columns="option", values="actual_soc", aggfunc="sum"
)
figa = dff.iplot(asFigure=True)
dg = dt.pivot_table(
index=dt.index, columns="option", values="charge_grid", aggfunc="sum"
)
figb = dg.iplot(asFigure=True)
fig = cf.subplots([figa, figb, figc], shape=(3, 1), shared_xaxes=True)
fig["layout"]["xaxis"].update(
{"tickfont": {"family": "Arial, sans-serif", "size": 14, "color": "black"}}
)
fig["layout"]["yaxis"].update(
{
"title": "SOC",
"titlefont": {"size": 14},
"showgrid": False,
"showline": True,
"rangemode": "tozero",
"zeroline": True,
"domain": [0.7, 1.0],
"tickformat": "%",
"tickfont": {
"family": "Arial, sans-serif",
"size": 14,
"color": "black",
},
"linewidth": 2,
}
)
fig["layout"]["yaxis2"].update(
{
"title": "Actual charge (kW)",
"titlefont": {"size": 14},
"showgrid": True,
"showline": True,
"rangemode": "tozero",
"domain": [0.25, 0.65],
"tickfont": {
"family": "Arial, sans-serif",
"size": 12,
"color": "black",
},
"linewidth": 2,
}
)
fig["layout"]["yaxis3"].update(
{
"title": "Location",
"titlefont": {"size": 14},
"showgrid": True,
"showline": True,
"rangemode": "tozero",
"domain": [0.0, 0.2],
"tickformat": "%",
"tickfont": {
"family": "Arial, sans-serif",
"size": 12,
"color": "black",
},
"linewidth": 2,
}
)
fig["layout"].update(
{
"paper_bgcolor": "white",
"plot_bgcolor": "white",
"margin": dict(l=10, r=10, t=20, b=10, pad=0),
"showlegend": True,
}
) # 'width': 800,'height': 450
FIG = go.Figure(data=fig["data"], layout=fig["layout"])
if to_html:
if path is None:
raise Exception(
"""when to_html is True then path must be given with .html extension"""
)
else:
FIG.write_html(file=path)
return FIG
[docs] def sankey(self, tscode, include=None, to_html=False, path=None):
"""
Plot of sankey diagram which shows the energy consumption flows.
Args:
tscode (str): Time series code. Eg. 'ev_user_W3_85e59_avai_65t2p'
include (int): Index which part to include. Defaults to None.
to_html (bool): Save as a html file. Defaults to False.
path (str): Path if plot should be saved to file. Defaults to None.
Returns:
plotly.plot: Plot object.
"""
self.db.update()
distance, consumption, rate, label, source, target, value = balance(
self.db, tscode, include=include
)
link = dict(source=source, target=target, value=value)
node = dict(label=label, pad=50, thickness=10)
data = go.Sankey(link=link, node=node)
fig = go.Figure(data)
if to_html:
if path is None:
raise Exception(
"""when to_html is True then path must be given with .html extension"""
)
else:
fig.write_html(file=path)
print(f"Consumption [kWh]: {round(consumption,3)}")
print(f"Distance [km]: {round(distance,3)}")
print(f"Specific consumption [kWh/100 km]: {round(rate,3)}")
return fig
[docs] def overview(self, tscode, date_range=None, to_html=False, path=None):
"""
Still in Development.
Do not use!
date_range=['01/01/2020 00:00:00','01/06/2020 23:00:00']
"""
self.db.update()
gavailability_name = self.db.db[tscode]['input']
# print(gavailability_name)
consumption_name = self.db.db[gavailability_name]['input']
ts = self.db.db[gavailability_name]['timeseries'].reset_index(drop=False).rename(
columns={'date': 'datetime', 'hh': 'hr'})
print(f"Actual time-series date range = [{ts.datetime.min()},{ts.datetime.max()}]")
if date_range is None:
start = ts.datetime.min()
end = ts.datetime.max()
else:
start = date_range[0]
end = date_range[1]
cons = self.db.db[consumption_name]
from emobpy.consumption import include_weather, Weather
wt = Weather()
D = wt.humidair_density
temp_arr = wt.temp(cons['weather_country'], cons['weather_year'])
pres_arr = wt.pressure(cons['weather_country'], cons['weather_year'])
dp_arr = wt.dewpoint(cons['weather_country'], cons['weather_year'])
hum_arr = wt.calc_rel_humidity(dp_arr, temp_arr)
r_ha = wt.humidair_density(temp_arr, pres_arr, h=hum_arr)
dfs = include_weather(ts, cons['refdate'], temp_arr, pres_arr, dp_arr, hum_arr, r_ha)
cdf = self.db.db[consumption_name]['profile'].copy()
dfg = pd.merge_asof(dfs, cdf[['datetime', 'speed km/h']], on="datetime", tolerance=pd.Timedelta("900s"),
direction="nearest").fillna(0.0).set_index('datetime')
df = pd.DataFrame()
availcode = self.db.db[tscode]["input"]
for k in self.db.db.keys():
if self.db.db[k]["kind"] == "charging":
if self.db.db[k]["input"] == availcode:
tmp = self.db.db[k]["timeseries"].copy()
tmp.loc[:, "option"] = self.db.db[k]["option"]
df = df.append(tmp, sort=False)
dt = df[["state", "actual_soc", "charge_grid", "option"]]
dt = dt.astype(dtype={'actual_soc':float, 'charge_grid':float})
dff = dt.pivot_table(index=dt.index, columns="option", values="actual_soc", aggfunc="sum")
dg = dt.pivot_table(index=dt.index, columns="option", values="charge_grid", aggfunc="sum")
dg.loc[:, 'Grid Availability'] = dfg["charging_cap"]
cnt = dfg.groupby([dfg.index, "state"])["state"].count()
cn = (
pd.DataFrame(cnt)
.rename(columns={"state": "count"})
.unstack(level=-1)
.fillna(0)
)
cn.columns = cn.columns.droplevel()
rr = (cn.T / cn.T.sum(axis=0)).T
# imput is the name of grid demand time series (charging class) and database 'db'
# rr, dfg, dff, dg are dataframes resulting from a preprocessing step
fig1 = rr[start:end].iplot(kind="area", fill=True, asFigure=True)
fig2 = dfg[start:end][["distance", "consumption"]].iplot(colors=['green', 'pink'], asFigure=True)
fig3 = dfg[start:end][["temp_degC", "speed km/h"]].iplot(colors=['purple', '#9c8830'], asFigure=True)
fig4 = dg[start:end].iplot(yTitle='Power rating (kW)', asFigure=True)
fig5 = dff[start:end].iplot(yTitle='SOC', asFigure=True)
for trace in fig5['data']:
trace['showlegend'] = True
fig = make_subplots(rows=5, cols=1,
specs=[[{"secondary_y": True}], [{'secondary_y': True}], [{'secondary_y': True}],
[{'secondary_y': True}], [{'secondary_y': True}]])
[fig.add_trace(trace, secondary_y=False, row=1, col=1) for trace in fig1['data']]
fig.add_trace(fig2['data'][0], secondary_y=False, row=2, col=1)
fig.add_trace(fig2['data'][1], secondary_y=True, row=2, col=1)
fig.add_trace(fig3['data'][0], secondary_y=False, row=3, col=1)
fig.add_trace(fig3['data'][1], secondary_y=True, row=3, col=1)
[fig.add_trace(trace, secondary_y=False, row=4, col=1) for trace in fig4['data']]
[fig.add_trace(trace, secondary_y=False, row=5, col=1) for trace in fig5['data']]
renames = {'distance': ('Distance', 2.2),
'consumption': ('Consumption', 1.2),
'temp_degC': ('Temperature', 2),
'speed km/h': ('Average speed', 2),
# 'from_23_to_8_at_any': ('Charge at night', 2),
# 'immediate': ('Charge immediate', 1.5),
# 'home': ('Home', 0.6), 'driving': ('Driving', 0.6), 'workplace': ('Workplace', 0.6),
# 'errands': ('Errands', 0.6), 'leisure': ('Leisure', 0.6), 'shopping': ('Shopping', 0.6),
}
for trace in fig['data']:
if trace['name'] in renames:
name = trace['name']
trace['name'] = renames[name][0]
trace['line']['width'] = renames[name][1]
else:
trace['line']['width'] = 0.6
fig["layout"].update({'yaxis': dict(title="Location",
title_font=dict(color='black',
# size=18,
),
showgrid=False,
zeroline=True, linecolor='black', gridcolor='#bdbdbd', tickformat="%",
# tickfont={"size": 12},
),
'yaxis3': dict(title='Distance (km)',
title_font=dict(color='green',
# size=18,
),
showgrid=True, zeroline=True, linecolor='black', gridcolor='#bdbdbd',
# tickfont={"size": 12},
),
'yaxis4': dict(title='Consumption (kWh)',
title_font=dict(color='pink'
# size=18,
),
showgrid=False, zeroline=True, linecolor='black', gridcolor='#bdbdbd',
# tickfont={"size": 12},
),
'yaxis5': dict(title='Temp (C)',
title_font=dict(color='purple',
# size=18,
),
showgrid=True, zeroline=True, linecolor='black', gridcolor='#bdbdbd',
zerolinecolor='black',
# tickfont={"size": 12},
),
'yaxis6': dict(title='Speed (km/h)', title_font=dict(color='#9c8830'
# size=18,
),
showgrid=False, zeroline=True, linecolor='black', gridcolor='#bdbdbd',
# tickfont={"size": 12},
),
'yaxis7': dict(title='Power rating (kW)', title_font=dict(color='black',
# size=18,
),
showgrid=True, zeroline=True, linecolor='black', gridcolor='#bdbdbd',
# tickfont={"size": 12},
),
'yaxis9': dict(title='SOC', title_font=dict(color='black',
# size=18,
),
showgrid=True, zeroline=True, linecolor='black', gridcolor='#bdbdbd', tickformat="%",
# tickfont={"size": 12},
),
})
fig.update_xaxes(showgrid=True, zeroline=True, linecolor='black', gridcolor='#bdbdbd')
fig.update_yaxes(rangemode='tozero')
fig["layout"].update(
{
"paper_bgcolor": "white",
"plot_bgcolor": "white",
"margin": dict(l=10, r=10, t=20, b=10, pad=0),
# 'width': 1300, 'height': 1200,
'showlegend': True
}
)
if to_html:
if path is None:
raise Exception("""when to_html is True then path must be given with .html extension""")
else:
fig.write_html(file=path)
return fig