RADU: Processing & Interpreting ROS Movement Messages with Python
When using the Robot Operating System, nodes are started, topics published, messages send. Internally, the ROS nodes use these messages to communicate with each other. Now, if you build a robot that uses ROS as the communication middleware, you need to write custom code which will parse these messages and transform them to meaningful commands for your robot.
This article is a step-by-step tutorial about making your robot move based on ROS messages with the Twist
message format. First, we will start ROS nodes and investigate the Twist
messages that they exchange. Second, we need to decide the best approach and format to handle convert this data into a form that the robot understands. Finally, we implement the logic in the robot.
The technical context of this article is Ubuntu 20.04 LTS
with latest ros-noetic
and Python 3.8.10
Starting ROS
For the start, we need three different terminals.
In the first terminal, start roscore
.
$> roscore
... logging to /home/devcon/.ros/log/6c0f6dc0-e33f-11eb-9a51-737b11e35b15/roslaunch-giga-442540.log
Checking log directory for disk usage. This may take a while.
Press Ctrl-C to interrupt
Done checking log file disk usage. Usage is <1GB.
started roslaunch server http://giga:45125/
ros_comm version 1.15.11
SUMMARY
========
PARAMETERS
* /rosdistro: noetic
* /rosversion: 1.15.11
NODES
auto-starting new master
process[master]: started with pid [442548]
ROS_MASTER_URI=http://giga:11311/
setting /run_id to 6c0f6dc0-e33f-11eb-9a51-737b11e35b15
process[rosout-1]: started with pid [442558]
started core service [/rosout]
Then in the second, start the teleop-twist-keyboard
. With this node, you can use the keyboard to generate messages.
$> rosrun teleop_twist_keyboard teleop_twist_keyboard.py
Reading from the keyboard and Publishing to Twist!
---------------------------
Moving around:
u i o
j k l
m , .
For Holonomic mode (strafing), hold down the shift key:
---------------------------
U I O
J K L
M < >
t : up (+z)
b : down (-z)
anything else : stop
q/z : increase/decrease max speeds by 10%
w/x : increase/decrease only linear speed by 10%
e/c : increase/decrease only angular speed by 10%
CTRL-C to quit
currently: speed 0.5 turn 1.0
...
currently: speed 0.5 turn 0.47351393100000017
But where are these messages generated? Let’s explore the available ROS topics.
$> rostopic list
/rosout
/rosout_agg
/cmd_vel
/color_sensor
/pose
The messages are published in the topic /cmd_vel
.
Twist Message Format
We can check the data format of the topic /cmd_vel
.
$> rostopic type /cmd_vel
geometry_msgs/Twist
$> rosmsg show geometry_msgs/Twist
geometry_msgs/Vector3 linear
float64 x
float64 y
float64 z
geometry_msgs/Vector3 angular
float64 x
float64 y
float64 z
In addition to the command line, we can also use the graphical tool rqt
.
But what is the meaning of this message?
TWIST Message Format Explained
The twist
message format consists of two parts. In its linear
component, the planar directions for the three dimensions x, y, and z are defines as float values. Think of this as the speed your robot should drive in each direction. For example, the following command shows that a robot should move with the speed of 2 in the direction of x.
linear:
x: 2.0
y: 0.0
z: 0.0
The angular
component defines the change in orientation of a movement, for the three coordinates x, y, and z, expressed as radians per second. The coordinates represent the yaw, pitch and roll axis. This means that the z value describes a change in the 2d plane.
angular:
x: 0.0
y: 0.0
z: 1.5
The unit radians
means: Given a circle with radius r, a radian is the distance r along the circles border.
ROS: Twist Message Provider
Understanding the data format, lets define a ROS node that subscribes to this message.
First, we create a new ROS package.
catkin_create_pkg teleop_listener twist rospy
Then, create the file scripts/listener.py
. This script will create a ROS node that subscribes to the topic /cmd_vel
. For simplicity, we will just print the received messages.
# scripts/listener.py
import rospy
import time
from geometry_msgs.msg import Twist
def callback(twist_msg):
rospy.loginfo(rospy.get_caller_id() + "Received Data")
rospy.loginfo('Sending ' + repr(twist_msg))
def listener():
rospy.init_node('twist_listener')
rospy.Subscriber('/cmd_vel', Twist, callback)
rospy.loginfo('Twist Listener started')
rospy.spin()
if __name__ == '__main__':
listener()
Starting this program, we can see that the twist
messages are successfully parsed.
[INFO] [1627208023.888022]: Twist Listener started
[INFO] [1627208028.010496]: /twist_listenerReceived Data
[INFO] [1627208028.014720]: Sending linear:
x: 2.0
y: 0.0
z: 0.0
angular:
x: 0.0
y: 0.0
z: 0.0
Now we need to think about how this format will be interpreted by our robot.
Twist Message Format Wrapper on the Host Computer
The received messages have only two values that are meaningful for a planar, wheeled robot. The linear x
value determines the forward or backward direction, and the angular z
determines the amount of turns that the robot needs to make.
As documented in my earlier articles best practices for handling serial data, the most performant data formats that the motor controller receives is either bit fields or texts. Texts have the additional benefit for being more extensive than bit fields, and therefore we will use them.
The scripts/listener.py
will extract the linear and angular value, and create a text message in the form TWIST_MSG:(MOVE=2.0, TURN=0.0)
. Here is the code:
# scripts/listener_refined.py
import rospy
import time
from geometry_msgs.msg import Twist
def callback(twist_msg):
rospy.loginfo(rospy.get_caller_id() + "Received Data")
radu_msg = repr(f'TWIST_MSG:(MOVE={twist_msg.linear.x}, TURN={twist_msg.angular.z})')
rospy.loginfo('Sending <<' + radu_msg + '>>')
ser.write((radu_msg + "\n").encode('utf-8'))
def listener():
rospy.init_node('radu_mk1')
rospy.Subscriber('/cmd_vel', Twist, callback)
rospy.spin()
if __name__ == '__main__':
listener()
Twist Message Format Gateway on the Microcontroller
The received message seems simple, but we cannot parse it straight ahead. The values for MOVE
and TURN
need to translated to concrete commands of your robot. This involved consideration such as the speed values, the wheel diameter, the turn radius, and much more. I highly recommend this great blog article about motor controller that summarizes the essential math. In essence: Movement and turn commands are speed values, and these needs to be converted to duty cycles of PWM signals send to the motors.
Handling messages and transforming them to the internal format is responsibility of a class that I termed MessageGateway
. This class is instantiated with a concrete serial connection interface, then actively polls the interface for new messages, and of available, parses and translates them
Let’s develop the gateway step-by-step.
The main.py
class create an instance of MessageGateway
with a serial connection object.
#robot/radu/message_gateway.python
class MessageGateway():
def __init__(self, serial_obj):
self._serial = serial_obj
self.speed_limits= limits['speed']
self.twist_matcher = ure.compile('TWIST_MSG')
self.move_matcher = ure.compile('MOVE=(-?\d+.\d+)')
self.turn_matcher = ure.compile('TURN=(-?\d+.\d+)')
Once created, main.py
will regularly call gateway.poll()
while True:
obj = gateway.poll()
if obj:
robot.process(obj)
sleep_ms(5)
The poll()
method checks if any data from its serial connection exists...
def poll(self):
msg = self._serial.read()
if msg:
commands = self.parse(msg)
return commands
return None
... and if yes, will call the parse()
method. In this method, the following steps happen:
- Check if the message is a TWIST message (by using regular expressions)
- For the
MOVE
andTURN
commands, extract thefloat
value (by using regular expressions - return a list of tuples with the commands
def parse2(self, msg):
try:
if self.twist_matcher.search(msg):
move_string = self.move_matcher.search(msg).groups()
move_value = float(move_string[0])
turn_string = self.turn_matcher.search(msg).groups()
turn_value = float(turn_string[0])
result = [('MOVE', move_value), ('TURN', turn_value)]
print('PARSE2', result)
return({'TWIST_MSG': result})
except Exception as e:
print('MessageGateway error', msg, e)
return None
Motor Controller on the Microcontroller
The next step is to process the movements commands inside main.py
. The following method parses the command tuple and saves the movement values which are float
values. From this, it calculates the duty cycle for both wheels, and clips
the values so they confirm with the motor (in my case: a minimum of 49000
, and a maximum of 65000
). Then it sets the duty cycle on the servo motors.
def process_twist_msg(self, commands):
linear = angular = 0
for tuple in commands:
key, value = tuple
if key == 'MOVE':
linear = value
if key == 'TURN':
angular = value
m1_duty_percent = (linear - (angular * WHEEL_DISTANCE))*2
m2_duty_percent = (linear + (angular * WHEEL_DISTANCE))*2
duty_cycle1 = self.clip(m1_duty_percent)
duty_cycle2 = self.clip(m2_duty_percent)
self.logger.status("M1: {} dty".format(duty_cycle1))
self.m1.move2(duty_cycle1)
self.m2.move2(duty_cycle2)
Conclusion
When connecting your robot to ROS, you need to parse the ROS specific messages to your robot. In this article, I showed how to translate the ROS Twist
message format, which are movement information, to a format that the Robot can understand. The design space for this problem is huge. First, you need to decide how much interpretation of the format you are doing on the receiving ROS node, and how much the microcontroller to which the message is passed needs to do. Second, you decide the data format. The solution shown here evolved: While I was first passing complete Python objects, performance tests showed that working with texts is way faster. Third, you need to divide how to process the message inside your robot. For this, I adopted the Gateway
pattern: A central entity reads, parses, transforms the messages, and applies conversion (e.g. set appropriates speed limits). If your application gets complex, and when you need to send even more messages, then having a central Gateway class that encapsulate all of these steps is essential. Fourth, you need to transform the obtained values into concrete motor controller commands. In my case, this boils down to duty cycles of PWM, clipped into the possible minimum and maximum values.