Source code for scripts.fsm

#!/usr/bin/env python

"""
.. module:: Finite_State_Machine
    :platform: Unix
    :synopsis: Python code that manage the SMACH finite state machine 

.. moduleauthor:: Matteo Carlone <matteo.carlone99@gmail.com>

Service:
    /start 
    /reason
    /recharge
Action:
    /armor_client
    motion/controller
    motion/planner

This Node implement the SMACH Finite-State-Machine that guide all the surveillance routine.
The communication with other nodes is managed using ros-services and actions, the actual structure of the service and 
action client is implemented in the script helper.py in the utilities folder.
two sub state machines are implemented to exploit the planning and control states while performing the movement and the recharge routine
composed by a movement to the DOC-Station (Starting Room) and the actual recharging of the battery.

"""

#---Libraries---#

import roslib
import rospy
import smach
import smach_ros
import time
import string

from patrol_robot.helper import InterfaceHelper
from patrol_robot import environment as env

#--------------#
 
#global variable describing the battery state
low = False

[docs]class Initial(smach.State): """ Class representing the initial state directly connected with the script initial_state.py . When called from the SMACH state machine a client makes a request to the server /start, and then it busy waits the response of the finished loading of the Ontology from the server. Every acces to the flags to be checked is protected by mutexes in order to avoid simultaneus operations on the same varible, ROS automatically protect access to variables with mutexes. First State runned when main state machine is called. * Returns: start:(str) transition flag for the SMACH fsm to the reason state. """ def __init__(self,interfacehelper): # State Initialization smach.State.__init__(self, outcomes=['start'], input_keys=['initial_counter_in'], output_keys=['initial_counter_out']) # InterfaceHelper Object declaration self._helper = interfacehelper print('init_fsm')
[docs] def execute(self, userdata): """ Initial State execution function, called by the smach state machine """ # /start service client request self._helper.start_client() print('req srv') # looping until transition while not rospy.is_shutdown(): print('initialization') # acquire mutex self._helper.mutex.acquire() try: # look for transition flags: # transition to reasoning state if self._helper.should_reasoning_start(): # resetting states when transition occur self._helper.reset_states() # returning the transition associated with checked flag return 'start' finally: # release mutex self._helper.mutex.release() rospy.sleep(0.3)
[docs]class Reasoning(smach.State): """ Class representing the reasoning state directly connected with the script reason.py . When called from the SMACH state machine a client makes a request to the server /reason, and then it busy waits the response of the finished decision of which room will be pointed by the robot. Every acces to the flags to be checked is protected by mutexes in order to avoid simultaneus operations on the same varible, ROS automatically protect access to variables with mutexes. * Input Transitions: start:(str) transition flag for the SMACH fsm from the initial state. * Returns: reasoned:(str) transition flag for the SMACH fsm to the moving sub-state machine. battery_low:(str) transition flag for the SMACH fsm to the recharge sub-state machine. """ def __init__(self,interfacehelper): # State Initialization smach.State.__init__(self, outcomes=['start','reasoned','battery_low'], input_keys=['reasoning_counter_in'], output_keys=['reasoning_counter_out']) # InterfaceHelper Object declaration self._helper = interfacehelper
[docs] def execute(self, userdata): """ Reasoning State execution function, called by the smach state machine. """ # recall the low global variable global low # /reason service client request self._helper.reason_client() # looping until transition while not rospy.is_shutdown(): # acquire mutex self._helper.mutex.acquire() try: # look for transition flags: # transition to recharge sub-state machine if self._helper.is_battery_low(): # resetting states when transition occur self._helper.reset_states() # change global battery state low = True # returning the transition associated with checked flag return 'battery_low' # transition to moving sub-state machine if self._helper.should_pointing_start(): # resetting states when transition occur self._helper.reset_states() # returning the transition associated with checked flag return 'reasoned' finally: # release mutex self._helper.mutex.release() rospy.sleep(0.3)
[docs]class Pointing(smach.State): """ Class representing the pointing state directly connected with the script planner.py . When the sub-state machine moving is called from the SMACH state machine a action-client makes a request to the action-server motion/planner, and then it busy waits the response of the finished planning of the via-poits the robot will follow in the Controlling State. Every acces to the flags to be checked is protected by mutexes in order to avoid simultaneus operations on the same varible, ROS automatically protect access to variables with mutexes. First State runned when the Moving sub-state machine is called. * Returns: pointed:(str) transition flag for the SMACH fsm to the controlling state of the moving sub-state machine. battery_low:(str) transition flag for the SMACH fsm to the recharge sub-state machine. """ def __init__(self,interfacehelper): # State Initialization smach.State.__init__(self, outcomes=['pointed','battery_low'], input_keys=['pointing_counter_in'], output_keys=['pointing_counter_out']) # InterfaceHelper Object declaration self._helper = interfacehelper
[docs] def execute(self, userdata): """ Pointing State execution function, called by the smach state machine. """ # recall the low global variable global low # motion/planner action client request, low variable is passed to force the robot # to the DOC location self._helper.send_planner_goal(low) # looping until transition while not rospy.is_shutdown(): # acquire mutex self._helper.mutex.acquire() try: # look for transition flags: # transition to recharge sub-state machine if self._helper.is_battery_low(): # resetting states when transition occur self._helper.reset_states() # canceling the goal, the transition may arrive while the action is on going self._helper.planner_client.cancel_goals() # change global battery state low = True # returning the transition associated with the checked flag return 'battery_low' # transition to the controlling state inside the moving sub state machine if self._helper.planner_client.is_done(): # resetting states when transition occur self._helper.reset_states() # returning the transition associated with the checked flag return 'pointed' finally: # release mutex self._helper.mutex.release() rospy.sleep(0.3)
[docs]class Controlling(smach.State): """ Class representing the controlling state directly connected with the script controller.py . When called from the SMACH state machine a action-client makes a request to the action-server motion/controller, and then it busy waits the response of the finished movement of the robot trough the via points passed by the client and taken from the Pointing State. Every acces to the flags to be checked is protected by mutexes in order to avoid simultaneus operations on the same varible, ROS automatically protect access to variables with mutexes. * Input Transitions: pointed:(str) transition flag for the SMACH fsm from the Pointing state. * Returns: reached:(str) transition flag for the SMACH fsm that ends the Moving sub-state machine battery_low:(str) transition flag for the SMACH fsm to the recharge sub-state machine. """ def __init__(self,interfacehelper): # State Initialization smach.State.__init__(self, outcomes=['reached','battery_low'], input_keys=['controlling_counter_in'], output_keys=['controlling_counter_out']) # InterfaceHelper Object declaration self._helper = interfacehelper
[docs] def execute(self, userdata): """ Controlling State execution function, called by the smach state machine """ # recall the low global variable global low # motion/controller action client request self._helper.send_controller_goal() # looping until transition while not rospy.is_shutdown(): # acquire mutex self._helper.mutex.acquire() try: # look for transition flags: # transition to recharge sub-state machine if self._helper.is_battery_low(): # resetting states when transition occur self._helper.reset_states() # canceling the goal, the transition mt arrive while the action in on going self._helper.controller_client.cancel_goals() # change global battery state low = True # returning the transition associated with checked flag return 'battery_low' # transition that ends the moving sub-state machine if self._helper.controller_client.is_done(): # resetting states when transition occur self._helper.reset_states() # returning the transition associated with the checked flag return 'reached' finally: # release mutex self._helper.mutex.release() rospy.sleep(0.3)
[docs]class Recharge(smach.State): """ Class representing the Recharge state directly connected with the script battery.py . When the sub-state machine recharge is called from the SMACH state machine a client makes a request to the server /recharge, and then it busy waits the response of the finished recharging of the robot's battery. Since the robot can recharge the battery only in the DOC-Statetion, if the robot is not there the fsm redirect the the state to the moving sub-state machine nested in the recharge sub-state machine in order to force the robot to DOC before recharging. Every acces to the flags to be checked is protected by mutexes in order to avoid simultaneus operations on the same varible, ROS automatically protect access to variables with mutexes. First State runned when the Recharge sub-state machine is called. * Returns: battery_full:(str) transition flag for the SMACH fsm that ends the recharge sub-state machine. not_at_dock:(str) transition flag for the SMACH fsm to Moving sub-state machine in order to get to the DOC-station. """ def __init__(self,interfacehelper): # State Initialization smach.State.__init__(self, outcomes=['battery_full','not_at_dock'], input_keys=['recharge_counter_in'], output_keys=['recharge_counter_out']) # Interface Object declaration self._helper = interfacehelper
[docs] def execute(self, userdata): """ Recharging State execution function, called by the smach state machine """ # recall the low global variable global low # reasoning on the ontology self._helper.reason() # getting the actual robot's location isin = self._helper.client.query.objectprop_b2_ind('isIn','Robot1') isin = self._helper.list_formatter(isin,'#','>') # check weather the robot is not in the starting location if env.START_LOC not in isin: print('\nROBOT in: ',isin) print('Not in START LOCATION, Cannot DOC\n') # retunrning transition to the moving sub-state machine nested in the recharge sub-state machine return 'not_at_dock' # /recharge service client request self._helper.recharge_client() # looping until transition while not rospy.is_shutdown(): # acquire mutex self._helper.mutex.acquire() try: # look for transition flags: # transition that ends the recharge sub-state machine if self._helper.is_battery_full(): # resetting states when transition occur self._helper.reset_states() # change global battery state low = False # returning the transition associated with the checked flag return 'battery_full' finally: # release mutex self._helper.mutex.release() rospy.sleep(0.3)
def main(): print('fsm') # Initialize ros-node rospy.init_node(env.NODE_FSM) print('fsm2') # InterfaceHelper Object Initialization interfacehelper = InterfaceHelper() # Create a SMACH state machine sm_top = smach.StateMachine(outcomes=['fsm']) sm_top.userdata.sm_counter = 0 # ------ Main State Machine ------ # with sm_top: smach.StateMachine.add('INITIAL', Initial(interfacehelper), transitions={'start':'REASONING'}, remapping={'initial_counter_in':'sm_counter', 'initial_counter_out':'sm_counter'}) smach.StateMachine.add('REASONING', Reasoning(interfacehelper), transitions={'start':'REASONING', 'reasoned':'MOVING', 'battery_low':'RECHARGE'}, remapping={'reasoning_counter_in':'sm_counter', 'reasoning_counter_out':'sm_counter'}) # Create moving sub-state machine sm_sub = smach.StateMachine(outcomes=['exit','low']) sm_sub.userdata.sm_counter = 0 # ------ Moving Sub-State Machine ------ # with sm_sub: smach.StateMachine.add('POINTING', Pointing(interfacehelper), transitions={'pointed':'CONTROLLING', 'battery_low':'low'}, remapping={'pointing_counter_in':'sm_counter', 'pointing_counter_out':'sm_counter'}) smach.StateMachine.add('CONTROLLING', Controlling(interfacehelper), transitions={'reached':'exit', 'battery_low':'low'}, remapping={'controlling_counter_in':'sm_counter', 'controlling_counter_out':'sm_counter'}) # --------------------------------------- # smach.StateMachine.add('MOVING', sm_sub, transitions={'exit':'REASONING', 'low':'RECHARGE'}, remapping={'moving_counter_in':'sm_counter', 'moving_counter_out':'sm_counter'}) # Create recharge sub-state machine sm_recharge = smach.StateMachine(outcomes=['full']) sm_recharge.userdata.sm_counter = 0 # ------ Recharge Sub-State Machine ------ # with sm_recharge: smach.StateMachine.add('RECH_BAR', Recharge(interfacehelper), transitions={'battery_full':'full', 'not_at_dock':'MOVE_TO_DOCK'}, remapping={'bar_counter_in':'sm_counter', 'bar_counter_out':'sm_counter'}) smach.StateMachine.add('MOVE_TO_DOCK', sm_sub, transitions={'exit':'RECH_BAR', 'low':'MOVE_TO_DOCK'}, remapping={'moving_counter_in':'sm_counter', 'moving_counter_out':'sm_counter'}) # --------------------------------------- # smach.StateMachine.add('RECHARGE', sm_recharge, transitions={'full':'REASONING'}, remapping={'recharge_counter_in':'sm_counter', 'recharge_counter_out':'sm_counter'}) # --------------------------------------- # # Create and start the introspection server for visualization sis = smach_ros.IntrospectionServer('server_name', sm_top, '/SM_ROOT') sis.start() # Execute the state machine outcome = sm_top.execute() # Wait for ctrl-c to stop the application rospy.spin() sis.stop() if __name__ == '__main__': main()