"""
This module defines the :class:`Renderer` that is able to display the state (:class:`grid2op.BaseObservation.BaseObservation`)
of the powergrid on a dedicated window.
It is also able to output a 3d representation of this representation to be further used by other libraries to
output gifs for example.
"""
import numpy as np
import cmath
import math
import os
from grid2op.Plot.PlotGraph import BasePlot
try:
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
can_plot = True
except Exception as e:
can_plot = False
pass
class Point:
# https://codereview.stackexchange.com/questions/70143/drawing-a-dashed-line-with-pygame
# constructed using a normal tupple
def __init__(self, point_t = (0,0)):
self.x = float(point_t[0])
self.y = float(point_t[1])
# define all useful operators
def __add__(self, other):
return Point((self.x + other.x, self.y + other.y))
def __sub__(self, other):
return Point((self.x - other.x, self.y - other.y))
def __mul__(self, scalar):
return Point((self.x*scalar, self.y*scalar))
def __div__(self, scalar):
return Point((self.x/scalar, self.y/scalar))
def __floordiv__(self, scalar):
return Point((self.x/scalar, self.y/scalar))
def __truediv__(self, scalar):
return Point((self.x/scalar, self.y/scalar))
def __len__(self):
return int(math.sqrt(self.x**2 + self.y**2))
# get back values in original tuple format
def get(self):
return (self.x, self.y)
def to_cplx(self):
return self.x + 1j * self.y
@staticmethod
def from_cplx(cplx):
return Point((cplx.real, cplx.imag))
def _draw_dashed_line(surf, color, start_pos, end_pos, width=1, dash_length=10):
# https://codereview.stackexchange.com/questions/70143/drawing-a-dashed-line-with-pygame
origin = Point(start_pos)
target = Point(end_pos)
displacement = target - origin
length = len(displacement)
slope = displacement/length
for index in range(0, int(length/dash_length), 2):
start = origin + (slope * index * dash_length)
end = origin + (slope * (index + 1) * dash_length)
pygame.draw.line(surf, color, start.get(), end.get(), width)
def _draw_arrow(surf, color, start_pos, end_pos, positive_flow, width=1, num_arrows=10,
length_arrow=10, angle_arrow=30):
if positive_flow:
origin = Point(start_pos)
target = Point(end_pos)
else:
target = Point(start_pos)
origin = Point(end_pos)
displacement = target - origin
length = len(displacement)
slope = displacement/length
# phi = cmath.phase(slope.to_cplx()) * 360 / 2*cmath.pi
phi = cmath.phase(displacement.to_cplx()) * 360 / (2*cmath.pi)
cste_ = 2*cmath.pi / 360 * 1j
rotatedown = cmath.exp(cste_ * (180 + phi + angle_arrow) )
rotateup = cmath.exp(cste_ * (180 + phi - angle_arrow) )
first_arrow_part = length_arrow*rotateup
second_arrow_part = length_arrow*rotatedown
per_displ = displacement / (num_arrows+1)
for index in range(0, int(num_arrows)):
mid = origin + (per_displ * (index + 1) )
start_arrow = Point.from_cplx(mid.to_cplx() + first_arrow_part)
end_arrow = Point.from_cplx(mid.to_cplx() + second_arrow_part)
# , end_arrow.get()
pygame.draw.lines(surf, color, False,
[start_arrow.get(), mid.get(), end_arrow.get()],
width)
[docs]class PlotPyGame(BasePlot):
"""
This renderer should be used only for "online" representation of a powergrid.
"""
[docs] def __init__(self,
observation_space,
substation_layout=None,
radius_sub=20.,
load_prod_dist=70.,
bus_radius=5.,
timestep_duration_seconds=1.,
fontsize=20):
"""
Parameters
----------
substation_layout: ``list``
List of tupe given the position of each of the substation of the powergrid.
observation_space: :class:`grid2op.Observation.ObservationSpace`
BaseObservation space used for the display
radius_sub: ``int``
radius (in pixel) of the substations representation.
load_prod_dist: ``int``
distance (in pixels) between the substation and the load or the generator.
bus_radius: ``int``
The buses are represented by small circles. This is the radius (in pixel) for the pixels representing
the buses.
timestep_duration_seconds: ``float``
Currently not implemented.
fontsize: ``int``
size of the font used to display the texts.
"""
if not can_plot:
raise RuntimeError("Impossible to plot as pygame cannot be imported.")
self.window_grid = (1000, 700)
self.lag_x = 150
self.lag_y = 100
BasePlot.__init__(self,
substation_layout=substation_layout,
observation_space=observation_space,
radius_sub=radius_sub,
load_prod_dist=load_prod_dist,
bus_radius=bus_radius)
# pygame
pygame.init()
self.video_width, self.video_height = 1300, 700
self.timestep_duration_seconds = timestep_duration_seconds
self.display_called = False
self.screen = pygame.display.set_mode((self.video_width, self.video_height), pygame.RESIZABLE)
self.background_color = [70, 70, 73]
self.font = pygame.font.Font(None, fontsize)
# pause button
self.font_pause = pygame.font.Font(None, 30)
self.color_text = pygame.Color(255, 255, 255)
self.text_paused = self.font_pause.render("Game Paused", True, self.color_text)
# maximum overflow possible
self.rho_max = 2.
# utilities
self.cum_reward = 0.
self.nb_timestep = 0
[docs] def reset(self, env):
"""
Reset the runner in a consistent state, equivalent to a state where it has not run at all.
Parameters
----------
env: :class:`grid2op.Environment.Environment`
The used environment.
Returns
-------
"""
self.cum_reward = 0.
self.nb_timestep = 0
self.rho_max = env.parameters.HARD_OVERFLOW_THRESHOLD
def _get_sub_layout(self, init_layout):
tmp = [(el1, -el2) for el1, el2 in init_layout]
# then scale the grid to be on the window, with the proper margin (careful, margin are applied both left and r
# and right, so count twice
tmp_arr = np.array(tmp)
min_ = tmp_arr.min(axis=0)
max_ = tmp_arr.max(axis=0)
b = min_
a = max_ - min_
res = [(int((el1- b[0]) / a[0] * (self.window_grid[0] - 2*self.lag_x)) + self.lag_x,
int((el2 - b[1]) / a[1] * (self.window_grid[1] - 2*self.lag_y)) + self.lag_y)
for el1, el2 in tmp]
return res
def _event_looper(self, force=False):
has_quit = False
for event in pygame.event.get():
if event.type == pygame.QUIT:
has_quit = True
return force, has_quit
# pygame.quit()
# exit()
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_ESCAPE:
has_quit = True
return force, has_quit
if event.key == pygame.K_SPACE:
self._get_plot_pause()
# pause_surface = self.draw_plot_pause()
# self.screen.blit(pause_surface, (320 + self.left_menu_shape[0], 320))
return not force, has_quit
return force, has_quit
def _press_key_to_quit(self):
"""
This utility function waits for the player to press a key to exit the renderer (called when the episode is done)
Returns
-------
res: ``bool``, ``bool``
``True`` if the human player closed the window, in this case it will stop the computation: no other episode
will be computed. ``False`` otherwise.
"""
has_quit = False
for event in pygame.event.get():
if event.type == pygame.QUIT:
has_quit = True
return True, has_quit
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_ESCAPE:
has_quit = True
return True, has_quit
if event.key == pygame.K_SPACE:
return True, has_quit
return False, has_quit
[docs] def close(self):
"""
This method is called when the renderer should be close.
"""
self.display_called = False
pygame.quit()
def _get_plot_pause(self):
position = 300
start_pause = position + self.text_paused.get_height()
end_pause = start_pause + 50
y_text_left = self.window_grid[0] + 100
self.screen.blit(self.text_paused, (y_text_left, 300))
pygame.draw.line(self.screen,
self.color_text,
(y_text_left+self.text_paused.get_width()//2-10, start_pause),
(y_text_left+self.text_paused.get_width()//2-10, end_pause),
10)
pygame.draw.line(self.screen,
self.color_text,
(y_text_left+self.text_paused.get_width()//2+10, start_pause),
(y_text_left + self.text_paused.get_width()//2+10, end_pause),
10)
pygame.display.flip()
def _make_screen(self, obs, reward=None, done=None, timestamp=None):
self.cum_reward += reward
self.nb_timestep += 1
# if not "line" in self._layout:
# # update the layout of the objects only once to ensure the same positionning is used
# # if more than 1 observation are displayed one after the other.
# self._compute_layout(obs)
# The game is not paused anymore (or never has been), I can render the next surface
self.screen.fill(self.background_color)
if not done:
# draw the generic information on the right part
self._draw_generic_info(reward, done, timestamp)
else:
# inform user that it's over
self._draw_final_information(reward, done, timestamp)
# draw the state now
self._draw_subs(observation=obs)
self._draw_powerlines(observation=obs)
self._draw_loads(observation=obs)
self._draw_gens(observation=obs)
self._draw_topos(observation=obs)
def _draw_final_information(self, reward, done, timestamp):
text_label = "GAME OVER, press any key to continue to next episode."
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 100))
text_label = "Total cumulated reward: {:.1f}".format(self.cum_reward)
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 130))
text_label = "Total number timesteps: {:.1f}".format(self.nb_timestep)
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 160))
[docs] def get_rgb(self, obs, reward=None, done=None, timestamp=None):
"""
Computes and returns the rgb 3d array from an observation, and potentially other informations.
Parameters
----------
obs: :class:`grid2op.Observation.Observation`
The observation to converte into a 3d array
reward: ``float``
The current reward
done: ``bool``
Whether this is the last frame of the episode.
timestamp: ``datetime.datetime``
The curent datetime corresponding to the observation
Returns
-------
res: ``numpy.ndarray``
The 3d representation of the observation that can then be converted to a gif, or an image using appropriate
softwares.
"""
self._make_screen(obs, reward, done, timestamp)
return pygame.surfarray.array3d(self.screen)
[docs] def render(self, obs, reward=None, done=None, timestamp=None):
"""
This function is called when the human renderer mode is called. It displays the observation on the screen,
and allows for basic interactions, such as pausing or exiting.
**NB** pressing "escape" key or the "exit" screen button will quit the game. It will end the current episode,
and won't start any other episode.
Parameters
----------
obs: :class:`grid2op.Observation.Observation`
The observation to converte into a 3d array
reward: ``float``
The current reward
done: ``bool``
Whether this is the last frame of the episode.
timestamp: ``datetime.datetime``
The curent datetime corresponding to the observation
Returns
-------
res: ``bool``
Whether the human decided to quit the window. If ``True`` then it will completly quit the game, ending all
steps of this episode and all episode afterwards.
"""
if not self.display_called:
self.display_called = True
self.screen.fill(self.background_color)
pygame.display.set_caption('Grid2Op Renderer') # Window title
force, has_quit = self._event_looper(force=False)
while force:
force, has_quit = self._event_looper(force=force)
pygame.time.wait(250) # it's in ms
if has_quit:
return has_quit
self._make_screen(obs, reward, done, timestamp)
pygame.display.flip()
if done:
key_pressed = False
while not key_pressed:
key_pressed, has_quit = self._press_key_to_quit()
pygame.time.wait(250) # it's in ms
return has_quit
def _draw_generic_info(self, reward=None, done=None, timestamp=None):
if reward is not None:
text_label = "Instantaneous reward: {:.1f}".format(reward)
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 100))
text_label = "Cumulated reward: {:.1f}".format(self.cum_reward)
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 130))
text_label = "Number timesteps: {:.1f}".format(self.nb_timestep)
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 160))
if done is not None:
pass
if timestamp is not None:
text_label = "Date : {:%Y-%m-%d %H:%M}".format(timestamp)
text_graphic = self.font.render(text_label, True, self.color_text)
self.screen.blit(text_graphic, (self.window_grid[0]+100, 200))
def _draw_sub(self, center):
pygame.draw.circle(self.screen,
self.color_text,
[int(el) for el in center],
int(self.radius_sub),
2)
def _draw_powerlines(self, observation):
for line_id, (rho, status, p_or) in enumerate(zip(observation.rho, observation.line_status, observation.p_or)):
# the next 5 lines are always the same, for each observation, it makes sense to compute it once
# and then reuse it
pos_or, pos_ex, *_ = self._get_line_coord(line_id)
if not status:
# line is disconnected
_draw_dashed_line(self.screen, pygame.Color(0, 0, 0), pos_or, pos_ex)
else:
# line is connected
# step 0: compute thickness and color
if rho < (self.rho_max / 1.5):
amount_green = 255 - int(255. * 1.5 * rho / self.rho_max)
else:
amount_green = 0
amount_red = int(255 - (50 + int(205. * rho / self.rho_max)))
color = pygame.Color(amount_red, amount_green, 20)
width = 1
if rho > self.rho_max:
width = 4
elif rho > 1.:
width = 3
elif rho > 0.9:
width = 2
width += 3
# step 1: draw the powerline with right color and thickness
pygame.draw.line(self.screen, color, pos_or, pos_ex, width)
# step 2: draw arrows indicating current flows
_draw_arrow(self.screen, color, pos_or, pos_ex,
p_or >= 0.,
num_arrows=width,
width=width)
def _aligned_text(self, pos, text_graphic, pos_text):
pos_x = pos_text.real
pos_y = pos_text.imag
width = text_graphic.get_width()
height = text_graphic.get_height()
if pos == "center|left":
pos_y -= height // 2
elif pos == "up|center":
pos_x -= width // 2
pos_y -= height
elif pos == "center|right":
pos_x -= width
pos_y -= height // 2
elif pos == "down|center":
pos_x -= width // 2
self.screen.blit(text_graphic, (pos_x, pos_y))
def _draw_loads(self, observation):
for c_id, por in enumerate(observation.load_p):
pos_end_line, pos_load_sub, pos_load, how_center = self._get_load_coord(c_id)
color = pygame.Color(0, 0, 0)
width = 2
pygame.draw.line(self.screen, color, pos_load_sub, (pos_end_line.real, pos_end_line.imag), width)
text_label = "- {:.1f} MW".format(por)
text_graphic = self.font.render(text_label, True, color)
self._aligned_text(how_center, text_graphic, pos_load)
def _draw_gens(self, observation):
for g_id, por in enumerate(observation.prod_p):
pos_end_line, pos_gen_sub, pos_gen, how_center = self._get_gen_coord(g_id)
color = pygame.Color(0, 0, 0)
width = 2
pygame.draw.line(self.screen, color, pos_gen_sub, (pos_end_line.real, pos_end_line.imag), width)
text_label = "+ {:.1f} MW".format(por)
text_graphic = self.font.render(text_label, True, color)
self._aligned_text(how_center, text_graphic, pos_gen)
def _draw_topos(self, observation):
for sub_id, elements in enumerate(self.subs_elements):
buses_z, bus_vect = self._get_topo_coord(sub_id, observation, elements)
if not buses_z:
# I don't plot details of substations with 1 bus for better quality
continue
colors = [pygame.Color(255, 127, 14), pygame.Color(31, 119, 180)]
# I plot the buses
for bus_id, z_bus in enumerate(buses_z):
pygame.draw.circle(self.screen,
colors[bus_id],
[int(z_bus.real), int(z_bus.imag)],
int(self.bus_radius),
0)
# i connect every element to the proper bus with the proper color
for el_nm, dict_el in elements.items():
this_el_bus = bus_vect[dict_el["sub_pos"]] -1
if this_el_bus >= 0:
pygame.draw.line(self.screen,
colors[this_el_bus],
[int(dict_el["z"].real), int(dict_el["z"].imag)],
[int(buses_z[this_el_bus].real), int(buses_z[this_el_bus].imag)],
2)