"""
This module presents an example of an implementation of a `grid2op.Backend` when using the powerflow
implementation "pandapower" available at `PandaPower <https://www.pandapower.org/>`_ for more details about
this backend. This file is provided as an example of a proper :class:`grid2op.Backend.Backend` implementation.
This backend currently does not work with 3 winding transformers and other exotic object.
"""
import os # load the python os default module
import sys # laod the python sys default module
import copy
import warnings
import numpy as np
import pandas as pd
import pandapower as pp
import scipy
from grid2op.Backend.Backend import Backend
from grid2op.Action import BaseAction
from grid2op.Exceptions import *
try:
import numba
numba_ = True
except (ImportError, ModuleNotFoundError):
numba_ = False
warnings.warn("Numba cannot be loaded. You will gain possibly massive speed if installing it by "
"\n\t{} -m pip install numba\n".format(sys.executable))
import pdb
[docs]class PandaPowerBackend(Backend):
"""
As explained in the `grid2op.Backend` module, every module must inherit the `grid2op.Backend` class.
This class have more attributes that are used internally for faster information retrieval.
Attributes
----------
prod_pu_to_kv: :class:`numpy.array`, dtype:float
The ratio that allow the conversion from pair-unit to kv for the generators
load_pu_to_kv: :class:`numpy.array`, dtype:float
The ratio that allow the conversion from pair-unit to kv for the loads
lines_or_pu_to_kv: :class:`numpy.array`, dtype:float
The ratio that allow the conversion from pair-unit to kv for the origin end of the powerlines
lines_ex_pu_to_kv: :class:`numpy.array`, dtype:float
The ratio that allow the conversion from pair-unit to kv for the extremity end of the powerlines
p_or: :class:`numpy.array`, dtype:float
The active power flowing at the origin end of each powerline
q_or: :class:`numpy.array`, dtype:float
The reactive power flowing at the origin end of each powerline
v_or: :class:`numpy.array`, dtype:float
The voltage magnitude at the origin bus of the powerline
a_or: :class:`numpy.array`, dtype:float
The current flowing at the origin end of each powerline
p_ex: :class:`numpy.array`, dtype:float
The active power flowing at the extremity end of each powerline
q_ex: :class:`numpy.array`, dtype:float
The reactive power flowing at the extremity end of each powerline
a_ex: :class:`numpy.array`, dtype:float
The current flowing at the extremity end of each powerline
v_ex: :class:`numpy.array`, dtype:float
The voltage magnitude at the extremity bus of the powerline
"""
[docs] def __init__(self, detailed_infos_for_cascading_failures=False):
Backend.__init__(self, detailed_infos_for_cascading_failures=detailed_infos_for_cascading_failures)
self.prod_pu_to_kv = None
self.load_pu_to_kv = None
self.lines_or_pu_to_kv = None
self.lines_ex_pu_to_kv = None
self.p_or = None
self.q_or = None
self.v_or = None
self.a_or = None
self.p_ex = None
self.q_ex = None
self.v_ex = None
self.a_ex = None
self.load_p = None
self.load_q = None
self.load_v = None
self.prod_p = None
self.prod_q = None
self.prod_v = None
self._pf_init = "flat"
self._pf_init = "results"
self._nb_bus_before = 0
self.thermal_limit_a = None
self._iref_slack = None
self._id_bus_added = None
self._fact_mult_gen = -1
self._what_object_where = None
self._number_true_line = -1
self._corresp_name_fun = {}
self._get_vector_inj = {}
self.dim_topo = -1
self._vars_action = BaseAction.vars_action
self._vars_action_set = BaseAction.vars_action_set
# self._time_topo_vect = 0.
[docs] def get_nb_active_bus(self):
"""
Compute the amount of buses "in service" eg with at least a powerline connected to it.
Returns
-------
res: :class:`int`
The total number of active buses.
"""
return np.sum(self._grid.bus["in_service"])
@staticmethod
def _load_grid_load_p_mw(grid):
return grid.load["p_mw"]
@staticmethod
def _load_grid_load_q_mvar(grid):
return grid.load["q_mvar"]
@staticmethod
def _load_grid_gen_p_mw(grid):
return grid.gen["p_mw"]
@staticmethod
def _load_grid_gen_vm_pu(grid):
return grid.gen["vm_pu"]
[docs] def load_grid(self, path=None, filename=None):
"""
Load the _grid, and initialize all the member of the class. Note that in order to perform topological
modification of the substation of the underlying powergrid, some buses are added to the test case loaded. They
are set as "out of service" unless a topological action acts on these specific substations.
"""
# TODO read the name from the file if they are set...
if path is None and filename is None:
raise RuntimeError("You must provide at least one of path or file to laod a powergrid.")
if path is None:
full_path = filename
elif filename is None:
full_path = path
else:
full_path = os.path.join(path, filename)
if not os.path.exists(full_path):
raise RuntimeError("There is no powergrid at \"{}\"".format(full_path))
with warnings.catch_warnings():
# remove deprecationg warnings for old version of pandapower
warnings.filterwarnings("ignore", category=DeprecationWarning)
self._grid = pp.from_json(full_path)
# add the slack bus that is often not modeled as a generator, but i need it for this backend to work
bus_gen_added = None
i_ref = None
self._iref_slack = None
self._id_bus_added = None
pp.runpp(self._grid, numba=numba_)
if np.all(~self._grid.gen["slack"]):
# there are not defined slack bus on the data, i need to hack it up a little bit
pd2ppc = self._grid._pd2ppc_lookups["bus"] # pd2ppc[pd_id] = ppc_id
ppc2pd = np.argsort(pd2ppc) # ppc2pd[ppc_id] = pd_id
for i, el in enumerate(self._grid._ppc['gen'][:, 0]):
if int(el) not in self._grid._pd2ppc_lookups["bus"][self._grid.gen["bus"].values]:
if bus_gen_added is not None:
raise RuntimeError("Impossible to recognize the powergrid")
bus_gen_added = ppc2pd[int(el)]
i_ref = i
self._iref_slack = i_ref
self._id_bus_added = self._grid.gen.shape[0]
# see https://matpower.org/docs/ref/matpower5.0/idx_gen.html for details on the comprehension of self._grid._ppc
pp.create_gen(self._grid, bus_gen_added,
p_mw=self._grid._ppc['gen'][i_ref, 1],
vm_pu=self._grid._ppc['gen'][i_ref, 5],
min_p_mw=self._grid._ppc['gen'][i_ref, 9],
max_p_mw=self._grid._ppc['gen'][i_ref, 8],
max_q_mvar=self._grid._ppc['gen'][i_ref, 3],
min_q_mvar=self._grid._ppc['gen'][i_ref, 4],
slack=True,
controllable=True)
pp.runpp(self._grid, numba=numba_)
# this has the effect to divide by 2 the active power in the added generator, if this generator and the "slack bus"
# one are connected to the same bus.
# if not, it must not be done. So basically, i create a vector for which p and q for generator must be multiply
self._fact_mult_gen = np.ones(self._grid.gen.shape[0])
# self._fact_mult_gen[-1] += 1
# now extract the powergrid
self.n_line = copy.deepcopy(self._grid.line.shape[0]) + copy.deepcopy(self._grid.trafo.shape[0])
self.name_line = ['{from_bus}_{to_bus}_{id_powerline_me}'.format(**row, id_powerline_me=i)
for i, (_, row) in enumerate(self._grid.line.iterrows())]
transfo = [('{hv_bus}'.format(**row), '{lv_bus}'.format(**row))
for i, (_, row) in enumerate(self._grid.trafo.iterrows())]
transfo = [sorted(el) for el in transfo]
self.name_line += ['{}_{}_{}'.format(*el, i + self._grid.line.shape[0]) for i, el in enumerate(transfo)]
self.name_line = np.array(self.name_line)
self.n_gen = copy.deepcopy(self._grid.gen.shape[0])
self.name_gen = ["gen_{bus}_{index_gen}".format(**row, index_gen=i)
for i, (_, row) in enumerate(self._grid.gen.iterrows())]
self.name_gen = np.array(self.name_gen)
self.n_load = copy.deepcopy(self._grid.load.shape[0])
self.name_load = ["load_{bus}_{index_gen}".format(**row, index_gen=i)
for i, (_, row) in enumerate(self._grid.load.iterrows())]
self.name_load = np.array(self.name_load)
self.n_sub = copy.deepcopy(self._grid.bus.shape[0])
self.name_sub = ["sub_{}".format(i) for i, row in self._grid.bus.iterrows()]
self.name_sub = np.array(self.name_sub)
# number of elements per substation
self.sub_info = np.zeros(self.n_sub, dtype=np.int)
self.load_to_subid = np.zeros(self.n_load, dtype=np.int)
self.gen_to_subid = np.zeros(self.n_gen, dtype=np.int)
self.line_or_to_subid = np.zeros(self.n_line, dtype=np.int)
self.line_ex_to_subid = np.zeros(self.n_line, dtype=np.int)
self.load_to_sub_pos = np.zeros(self.n_load, dtype=np.int)
self.gen_to_sub_pos = np.zeros(self.n_gen, dtype=np.int)
self.line_or_to_sub_pos = np.zeros(self.n_line, dtype=np.int)
self.line_ex_to_sub_pos = np.zeros(self.n_line, dtype=np.int)
pos_already_used = np.zeros(self.n_sub, dtype=np.int)
self._what_object_where = [[] for _ in range(self.n_sub)]
# self._grid.line.sort_index(inplace=True)
# self._grid.trafo.sort_index(inplace=True)
# self._grid.gen.sort_index(inplace=True)
# self._grid.load.sort_index(inplace=True)
for i, (_, row) in enumerate(self._grid.line.iterrows()):
sub_or_id = int(row["from_bus"])
sub_ex_id = int(row["to_bus"])
self.sub_info[sub_or_id] += 1
self.sub_info[sub_ex_id] += 1
self.line_or_to_subid[i] = sub_or_id
self.line_ex_to_subid[i] = sub_ex_id
self.line_or_to_sub_pos[i] = pos_already_used[sub_or_id]
pos_already_used[sub_or_id] += 1
self.line_ex_to_sub_pos[i] = pos_already_used[sub_ex_id]
pos_already_used[sub_ex_id] += 1
self._what_object_where[sub_or_id].append(("line", "from_bus", i))
self._what_object_where[sub_ex_id].append(("line", "to_bus", i))
lag_transfo = self._grid.line.shape[0]
self._number_true_line = copy.deepcopy(self._grid.line.shape[0])
for i, (_, row) in enumerate(self._grid.trafo.iterrows()):
sub_or_id = int(row["hv_bus"])
sub_ex_id = int(row["lv_bus"])
self.sub_info[sub_or_id] += 1
self.sub_info[sub_ex_id] += 1
self.line_or_to_subid[i + lag_transfo] = sub_or_id
self.line_ex_to_subid[i + lag_transfo] = sub_ex_id
self.line_or_to_sub_pos[i + lag_transfo] = pos_already_used[sub_or_id]
pos_already_used[sub_or_id] += 1
self.line_ex_to_sub_pos[i + lag_transfo] = pos_already_used[sub_ex_id]
pos_already_used[sub_ex_id] += 1
self._what_object_where[sub_or_id].append(("trafo", "hv_bus", i))
self._what_object_where[sub_ex_id].append(("trafo", "lv_bus", i))
for i, (_, row) in enumerate(self._grid.gen.iterrows()):
sub_id = int(row["bus"])
self.sub_info[sub_id] += 1
self.gen_to_subid[i] = sub_id
self.gen_to_sub_pos[i] = pos_already_used[sub_id]
pos_already_used[sub_id] += 1
self._what_object_where[sub_id].append(("gen", "bus", i))
for i, (_, row) in enumerate(self._grid.load.iterrows()):
sub_id = int(row["bus"])
self.sub_info[sub_id] += 1
self.load_to_subid[i] = sub_id
self.load_to_sub_pos[i] = pos_already_used[sub_id]
pos_already_used[sub_id] += 1
self._what_object_where[sub_id].append(("load", "bus", i))
self._compute_pos_big_topo()
self.dim_topo = np.sum(self.sub_info)
# utilities for imeplementing apply_action
self._corresp_name_fun = {}
self._get_vector_inj = {}
self._get_vector_inj["load_p"] = self._load_grid_load_p_mw #lambda grid: grid.load["p_mw"]
self._get_vector_inj["load_q"] = self._load_grid_load_q_mvar #lambda grid: grid.load["q_mvar"]
self._get_vector_inj["prod_p"] = self._load_grid_gen_p_mw #lambda grid: grid.gen["p_mw"]
self._get_vector_inj["prod_v"] = self._load_grid_gen_vm_pu #lambda grid: grid.gen["vm_pu"]
# "hack" to handle topological changes, for now only 2 buses per substation
add_topo = copy.deepcopy(self._grid.bus)
add_topo.index += add_topo.shape[0]
add_topo["in_service"] = False
self._grid.bus = pd.concat((self._grid.bus, add_topo))
self.load_pu_to_kv = self._grid.bus["vn_kv"][self.load_to_subid].values
self.prod_pu_to_kv = self._grid.bus["vn_kv"][self.gen_to_subid].values
self.lines_or_pu_to_kv = self._grid.bus["vn_kv"][self.line_or_to_subid].values
self.lines_ex_pu_to_kv = self._grid.bus["vn_kv"][self.line_ex_to_subid].values
self._nb_bus_before = self.get_nb_active_bus()
self.thermal_limit_a = 1000 * np.concatenate((self._grid.line["max_i_ka"].values,
self._grid.trafo["sn_mva"].values / (np.sqrt(3) * self._grid.trafo["vn_hv_kv"].values)))
self.p_or = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.q_or = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.v_or = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.a_or = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.p_ex = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.q_ex = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.v_ex = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.a_ex = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self._nb_bus_before = None
# shunts data
self.n_shunt = self._grid.shunt.shape[0]
self.shunt_to_subid = np.zeros(self.n_shunt, dtype=np.int) - 1
name_shunt = []
for i, (_, row) in enumerate(self._grid.shunt.iterrows()):
bus = int(row["bus"])
name_shunt.append("shunt_{bus}_{index_shunt}".format(**row, index_shunt=i))
self.shunt_to_subid[i] = bus
self.name_shunt = np.array(name_shunt)
self.shunts_data_available = True
[docs] def apply_action(self, action: BaseAction):
"""
Specific implementation of the method to apply an action modifying a powergrid in the pandapower format.
"""
if not isinstance(action, BaseAction):
raise UnrecognizedAction("BaseAction given to PandaPowerBackend should be of class BaseAction and not "
"\"{}\"".format(action.__class__))
# change the _injection if needed
dict_injection, set_status, switch_status, set_topo_vect, switcth_topo_vect, redispatching, shunts = action()
for k in dict_injection:
if k in self._vars_action_set:
tmp = self._get_vector_inj[k](self._grid)
val = 1. * dict_injection[k]
ok_ind = np.isfinite(val)
if k == "prod_v":
pass
# convert values back to pu
val /= self.prod_pu_to_kv # self._grid.bus["vn_kv"][self._grid.gen["bus"]].values
if self._id_bus_added is not None:
# in this case the slack bus where not modeled as an independant generator in the
# original data
if np.isfinite(val[self._id_bus_added]):
# handling of the slack bus, where "2" generators are present.
pass
self._grid["ext_grid"]["vm_pu"] = val[self._id_bus_added]
tmp[ok_ind] = val[ok_ind]
else:
warn = "The key {} is not recognized by PandaPowerBackend when setting injections value.".format(k)
warnings.warn(warn)
if np.any(redispatching != 0.):
# print('before tmp[ok_ind]: {}'.format(self._get_vector_inj["prod_p"](self._grid)))
tmp = self._get_vector_inj["prod_p"](self._grid)
ok_ind = np.isfinite(redispatching)
tmp[ok_ind] += redispatching[ok_ind]
# print('after tmp[ok_ind]: {}'.format(self._get_vector_inj["prod_p"](self._grid)))
# shunts
if shunts:
arr_ = shunts["shunt_p"]
is_ok = np.isfinite(arr_)
self._grid.shunt["p_mw"][is_ok] = arr_[is_ok]
arr_ = shunts["shunt_q"]
is_ok = np.isfinite(arr_)
self._grid.shunt["q_mvar"][is_ok] = arr_[is_ok]
arr_ = shunts["shunt_bus"]
## turn off turned off shunt
turned_off = arr_ == -1
self._grid.shunt["in_service"][turned_off] = False
## turn on turned on shunt
turned_on = arr_ >= 1
self._grid.shunt["in_service"][turned_on] = True
## assign proper buses (1 = subid, 2 = subid + n_sub)
is_ok = arr_ > 0
bus_shunt = self.shunt_to_subid[is_ok]
bus_shunt[(arr_ == 2)[is_ok]] += self.n_sub
self._grid.shunt["bus"][is_ok] = bus_shunt
# topology
# run through all substations, find the topology. If it has changed, then update it.
beg_ = 0
end_ = 0
possiblechange = set_topo_vect != 0
if np.any(possiblechange) or np.any(switcth_topo_vect):
actual_topo_full = self.get_topo_vect()
if np.any(set_topo_vect[possiblechange] != actual_topo_full[possiblechange]) or np.any(switcth_topo_vect):
for sub_id, nb_obj in enumerate(self.sub_info):
nb_obj = int(nb_obj)
end_ += nb_obj
# extract all sub information
this_topo_set = set_topo_vect[beg_:end_]
this_topo_switch = switcth_topo_vect[beg_:end_]
actual_topo = copy.deepcopy(actual_topo_full[beg_:end_])
origin_topo = copy.deepcopy(actual_topo_full[beg_:end_])
# compute topology after action
if np.any(this_topo_switch):
# i need to switch some element
st = actual_topo[this_topo_switch] # st is between 1 and 2
st -= 1 # st is between 0 and 1
st *= -1 # st is 0 or -1
st += 2 # st is 2 or 1 (i switched 1 <-> 2 compared to the original values)
actual_topo[this_topo_switch] = st
if np.any(this_topo_set != 0):
# some buses have been set
sel_ = (this_topo_set != 0)
actual_topo[sel_] = this_topo_set[sel_]
# in case the topo vector is 2,2,2 etc. i convert it back to 1,1,1 etc.
actual_topo = actual_topo - np.min(actual_topo[actual_topo > 0.]) + 1
# implement in on the _grid
# change the topology in case it doesn't match the original one
if np.any(actual_topo != origin_topo):
nb_bus_before = len(np.unique(origin_topo[origin_topo > 0.])) # only count activated bus
nb_bus_now = len(np.unique(actual_topo[actual_topo > 0.])) # only count activated bus
if nb_bus_before > nb_bus_now:
# i must deactivate the unused bus
self._grid.bus["in_service"][sub_id + self.n_sub] = False
elif nb_bus_before < nb_bus_now:
# i must activate the new bus
self._grid.bus["in_service"][sub_id + self.n_sub] = True
# now assign the proper bus to each element
for i, (table, col_name, row_id) in enumerate(self._what_object_where[sub_id]):
self._grid[table][col_name].iloc[row_id] = sub_id if actual_topo[i] == 1 else sub_id + self.n_sub
# if actual_topo[i] <0:
# pdb.set_trace()
# self._grid[table][col_name].iloc[i] = sub_id if actual_topo[i] == 1 else sub_id + self.n_sub
beg_ += nb_obj
# change line status if needed
# note that it is a specification that lines status must override buses reconfiguration.
if np.any(set_status != 0.):
for i, el in enumerate(set_status):
# TODO performance optim here, it can be vectorized
if el == -1:
self._disconnect_line(i)
elif el == 1:
self._reconnect_line(i)
# switch line status if needed
if np.any(switch_status):
for i, el in enumerate(switch_status):
# TODO performance optim here, it can be vectorized
df = self._grid.line if i < self._number_true_line else self._grid.trafo
tmp = i if i < self._number_true_line else i - self._number_true_line
if el:
connected = df["in_service"].iloc[tmp]
if connected:
df["in_service"].iloc[tmp] = False
else:
bus_or = set_topo_vect[self.line_or_pos_topo_vect[i]]
bus_ex = set_topo_vect[self.line_ex_pos_topo_vect[i]]
if bus_ex == 0 or bus_or == 0:
raise InvalidLineStatus("Line {} was disconnected. The action switched its status, "
"without providing buses to connect it on both ends.".format(i))
# reconnection has then be handled in the topology
df["in_service"].iloc[tmp] = True
def _aux_get_line_info(self, colname1, colname2):
res = np.concatenate((self._grid.res_line[colname1].values, self._grid.res_trafo[colname2].values))
return res
[docs] def runpf(self, is_dc=False):
"""
Run a power flow on the underlying _grid. This implements an optimization of the powerflow
computation: if the number of
buses has not changed between two calls, the previous results are re used. This speeds up the computation
in case of "do nothing" action applied.
"""
# print("I called runpf")
conv = True
nb_bus = self.get_nb_active_bus()
try:
with warnings.catch_warnings():
# remove the warning if _grid non connex. And it that case load flow as not converged
warnings.filterwarnings("ignore", category=scipy.sparse.linalg.MatrixRankWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
if nb_bus == self._nb_bus_before:
self._pf_init = "results"
init_vm_pu = "results"
init_va_degree = "results"
else:
self._pf_init = "auto"
init_vm_pu = None
init_va_degree = None
if is_dc:
pp.rundcpp(self._grid, check_connectivity=False)
self._nb_bus_before = None # if dc i start normally next time i call an ac powerflow
else:
pp.runpp(self._grid, check_connectivity=False, init=self._pf_init, numba=numba_)
self._nb_bus_before = nb_bus
if self._grid.res_gen.isnull().values.any():
# TODO see if there is a better way here
# sometimes pandapower does not detect divergence and put Nan.
raise pp.powerflow.LoadflowNotConverged
self.load_p, self.load_q, self.load_v = self._loads_info()
if not is_dc:
if not np.all(np.isfinite(self.load_v)):
# TODO see if there is a better way here
# some loads are disconnected: it's a game over case!
raise pp.powerflow.LoadflowNotConverged
# I retrieve the data once for the flows, so has to not re read multiple dataFrame
self.p_or = self._aux_get_line_info("p_from_mw", "p_hv_mw")
self.q_or = self._aux_get_line_info("q_from_mvar", "q_hv_mvar")
self.v_or = self._aux_get_line_info("vm_from_pu", "vm_hv_pu")
self.a_or = self._aux_get_line_info("i_from_ka", "i_hv_ka") * 1000
self.a_or[~np.isfinite(self.a_or)] = 0.
self.v_or[~np.isfinite(self.v_or)] = 0.
self.p_ex = self._aux_get_line_info("p_to_mw", "p_lv_mw")
self.q_ex = self._aux_get_line_info("q_to_mvar", "q_lv_mvar")
self.v_ex = self._aux_get_line_info("vm_to_pu", "vm_lv_pu")
self.a_ex = self._aux_get_line_info("i_to_ka", "i_lv_ka") * 1000
self.a_ex[~np.isfinite(self.a_ex)] = 0.
self.v_ex[~np.isfinite(self.v_ex)] = 0.
self.v_or *= self.lines_or_pu_to_kv
self.v_ex *= self.lines_ex_pu_to_kv
self.prod_p, self.prod_q, self.prod_v = self._gens_info()
return self._grid.converged
except pp.powerflow.LoadflowNotConverged:
# of the powerflow has not converged, results are Nan
self.p_or = np.full(self.n_line, dtype=np.float, fill_value=np.NaN)
self.q_or = self.p_or
self.v_or = self.p_or
self.a_or = self.p_or
self.p_ex = self.p_or
self.q_ex = self.p_or
self.v_ex = self.p_or
self.a_ex = self.p_or
self._nb_bus_before = None
return False
[docs] def copy(self):
"""
Performs a deep copy of the power :attr:`_grid`.
As pandapower is pure python, the deep copy operator is perfectly suited for the task.
"""
res = copy.deepcopy(self)
return res
[docs] def close(self):
"""
Called when the :class:`grid2op;Environment` has terminated, this function only reset the grid to a state
where it has not been loaded.
"""
del self._grid
self._grid = None
[docs] def save_file(self, full_path):
"""
Save the file to json.
:param full_path:
:return:
"""
pp.to_json(self._grid, full_path)
[docs] def get_line_status(self):
"""
As all the functions related to powerline, pandapower split them into multiple dataframe (some for transformers,
some for 3 winding transformers etc.). We make sure to get them all here.
"""
return np.concatenate((self._grid.line["in_service"].values, self._grid.trafo["in_service"].values)).astype(np.bool)
[docs] def get_line_flow(self):
"""
return the powerflow in amps in all powerlines.
:return:
"""
return self.a_or
[docs] def _disconnect_line(self, id):
if id < self._number_true_line:
self._grid.line["in_service"].iloc[id] = False
else:
self._grid.trafo["in_service"].iloc[id - self._number_true_line] = False
def _reconnect_line(self, id):
if id < self._number_true_line:
self._grid.line["in_service"].iloc[id] = True
else:
self._grid.trafo["in_service"].iloc[id - self._number_true_line] = True
[docs] def get_topo_vect(self):
# beg__ = time.time()
# TODO refactor this, this takes a looong time
res = np.full(self.dim_topo, fill_value=np.NaN, dtype=np.int)
line_status = self.get_line_status()
for i, (_, row) in enumerate(self._grid.line.iterrows()):
bus_or_id = int(row["from_bus"])
bus_ex_id = int(row["to_bus"])
if line_status[i]:
res[self.line_or_pos_topo_vect[i]] = 1 if bus_or_id == self.line_or_to_subid[i] else 2
res[self.line_ex_pos_topo_vect[i]] = 1 if bus_ex_id == self.line_ex_to_subid[i] else 2
else:
res[self.line_or_pos_topo_vect[i]] = -1
res[self.line_ex_pos_topo_vect[i]] = -1
nb = self._number_true_line
for i, (_, row) in enumerate(self._grid.trafo.iterrows()):
bus_or_id = int(row["hv_bus"])
bus_ex_id = int(row["lv_bus"])
# res[self.line_or_pos_topo_vect[i + nb]] = 1 if bus_or_id == self.line_or_to_subid[i + nb] else 2
# res[self.line_ex_pos_topo_vect[i + nb]] = 1 if bus_ex_id == self.line_ex_to_subid[i + nb] else 2
j = i + nb
if line_status[j]:
res[self.line_or_pos_topo_vect[j]] = 1 if bus_or_id == self.line_or_to_subid[j] else 2
res[self.line_ex_pos_topo_vect[j]] = 1 if bus_ex_id == self.line_ex_to_subid[j] else 2
else:
res[self.line_or_pos_topo_vect[j]] = -1
res[self.line_ex_pos_topo_vect[j]] = -1
for i, (_, row) in enumerate(self._grid.gen.iterrows()):
bus_id = int(row["bus"])
res[self.gen_pos_topo_vect[i]] = 1 if bus_id == self.gen_to_subid[i] else 2
for i, (_, row) in enumerate(self._grid.load.iterrows()):
bus_id = int(row["bus"])
res[self.load_pos_topo_vect[i]] = 1 if bus_id == self.load_to_subid[i] else 2
# self._time_topo_vect += time.time() - beg__
return res
def _gens_info(self):
prod_p = 1.0 * self._grid.res_gen["p_mw"].values
prod_q = 1.0 * self._grid.res_gen["q_mvar"].values
prod_v = 1.0 * self._grid.res_gen["vm_pu"].values * self.prod_pu_to_kv
if self._iref_slack is not None:
# slack bus and added generator are on same bus. I need to add power of slack bus to this one.
if self._grid.gen["bus"].iloc[self._id_bus_added] == self.gen_to_subid[self._id_bus_added]:
prod_p[self._id_bus_added] += self._grid._ppc["gen"][self._iref_slack, 1]
prod_q[self._id_bus_added] += self._grid._ppc["gen"][self._iref_slack, 2]
return prod_p, prod_q, prod_v
[docs] def generators_info(self):
return self.prod_p, self.prod_q, self.prod_v
def _loads_info(self):
load_p = 1. * self._grid.res_load["p_mw"].values
load_q = 1. * self._grid.res_load["q_mvar"].values
load_v = self._grid.res_bus.loc[self._grid.load["bus"].values]["vm_pu"].values * self.load_pu_to_kv
return load_p, load_q, load_v
[docs] def loads_info(self):
return self.load_p, self.load_q, self.load_v
[docs] def lines_or_info(self):
return self.p_or, self.q_or, self.v_or, self.a_or
[docs] def lines_ex_info(self):
return self.p_ex, self.q_ex, self.v_ex, self.a_ex
[docs] def shunt_info(self):
shunt_p = 1.0 * self._grid.res_shunt["p_mw"].values
shunt_q = 1.0 * self._grid.res_shunt["q_mvar"].values
shunt_v = self._grid.res_bus["vm_pu"].values[self._grid.shunt["bus"].values]
shunt_v *= self._grid.bus["vn_kv"].values[self._grid.shunt["bus"]]
shunt_bus = self._grid.shunt["bus"].values
return shunt_p, shunt_q, shunt_v, shunt_bus
[docs] def sub_from_bus_id(self, bus_id):
if bus_id >= self._number_true_line:
return bus_id - self._number_true_line
return bus_id