Source code for rtd.sim.systems.visual.ClientVisualSystem

from rtd.util.mixins import Options
from rtd.sim import ClientSimulationSystem
from rtd.sim.systems.visual import ClientVisualObject
from rtd.functional.sequences import toSequence, arrange
from datetime import datetime
import numpy as np
import time
from rtd.util.mixins.Typings import Bound

# define top level module logger
import logging
logger = logging.getLogger(__name__)



[docs]class ClientVisualSystem(ClientSimulationSystem, Options): ''' Takes in a list of static and dynamic objects and handles their rendering '''
[docs] @staticmethod def defaultoptions() -> dict: """ Returns ------- options : dict default options of visual system """ return { "time_discretization": 0.1, "draw_time": 0.05, "dimension": 3, }
def __init__(self, static_objects: ClientVisualObject | list[ClientVisualObject] = None, dynamic_objects: ClientVisualObject | list[ClientVisualObject] = None, **options): # initialize base classes ClientSimulationSystem.__init__(self) Options.__init__(self) # initialize using given options self.mergeoptions(options) self.reset() self.addObjects(static=static_objects, dynamic=dynamic_objects)
[docs] def reset(self, **options): ClientSimulationSystem.reset(self) options = self.mergeoptions(options) # animation options self.time_discretization = options["time_discretization"] self.draw_time = options["draw_time"] self.dimension = options["dimension"] # reset time and clear all stored objects self.time = [0] # remove any object in static & dynamic from server self.static_objects: list[ClientVisualObject] = [] self.dynamic_objects: list[ClientVisualObject] = []
[docs] def addObjects(self, static: ClientVisualObject | list[ClientVisualObject] = None, dynamic: ClientVisualObject | list[ClientVisualObject] = None): ''' Takes in a visual object or a list of visual objects and adds them to the corresponding list Parameters ---------- static : ClientVisualObject | list[ClientVisualObject] static object(s) to add dynamic : ClientVisualObject | list[ClientVisualObject] dynamic object(s) to add ''' # handle single items if static is not None: static: list[ClientVisualObject] = toSequence(static) self.static_objects.extend(static) for obj in static: self.client.add_mesh_data(obj.plot_data) if dynamic is not None: dynamic: list[ClientVisualObject] = toSequence(dynamic) self.dynamic_objects.extend(dynamic) for obj in static: self.client.add_mesh_data(obj.plot_data) self.client.send_mesh_data_list()
[docs] def remove(self, *objects: ClientVisualObject): ''' Takes in a visual object or a list of visual objects and removes them from static and dynamic objects list Parameters ---------- *objects : ClientVisualObject objects to remove from the system ''' for obj in objects: if obj in self.static_objects: self.static_objects.remove(obj.plot_data) elif obj in self.dynamic_objects: self.dynamic_objects.remove(obj.plot_data) # remove mesh data from server self.client.clear_mesh_data_list() for obj in self.static_objects + self.dynamic_objects: self.client.add_mesh_data(obj.plot_data)
[docs] def updateVisual(self, t_update: float): ''' Appends `t_update` to `time` and updates the dynamic objects on the current figure for `t_update` time Parameters ---------- t_update : float duration to update for ''' start_time = self.time[-1] + self.time_discretization end_time = self.time[-1] + t_update t_vec = np.linspace(start_time, end_time, int(round(t_update/self.time_discretization))).tolist() logger.debug("Running Visualization!") # render if client is connected if self.client.ws is None or not self.client.ws.open: for t in t_vec: # update the dynamic objects on the plotter for obj in self.dynamic_objects: move_msgs = toSequence(obj.plot(t)) for move_msg in move_msgs: self.client.send_move_object_message(*move_msg) time.sleep(self.draw_time) # append the updated time self.time += t_vec
[docs] def animate(self, t_span: Bound = None, time_discretization: float = None, pause_time: float = None): ''' Animates from `time` = `t_span[0]` to `t_span[1]`, dividing it into frames of length `time_discretization` and waiting `pause_time` seconds before rendering the next frame. A lower discretization will result in a smoother animation, while a `pause_time` lower than the discretization will result in a faster animation speed. Defaults to 1 second per t Parameters ---------- t_span : Bound range of time to animate time_discretization : float time difference between frames pause_time : float pause time in seconds between frames ''' if t_span is None: t_span = (0, self.time[-1]) if time_discretization is None: time_discretization = self.time_discretization if pause_time is None: pause_time = time_discretization start_time = t_span[0] end_time = t_span[1] + time_discretization t_vec = arrange(start_time, end_time, time_discretization) # animate the dynamic stuff for next `t_update` for t in t_vec: # get current time start_time = datetime.now() # update the dynamic objects on the figure for obj in self.dynamic_objects: move_msgs = toSequence(obj.plot(t)) for move_msg in move_msgs: self.client.send_move_object_message(*move_msg) # get current time and subtract from previous time # if delta_time > self.draw_time, print lagging message draw_time = pause_time - (datetime.now() - start_time).total_seconds() if draw_time < 0: logger.warning(f"Warning, animation lagging by {-draw_time:.6f}s!") time.sleep(max(draw_time, 0))
[docs] @staticmethod def get_discretization_and_pause( framerate: float = 30, speed: float = 1) -> tuple[float, float]: ''' Returns the time_discretization and pause_time to get the desired framerate and speed. Speed of 2 means 1 second = 2 time Parameters ---------- framerate : float desired framerate speed : float desired animation speed ratio Returns ------- time_discretization : float time difference between frames pause_time : float pause time in seconds between frames ''' pause_time = 1 / framerate time_discretization = speed / framerate return (time_discretization, pause_time)