Source code for armour.agent.ArmourAgentState

from rtd.entity.components import BaseStateComponent
from rtd.entity.states import ArmRobotState
from rtd.util.mixins import Options
from armour.agent import ArmourAgentInfo
import numpy as np
from rtd.util.mixins.Typings import Vecnp, Matnp, Boundsnp

# define top level module logger
import logging
logger = logging.getLogger(__name__)



[docs]class ArmourAgentState(BaseStateComponent, Options):
[docs] @staticmethod def defaultoptions() -> dict: return { "initial_position": None, "initial_velocity": None, }
def __init__(self, arm_info: ArmourAgentInfo, **options): # initialize base classes BaseStateComponent.__init__(self) Options.__init__(self) # initialize using given options self.mergeoptions(options) self.entityinfo = arm_info # self.reset() @property def position(self): return self.state[self.position_indices,:] @property def velocity(self): return self.state[self.velocity_indices,:]
[docs] def reset(self, **options): options = self.mergeoptions(options) # Set component dependent properties self.n_states = 2 * self.entityinfo.n_q self.position_indices = np.arange(0, self.n_states, 2) self.velocity_indices = np.arange(1, self.n_states, 2) logger.info("Resetting time and states") self.state: Matnp = np.zeros((self.n_states, 1)) self.time = np.zeros(1) self.step_start_idxs = np.zeros(1) # add position if options["initial_position"] is not None: logger.debug("Using provided joint positions") self.state[self.position_indices] = options["initial_position"] # add velocity if options["initial_velocity"] is not None: logger.debug("Using provided joint velocities") self.state[self.velocity_indices] = options["initial_velocity"] # take these initials and merge them in again options["initial_position"] = self.position options["initial_velocity"] = self.velocity self.mergeoptions(options)
[docs] def random_init(self, pos_range: Boundsnp = None, vel_range: Boundsnp = None, random_position: bool = True, random_velocity: bool = False, save_to_options: bool = False): if pos_range is None: pos_range = self.entityinfo.joints[:self.entityinfo.n_q].position_limits if vel_range is None: vel_range = self.entityinfo.joints[:self.entityinfo.n_q].velocity_limits # set any joint limits that are +Inf to pi and -Inf to -pi pos_range_infs = np.isinf(pos_range) pos_range[0, pos_range_infs[0,:]] = -np.pi pos_range[1, pos_range_infs[1,:]] = np.pi # reset self.state: Matnp = np.zeros((self.n_states, 1)) self.time = np.zeros(1) self.step_start_idxs = np.zeros(1) # make the random configuration if random_position: self.state[self.position_indices] = np.random.uniform(pos_range[0,:], pos_range[1,:]) if random_velocity: self.state[self.velocity_indices] = np.random.uniform(vel_range[0,:], vel_range[1,:]) if save_to_options: self.mergeoptions({"initial_position": self.position, "initial_velocity": self.velocity})
[docs] def get_state(self, time: Vecnp = None) -> ArmRobotState: ''' Returns the state of the ArmourAgent at a specific time. ''' # default to the last time and state if time is None: time = self.time[-1:] state = ArmRobotState(self.position_indices, self.velocity_indices) state.time = time # cannot interpolate if self.time.size == 1: state.state = self.state[:,0:][..., None] return state # otherwise, interpolate timesize = time.size if isinstance(time, Vecnp) else 1 state.state = np.zeros((self.n_states, timesize)) # if we can and need to interpolate the state, do it for i in range(self.n_states): state.state[i,:] = np.interp(time, self.time, self.state[i,:]) return state
[docs] def commit_state_data(self, T_state: Vecnp, Z_state: Matnp): ''' method: commit_move_data(T_state,Z_state) After moving the agent, commit the new state and input trajectories, and associated time vectors, to the agent's state, time, input, and input_time properties. Parameters ---------- T_state: NpFvec times to commit states to Z_state : NpFmat corresponding states for the times ''' # update the time and state self.step_start_idxs = np.append(self.time, self.time.size+1) self.time = np.concatenate((self.time, self.time[-1] + T_state[1:])) self.state = np.concatenate((self.state, Z_state[:,1:]), 1)
[docs] def joint_limit_check(self, t_check_step: float) -> bool: ''' Checks if joint limits did not exceed at any time. Parameters ---------- t_check_step : float step size to check with Returns ------- check : bool whether limit was exceeded or not ''' # create time vector for checking start_idx = int(self.step_start_idxs[-1]) t_check = np.arange(self.time[start_idx], self.time[-1], t_check_step) # get agent state trajectories interpolated to time pos_check = np.array([np.interp(t_check, self.time[start_idx:], self.position[n,start_idx:]) for n in range(self.position.shape[0])]) vel_check = np.array([np.interp(t_check, self.time[start_idx:], self.velocity[n,start_idx:]) for n in range(self.velocity.shape[0])]) logger.info("Running joint limits check!") # check position & velocity pos_exceeded = np.zeros(pos_check.shape, dtype=bool) vel_exceeded = np.zeros(vel_check.shape, dtype=bool) for i in range(self.entityinfo.n_q): # pos lb = np.any(pos_check[i,:] < float(self.entityinfo.params.pos_lim[0,i])) ub = np.any(pos_check[i,:] > float(self.entityinfo.params.pos_lim[1,i])) pos_exceeded[i,:] = lb | ub # vel vel_exceeded[i,:] = np.any(np.abs(vel_check[i,:]) > float(self.entityinfo.params.vel_lim[i])) # get out results out = np.any(pos_exceeded) | np.any(vel_exceeded) if out: # position limits exceeded in these positions idx_list = np.argwhere(pos_exceeded) for joint, t in idx_list: lb = self.entity_info.joints[joint].position_limits[0] ub = self.entity_info.joints[joint].position_limits[1] logger.error(f"t={t_check[t]:.2f}, {joint}-position limit exceeded: {pos_check[joint,t]:.5f}, [{lb:.5f},{ub:.5f}]") # velocity limits exceeded in these positions idx_list = np.argwhere(vel_exceeded) for joint, t in idx_list: lb = self.entity_info.joints[joint].velocity_limits[0] ub = self.entity_info.joints[joint].velocity_limits[1] logger.error(f"t={t_check[t]:.2f}, {joint}-velocity limit exceeded: {vel_check[joint,t]:.5f}, [{lb:.5f},{ub:.5f}]") else: logger.info("No joint limits exceeded") return out
def __str__(self): return (f"State component {repr(self)} with properties:\n" + f" arm_info: {repr(self.entityinfo)}\n" + f" state dim: {self.state.shape}\n")