Source code for scripts.initial_state

#!/usr/bin/env python

"""
.. module:: initial_state
    :platform: Unix
    :synopsis: Python code to initialize and load the topology

.. moduleauthor:: Matteo Carlone <matteo.carlone99@gmail.com>

Service:
    /start 
Action:
    /armor_client

This Node start the whole program by

    - Looking for the aruco marker
        - orient the camera using the robot arm
    - decode information about the environment
    - initializing and loading the topology.

"""

#---Libraries---#

import sys
import roslib
import rospy
import actionlib
import time
from os.path import dirname, realpath

from std_srvs.srv import Empty , EmptyResponse
from armor_api.armor_client import ArmorClient

from patrol_robot import environment as env

from patrol_robot.srv import MarkerRoutine
from patrol_robot.srv import RoomInformation
from std_msgs.msg import Int32


#--------------#

[docs]class InitialState: """ Class representing the Initial-State of the Smach-State-Machine, which load the Ontology initialized with the environment description, and starting point of the Robot. ... Methods ---------- __init__(self) Initialization of parameters: client:ros_action_client Armor-Client to set-up the Ontology path:str The Ontology folder-path to load server:ros_server the server of the Empty service /start with Callback execute() execute(self,request) Server Callback of the /start service requested from the fsm module to start up the whole program. This Callback-Server load the ontology, set all the room of the environment with the respective doors, the starting position of the robot, already initialized in the starting ontology, the disjoint function that let the reasoner understanding all the individuals and I assumed all the rooms visited at time zero in order to yet start the counter for the URGENT rooms. All the informations are retrived from aruco markers detection. Therefore the camera mounted on the robot's arm scan the surroundings by achiving different orientations. The movement of the arm is performed by a MoveIt control node. """
[docs] def __init__(self): # armor - client self.client = ArmorClient("armor_client", "reference") # absolute ontology path self.path = dirname(realpath(__file__)) self.path = self.path + "/../topology/" # /start Empty Server , execute callback self.server = rospy.Service(env.SERVER_START , Empty , self.execute) # ROS message subscriber on the topic /aruco_detector/id rospy.Subscriber("/aruco_detector/id", Int32, self.id_callback) # define empty lists and dictionaries self.id_list = [] self.loc = [] self.loc_dict = {} self.loc_coordinates = {} self.coordinates_loc = {}
[docs] def execute(self,request): time.sleep(5) print('\n###############\nTOPOLOGY LOADING EXECUTION') # get the current time instant curr_time = int(time.time()) # wait for move_arm srv to be online rospy.wait_for_service('move_arm') # Create a client for the 'move_arm' service MR_client = rospy.ServiceProxy('move_arm',MarkerRoutine) # Call the 'MarkerRoutine' method of the 'move_arm' service with an argument of 1 (MR_left position) resp = MR_client(1) print(resp.message) time.sleep(5) # Call the 'MarkerRoutine' method of the 'move_arm' service with an argument of 2 (home position) resp = MR_client(2) print(resp.message) time.sleep(5) # Call the 'MarkerRoutine' method of the 'move_arm' service with an argument of 1 (MR_right position) resp = MR_client(3) print(resp.message) time.sleep(5) # Call the 'MarkerRoutine' method of the 'move_arm' service with an argument of 1 (MR_back position) resp = MR_client(4) print(resp.message) time.sleep(5) # wait for room_info srv to be online rospy.wait_for_service('/room_info') # Create a client for the 'room_info' service MS_client = rospy.ServiceProxy('/room_info',RoomInformation) # construct lists and dictionaries from infos retrived from the room_info srv for i in self.id_list: resp = MS_client(i) self.loc.append(resp.room) self.loc_dict[resp.room] = resp.connections self.loc_coordinates[resp.room] = [resp.x,resp.y] self.coordinates_loc[str(resp.x) + ',' + str(resp.y)] = resp.room time.sleep(0.5) # load dictionaries on the ROS parameter server rospy.set_param('ids',self.loc_coordinates) rospy.set_param('coord',self.coordinates_loc) # Call the 'MarkerRoutine' method of the 'move_arm' service with an argument of 5 (stop position) resp = MR_client(5) print(resp.message) time.sleep(3) # load ontology from the absolute path self.client.utils.load_ref_from_file(self.path + "topological_map.owl", "http://bnc/exp-rob-lab/2022-23", True, "PELLET", True, False) # ------- Set up all the rooms with respective doors ------- # self.client.utils.mount_on_ref() self.client.utils.set_log_to_terminal(True) for i in self.loc: connections = self.loc_dict[i] print(i) for j in connections: print(j.through_door) self.client.manipulation.add_objectprop_to_ind("hasDoor", i, j.through_door) # ----------------------------------------------------------- # # Robot starting room self.client.manipulation.add_objectprop_to_ind("isIn", 'Robot1', 'E') # Set all rooms visited at curr_time time instant for room in self.loc: self.client.manipulation.add_dataprop_to_ind('visitedAt',room, 'Long', str(curr_time)) # Disjoint for Individuals understanding self.client.call('DISJOINT','IND','',self.loc) # First Reasoning self.client.utils.apply_buffered_changes() self.client.utils.sync_buffered_reasoner() print('LOADING COMPLETED') # returning an empty response to notify the completed load of the ontology return EmptyResponse()
[docs] def id_callback(self,msg): """ Callback of the ROS message on the topic /aruco_detector/id to retrive codes from aruco markers and stuck them in a list """ if(msg.data not in self.id_list): self.id_list.append(msg.data)
if __name__ == "__main__": # Initialize the ROS-Node rospy.init_node(env.NODE_INIT_STATE, log_level=rospy.INFO) # Instantiate the node manager class and wait. InitialState() rospy.spin()