"""
Based on the vehicle mobility time series, the driving electricity consumption (ii) time series is derived.
This requires further input data, such as information on nominal motor power, curb weight, drag coefficient, and dimensions,
which the tool includes for several current BEV models. Ambient temperature is also a significant parameter that affects the
consumption of BEV. For that reason, emobpy is endowed with a database of hourly temperature for European countries with a registry
of the last 17 years. Additionally, the vehicle cabin insulation characteristics are required; this data is not widely available
and thus assumed independently of the BEV models database. Driving cycles are also important input parameters that are used to
simulate every individual trip. The model includes two driving cycles, Worldwide Harmonized Light Vehicles Test Cycle (WLTC)
and Environmental Protection Agency (EPA). This input data is already provided within the tool, and the user can select a particular
BEV model, country weather, and driving cycle. Alternatively, emobpy also allows providing user-defined custom data.
For more details see the article and cite:
.. code-block:: python
@article{Gaete-Morales_2021,
author={Gaete-Morales, Carlos and Kramer, Hendrik and Schill, Wolf-Peter and Zerrahn, Alexander},
title={An open tool for creating battery-electric vehicle time series from empirical data, emobpy},
journal={Scientific Data}, year={2021}, month={Jun}, day={11}, volume={8}, number={1}, pages={152},
issn={2052-4463}, doi={10.1038/s41597-021-00932-9}, url={https://doi.org/10.1038/s41597-021-00932-9}}
See also the examples in the documentation https://diw-evu.gitlab.io/emobpy/emobpy
"""
import zenodo_get
from datetime import datetime
import pandas as pd
import numpy as np
import numba
import time
import logging
import wget
import yaml
import pytz
import sys
import os
import uuid
import gzip
import pickle
import json
from .constants import (
TIME_FREQ,
CWD,
DEFAULT_DATA_DIR,
USER_PATH,
DATA_DIR,
MODULE_DATA_PATH,
WEATHER_FILES,
WEATHER_OPTIONS,
EVSPECS_FILE,
MG_EFFICIENCY_FILE,
DC_FILE,
LAYER_NAMES,
ZONE_NAMES,
ZONE_LAYERS,
ZONE_SURFACE,
LAYER_CONDUCTIVITY,
LAYER_THICKNESS,
TARGET_TEMP,
GRAVITY,
VEHICLE_NEEDED_PARAMETERS,
)
from .functions import (
rolling_resistance_coeff,
vehicle_mass,
prollingresistance,
pairdrag,
p_gravity,
pinertia,
p_wheel,
p_motorout,
EFFICIENCYregenerative_braking,
p_generatorin,
p_motorin,
p_generatorout,
qhvac
)
from .tools import (check_for_new_function_name,
Unit
)
from .mobility import add_column_datetime
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s:%(name)s:%(funcName)s:%(message)s")
log_filename = "emobpy.log"
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
[docs]def bar_progress(*args):
"""
Prints actual progress in format: "Downloading: 80% [8 / 10] kilobyte"
Args:
current (int): Current download.
total (int): Total number of downloads.
"""
current = args[0]
total = args[1]
progress_message = "Downloading: %d%% [%d / %d] kilobyte" % (
current / total * 100,
current / 1024,
total / 1024,
)
sys.stdout.write("\r" + progress_message)
sys.stdout.flush()
[docs]def consumption_progress(current, total):
"""
Prints message about consumption progress.
Args:
current (int): Current index.
total (int): Total number of loops.
width (int, optional): Not used. Defaults to 80.
"""
progress_message = "Progress: %d%% [%d / %d] trips" % (
current / total * 100,
current,
total,
)
sys.stdout.write("\r" + progress_message)
sys.stdout.flush()
[docs]@numba.jit(nopython=True)
def inertial_mass(curb_weight, gear_ratio):
"""
Calculates and returns inertial mass.
Args:
curb_weight (float): Curb weight of the car.
gear_ratio (float): Gear ratio of the car.
Returns:
float: Inertial mass of the car.
"""
return curb_weight * (0.04 + 0.0025 * gear_ratio ** 2)
[docs]def include_weather(pf, refdate, temp_arr, pres_arr, dp_arr, H, r_ha):
"""
Adds weather data to given DataFrame.
Args:
pf (pd.DataFrame): DataFrame where weather data should be added.
refdate (str): E.g. '01/01/2020'.
temp_arr (ndarray): Temperature in degree Celsius.
pres_arr (ndarray): Pressure in mbar.
dp_arr (ndarray): Dewpoint data in degree Celsius.
H (ndarray): Humidity data.
r_ha (ndarray): Air density in kg/m3.
Returns:
pd.DataFrame: [description]
"""
year = pd.to_datetime(refdate).year
start_date = pd.to_datetime(year, format="%Y")
drange = pd.date_range(start_date, periods=len(r_ha) * 2, freq="H")
df = pd.DataFrame(
data={
"temp_degC": np.concatenate([temp_arr, temp_arr]),
"pressure_mbar": np.concatenate([pres_arr, pres_arr]),
"dewpoint_degC": np.concatenate([dp_arr, dp_arr]),
"humidity": np.concatenate([H, H]),
"air_density_kg/m3": np.concatenate([r_ha, r_ha]),
},
index=drange,
)
df = df.rename_axis("datetime")
df = df.sort_index().reset_index()
df["weather_time"] = df["datetime"]
return pd.merge_asof(
pf, df, on="datetime", tolerance=pd.Timedelta("3600s"), direction="nearest"
).set_index("hr", drop=False)
[docs]class Weather:
def __init__(self):
pass
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
[docs] def temp(self, country_code, year):
"""
Loads selected temperature data in Kelvin into object.
Args:
country_code (str): E.g. 'DE'.
year (int): E.g. 2016.
Returns:
list: Temperature data.
"""
return self._load_data(country_code, year) - 273.15
[docs] def pressure(self, country_code, year):
"""
Loads selected pressure data in Pascal into object.
Args:
country_code (str): E.g. 'DE'.
year (int): E.g. 2016.
Returns:
list: Pressure data.
"""
return self._load_data(country_code, year, option="pressure Pascal", location=None) / 100
[docs] def dewpoint(self, country_code, year):
"""
Loads selected dew point data in Kelvin into object.
Args:
country_code (str): E.g. 'DE'.
year (int): E.g. 2016.
Returns:
list: Dew point data.
"""
return self._load_data(country_code, year, option="dew_point Kelvin") - 273.15
[docs] @staticmethod
def download_weather_data(location=None):
"""
Download weather data from zenodo.
Args:
location (str, optional): Path to user path. Defaults to None.
Returns:
list: Weather data.
"""
user_dir = location or USER_PATH or DEFAULT_DATA_DIR + "/user_files"
os.makedirs(user_dir, exist_ok=True)
os.chdir(user_dir)
zenodo_get.zenodo_get(["10.5281/zenodo.1489915", "-wurls.txt"])
os.chdir(CWD)
time.sleep(2)
fh = open(os.path.join(user_dir, "urls.txt"))
text_list = []
for line in fh:
text_list.append(line)
fh.close()
dest_list = []
for url in text_list:
for file in WEATHER_FILES.keys():
if file in url:
filename = os.path.join(user_dir, WEATHER_FILES[file])
if os.path.exists(filename):
os.remove(filename)
print(f"Downloading file... {url.strip()}")
dest = wget.download(url.strip(), filename, bar=bar_progress)
print("")
dest_list.append(dest)
for dfp in dest_list:
print(dfp)
return dest_list
@staticmethod
def _load_data(country_code="DE", year=2016, option="temp Kelvin", location=None):
"""
Load data from csv files and configure it in a DataFrame.
Args:
country_code (str, optional): Defaults to "DE".
year (int, optional): Defaults to 2016.
option (str, optional): Defaults to "temp Kelvin".
location (str, optional): Path where data file is stored. Defaults to None.
Returns:
pd.DataFrame: Loaded data.
"""
user_dir = location or USER_PATH or DEFAULT_DATA_DIR
filename = os.path.join(user_dir, WEATHER_OPTIONS[option])
if country_code == "UK":
code = "GB"
else:
code = country_code
dateparse = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S %Z")
df = pd.read_csv(
filename,
parse_dates=["date"],
date_parser=dateparse,
usecols=["date", country_code],
)
timezones = pytz.country_timezones[code]
logger.info(
" ".join([option, country_code, str(year), "Timezone:", timezones[0]])
)
df.date = pd.DatetimeIndex(df.date).tz_localize("GMT").tz_convert(timezones[0])
data = df[df.date.dt.year.isin([year])]
return data.set_index("date").reset_index(drop=True)[country_code][0:8760].values
# @staticmethod
# def calc_vapor_pressure_WF(t, h):
# """
# Calculate vapor pressure wf
#
#
# Args:
# t (array): Dew point or air temperature in degree Celsius.
# h ([type]): pct
#
# Returns:
# [type]: [description]
# """
# T = t # degC
# H = h # pct
# E = (H / 100) * 6.112 * np.exp((17.67 * T) / (243.5 + T))
# return E
[docs] @staticmethod
def calc_vapor_pressure(t):
"""
Calculate vapor pressure.
Args:
t (array): Dew point or air temperature in degree Celsius.
Returns:
array: Vapor pressure array
"""
T = t # dew point or air temp degC
E = 6.11 * np.power(
10, ((7.5 * T) / (237.3 + T))
) # saturated vapor pressure (mb) if t is dewpoint
return E
[docs] def calc_rel_humidity(self, Dp, T):
"""
Calculate humidity.
Args:
Dp (array): Dew point temperature in degree Celsius.
T (array): Temperature in degree Celsius.
Returns:
array: Relative humidity in percent.
"""
H = (
100 * self.calc_vapor_pressure(Dp) / self.calc_vapor_pressure(T)
) # Relative humidity (percentage)
return H
[docs] @staticmethod
def calc_dew_point(t, h):
"""
Calculate dew point.
Args:
t (float): air temperature in degree Celsius.
h (float): Relative humidity in percent.
Returns:
array:
"""
T = t
H = h
Td = (
243.04
* (np.log(H / 100) + ((17.625 * T) / (243.04 + T)))
/ (17.625 - np.log(H / 100) - ((17.625 * T) / (243.04 + T)))
)
return Td
[docs] @staticmethod
def calc_dry_air_partial_pressure(P, pv):
"""
Calculate dry air partial pressure.
Args:
P (float): [description]
pv (float): [description]
Returns:
array: Dry air partial pressure.
"""
pair = P - pv
return pair
[docs] @staticmethod
def air_density_from_ideal_gas_law(t, p):
"""
Calculate Air density from ideal gas law.
Args:
t (float): Temperature in degree Celsius.
p (float): Pressure in mega bar.
Returns:
array: Air density
"""
T = t + 273.15 # convert degC => degK
P = 100 * p # convert mb => Pascals
N = 287.05 # specific gas constant , J/(kg*degK) = 287.05 for dry air
return P / (N * T)
[docs] def humidair_density(self, t, p, dp=None, h=None):
"""
Calculate humid air density.
Args:
t (array): Temperature in degree Celsius.
p (array): Pressure in mbar.
dp (array, optional): Dew point temperature in degree Celsius. Defaults to None.
h (array, optional): Humidity in percent. Defaults to None.
Raises:
Exception: Dp or h is missing.
Returns:
array: Humid air density.
"""
if dp is None and h is None:
raise Exception("One value is required, either dp or h")
if dp is not None:
pv = self.calc_vapor_pressure(dp) # mbar
if h is not None:
pv = self.calc_vapor_pressure(self.calc_dew_point(t, h)) # mbar
pd = self.calc_dry_air_partial_pressure(p, pv) # mbar
Pv = 100 * pv # convert mb => Pascals
Pd = 100 * pd # convert mb => Pascals
Rd = 287.05 # specific gas constant for dry air [J/(kgK)]
Rv = 461.495 # specific gas constant for water vapour [J/(kgK)]
T = t + 273.15 # convert degC => degK
AD = Pd / (Rd * T) + Pv / (Rv * T) # density [kg/m3]
return AD
[docs]class BEVspecs:
def __init__(self, filename=None):
self.filename = filename
self.data = []
self.parameters = [
"acc_0_100_kmh",
"axle_ratio",
"battery_cap",
"curb_weight",
"drag_coeff",
"motor_type",
"height",
"length",
"market",
"num_cells",
"num_modules",
"power",
"reg_braking",
"top_speed",
"torque",
"trunk_volume",
"battery_type",
"voltage",
"weight",
"width",
]
self._load_specs(filename=self.filename)
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
def _load_specs(self, filename=None):
"""
Load specification based on specification file.
Args:
filename (str, optional): Name of specification file to be loaded. Defaults to None.
"""
if filename is None:
user_dir = USER_PATH or DEFAULT_DATA_DIR
self.filename = os.path.join(user_dir, EVSPECS_FILE)
else:
self.filename = filename
with open(self.filename) as file:
self.data = json.load(file)
# print(file)
[docs] def search_by_parameter(self, parameter='power', first_x=10, brand_filter=[], model_filter=[],
year_filter=[]):
"""
Searching for vehicles sorted in descending order of given parameter. It returns a Pandas DataFrame.
Args:
parameter (str): Vehicle parameter to compare. Defaults to 'power'
first_x (int): Number of vehicles to show. Defaults to 10.
brand_filter (int): Filter for brands. Defaults to [].
model_filter (int): Filter for models. Defaults to [].
year_filter (int): Filter for years. Defaults to [].
Returns:
data (pd.DataFrame)
"""
print_dict = json.loads(json.dumps(self.data))
print_dict.pop('fallback_parameters')
df = pd.DataFrame(columns=['brand', 'model', 'year', 'value', 'unit'])
for brand_name, brand_values in self.data.items():
if brand_name == 'fallback_parameters':
continue
if brand_filter and brand_name not in brand_filter:
continue
for model_name, model_values in brand_values.items():
if model_filter and model_name not in model_filter:
continue
for year_name, year_values in model_values.items():
if year_filter and year_name not in year_filter:
continue
for para_name, para_value in year_values.items():
if para_name == parameter:
df = df.append(
{'brand': brand_name, 'model': model_name, 'year': year_name, 'value': para_value["value"], 'unit': para_value['unit']},
ignore_index=True)
print('Parameter:', parameter)
data = df.sort_values(by='value', ascending=False).reset_index(drop=True)
print(data.head(first_x))
data['parameter'] = parameter
return data
[docs] def show_models(self, brand='', model='', year=''):
"""
Shows a list of all cars from the database. Can be filtered by brand, model and year.
Args:
brand (str, optional): Show only cars that match the brand. Defaults to ''.
model (str, optional): Show only cars that match the model. Defaults to ''.
year (str, optional): Show only cars that match the year. Defaults to ''.
"""
def pretty(d, indent=0):
for key, value in d.items():
if value:
print('\t' * indent + str(key))
if isinstance(value, dict) and indent < 2:
pretty(value, indent + 1)
print_dict = json.loads(json.dumps(self.data))
print_dict.pop('fallback_parameters')
for brand_name, brand_values in self.data.items():
if brand_name == 'fallback_parameters':
continue
if brand and brand != brand_name:
print_dict.pop(brand_name)
for model_name, model_values in brand_values.items():
if model and model != model_name:
print_dict[brand_name].pop(model_name)
for year_name, year_values in model_values.items():
if year and year != year_name:
print_dict[brand_name][model_name].pop(year_name)
pretty(print_dict)
[docs] def get(self, brand, model, year, parameter):
"""
Search for specific information in the vehicle database and returns it.
Args:
brand (str): E.g 'Volkswagen'
model (str): E.g. 'ID.3'
year (int): E.g. 2020
parameter (str): E.g. 'acc_0_100_kmh'
Returns:
[float]: Requested value. None if nothing was found.
"""
if parameter == 'Brand':
return brand
if parameter == 'EV Model':
return model
if parameter == 'Model year':
return year
if parameter in self.parameters:
try:
unit_data = self.data[brand][model][str(int(year))][parameter]
param = Unit(val=unit_data['value'], unit=unit_data['unit'], description=unit_data['description'])
param.convert_to_default_value()
return param.val
except KeyError:
print("No value found! verify there is no typo")
else:
print("No value found!")
[docs] def get_fallback_parameter(self, parameter):
"""
Get data for a given parameter if it is missing.
Args:
parameter (str): Parameter to get data value for.
Returns:
float: Fallback data value for given parameter.
"""
try:
unit_data = self.data['fallback_parameters'][parameter]
param = Unit(val=unit_data['value'], unit=unit_data['unit'], description=unit_data['description'])
param.convert_to_default_value()
return param.val
except KeyError:
return None
[docs] def dropna_model(self, parameter):
"""
Delete all na in self.data for given parameter.
Args:
parameter (str): Parameter from which to delete.
"""
for brand, brand_values in self.data.items():
for model, model_values in brand_values.items():
for year, year_values in model_values.items():
for param, param_value in year_values.items():
if param_value.__class__.__name__ in ["float", "int"] and param_value is not None:
print(f"Delete {brand} {model} {year}")
self.data[brand][model].pop(year)
[docs] def replacena_model(self, parameter, default):
"""
Replace all na in self.data for given parameter with default value.
Args:
parameter (str): Parameter from which to delete.
default (float): Value to be used instead.
"""
for brand, brand_values in self.data.items():
for model, model_values in brand_values.items():
for year, year_values in model_values.items():
for param, param_value in year_values.items():
if param_value.__class__.__name__ in ["float", "int"] and param_value is None:
print(f"Added {default} to {brand} {model} {year}")
self.data[brand][model][year][parameter] = default
[docs] def maximum(self, parameter):
"""
Returns maximum of specific parameter for the object.
Args:
parameter (str): Parameter of which the maximum is required.
Returns:
float: Maximum of the parameter.
"""
value = []
for brand, brand_values in self.data.items():
for model, model_values in brand_values.items():
for year, year_values in model_values.items():
for param, param_value in year_values.items():
if param == parameter and \
param_value.__class__.__name__ in ["float", "int"] and \
param_value is not None:
value.append(param_value)
return max(value)
[docs] def average(self, parameter):
"""
Returns average of a given parameter from self.data.
Args:
parameter (str): Parameter of which the average is required.
Returns:
float: Average of the parameter.
"""
value = []
for brand, brand_values in self.data.items():
for model, model_values in brand_values.items():
for year, year_values in model_values.items():
for param, param_value in year_values.items():
if param == parameter and \
param_value.__class__.__name__ in ["float", "int"] and \
param_value is not None:
value.append(param_value)
return round(sum(value) / len(value), 3)
[docs] def model(self, model, use_fallback=True, msg=True):
"""
Initializes ModelSpecs object, adds parameters and checks them.
Args:
model (tuple): Data of model. E.g ('Volkswagen', 'ID.3', 2020)
use_fallback:
msg (bool, optional): Flag, whether to inform about missing parameters. Defaults to True.
Returns:
ModelSpecs object: Initialized object.
"""
M = ModelSpecs(model, self)
M.add_parameters()
M.add_calculated_param()
if use_fallback:
M.add_fallback_data()
self._ev_par_test(M, msg=msg)
return M
def _ev_par_test(self, Model, msg=True):
"""
Checks whether all relevant parameters have been saved.
Args:
Model (Model object): Vehicle model to be checked.
msg (bool, optional): Flag, whether missing messages should be printed. Defaults to True.
"""
param_missing = []
for wanted in VEHICLE_NEEDED_PARAMETERS:
flag = False
for parameter in Model.parameters:
if wanted == parameter:
if not isinstance(Model.parameters[parameter], (int, float)):
flag = False
break
flag = True
break
if not flag:
param_missing.append(wanted)
if len(param_missing) != 0:
if msg:
print("Missing relevant parameters:")
for name in param_missing:
print(" ", name)
print(
'Please, add these parameters to the model instance < model_instance.add({"parameter":value}) >'
)
[docs] def save(self):
"""
Save self.data into .yml file.
"""
with open(self.filename, "w") as file:
json.dump(self.data, file, indent=4)
print(f"File saved: {self.filename}")
logger.info(f"File saved: {self.filename}")
[docs]class ModelSpecs:
def __init__(self, model, BEVspecs_instance):
self.name = model
self.parameters = {}
self.db = BEVspecs_instance
for parameter in self.db.parameters:
self.parameters[parameter] = None
self.parameters["Brand"] = model[0]
self.parameters["EV Model"] = model[1]
self.parameters["Model year"] = model[2]
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
[docs] def add_parameters(self):
"""
Adds a value from the database to all parameters in self.db.parameters.
"""
for parameter in self.db.parameters:
value = self.db.get(*self.name, parameter)
self.parameters[parameter] = value
[docs] def add_calculated_param(self):
"""
Calculate all parameters that can be calculated from existing information.
"""
if (
"power" in self.parameters
and "curb_weight" in self.parameters
):
self.parameters["pmr"] = self._pmr(
self.parameters["power"],
self.parameters["curb_weight"],
)
else:
self.parameters["pmr"] = None
if (
"curb_weight" in self.parameters
and "axle_ratio" in self.parameters
):
self.parameters["inertial_mass"] = inertial_mass(
self.parameters["curb_weight"],
self.parameters["axle_ratio"],
)
else:
self.parameters["inertial_mass"] = None
if (
"width" in self.parameters
and "height" in self.parameters
):
self.parameters["front_area"] = self._frontal_area(
self.parameters["height"],
self.parameters["width"],
)
else:
self.parameters["front_area"] = None
[docs] def add_fallback_data(self):
for parameter in VEHICLE_NEEDED_PARAMETERS:
if parameter not in self.parameters:
fallback = self.db.get_fallback_parameter(parameter)
self.parameters[parameter] = fallback
print(f"Fallback value {fallback} added for missing {parameter} parameter.")
def _pmr(self, power, curb_weight):
"""Calculates PMR.
Args:
power (int): Power of the vehicle.
curb_weight (int): Empty weight of the vehicle.
Returns:
int: PMR value for the vehicle
"""
return power * 1000 / curb_weight # W/kg
def _frontal_area(self, height, width):
"""
Calculate and returns size of frontal area.
Args:
height (float): Height of the vehicle.
width (float): Width of the vehicle.
Returns:
float: Frontal area of the vehicle in square.
"""
return height * width
[docs] def addtodb(self):
"""
Adds parameters, which are not None, to database.
Returns:
None, List of None parameters: Returns list of None parameters, which can not be added to database.
"""
nones = []
flag = False
for param in self.parameters:
if self.parameters[param] is None:
flag = True
nones.append(param)
if flag:
print(
"The model can not be added to the database. It has parameters with None as value"
)
return nones
self.add_calculated_param()
self.db.data.append(self.parameters)
print("Model added to the BEVspecs instance")
return None
[docs] def add(self, parameters_dict, msg=True):
"""
Adds parameter and associated value to the object.
Args:
parameters_dict (dict): Contains the name of the parameters and the corresponding value.
msg (bool, optional): Flag, whether to inform about added parameters. Defaults to True.
"""
for k, v in parameters_dict.items():
self.parameters[k] = v
if k in VEHICLE_NEEDED_PARAMETERS:
if msg:
print(f"{k} has been added to the model object. Required OK")
[docs]class MGefficiency:
def __init__(self, filename=None):
self.data = None
self.filename = filename
self._load_file(filename=self.filename)
self._get_codes()
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
def _load_file(self, filename=None):
"""
Loads bev efficiency from csv file.
Args:
filename (str, optional): Csv file with bev efficiency data. Defaults to None.
"""
if filename is None:
user_dir = USER_PATH or DEFAULT_DATA_DIR
self.filename = os.path.join(user_dir, MG_EFFICIENCY_FILE)
else:
self.filename = filename
with open(self.filename) as file:
self.data = pd.read_csv(file)
def _get_codes(self):
"""
Loads code data into self.load_fraction, self.motor and self.generator.
"""
self.load_fraction = self.data.load_fraction.values
self.motor = self.data.motor.values
self.generator = self.data.generator.values
[docs] def get_efficiency(self, load_fraction, g_m_code):
"""
g_m_code: 1 -> motor, -1 -> generator
#TODO DOCSTRING
Args:
load_fraction ([type]):
g_m_code (-1, 1): 1 for motor, -1 for generator
Raises:
Exception: Raised if g_m_code is not 1 or -1.
Returns:
type: [description]
"""
if g_m_code not in [1, -1]:
raise Exception(f"g_m_code is {g_m_code}. It should be 1 or -1")
if g_m_code == 1:
return self._get_efficiency(load_fraction, self.load_fraction, self.motor)
elif g_m_code == -1:
return self._get_efficiency(
np.abs(load_fraction), self.load_fraction, self.generator
)
@staticmethod
@numba.jit(nopython=True)
def _get_efficiency(load_fraction, load_fraction_values, column_values):
"""
#TODO DOCSTRING
Gets a one-dimensional linear interpolation of given arguments.
Args:
load_fraction ([type]): [description]
load_fraction_values ([type]): [description]
column_values ([type]): [description]
Returns:
float: Efficiency.
"""
return np.interp(load_fraction, load_fraction_values, column_values)
[docs]@numba.jit(nopython=True)
def acceleration(V0, V2): # V0, V2 km/h
"""
Calculate and returns acceleration.
Args:
V0 (float): Old speed.
V2 (float): New speed.
Returns:
float: Acceleration.
"""
acc = (V2 - V0) / 2 / 3.6 # acc m/s**2
return acc
[docs]@numba.jit(nopython=True)
def acceleration_array(speed_array):
"""
Calculates and returns acceleration array from speed_array.
The acceleration of the adjoining values is calculated.
Args:
speed_array (ndarray): Array with speed values.
Returns:
ndarray: Array with acceleration values.
"""
acc = np.zeros((speed_array.shape[0],))
acc[0] = acceleration(0, speed_array[1])
i = 0
for a, b in zip(speed_array[0:-2], speed_array[2:]):
i += 1
acc[i] = acceleration(a, b)
return acc
[docs]class DrivingCycle:
def __init__(self):
self.data = []
self.index_speed = None
user_dir = USER_PATH or DEFAULT_DATA_DIR
self.datafile = os.path.join(user_dir, DC_FILE)
# self.load_data()
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
[docs] def get_csv(self, csv_path=os.path.join(MODULE_DATA_PATH, "driving_cycles.csv")):
"""
Load csv as dataframe into self.dc_df
Args:
csv_path (str, optional): Path of file. Defaults to os.path.join(MODULE_DATA_PATH, "driving_cycles.csv").
"""
self.csv_path = csv_path
self.dc_df = pd.read_csv(self.csv_path, index_col="Seconds")
[docs] def create_data(self):
"""
Create self.data from self.dc_df.
"""
self.data = []
for i, dc_name in enumerate(self.dc_df.columns):
dc = {}
dc["name"] = dc_name
dc["type"] = dc_name.split("_")[0]
dc["id"] = i
dc["speed"] = {}
dc["speed"]["value"] = self.dc_df[dc_name].dropna().values
dc["speed"]["unit"] = "km/h"
dc["mean_speed"] = {}
dc["mean_speed"]["value"] = round(float(np.mean(dc["speed"]["value"])), 1)
dc["mean_speed"]["unit"] = "km/h"
dc["time"] = {}
dc["time"]["value"] = len(dc["speed"]["value"])
dc["time"]["unit"] = "s"
dc["distance"] = {}
dc["distance"]["value"] = round(float(sum(dc["speed"]["value"] / 3600)), 1)
dc["distance"]["unit"] = "km"
dc["normalized"] = {}
dc["normalized"]["value"] = np.round(
dc["speed"]["value"] / dc["mean_speed"]["value"], 4
)
dc["normalized"]["unit"] = ""
dc["acceleration"] = {}
dc["acceleration"]["value"] = np.round(
acceleration_array(dc["speed"]["value"]), 4
)
dc["acceleration"]["unit"] = "m/s**2"
dc["max_acceleration"] = {}
dc["max_acceleration"]["value"] = float(max(dc["acceleration"]["value"]))
dc["max_acceleration"]["unit"] = "m/s**2"
self.data.append(dc)
self._get_index_speed()
[docs] def save_data(self):
"""
Save self.tmpdata to file.
"""
self.tmpdata = []
i = -1
for dc in self.data:
i += 1
self.tmpdata.append(dc)
for key in dc.keys():
if isinstance(self.tmpdata[i][key], dict):
if isinstance(self.tmpdata[i][key]["value"], np.ndarray):
self.tmpdata[i][key]["value"] = self.tmpdata[i][key][
"value"
].tolist()
with open(self.datafile, "w") as file:
yaml.dump(self.tmpdata, file)
print(f"File saved: {self.datafile}")
[docs] def load_data(self):
"""
Load data from self.datafile to self.data.
"""
if os.path.isfile(self.datafile):
with open(self.datafile) as file:
self.tmpdata = yaml.load(file, Loader=yaml.FullLoader)
i = -1
for dc in self.tmpdata:
i += 1
self.data.append(dc)
for key in dc.keys():
if isinstance(self.data[i][key], dict):
if isinstance(self.data[i][key]["value"], list):
self.data[i][key]["value"] = np.array(
self.data[i][key]["value"]
)
self._get_index_speed()
else:
print(
f'File "{self.datafile}" does not exist!. You can create it from .csv file'
)
def _get_index_speed(self):
"""
Loads index speed in self.index_speed.
"""
self.index_speed = {}
classes = ["WLTC_3b", "WLTC_2"]
dc_types = list(set([t["type"] for t in self.data]))
dc_class = []
dc_full = []
for t in self.data:
flag = False
for entry in classes:
if entry in t["name"]:
flag = True
dc_class.append(entry)
if "full" in t["name"]:
dc_full.append(True)
else:
dc_full.append(False)
break
if not flag:
dc_class.append("none")
if "full" in t["name"]:
dc_full.append(True)
else:
dc_full.append(False)
for type_ in dc_types:
self.index_speed[type_] = {}
if "WLTC" == type_:
for cl in classes:
self.index_speed[type_][cl] = {"partial": {}, "full": {}}
else:
self.index_speed[type_]["none"] = {"partial": {}, "full": {}}
for s in self.data:
if dc_full[s["id"]]:
key = "full"
else:
key = "partial"
self.index_speed[s["type"]][dc_class[s["id"]]][key][s["id"]] = {
"value": s["mean_speed"]["value"],
"unit": s["mean_speed"]["unit"],
}
def _select_driving_cycle_index(self, driving_cycle_type, speed, PMR, full_driving_cycle):
"""
#TODO DOCSTRING
Args:
driving_cycle_type (str): Type of driving cycle. E.g. 'WLTC'.
speed (dict): Speed value. E.g. {'value': 11.11, 'unit': 'm/s'}
PMR (float):
full_driving_cycle (fool): [description]
Returns:
[type]: [description]
"""
WLTC_class = "none"
if driving_cycle_type == "WLTC":
if PMR is not None:
if PMR > 34:
WLTC_class = "WLTC_3b"
elif 22 < PMR <= 34:
WLTC_class = "WLTC_2"
idx = []
spd = []
if not full_driving_cycle:
for k, v in self.index_speed[driving_cycle_type][WLTC_class][
"partial"
].items():
idx.append(k)
driving_cycle_type_average_speed = Unit(v["value"], v["unit"]).convert_to("m/s").val
trip_average_speed = Unit(speed["value"], speed["unit"]).convert_to("m/s").val
spd.append(-abs(driving_cycle_type_average_speed - trip_average_speed))
return idx[np.argmax(spd)]
else:
return list(
self.index_speed[driving_cycle_type][WLTC_class]["full"].keys()
)[0]
[docs] def driving_cycle(self, trip, model, full_driving_cycle=False):
"""
Calculates driving cycle from Trip and ModelSpecs object.
Args:
trip (Trip): Trip for the driving cycle.
model (ModelSpecs): Vehicle Model for the driving cycle.
full_driving_cycle (bool, optional): [description]. Defaults to False.
"""
trip.index = self._select_driving_cycle_index(
trip.driving_cycle_type,
trip._mean_speed,
model.parameters["pmr"],
full_driving_cycle,
)
if full_driving_cycle:
temp = Unit(self.data[trip.index]["time"]["value"], self.data[trip.index]["time"]["unit"])
temp.convert_to("s")
trip.duration["value"] = temp.val
temp = Unit(self.data[trip.index]["distance"]["value"], self.data[trip.index]["distance"]["unit"])
temp.convert_to("m")
trip.distance["value"] = temp.val
trip.get_mean_speed()
# Todo use Unit class for vector calculations
trip.time["value"] = np.ceil(Unit(trip._duration["value"], trip._duration["unit"]).convert_to("s").val)
trip.time["unit"] = "s"
scale = (trip.time["value"]
// Unit(self.data[trip.index]["time"]["value"], self.data[trip.index]["time"]["unit"]).convert_to("s").val
)
slide = (
trip.time["value"]
% Unit(self.data[trip.index]["time"]["value"], self.data[trip.index]["time"]["unit"]).convert_to("s").val
)
normalized = self.data[trip.index]["normalized"]["value"]
normalized_array = np.array(list(normalized) * int(scale) + list(normalized)[0: int(slide)])
speed_array = (
normalized_array
* Unit(trip._mean_speed["value"], trip._mean_speed["unit"]).convert_to("m/s").val
)
i = 0
for last_secs in range(-20, 0):
i += 1
calc = (
speed_array[last_secs - 1] - speed_array[last_secs - 1] * (i / 100) * 2
)
speed_array[last_secs] = max(0, calc)
trip.speed["value"] = speed_array
trip.speed["unit"] = "m/s"
trip.acceleration["value"] = acceleration_array(speed_array)
trip.acceleration["unit"] = "m/s**2"
trip.driving_cycle_name = self.data[trip.index]["name"]
[docs]class Trips:
def __init__(self):
self.quantity = 0
self.trips = []
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
[docs] def get_code(self):
"""
Returns trip code based on quantity of trip plus 1.
Returns:
int: Trip code.
"""
code = self.quantity
self.quantity += 1
return code
[docs] def add_trip(self, trip):
"""
Adds single trip to trips object.[summary]
Args:
trip (Trip): Object to be added to the trips collection.
"""
self.trips.append(trip)
[docs] def get_trip(self, code):
"""
Returns specific trip based on code.
Args:
code (int): Code of the requested trip.
Returns:
Trip: Requested trip object.
"""
return self.trips[code]
[docs]class Trip:
def __init__(self, trips):
self.code = trips.get_code()
self.origin = None
self.destination = None
self.distance = {"value": None, "unit": None}
self._distance = {"value": None, "unit": None}
self.duration = {"value": None, "unit": None}
self._duration = {"value": None, "unit": None}
self.start_trip_time = {"value": None, "unit": None}
self.end_trip_time = {"value": None, "unit": None}
self.mean_speed = {"value": None, "unit": None}
self._mean_speed = {"value": None, "unit": None}
self.driving_cycle_type = None
self.driving_cycle_name = None
self.acceleration = {"value": None, "unit": None}
self.speed = {"value": None, "unit": None}
self.time = {"value": None, "unit": None}
self.index = None
self.balance = None
self.results = {}
self.rate = {"value": None, "unit": None}
self.consumption = {"value": None, "unit": None}
trips.add_trip(self)
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
[docs] def add_distance_duration(self, distance, duration):
"""
Sets loaded distance and duration to self.distance and self.duration and saves mean_speed.
Args:
distance (dict): Dictionary containing value and unit. E.g. {'value': 10.0, 'unit': 'km'}
duration (dict): Dictionary containing value and unit. E.g. {'value': 15.0, 'unit': 'min'}
"""
self.distance["value"] = distance["value"]
self.distance["unit"] = distance["unit"]
self.duration["value"] = duration["value"]
self.duration["unit"] = duration["unit"]
self.get_mean_speed()
[docs] def get_mean_speed(self):
"""
Calculates mean_speed ans saves it in object attribute.
"""
self._distance["value"] = Unit(self.distance["value"], self.distance["unit"]).convert_to("m").val
self._distance["unit"] = "m"
self._duration["value"] = Unit(self.duration["value"], self.duration["unit"]).convert_to("s").val
self._duration["unit"] = "s"
self.mean_speed = {
"value": self.distance["value"] / self.duration["value"],
"unit": self.distance["unit"] + "/" + self.duration["unit"],
}
self._mean_speed = {
"value": self._distance["value"] / self._duration["value"],
"unit": self._distance["unit"] + "/" + self._duration["unit"],
}
[docs]class HeatInsulation:
def __init__(self, default=False):
self.flag = True
self.zone_layers = {}
self.zone_surface = {}
self.layer_conductivity = {}
self.layer_thickness = {}
self.layer_names = None
self.zone_names = None
if default:
self._layers_name(LAYER_NAMES)
self._zones_name(ZONE_NAMES)
self.zone_layers = ZONE_LAYERS
self.zone_surface = ZONE_SURFACE
self.layer_conductivity = LAYER_CONDUCTIVITY
self.layer_thickness = LAYER_THICKNESS
self.flag = False
self._makearrays()
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
def _layers_name(self, name_list):
"""
Sets heat insulation layers and default value for each material in name_list.
E.g.
Args:
name_list (list): Contains names of heat insulation materials.
"""
self.layer_names = name_list
for nm in name_list:
self.layer_conductivity[nm] = None
self.layer_thickness[nm] = None
def _zones_name(self, name_list):
"""
Sets the vehicle zones.
E.g. 'lateral_windows', 'windshields' ..
Args:
name_list (list): Contains names of zones.
"""
self.zone_names = name_list
for nm in name_list:
self.zone_layers[nm] = None
self.zone_surface[nm] = None
[docs] def compile(self):
"""
Set all zones from self.zone_names to self.zone_layers.
"""
for zone in self.zone_names:
layer = {}
for lyr in self.layer_names:
layer[lyr] = None
self.zone_layers[zone] = layer
self._check()
[docs] def show(self):
"""
Prints Heat Insulation attributes.
"""
print("zone_layers:")
print(self.zone_layers)
print("zone_surface:")
print(self.zone_surface)
print("layer_conductivity:")
print(self.layer_conductivity)
print("layer_thickness:")
print(self.layer_thickness)
self._check()
def _check(self):
"""
Checks for None values in self.zone_names and self.layer_names.
"""
flag = False
for zone in self.zone_names:
for lyr in self.layer_names:
if self.zone_layers[zone][lyr] is None:
print(
f"{self.__class__.__name__}.zone_layers['{zone}']['{lyr}'] = None"
)
flag = True
for zone in self.zone_names:
if self.zone_surface[zone] is None:
print(f"{self.__class__.__name__}.zone_surface['{zone}'] = None")
flag = True
for lyr in self.layer_names:
if self.layer_conductivity[lyr] is None:
print(f"{self.__class__.__name__}.layer_conductivity['{lyr}'] = None")
flag = True
for lyr in self.layer_names:
if self.layer_thickness[lyr] is None:
print(f"{self.__class__.__name__}.layer_thickness['{lyr}'] = None")
flag = True
if flag:
self.flag = True
print('There are still "None" fields. All fields must contain values.')
else:
self.flag = False
def _makearrays(self):
"""
Create np.array for some object attributes.
Attributes: zone_layers, zone_surface, layer_conductivity, layer_thickness
"""
if not self.flag:
z_l = []
for zone in self.zone_layers.keys():
z_l.append(list(self.zone_layers[zone].values()))
self.zone_layers_ = np.array(z_l)
self.zone_surface_ = np.array(list(self.zone_surface.values()))
self.layer_conductivity_ = np.array(list(self.layer_conductivity.values()))
self.layer_thickness_ = np.array(list(self.layer_thickness.values()))
[docs]class Consumption:
def __init__(self, inpt, ev_model):
self.profile = None
self.kind = "consumption"
self.input = inpt
self.vehicle = ev_model
self._ev_par_test()
self.brand = "_".join(self.vehicle.parameters["Brand"].split(" "))
self.model = "_".join(self.vehicle.parameters["EV Model"].split(" "))
self.year = str(int(self.vehicle.parameters["Model year"]))
self.name = (
self.input
+ "_"
+ self.brand
+ "_"
+ self.model
+ "_"
+ self.year
+ "_"
+ uuid.uuid4().hex[0:5]
)
self.COP = {
"heating": self.vehicle.parameters["hvac_cop_heating"],
"cooling": self.vehicle.parameters["hvac_cop_cooling"],
}
self.TARGET_TEMP = TARGET_TEMP
def __getattr__(self, item):
check_for_new_function_name(item)
# if the return value is not callable, we get TypeError:
def _ev_par_test(self):
"""
Checks whether all required parameters have been saved in the object.
Raises:
Exception: Raised if parameter is missing.
"""
param_names = VEHICLE_NEEDED_PARAMETERS
param_missing = []
for wanted in param_names:
flag = False
for parameter in self.vehicle.parameters:
if wanted == parameter:
if not isinstance(self.vehicle.parameters[parameter], (int, float)):
flag = False
break
flag = True
break
if not flag:
param_missing.append(wanted)
if len(param_missing) == 0:
pass
else:
print("Missing parameters in vehicle object:")
for name in param_missing:
print(name)
raise Exception('Parameters missing, add these parameters to the ev_model object < '
'ev_model.add({"parameter_name":value}) >')
def _cop_and_target_temp(self, T_out):
"""
# TODO DOCSTRING
Args:
T_out ([type]): [description]
Returns:
[type]: [description]
"""
if T_out < self.TARGET_TEMP["heating"]:
T_targ = self.TARGET_TEMP["heating"]
cop = self.COP["heating"]
flag = 1
elif T_out > self.TARGET_TEMP["cooling"]:
T_targ = self.TARGET_TEMP["cooling"]
cop = self.COP["cooling"]
flag = -1
else:
T_targ = None
cop = 1
flag = 0
return T_targ, cop, flag
[docs] def load_setting_mobility(self, DataBase):
"""
Load certain attributes of the object with data from the transferred database.
Attributes:
- self.df
- self.t
- self.totalrows
- self.hours
- self.freq
- self.refdate
- self.energy_consumption
- self.states
Args:
DataBase (DataBase()): Database from which the data is to be loaded.
Raises:
ValueError: A driving profile can not be found in the database.
"""
if DataBase.db[self.input]:
if DataBase.db[self.input]["kind"] == "driving":
self.profile = DataBase.db[self.input]["profile"].copy()
self.t = DataBase.db[self.input]["t"]
self.totalrows = DataBase.db[self.input]["totalrows"]
self.hours = DataBase.db[self.input]["hours"]
self.freq = TIME_FREQ[self.t]["f"]
self.refdate = DataBase.db[self.input]["refdate"]
self.states = DataBase.db[self.input]["states"]
else:
raise ValueError(
"The driving profile {} can not be found in the database".format(
self.input
)
)
else:
raise ValueError(
"The driving profile {} can not be found in the database".format(
self.input
)
)
[docs] def run(
self,
heat_insulation,
weather_country="DE",
weather_year=2016,
passenger_mass=75,
passenger_sensible_heat=70,
passenger_nr=1.5,
air_cabin_heat_transfer_coef=10,
air_flow=0.01,
driving_cycle_type="WLTC",
road_type=0,
wind_speed=0,
road_slope=0,
):
"""
#TODO Docstring
Args:
heat_insulation (object): [description]
weather_country (str, optional: [description]. Defaults to "DE".
weather_year (int, optional): [description]. Defaults to 2016.
passenger_mass (int, optional): Passenger mass in kg. Defaults to 75.
passenger_sensible_heat (int, optional): Passenger sensible heat in W. Defaults to 70.
air_cabin_heat_transfer_coef (int, optional): Coefficient in W/(m2K). Defaults to 10.
air_flow (float, optional): Ranges from 0.02 (high ventilation) to 0.001 (minimum ventilation) in me/s.
Defaults to 0.01.
driving_cycle_type (str, optional): [desc]. Defaults to "WLTC".
road_type (int, optional): See function rolling_resistance_coef(method='M1') if an integer then all trips
have the same value, if list must fit the length of the profile. Defaults to 0.
wind_speed (int, optional): m/s if an integer then all trips have the same value, if list must fit the
length of the profile. Defaults to 0.
road_slope (int, optional): Radians if an integer then all trips have the same value, if list must fit the
length of the profile. Defaults to 0.
Raises:
Exception: [description]
"""
self.heat_insulation = heat_insulation
self.weather_country = weather_country
self.weather_year = weather_year
self.passenger_mass = passenger_mass
self.passenger_sensible_heat = passenger_sensible_heat
self.passenger_nr = passenger_nr
self.air_flow = air_flow
self.driving_cycle_type = driving_cycle_type
self.road_type = road_type
self.wind_speed = wind_speed
self.road_slope = road_slope
self.transmission_eff = self.vehicle.parameters["transmission_eff"]
self.battery_discharge_eff = self.vehicle.parameters[
"battery_discharging_eff"
]
self.battery_charge_eff = self.vehicle.parameters["battery_charging_eff"]
self.air_cabin_heat_transfer_coef = air_cabin_heat_transfer_coef
self.auxiliary_power = self.vehicle.parameters["auxiliary_power"] * 1000
self.cabin_volume = self.vehicle.parameters["cabin_volume"]
if heat_insulation.flag:
raise Exception("heat_insulation object is not complete. Test the object with the method check() before "
"including it as argument")
print("New profile running: " + self.name)
logger.info("###################################################")
logger.info("===================================================")
logger.info("New profile running: " + self.name)
logger.info("===================================================")
logger.info("###################################################")
self.profile = self.profile[["hr", "state", "distance", "trip_duration"]].copy()
self.profile.loc[:, "datetime"] = pd.to_datetime(self.refdate) + (
self.profile.hr * 60
).astype("timedelta64[m]")
self.profile = self.profile.set_index("datetime")
self.profile = self.profile.sort_index().reset_index()
self.profile.loc[:, "speed km/h"] = (
self.profile["distance"] / self.profile["trip_duration"] * 60
)
self.profile.loc[:, "wind_m/s"] = wind_speed
self.profile.loc[:, "slope_rad"] = road_slope
self.profile.loc[:, "road_type"] = road_type
wt = Weather()
D = wt.humidair_density
temp_arr = wt.temp(weather_country, weather_year)
pres_arr = wt.pressure(weather_country, weather_year)
dp_arr = wt.dewpoint(weather_country, weather_year)
hum_arr = wt.calc_rel_humidity(dp_arr, temp_arr)
r_ha = wt.humidair_density(temp_arr, pres_arr, h=hum_arr)
self.profile = include_weather(
self.profile, self.refdate, temp_arr, pres_arr, dp_arr, hum_arr, r_ha
)
self.η = MGefficiency()
self.Trips = Trips()
dc = DrivingCycle()
dc.load_data()
total = len(self.profile[self.profile["state"] == "driving"])
current = 1
for i, row in self.profile.iterrows():
if row["state"] == "driving":
consumption_progress(current, total)
current += 1
trip = Trip(self.Trips)
trip.driving_cycle_type = driving_cycle_type
trip.add_distance_duration(
distance={"value": row["distance"], "unit": "km"},
duration={"value": row["trip_duration"], "unit": "min"},
)
dc.driving_cycle(trip, self.vehicle, full_driving_cycle=False)
v = trip.speed["value"] # m/s
acc = trip.acceleration["value"] # m/s2
targ_temp, cop, ret = self._cop_and_target_temp(row["temp_degC"])
frontal_area = self.vehicle.parameters["front_area"]
P_max = (
self.vehicle.parameters["power"] * 1000
) # kW to W
f_d = self.vehicle.parameters["drag_coeff"]
f_r = rolling_resistance_coeff(
method="M1",
temp=row["temp_degC"],
v=v * 3.6,
road_type=row["road_type"],
)
# f_r = rolling_resistance_coeff(method='M2', v=v*3.6, tire_type=0, road_type=4)
m_i = self.vehicle.parameters["inertial_mass"]
m_c = self.vehicle.parameters["curb_weight"]
m_v = vehicle_mass(m_c, passenger_mass * passenger_nr)
P_rol = prollingresistance(f_r, m_v, GRAVITY, v)
P_air = pairdrag(
row["air_density_kg/m3"], frontal_area, f_d, v, row["wind_m/s"]
) # last arg is wind speed
P_g = p_gravity(
m_v, GRAVITY, v, row["slope_rad"]
) # last arg is road slope
P_ine = pinertia(m_i, m_v, acc, v)
P_wheel = p_wheel(P_rol, P_air, P_g, P_ine)
P_m_o = p_motorout(P_wheel, self.transmission_eff)
n_rb = EFFICIENCYregenerative_braking(acc)
P_gen_in = p_generatorin(P_wheel, self.transmission_eff, n_rb)
Load_p_m = P_m_o / P_max
Load_p_g = P_gen_in / P_max
n_mot = self.η.get_efficiency(Load_p_m, 1)
n_gen = self.η.get_efficiency(Load_p_g, -1)
P_m_in = p_motorin(P_m_o, n_mot)
P_g_out = p_generatorout(P_gen_in, n_gen)
P_aux = np.array([self.auxiliary_power] * len(v))
Q_hvac, Tcabin = qhvac(
D,
row["temp_degC"],
targ_temp,
self.cabin_volume,
air_flow,
heat_insulation.zone_layers_,
heat_insulation.zone_surface_,
heat_insulation.layer_conductivity_,
heat_insulation.layer_thickness_,
v,
Q_sensible=passenger_sensible_heat,
persons=passenger_nr,
air_cabin_heat_transfer_coef=air_cabin_heat_transfer_coef,
)
P_hvac = np.abs(Q_hvac[:, 0]) / cop
P_gen_bat_charg = P_g_out * self.battery_charge_eff * -1
P_bat = (P_m_in + P_aux + P_hvac) / self.battery_discharge_eff
# section to calculate consumption
P_all = P_m_in + P_aux + P_hvac + P_g_out
P_all_negative = P_all.copy()
P_all_negative[P_all_negative > 0.0] = 0.0
P_all_positive = P_all.copy()
P_all_positive[P_all_positive < 0.0] = 0.0
P_bat_chg = P_all_negative * self.battery_charge_eff
P_bat_dischg = P_all_positive / self.battery_discharge_eff
P_bat_actual = np.add(P_bat_dischg, P_bat_chg) # W
consumption = P_bat_actual.sum() / 1000 / 3600 # kWh
rate = consumption / v.sum() * 100000 # kWh/100 km
# Add variables to trip object: International units (power in W)
trip.results["targ_temp"] = targ_temp
trip.results["cop"] = cop
trip.results["ret"] = ret
trip.results["frontal_area"] = frontal_area
trip.results["P_max"] = P_max
trip.results["Drag_coeff"] = f_d
trip.results["roll_res_coeff"] = f_r
trip.results["m_i"] = m_i
trip.results["m_c"] = m_c
trip.results["m_v"] = m_v
trip.results["P_rol"] = P_rol
trip.results["P_air"] = P_air
trip.results["P_g"] = P_g
trip.results["P_ine"] = P_ine
trip.results["P_wheel"] = P_wheel
trip.results["P_gen_in"] = P_gen_in
trip.results["Load_p_m"] = Load_p_m
trip.results["Load_p_g"] = Load_p_g
trip.results["n_mot"] = n_mot
trip.results["n_gen"] = n_gen
trip.results["P_m_in"] = P_m_in
trip.results["P_g_out"] = P_g_out
trip.results["P_aux"] = P_aux
trip.results["Q_hvac"] = Q_hvac
trip.results["Tcabin"] = Tcabin
trip.results["Tout"] = row["temp_degC"]
trip.results["P_hvac"] = P_hvac
trip.results["P_gen_bat_charg"] = P_gen_bat_charg
trip.results["P_bat"] = P_bat # only all positive loads
trip.results[
"P_bat_actual"
] = P_bat_actual # positive load after generation subtraction and negative load (generation) after
# positive loads subtraction
# Variable for the balance
P_wheel_pos = P_wheel[P_wheel > 0].sum() # Ws
P_wheel_neg = P_wheel[P_wheel < 0].sum() * -1 # Ws
P_m_o_t = P_m_o.sum() # Ws
P_gen_in_t = P_gen_in.sum() * -1 # Ws
P_m_in_t = P_m_in.sum() # Ws
P_g_out_t = P_g_out.sum() * -1 # Ws
P_aux_t = P_aux.sum() # Ws
P_hvac_t = P_hvac.sum() # Ws
heat_source = np.abs(Q_hvac[:, 0]).sum() - P_hvac_t # Ws
P_gen_bat_charg_t = P_gen_bat_charg.sum() # Ws
P_gen_bat_dischg_t = (
P_gen_bat_charg_t * self.battery_discharge_eff
) # Ws
P_bat_t = P_bat.sum() # Ws
trip.consumption[
"value"
] = consumption # the only option this value be to small or negative is the ev goes downhill most of
# the trip
trip.consumption["unit"] = "kWh"
trip.rate["value"] = rate
trip.rate["unit"] = "kWh/100 km"
loss_gen = P_gen_in_t - P_g_out_t
loss_trans_m = P_m_o_t - P_wheel_pos
loss_trans_g = P_wheel_neg - P_gen_in_t
loss_motor = P_m_in_t - P_m_o_t
loss_gen_bat_charg = P_gen_bat_charg_t * (1 - self.battery_charge_eff)
loss_gen_bat_dischg = P_gen_bat_charg_t * (
1 - self.battery_discharge_eff
)
loss_bat = P_bat_t * (1 - self.battery_discharge_eff)
if ret == 1:
cooling = 0
heating = P_hvac_t + heat_source
elif ret == -1:
cooling = P_hvac_t + heat_source
heating = 0
elif ret == 0:
cooling = 0
heating = 0
# data for sankey diagram
j = np.zeros((v.shape[0], 7))
j[:, 0] = P_rol
j[:, 1] = P_air
j[:, 2] = P_g
j[:, 3] = P_ine
j[:, 4] = np.sum(j[:, 0:4], axis=1)
j[:, 5] = j[:, 4]
j[np.where(j[:, 5] > 0.0), 5] = 0
j[:, 6] = j[:, 4]
j[np.where(j[:, 6] < 0.0), 6] = 0
ig = np.zeros((v.shape[0], 4))
ig[np.where(j[:, 5] < 0.0), 0:4] = j[np.where(j[:, 5] < 0.0), 0:4]
ig[np.where(ig[:, 0] > 0.0), 0] = 0
ig[np.where(ig[:, 1] > 0.0), 1] = 0
ig[np.where(ig[:, 2] > 0.0), 2] = 0
ig[np.where(ig[:, 3] > 0.0), 3] = 0
xg = np.true_divide(
ig,
ig.sum(axis=1, keepdims=True),
out=np.zeros_like(ig),
where=ig.sum(axis=1, keepdims=True) != 0,
)
yg = (xg.T * j[:, 5]).T * -1
zg = yg.sum(axis=0)
ip = np.zeros((v.shape[0], 4))
ip[np.where(j[:, 6] > 0.0), 0:4] = j[np.where(j[:, 6] > 0.0), 0:4]
ip[np.where(ip[:, 0] < 0.0), 0] = 0
ip[np.where(ip[:, 1] < 0.0), 1] = 0
ip[np.where(ip[:, 2] < 0.0), 2] = 0
ip[np.where(ip[:, 3] < 0.0), 3] = 0
xp = np.true_divide(
ip,
ip.sum(axis=1, keepdims=True),
out=np.zeros_like(ip),
where=ip.sum(axis=1, keepdims=True) != 0,
)
yp = (xp.T * j[:, 6]).T
zp = yp.sum(axis=0)
gra_neg = zg[2]
acc_neg = zg[3]
rol_pos = zp[0]
air_pos = zp[1]
gra_pos = zp[2]
acc_pos = zp[3]
self.profile.loc[i, "consumption kWh/100 km"] = rate
self.profile.loc[i, "consumption kWh"] = consumption
self.profile.loc[i, "battery discharge kWh"] = P_bat_t / 3600 / 1000
self.profile.loc[i, "regeneration kWh"] = (
P_gen_bat_dischg_t / 3600 / 1000
)
self.profile.loc[i, "auxiliary kWh"] = P_aux_t / 3600 / 1000
self.profile.loc[i, "hvac kWh"] = P_hvac_t / 3600 / 1000
self.profile.loc[i, "motor in kWh"] = P_m_in_t / 3600 / 1000
self.profile.loc[i, "transmission in kWh"] = P_m_o_t / 3600 / 1000
self.profile.loc[i, "wheel kWh"] = P_wheel_pos / 3600 / 1000
self.profile.loc[i, "rolling res kWh"] = rol_pos / 3600 / 1000
self.profile.loc[i, "air res kWh"] = air_pos / 3600 / 1000
self.profile.loc[i, "gravity kWh"] = gra_pos / 3600 / 1000
self.profile.loc[i, "acceleration kWh"] = acc_pos / 3600 / 1000
self.profile.loc[i, "trip code"] = trip.code
stv = [
["Heat source", "HVAC", heat_source / 3600 / 1000],
["Potential energy", "Gravity force", gra_neg / 3600 / 1000],
[
"Battery",
"Discharge",
(P_bat_t - P_gen_bat_charg_t) / 3600 / 1000,
],
[
"Discharge",
"Losses",
(loss_bat + loss_gen_bat_dischg) / 3600 / 1000,
],
["Discharge", "HVAC", P_hvac_t / 3600 / 1000],
["Discharge", "Auxiliary", P_aux_t / 3600 / 1000],
["Discharge", "Motor", P_m_in_t / 3600 / 1000],
[
"reg_braking",
"Discharge",
P_gen_bat_dischg_t / 3600 / 1000,
],
[
"reg_braking",
"Losses",
loss_gen_bat_charg / 3600 / 1000,
],
["HVAC", "Cooling", cooling / 3600 / 1000],
["HVAC", "Heating", heating / 3600 / 1000],
["Motor", "Transmission of traction", P_m_o_t / 3600 / 1000],
["Motor", "Losses", loss_motor / 3600 / 1000],
["Transmission of traction", "Wheel", P_wheel_pos / 3600 / 1000],
["Transmission of traction", "Losses", loss_trans_m / 3600 / 1000],
["Wheel", "Rolling resistance", rol_pos / 3600 / 1000],
["Wheel", "Air resistance", air_pos / 3600 / 1000],
["Wheel", "Gravity force", gra_pos / 3600 / 1000],
["Wheel", "Acceleration force", acc_pos / 3600 / 1000],
["Rolling resistance", "Losses", rol_pos / 3600 / 1000],
["Air resistance", "Losses", air_pos / 3600 / 1000],
["Gravity force", "Kinetic energy", gra_neg / 3600 / 1000],
["Gravity force", "Losses", (gra_pos - gra_neg) / 3600 / 1000],
["Acceleration force", "Kinetic energy", acc_neg / 3600 / 1000],
["Acceleration force", "Losses", (acc_pos - acc_neg) / 3600 / 1000],
[
"Kinetic energy",
"Transmission of regenerative",
(acc_neg + gra_neg) / 3600 / 1000,
],
[
"Transmission of regenerative",
"Generator",
P_gen_in_t / 3600 / 1000,
],
[
"Transmission of regenerative",
"Losses",
loss_trans_g / 3600 / 1000,
],
["Generator", "reg_braking", P_g_out_t / 3600 / 1000],
["Generator", "Losses", loss_gen / 3600 / 1000],
["Cooling", "Losses", cooling / 3600 / 1000],
["Heating", "Losses", heating / 3600 / 1000],
["Auxiliary", "Losses", P_aux_t / 3600 / 1000],
]
link_label = []
for lk in stv:
llk = [lk[0], lk[1], str(round(lk[2], 1))]
link_label.append("->".join(llk))
sort = np.array(stv, dtype=object)
s = sort.T[0].tolist()
t = sort.T[1].tolist()
v = sort.T[2]
balance = {}
balance["label"] = [
"Heat source",
"Potential energy",
"Battery",
"Discharge",
"reg_braking",
"HVAC",
"Motor",
"Generator",
"Transmission of traction",
"Wheel",
"Kinetic energy",
"Cooling",
"Heating",
"Auxiliary",
"Gravity force",
"Acceleration force",
"Rolling resistance",
"Air resistance",
"Losses",
"Transmission of regenerative",
]
balance["source"] = [balance["label"].index(i) for i in s]
balance["target"] = [balance["label"].index(i) for i in t]
balance["value"] = v
balance["link_label"] = link_label
balance["data"] = stv
trip.balance = balance
print("")
self._fill_rows()
def _fill_rows(self):
"""
Sets data for many attributes and is executed in self.run.
"""
repeats = [
"hr",
"state",
"distance",
"consumption",
"trip_duration",
# "trip code",
]
fixed = ["consumption kWh"]
calcu = ["dayhrs"]
mixed = repeats + fixed
df = self.profile.copy(deep=True)
df['consumption'] = df["consumption kWh"]
df = df[mixed]
self.timeseries = pd.DataFrame(columns=mixed)
self.timeseries.loc[:, "hh"] = np.arange(0, self.hours, self.t)
idxx = self.timeseries[self.timeseries["hh"].isin(df["hr"].tolist())].index.tolist()
for r in mixed:
vall = df[r].values.tolist()
self.timeseries.loc[idxx, r] = vall
self.timeseries.loc[self.totalrows - 1, "state"] = df["state"].iloc[-1]
self.timeseries.loc[self.totalrows - 1, "hr"] = self.timeseries["hh"][self.totalrows - 1]
rp = self.timeseries[::-1].reset_index(drop=True)
rp.loc[:, repeats] = rp[repeats].fillna(method="ffill")
rp.loc[:, fixed] = rp[fixed].fillna(0)
self.timeseries = rp[::-1].reset_index(drop=True)
for cal in calcu:
self.timeseries.loc[:, cal] = self.timeseries["hh"].apply(lambda x: x % 24)
self.timeseries = add_column_datetime(
self.timeseries, self.totalrows, self.refdate, self.t
)
self.timeseries.loc[:, "count"] = self.timeseries.groupby(["hr", "state"])["state"].transform("count")
self.timeseries['consumption'] = (self.timeseries['consumption'] * self.timeseries['distance']).fillna(0) / (
self.timeseries['distance'].replace(0,
1)) # this is an artifact to make zero the consumption when distance is zero
self.timeseries.loc[:, "consumption"] = (
self.timeseries.loc[:, "consumption"] / self.timeseries.loc[:, "count"]
)
self.timeseries.loc[:, "distance"] = (
self.timeseries.loc[:, "distance"] / self.timeseries.loc[:, "count"]
)
self.timeseries = self.timeseries[["hh", "state", "distance", "consumption"]]
[docs] def save_profile(self, folder, description=" "):
"""
Saves object profile as a pickle file.
Args:
folder (str): Where the files will be stored. Folder is created in case it does not exist.
description (str, optional): Description which can be saved in object attribute. Defaults to " ".
"""
self.description = description
os.makedirs(folder, exist_ok=True)
filepath = os.path.join(folder, self.name + ".pickle")
with gzip.open(filepath, "wb") as file:
pickle.dump(self.__dict__, file)
print("=== profile saved === : " + filepath)
logger.info("=== profile saved === : " + filepath)