Source code for grid2op.Plot.PlotGraph

"""
This module is the base module for all graphical representation of the :class:`grid2op.BaseObservation.BaseObservation`.

It allows, from the layout of the graph of the powergrid (*eg* the coordinates of each substation) the position of
each objects (powerline ends, loads and generators) and the position of the buses in case of "node splitting" (when
a substations is split into independent electrical buses).

"""
import cmath
import math
import numpy as np
import pdb

from grid2op.Space import GridObjects
from grid2op.Exceptions import PlotError


[docs]class BasePlot(GridObjects): """ Utility class that allows to compute the position of the objects of the powergrid. Deriving from this class allows to perform the display of the powergrid. **NB** this class only performs the computation of the position, but does not display anything. Attributes ----------- observation_space: :class:`grid2op.Observation.HelperObservation` The observation space used. """
[docs] def __init__(self, observation_space, substation_layout=None, radius_sub=20., load_prod_dist=70., bus_radius=6.): if substation_layout is None: if observation_space.grid_layout is None: # if no layout is provided, and observation_space has no layout, then it fails raise PlotError("Impossible to use plotting abilities without specifying a layout (coordinates) " "of the substations.") # if no layout is provided, use the one in the observation_space substation_layout = [] for el in observation_space.name_sub: substation_layout.append(observation_space.grid_layout[el]) if len(substation_layout) != observation_space.n_sub: raise PlotError("You provided a layout with {} elements while there are {} substations on the powergrid. " "Your layout is invalid".format(len(substation_layout), observation_space.n_sub)) GridObjects.__init__(self) self.init_grid(observation_space) self.observation_space = observation_space self._layout = {} self._layout["substations"] = self._get_sub_layout(substation_layout) self.radius_sub = radius_sub self.load_prod_dist = load_prod_dist # distance between load and generator to the center of the substation self.bus_radius = bus_radius self.subs_elements = [None for _ in self.observation_space.sub_info] # get the element in each substation for sub_id in range(self.observation_space.sub_info.shape[0]): this_sub = {} objs = self.observation_space.get_obj_connect_to(substation_id=sub_id) for c_id in objs["loads_id"]: c_nm = self._get_load_name(sub_id, c_id) this_load = {} this_load["type"] = "load" this_load["sub_pos"] = self.observation_space.load_to_sub_pos[c_id] this_sub[c_nm] = this_load for g_id in objs["generators_id"]: g_nm = self._get_gen_name(sub_id, g_id) this_gen = {} this_gen["type"] = "gen" this_gen["sub_pos"] = self.observation_space.gen_to_sub_pos[g_id] this_sub[g_nm] = this_gen for lor_id in objs["lines_or_id"]: ext_id = self.observation_space.line_ex_to_subid[lor_id] l_nm = self._get_line_name(sub_id, ext_id, lor_id) this_line = {} this_line["type"] = "line" this_line["sub_pos"] = self.observation_space.line_or_to_sub_pos[lor_id] this_sub[l_nm] = this_line for lex_id in objs["lines_ex_id"]: or_id = self.observation_space.line_or_to_subid[lex_id] l_nm = self._get_line_name(or_id, sub_id, lex_id) this_line = {} this_line["type"] = "line" this_line["sub_pos"] = self.observation_space.line_ex_to_sub_pos[lex_id] this_sub[l_nm] = this_line self.subs_elements[sub_id] = this_sub self._compute_layout()
def _get_sub_layout(self, init_layout): return init_layout def _get_line_name(self, subor_id, sub_ex_id, line_id): l_nm = 'l_{}_{}_{}'.format(subor_id, sub_ex_id, line_id) return l_nm def _get_load_name(self, sub_id, c_id): c_nm = "load_{}_{}".format(sub_id, c_id) return c_nm def _get_gen_name(self, sub_id, g_id): p_nm = 'gen_{}_{}'.format(sub_id, g_id) return p_nm def _compute_layout(self): """ Compute the position of each of the objects. Parameters ---------- observation: :class:`grid2op.Observation.Observation` The observation used to know which object belong where. Returns ------- """ self._layout["line"] = {} # assign powerline coordinates for line_id in range(self.n_line): if line_id not in self._layout["line"]: # state = observation.state_of(line_id=line_id) sub_or_id = self.line_or_to_subid[line_id] # state["origin"]["sub_id"] sub_ex_id = self.line_ex_to_subid[line_id] # state["extremity"]["sub_id"] pos_or = self._layout["substations"][sub_or_id] pos_ex = self._layout["substations"][sub_ex_id] # make sure the powerline are connected to the circle of the substation and not to the center of it z_or_tmp = pos_or[0] + 1j * pos_or[1] z_ex_tmp = pos_ex[0] + 1j * pos_ex[1] module_or = cmath.phase(z_ex_tmp - z_or_tmp) module_ex = cmath.phase(- (z_ex_tmp - z_or_tmp)) # check parrallel lines: # for now it works only if there are 2 parrallel lines. The idea is to add / withdraw # 10° for each module in this case. # TODO draw line but not straight line in this case, this looks ugly for now :-/ deg_parrallel = 25 tmp_parrallel = self.observation_space.get_lines_id(from_=sub_or_id, to_=sub_ex_id) if len(tmp_parrallel) > 1: if line_id == tmp_parrallel[0]: module_or += deg_parrallel / 360 * 2 * cmath.pi module_ex -= deg_parrallel / 360 * 2 * cmath.pi else: module_or -= deg_parrallel / 360 * 2 * cmath.pi module_ex += deg_parrallel / 360 * 2 * cmath.pi z_or = z_or_tmp + self.radius_sub * cmath.exp(module_or * 1j) z_ex = z_ex_tmp + self.radius_sub * cmath.exp(module_ex * 1j) pos_or = z_or.real, z_or.imag pos_ex = z_ex.real, z_ex.imag self._layout["line"][line_id] = sub_or_id, sub_ex_id # TODO here get proper name l_nm = self._get_line_name(sub_or_id, sub_ex_id, line_id) self.subs_elements[sub_or_id][l_nm]["pos"] = pos_or self.subs_elements[sub_or_id][l_nm]["z"] = z_or self.subs_elements[sub_ex_id][l_nm]["pos"] = pos_ex self.subs_elements[sub_ex_id][l_nm]["z"] = z_ex # assign loads and generators coordinates # this is done by first computing the "optimal" placement if there were only substation (so splitting equally # the objects around the circle) and then remove the closest position that are taken by the powerlines. for sub_id, elements in enumerate(self.subs_elements): nb_el = len(elements) # equally split pos_sub = self._layout["substations"][sub_id] z_sub = pos_sub[0] + 1j * pos_sub[1] pos_possible = [self.radius_sub * cmath.exp(1j * 2 * cmath.pi * i / nb_el) + z_sub for i in range(nb_el)] # remove powerlines (already assigned) for el_nm, dict_el in elements.items(): if dict_el["type"] == "line": z = dict_el["z"] closest = np.argmin([abs(pos - z)**2 for pos in pos_possible]) pos_possible = [el for i, el in enumerate(pos_possible) if i != closest] i = 0 # now assign load and generator for el_nm, dict_el in elements.items(): if dict_el["type"] != "line": dict_el["pos"] = (pos_possible[i].real, pos_possible[i].imag) dict_el["z"] = pos_possible[i] i += 1 self._layout["load"] = {} for c_id in range(self.n_load): # state = observation.state_of(load_id=c_id) # sub_id = state["sub_id"] sub_id = self.load_to_subid[c_id] self._layout["load"][c_id] = sub_id self._layout["gen"] = {} for g_id in range(self.n_gen): # state = observation.state_of(gen_id=g_id) # sub_id = state["sub_id"] sub_id = self.gen_to_subid[g_id] self._layout["gen"][g_id] = sub_id def _draw_subs(self, observation): res = [] for i, el in enumerate(self._layout["substations"]): res.append(self._draw_sub(center=el)) return res def _draw_sub(self, center): pass def _get_line_coord(self, line_id): sub_or_id, sub_ex_id = self._layout["line"][line_id] l_nm = self._get_line_name(sub_or_id, sub_ex_id, line_id) pos_or = self.subs_elements[sub_or_id][l_nm]["pos"] pos_ex = self.subs_elements[sub_ex_id][l_nm]["pos"] return pos_or, pos_ex def _get_load_coord(self, load_id): sub_id = self._layout["load"][load_id] c_nm = self._get_load_name(sub_id, load_id) if not "elements_display" in self.subs_elements[sub_id][c_nm]: pos_load_sub = self.subs_elements[sub_id][c_nm]["pos"] pos_center_sub = self._layout["substations"][sub_id] z_sub = (pos_center_sub[0] + 1j * pos_center_sub[1]) theta = cmath.phase((self.subs_elements[sub_id][c_nm]["z"] - z_sub)) pos_load = z_sub + cmath.exp(1j * theta) * self.load_prod_dist # position of the end of the line connecting the object to the substation pos_end_line = pos_load - cmath.exp(1j * theta) * 20 how_center = self._get_position(theta) tmp_dict = {"pos_end_line": pos_end_line, "pos_load_sub": pos_load_sub, "pos_load": pos_load, "how_center": how_center} self.subs_elements[sub_id][c_nm]["elements_display"] = tmp_dict else: dict_element = self.subs_elements[sub_id][c_nm]["elements_display"] pos_end_line = dict_element["pos_end_line"] pos_load_sub = dict_element["pos_load_sub"] pos_load = dict_element["pos_load"] how_center = dict_element["how_center"] return pos_end_line, pos_load_sub, pos_load, how_center def _get_gen_coord(self, gen_id): sub_id = self._layout["gen"][gen_id] c_nm = self._get_gen_name(sub_id, gen_id) if not "elements_display" in self.subs_elements[sub_id][c_nm]: pos_gen_sub = self.subs_elements[sub_id][c_nm]["pos"] pos_center_sub = self._layout["substations"][sub_id] z_sub = (pos_center_sub[0] + 1j * pos_center_sub[1]) theta = cmath.phase((self.subs_elements[sub_id][c_nm]["z"] - z_sub)) pos_gen = z_sub + cmath.exp(1j * theta) * self.load_prod_dist # position of the end of the line connecting the object to the substation pos_end_line = pos_gen - cmath.exp(1j * theta) * 20 how_center = self._get_position(theta) tmp_dict = {"pos_end_line": pos_end_line, "pos_gen_sub": pos_gen_sub, "pos_gen": pos_gen, "how_center": how_center} self.subs_elements[sub_id][c_nm]["elements_display"] = tmp_dict else: dict_element = self.subs_elements[sub_id][c_nm]["elements_display"] pos_end_line = dict_element["pos_end_line"] pos_gen_sub = dict_element["pos_gen_sub"] pos_gen = dict_element["pos_gen"] how_center = dict_element["how_center"] return pos_end_line, pos_gen_sub, pos_gen, how_center def _get_topo_coord(self, sub_id, observation, elements): pos_center_sub = self._layout["substations"][sub_id] z_sub = (pos_center_sub[0] + 1j * pos_center_sub[1]) tmp = observation.state_of(substation_id=sub_id) if tmp["nb_bus"] == 1: # not to overload the plot, if everything is at the same bus, i don't plot it return [], [] # I have at least 2 buses # I compute the position of each elements bus_vect = tmp["topo_vect"] # i am not supposed to have more than 2 buses buses_z = [None, None] # center of the different buses nb_co = [0, 0] # center of the different buses # the position of a bus is for now the average of all the elements in there for el_nm, dict_el in elements.items(): this_el_bus = bus_vect[dict_el["sub_pos"]] - 1 if this_el_bus >= 0: nb_co[this_el_bus] += 1 if buses_z[this_el_bus] is None: buses_z[this_el_bus] = dict_el["z"] else: buses_z[this_el_bus] += dict_el["z"] # buses_z = [el / nb for el, nb in zip(buses_z, nb_co)] theta_z = [cmath.phase((el - z_sub)) for el in buses_z] # try to have nodes "in opposition" to one another NN = np.array(nb_co) / np.sum(nb_co) diff_theta = theta_z[0] - theta_z[1] # alpha = cmath.pi + diff_theta alpha = -cmath.pi + diff_theta alpha = math.fmod(alpha, 2*cmath.pi) theta_z = [theta_z[0] - alpha * NN[1], theta_z[1] + alpha * NN[0]] # buses_z = [z_sub + (self.radius_sub - self.bus_radius) * 0.75 * cmath.exp(1j * theta) for theta in theta_z] buses_z = [z_sub + (self.radius_sub - self.bus_radius) * 0.6 * cmath.exp(1j * theta) for theta in theta_z] return buses_z, bus_vect @staticmethod def _get_position(theta): quarter_pi = cmath.pi / 4 half_pi = cmath.pi / 2. if theta >= -quarter_pi and theta < quarter_pi: res = "center|left" elif theta >= quarter_pi and theta < quarter_pi + half_pi: res = "up|center" elif theta >= quarter_pi + half_pi and theta < quarter_pi + 2. * half_pi: res = "center|right" else: res = "down|center" return res