Source code for scripts.planner

#! /usr/bin/env python

"""
.. module:: Planner
    :platform: Unix
    :synopsis: Python code that plan a set via_points from a room to another

.. moduleauthor:: Matteo Carlone <matteo.carlone99@gmail.com>


Action:

    /armor_client
    motion/planner


This Node implement the planning action of creating a set of via points between a room to another. 
Those via points will be then passed to the Controlling node to perform the actual movement.

"""
#---Libraries---#

import rospy
from actionlib import SimpleActionServer
from patrol_robot.msg import Point, PlanAction, PlanFeedback, PlanResult
from armor_api.armor_client import ArmorClient
from patrol_robot import environment as env
import numpy as np
import re
from std_msgs.msg import Bool


#--------------#

[docs]class PlaningAction(object): """ Class representing the Planning state of the Smach-State-Machine, which creates a set of via points between the robot position and a target room to be pointed decided by the Reasoner State and passed to this ros node via a motion/planner action-client request in the fsm script. Methods ---------- __init__(self) Initialization of parameters: client:ros_action_client Armor-Client to set-up the Ontology as:ros_action_server the server of the motion/planner action _environment_size:list[] ROS parameter containing the coordinate limits of the environment execute_callback(self,goal) Server Callback of the action motion/planner requested from the fsm module to start up the via points generation action. This Callback-Server simulate the robot's planning by generating a set of n poits equally spacied from a room to another. """
[docs] def __init__(self): self._environment_size = rospy.get_param(env.ENV_SIZE) self.client = ArmorClient('armor_client', 'reference') # Instantiate and start the action server based on the `SimpleActionServer` class. self._as = SimpleActionServer(env.ACTION_PLANNER, PlanAction, execute_cb=self.execute_callback, auto_start=False) # Initialize the starting room self.start_room = env.START_LOC # Define boolean variable for the battery state self._battery_low = False # # ROS message subscriber on the topic /battery_low self.sub_battery = rospy.Subscriber(env.TOPIC_BATTERY_LOW, Bool, self._battery_cb) # start plan action server self._as.start()
def _battery_cb(self,battery_value): """ Callback function for /battery_low topic. Stores the battery state. """ # store battery state from /battery_low topic message self._battery_low = battery_value.data
[docs] def execute_callback(self, goal): print('\n###############\nPLANNING EXECUTION') # Get list of room coordinates from ROS parameters loc_coordinates = rospy.get_param('ids') if self._battery_low: # Wait until robot is in a room during the low battery routine print('BATTERY_LOW') # Ontology Reasoning self.client.utils.apply_buffered_changes() self.client.utils.sync_buffered_reasoner() # Query ontology for current location of robot current_start_room = self.client.query.objectprop_b2_ind('isIn','Robot1') # Format room name from ontology response current_start_room = re.search('#(.+?)>',current_start_room[0]).group(1) # Loop until robot is no longer in the previous target room r = rospy.Rate(1) while(current_start_room == self.start_room): # Ontology Reasoning self.client.utils.apply_buffered_changes() self.client.utils.sync_buffered_reasoner() # Query ontology for current location of robot current_start_room = self.client.query.objectprop_b2_ind('isIn','Robot1') # Format room name from ontology response current_start_room = re.search('#(.+?)>',current_start_room[0]).group(1) r.sleep() # Check if robot is already in target room if current_start_room == goal.target: self._as.set_aborted() return # get the current robot location self.start_room = self.client.query.objectprop_b2_ind('isIn','Robot1') # format information self.start_room = re.search('#(.+?)>',self.start_room[0]).group(1) # mapping start_room location into coordinates start_point = loc_coordinates[self.start_room] # mapping the target location into coordinates target_point = loc_coordinates[goal.target] print('Start Room: '+self.start_room+'\n') print('Target Room: '+goal.target+'\n') log_msg = (f'Starting-Room [{start_point[0]}, {start_point[1]}] , Target-Room [{target_point[0]}, ' f'{target_point[1]}] ') print(log_msg) # check if locations are None if start_point is None or target_point is None: log_msg = 'Cannot have `None` start point nor target_point. This service will be aborted!.' print(log_msg) # Close service by returning an `ABORT` state to the client. self._as.set_aborted() return # check if locations' coordinates are in the environment limit if not(self._is_valid(start_point) and self._is_valid(target_point)): log_msg = (f'Start point ({start_point[0]}, {start_point[1]}) or target point ({target_point[0]}, ' f'{target_point[1]}) point out of the environment. This service will be aborted!.') print(log_msg) # Close service by returning an `ABORT` state to the client. self._as.set_aborted() return # Initialise the `feedback` with the starting point of the plan. feedback = PlanFeedback() feedback.via_points = [] # number of via points n_points = 10 # generate a set of n equally spaced via points points_x = np.linspace(start_point[0],target_point[0],num = n_points) points_y = np.linspace(start_point[1],target_point[1],num = n_points) # format information points = [[a , b] for a, b in zip(points_x, points_y)] print('GENERATING VIA POINTS...') # loop to simulated time in generating via points for i in range(n_points): if self._as.is_preempt_requested(): print('Server has been cancelled by the client!') # Actually cancel this service. self._as.set_preempted() return # create a Point struct with current location's coordinates new_point = Point() new_point.x = points[i][0] new_point.y = points[i][1] print('[' + str("%.2f"% new_point.x)+','+str("%.2f"% new_point.y)+']') # update feedback feedback.via_points.append(new_point) # publish feedback self._as.publish_feedback(feedback) rospy.sleep(0.1) # Publish the results to the client. result = PlanResult() result.via_points = feedback.via_points self._as.set_succeeded(result)
def _is_valid(self, point): # return bool variable, true coordinates are in the env limits, false viceversa return self._environment_size[0] <= point[0] <= self._environment_size[1] and self._environment_size[2] <= point[1] <= self._environment_size[3]
if __name__ == '__main__': # Initialise the node, its action server, and wait. rospy.init_node(env.NODE_PLANNER, log_level=rospy.INFO) # Instantiate the node manager class and wait. server = PlaningAction() rospy.spin()