"""
``pyagar.visual``
=================
Provides the default visualizer.
"""
# pylint: disable=C0103
import asyncio
import ctypes
import math
import os
import time
import warnings
try:
from sdl2 import sdlgfx
from sdl2 import sdlttf
import sdl2
import sdl2.ext
except ImportError:
warnings.warn("Can't import pysdl2. The visualizer is not available.")
from pyagar.log import logger
from pyagar.messages import Camera
from pyagar.messages import Status
from pyagar.messages import ScreenAndCamera
from pyagar.messages import CameraPosition
from pyagar.messages import PlayerCell
from pyagar.messages import Leaderboard
FRAME_RATE = 60
HERE = os.path.realpath(os.path.dirname(__file__))
FONT_PATH = os.path.join(HERE, 'static', 'Ubuntu-R.ttf')
class SDLError(Exception):
pass
[docs]def asrt(code):
"""
If there is an error on a SDL call raise an exception with the error
description.
"""
if isinstance(code, int):
if code != 0:
raise SDLError(sdl2.SDL_GetError())
else:
return code
elif hasattr(code, 'contents'):
try:
code.contents
except ValueError as exc:
raise SDLError(exc)
else:
return code
else:
return code
[docs]class Visualizer:
"""
SDL based visualizer.
"""
def __init__(self, client, view_only=False, hardware=True):
self.names = dict()
self.messages = asyncio.Queue()
self.client = client
self.view_only = view_only
self.players = dict()
self.player_id = None
self.renderer = None
if hardware:
self.renderer_flags = sdl2.SDL_RENDERER_ACCELERATED
else:
self.renderer_flags = sdl2.SDL_RENDERER_SOFTWARE
self.mouse_x = self.mouse_y = None
self.move = None
self.last_move = None
self.now = self.last = self.last_move_send = time.monotonic()
self.window_w = None
self.window_h = None
self.ref_rate = None
self.stage_w = None
self.stage_h = None
#: The game board information sent by the server.
self._gamescreen = None
self.gamescreen_w = None
self.gamescreen_h = None
#: The texture we draw to.
self.stage = None
#: The window where we show the game.
self.window = None
self._camera = None
self.fullscreen = False
self.user_zoom = 0
self.pixel_format = None
self.font = {}
self.renderer_info = sdl2.SDL_RendererInfo()
self.leaderboard = None
def update_leaderboard(self, data):
lines = []
max_width = ctypes.c_int(0)
total_height = 0
def write_line(msg, size):
nonlocal max_width
nonlocal total_height
surface = asrt(sdlttf.TTF_RenderUTF8_Blended(
self.font[size],
msg.encode('utf-8'),
sdl2.SDL_Color(255, 255, 255, 255)))
texture = asrt(sdl2.SDL_CreateTextureFromSurface(
self.renderer,
surface))
sdl2.SDL_FreeSurface(surface)
lines.append(texture)
# Update max_width.
width = ctypes.c_int(0)
height = ctypes.c_int(0)
sdlttf.TTF_SizeUTF8(
self.font[size],
msg.encode('utf-8'),
width,
height)
max_width = max(max_width, width, key=lambda c: c.value)
total_height += height.value
write_line("Leaderboard", 64)
for idx, cell in enumerate(data.players):
write_line(str(idx) + '. ' + cell.name, 32)
if self.leaderboard is not None:
sdl2.SDL_DestroyTexture(self.leaderboard)
self.leaderboard = sdl2.SDL_CreateTexture(
self.renderer,
self.pixel_format,
sdl2.SDL_TEXTUREACCESS_TARGET,
max_width.value,
total_height)
sdl2.SDL_SetRenderTarget(self.renderer,
self.leaderboard)
sdl2.SDL_SetRenderDrawColor(self.renderer, 0, 0, 0, 128)
sdl2.SDL_RenderClear(self.renderer)
# Copy all strings to the leaderboard's texture.
h = ctypes.c_int(0)
w = ctypes.c_int(0)
offset_height = 0
for line in lines:
asrt(sdl2.SDL_QueryTexture(line, None, None, w, h))
asrt(sdl2.SDL_RenderCopy(
self.renderer,
line,
None,
sdl2.SDL_Rect(0, offset_height, w.value, h.value)))
offset_height += h.value
sdl2.SDL_DestroyTexture(line)
sdl2.SDL_SetTextureBlendMode(
self.leaderboard,
sdl2.SDL_BLENDMODE_BLEND)
def get_capabilities(self):
asrt(sdl2.SDL_GetRendererInfo(self.renderer, self.renderer_info))
logger.debug("Renderer max texture size: %dx%d",
self.renderer_info.max_texture_width,
self.renderer_info.max_texture_height)
flags = "|".join(filter(None,
(('SDL_RENDERER_SOFTWARE'
if sdl2.SDL_RENDERER_SOFTWARE & self.renderer_info.flags
else ''),
('SDL_RENDERER_ACCELERATED'
if sdl2.SDL_RENDERER_ACCELERATED & self.renderer_info.flags
else ''),
('SDL_RENDERER_PRESENTVSYNC'
if sdl2.SDL_RENDERER_PRESENTVSYNC & self.renderer_info.flags
else ''),
('SDL_RENDERER_TARGETTEXTURE'
if sdl2.SDL_RENDERER_TARGETTEXTURE & self.renderer_info.flags
else ''))))
logger.debug("Renderer capabilities: %s", flags)
[docs] def tr_game2stage_coords(self, x, y):
"""Translate from game cords to stage coordinates."""
if self.gamescreen is None:
raise ValueError("Screen is not setted.")
else:
s_x = self.remap(x,
self.gamescreen.x1,
self.gamescreen.x2,
0,
self.stage_w)
s_y = self.remap(y,
self.gamescreen.y1,
self.gamescreen.y2,
0,
self.stage_h)
return int(s_x), int(s_y)
[docs] def tr_game2stage_size(self, size):
"""Translate a size (in pixels) from game to stage."""
if self.gamescreen is None:
raise ValueError("Screen is not setted.")
else:
size = size ** 2
gs_area = ((self.gamescreen.x2 - self.gamescreen.x1) *
(self.gamescreen.y2 - self.gamescreen.y1))
st_area = self.stage_w * self.stage_h
return int(math.sqrt(st_area * size / gs_area))
@staticmethod
[docs] def remap(o_val, o_min, o_max, n_min, n_max):
"""Map a value from one range to another."""
o_range = (o_max - o_min)
n_range = (n_max - n_min)
n_value = (((o_val - o_min) * n_range) / o_range) + n_min
return n_value
[docs] def tr_win2game_coords(self, x, y):
"""Translate from window coords to game coordinates."""
cell = self.players.get(self.player_id)
if cell is None:
return None
else:
camera = self.camera_rect
# Camera rect
c_x1 = camera.x
c_x2 = camera.x + camera.w
c_y1 = camera.y
c_y2 = camera.y + camera.h
# Window rect
w_x1 = 0
w_x2 = self.window_w
w_y1 = 0
w_y2 = self.window_h
on_camera_x = self.remap(x, w_x1, w_x2, c_x1, c_x2)
on_camera_y = self.remap(y, w_y1, w_y2, c_y1, c_y2)
m_x = int(self.remap(on_camera_x,
0,
self.stage_w,
self.gamescreen.x1,
self.gamescreen.x2))
m_y = int(self.remap(on_camera_y,
0,
self.stage_h,
self.gamescreen.y1,
self.gamescreen.y2))
return m_x, m_y
@property
def gamescreen(self):
return self._gamescreen
@gamescreen.setter
def gamescreen(self, value):
self._gamescreen = value
self.gamescreen_w = int(value.x2 - value.x1)
if self.gamescreen_w > self.renderer_info.max_texture_width:
self.stage_w = self.renderer_info.max_texture_width
else:
self.stage_w = self.gamescreen_w
self.gamescreen_h = int(value.y2 - value.y1)
if self.gamescreen_h > self.renderer_info.max_texture_height:
self.stage_h = self.renderer_info.max_texture_height
else:
self.stage_h = self.gamescreen_h
if self.stage is not None:
sdl2.SDL_DestroyTexture(self.stage)
self.stage = sdl2.SDL_CreateTexture(
self.renderer,
self.pixel_format,
sdl2.SDL_TEXTUREACCESS_TARGET,
self.stage_w,
self.stage_h)
@property
def camera(self):
return self._camera
@camera.setter
def camera(self, value):
self._camera = value
@property
def camera_rect(self):
x, y = self.tr_game2stage_coords(self.camera.x, self.camera.y)
zoom = self.camera.zoom + self.user_zoom / 1000
w = int(self.stage_w * zoom)
h = int(self.stage_h * zoom)
w = int(w * self.window_w / self.window_h)
x = int(x - w / 2)
y = int(y - h / 2)
if x + w > self.stage_w:
x = self.stage_w - w
if y + h > self.stage_h:
y = self.stage_h - h
if x < 0:
x = 0
if y < 0:
y = 0
return sdl2.SDL_Rect(x, y, w, h)
@staticmethod
def hex2color(h):
i = int(h, base=16)
return sdl2.SDL_Color((i & 0xff0000) >> 16,
(i & 0x00ff00) >> 8,
(i & 0x0000ff),
255)
def get_font(self, size):
size = size / 4
best = min(self.font.keys(), key=lambda x: abs(size-x))
return self.font[best]
[docs] def refresh(self):
"""
Draw the current status of the game in ``window``.
The overall process is:
1. The server send information about the board size and status.
1.1. We keep the information about the board in ``gamescreen``.
2. We draw the game in the texture ``stage``. This texture can
be smaller than ``gamescreen``.
3. The rectangle ``camera`` (in game coordinates) is copied from
``stage`` to ``window``.
"""
main = self.players.get(self.player_id)
if main:
self.camera = Camera(main.x, main.y, 0.085)
camera = self.camera_rect
# Set background
sdl2.SDL_SetRenderTarget(self.renderer,
self.stage)
sdl2.SDL_SetRenderDrawColor(self.renderer, 0, 0, 0, 255)
sdl2.SDL_RenderClear(self.renderer)
# Draw the cells (Viruses last)
cells = sorted(self.players.values(),
key=lambda c: (int(c.is_virus), c.size))
for cell in cells:
if cell.id == self.player_id:
if self.client is not None:
label = self.client.nick
else:
label = "PLAYER"
else:
label = self.names.get(cell.id)
x, y = self.tr_game2stage_coords(cell.x, cell.y)
size = self.tr_game2stage_size(cell.size)
# Cell border
fill_color = int('ff' + cell.color, base=16)
r = int(cell.color[:2], base=16)
g = int(cell.color[2:4], base=16)
b = int(cell.color[4:], base=16)
border_color = int('ff%0.2x%0.2x%0.2x' %
(r - 0x10 if r > 0x10 else 0,
g - 0x10 if g > 0x10 else 0,
b - 0x10 if b > 0x10 else 0),
base=16)
if cell.is_virus:
border_size = int(size / 5)
else:
border_size = int(size / 25)
# Cell border
sdlgfx.filledCircleColor(self.renderer, x, y,
size,
border_color)
# Cell fill
sdlgfx.filledCircleColor(self.renderer, x, y,
size - border_size,
fill_color)
if label:
try:
text = asrt(sdlttf.TTF_RenderUTF8_Blended(
self.get_font(size),
label.encode('utf-8', errors='ignore'),
sdl2.SDL_Color(255, 255, 255, 255),
))
text_texture = asrt(sdl2.SDL_CreateTextureFromSurface(
self.renderer,
text))
asrt(sdl2.SDL_FreeSurface(text.contents))
asrt(sdl2.SDL_RenderCopy(
self.renderer,
text_texture,
None,
sdl2.SDL_Rect(int(x-size*0.75),
int(y-size*0.50),
int(size*1.5),
int(size))))
asrt(sdl2.SDL_DestroyTexture(text_texture))
except SDLError:
logger.exception("Error labeling cell.")
# Set background in window
sdl2.SDL_SetRenderTarget(self.renderer, None)
sdl2.SDL_SetRenderDrawColor(self.renderer, 0, 0, 0, 255)
sdl2.SDL_RenderClear(self.renderer)
# Copy the stage
sdl2.SDL_RenderCopy(self.renderer,
self.stage,
camera,
sdl2.SDL_Rect(0, 0, self.window_w, self.window_h))
if self.leaderboard is not None:
sdl2.SDL_RenderCopy(
self.renderer,
self.leaderboard,
None,
sdl2.SDL_Rect(
int(self.window_w * 5 / 6),
0,
int(self.window_w / 6),
int(self.window_h * 2 / 3)))
# Refresh
sdl2.SDL_RenderPresent(self.renderer)
def get_screen_size(self):
display = sdl2.SDL_DisplayMode()
ref_rate = FRAME_RATE
size = None
for idx in range(sdl2.SDL_GetNumVideoDisplays()):
res = sdl2.SDL_GetCurrentDisplayMode(idx, display)
if res == 0:
if size:
size = min(display.w, display.h, size)
else:
size = min(display.w, display.h)
ref_rate = min(ref_rate, display.refresh_rate)
if size and ref_rate:
self.window_w = self.window_h = int(size * 0.8)
self.ref_rate = ref_rate
else:
print("Error getting display mode.")
def create_window(self):
if self.renderer is not None:
sdl2.SDL_DestroyRenderer(self.renderer)
if self.window is not None:
sdl2.SDL_DestroyWindow(self.window.window)
self.window = sdl2.ext.Window(
"pyagar",
size=(self.window_w, self.window_h),
flags=sdl2.SDL_WINDOW_RESIZABLE)
self.window.show()
self.renderer = sdl2.SDL_CreateRenderer(
self.window.window,
-1,
self.renderer_flags)
self.get_capabilities()
display = sdl2.SDL_DisplayMode()
sdl2.SDL_GetWindowDisplayMode(self.window.window,
display)
self.pixel_format = display.format
if self._gamescreen is not None:
self.gamescreen = self._gamescreen
def toggle_fullscreen(self):
if self.fullscreen:
logger.debug("Fullscreen OFF")
self.create_window()
sdl2.SDL_SetWindowSize(self.window.window,
self.window_w,
self.window_h)
self.fullscreen = False
else:
logger.debug("Fullscreen ON")
sdl2.SDL_SetWindowFullscreen(
self.window.window,
sdl2.SDL_WINDOW_FULLSCREEN)
self.fullscreen = True
@asyncio.coroutine
def run(self):
sdl2.ext.init()
sdlttf.TTF_Init()
self.get_screen_size()
for i in range(5, 10):
size = 2**i
self.font[size] = sdlttf.TTF_OpenFont(
FONT_PATH.encode('ascii'),
size)
self.last = time.monotonic()
self.create_window()
# Window creation, we wait for a ScreenAndCamera message.
while True:
data = yield from self.messages.get()
if isinstance(data, ScreenAndCamera):
self.gamescreen = data.screen
self.camera = Camera(data.camera.x, data.camera.y, 0.085)
break
# Play
while True:
try:
data = yield from asyncio.wait_for(self.messages.get(),
1 / self.ref_rate)
except asyncio.TimeoutError:
data = None
if isinstance(data, PlayerCell):
self.player_id = data.cell.id
elif isinstance(data, CameraPosition):
self.camera = data.camera
elif isinstance(data, Leaderboard):
self.update_leaderboard(data)
elif isinstance(data, Status):
for cell in data.cells:
self.players[cell.id] = cell
if cell.name:
self.names[cell.id] = cell.name
for cell in data.dissapears:
if cell.id in self.players:
del self.players[cell.id]
for eats in data.eat:
if eats.eatee == self.player_id:
self.player_id = None
if eats.eatee in self.players:
del self.players[eats.eatee]
# Read sdl events
for event in sdl2.ext.get_events():
if event.type == sdl2.SDL_QUIT:
logger.debug("QUIT event received.")
return
elif event.type == sdl2.SDL_WINDOWEVENT:
if event.window.event == sdl2.SDL_WINDOWEVENT_RESIZED:
self.window_w = event.window.data1
self.window_h = event.window.data2
logger.debug("Window resized %dx%d",
event.window.data1,
event.window.data2)
elif event.type == sdl2.SDL_KEYDOWN:
if event.key.keysym.sym == sdl2.SDLK_f:
self.toggle_fullscreen()
elif event.key.keysym.sym == sdl2.SDLK_ESCAPE:
if self.fullscreen:
self.toggle_fullscreen()
else:
logger.debug("User pressed ESC, exiting.")
return
elif event.type == sdl2.SDL_MOUSEWHEEL:
self.user_zoom += event.wheel.y
if self.user_zoom > 50:
self.user_zoom = 50
elif self.user_zoom < -50:
self.user_zoom = -50
else:
logger.debug("UserZoom: %r", self.user_zoom)
if not self.view_only:
if event.type == sdl2.SDL_KEYDOWN:
if event.key.keysym.sym == sdl2.SDLK_SPACE:
logger.debug("SPACE key pressed.")
asyncio.async(self.client.split())
elif event.key.keysym.sym == sdl2.SDLK_w:
logger.debug("W key pressed.")
asyncio.async(self.client.eject())
elif event.type == sdl2.SDL_MOUSEMOTION:
self.mouse_x = event.motion.x
self.mouse_y = event.motion.y
self.move = self.tr_win2game_coords(self.mouse_x,
self.mouse_y)
elif (event.type == sdl2.SDL_MOUSEBUTTONDOWN and
event.button.button == sdl2.SDL_BUTTON_LEFT):
logger.debug("Mouse button pressed.")
asyncio.async(self.client.spawn())
self.now = time.monotonic()
if self.move is not None:
if self.move != self.last_move:
asyncio.async(self.client.move(*self.move))
self.last_move = self.move
self.last_move_send = self.now
elif self.now - self.last_move_send > 0.05:
self.move = self.tr_win2game_coords(self.mouse_x,
self.mouse_y)
if self.move:
asyncio.async(self.client.move(*self.move))
self.last_move = self.move
self.last_move_send = self.now
delay = abs(self.last - self.now)
if self.messages.empty() and delay > 1 / self.ref_rate:
self.refresh()
self.last = self.now