Skip to content

Commit

Permalink
Merge pull request #11 from CLeARoboticsLab/bug/tcp-feedback
Browse files Browse the repository at this point in the history
Change state feedback to TCP connection
  • Loading branch information
jake-levy authored Jan 6, 2023
2 parents ef4ae2f + 2f748b5 commit 6fbb159
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "RosSockets"
uuid = "f2b1035b-1fed-4502-a057-be66ed18c291"
authors = ["Jacob Levy"]
version = "0.2.2"
version = "0.3.0"

[deps]
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ close_robot_connection(robot_connection)

First, ensure the `/state_feedback` node from from [ros_sockets](https://github.com/CLeARoboticsLab/ros_sockets) is running on the target.

Open a listener for feedback data, setting a `port` to listen on which matches that of the node:
Open a connection to the ROS node, setting `ip` and `port` to match that of the node:

```jl
ip = "192.168.88.128"
port = 42422
feedback_connection = open_feedback_connection(port)
feedback_connection = open_feedback_connection(ip, port)
```

With an open listener, wait for data to arrive from the ROS node.
With the connection open, wait for data to arrive from the ROS node.
Execution is blocked while waiting, up to the timeout duration (seconds) provided.
If the timeout duration elapses without the arrival of data, a TimeoutError exception is thrown.

Expand All @@ -76,7 +77,7 @@ state = receive_feedback_data(feedback_connection, timeout)

`receive_feedback_data` returns a struct with the following fields: `position`, `orientation`, `linear_vel`, `angular_vel`.

When complete with tasks, be sure to close the listener:
When complete with tasks, be sure to close the connection:

```jl
close_feedback_connection(feedback_connection)
Expand Down
2 changes: 1 addition & 1 deletion examples/multi_agent_example.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function run_example()

# open feedback and velocity control connections for each agent
for agent in agents
agent.feedback_connection = open_feedback_connection(agent.feedback_port)
agent.feedback_connection = open_feedback_connection(IP, agent.feedback_port)
agent.robot_connection = open_robot_connection(IP, agent.control_port)
end

Expand Down
8 changes: 4 additions & 4 deletions examples/optimal_feedback_control_example.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ function run_example()
# initialize a plotter to display trajectory solution real-time
display(plot(xlims=(-1,4),ylims=(-1,4),size=(600,600)))

# Open listener for feedback data from the ROS node
feedback_port = 42422 # port to listen for ROS data on
# Open a connection to the ROS feedback node
ip = "192.168.1.135" # ip address of the host of the ROS node
feedback_port = 42422 # port to connect on
timeout = 10.0 # maximum seconds to wait for data with receive_feedback_data
feedback_connection = open_feedback_connection(feedback_port)
feedback_connection = open_feedback_connection(ip, feedback_port)

# open a connection to the ROS velocity control node
ip = "192.168.1.135" # ip address of the host of the ROS node
control_port = 42421 # port to connect on
robot_connection = open_robot_connection(ip, control_port)

Expand Down
5 changes: 3 additions & 2 deletions examples/state_feedback_barebones_example.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ using RosSockets

function run_example()

port = 42422 # port to listen for ROS data on
ip = "192.168.1.135" # ip address of the host of the ROS node
port = 42422 # port to connect on
timeout = 10.0 # maximum seconds to wait for data with receive_feedback_data

# Open listener for feedback data from the ROS node
feedback_connection = open_feedback_connection(port)
feedback_connection = open_feedback_connection(ip, port)

# Wait for data to arrive from the ROS node. Execution is blocked while
# waiting, up to the timout duration provided. If the timeout duration
Expand Down
3 changes: 1 addition & 2 deletions examples/utils/nlp.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ function initialize_model(goal_state, timestep)
Q = 1
R = 2

T = 400
dt = timestep
times = [k*dt for k in 0:T-1]
T = Integer(round(40/dt))

v_max = 0.22
a_max = v_max
Expand Down
67 changes: 28 additions & 39 deletions src/state_feedback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@ mutable struct FeedbackData
end

mutable struct FeedbackConnection
socket::Sockets.UDPSocket
task::Task
command_channel::Channel{Any}
data_channel::Channel{Any}
ready_channel::Channel{Bool}

function FeedbackConnection(port::Integer)
@info "Starting feedback server on port $(port) ..."
socket = UDPSocket()
bind(socket,IPv4(0),port)
function FeedbackConnection(ip::String, port::Integer)
command_channel = Channel(1)
data_channel = Channel(1)
ready_channel = Channel{Bool}(1)
put!(ready_channel, false)
@info "Connecting to feedback server at $(ip):$(port) ..."
socket = Sockets.connect(ip, port)
task = errormonitor(Threads.@spawn feedback_connection_task(socket,
data_channel, ready_channel))
feedback_connection = new(socket, task, data_channel, ready_channel)
@info "Feedback server started"
command_channel, data_channel))
feedback_connection = new(task, command_channel, data_channel)
return feedback_connection
end
end
Expand All @@ -41,37 +37,31 @@ mutable struct TimeoutError <: Exception
end

"""
open_feedback_connection(port::Integer)
open_feedback_connection(ip::String, port::Integer)
Open listener for feedback data from the ROS node on the specified port and
return the `FeedbackConnection`.
Open a connection to the ROS node and return the `FeedbackConnection`.
The `ip` must be a string formated as `"123.123.123.123"`
"""
function open_feedback_connection(port::Integer)
return FeedbackConnection(port)
function open_feedback_connection(ip::String, port::Integer)
return FeedbackConnection(ip, port)
end

# This task continuously reads from the UDP port in order to keep the buffer
# clear. That way, when receive_feedback_data is called, stale data will not be
# returned.
function feedback_connection_task(socket, data_channel, ready_channel)
function feedback_connection_task(socket, command_channel, data_channel)
@info "Feedback connection task spawned"
while true
payload = nothing
try
payload = recv(socket)
catch e
# socket was closed, end the task
command = take!(command_channel)
if command === :close
@info "Closing feedback connection ..."
break
end

if fetch(ready_channel)
_ = take!(ready_channel)
put!(ready_channel, false)
put!(data_channel, payload)
else
@debug "Feedback data discarded"
end
msg = """{ "action": "get_feedback_data" }\n"""
write(socket, msg)
data = readline(socket)
put!(data_channel, data)
end
close(socket)
@info "Feedback connection task completed"
end

"""
Expand All @@ -91,11 +81,10 @@ exception.
"""
function receive_feedback_data(feedback_connection::FeedbackConnection,
timeout::Real = 10.0)
_ = take!(feedback_connection.ready_channel)
put!(feedback_connection.ready_channel, true)
t = Timer(_ -> timeout_callback(feedback_connection), timeout)
feedback_data = nothing
try
put!(feedback_connection.command_channel, :read)
payload = take!(feedback_connection.data_channel)
data = JSON.parse(String(payload))
feedback_data = FeedbackData(data)
Expand All @@ -117,15 +106,15 @@ function timeout_callback(feedback_connection::FeedbackConnection)
end

"""
close_feedback_connection(feedback_connection::FeedbackConnection)
close_feedback_connection(feedback_connection::FeedbackConnection)
Close the listener.
Close the connection with the ROS node.
"""
function close_feedback_connection(feedback_connection::FeedbackConnection)
@info "Stopping feedback server ..."
close(feedback_connection.socket)
put!(feedback_connection.command_channel, :close)
wait(feedback_connection.task)
close(feedback_connection.data_channel)
close(feedback_connection.ready_channel)
close(feedback_connection.command_channel)
@info "Feedback server stopped"
end
3 changes: 1 addition & 2 deletions src/velocity_control.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Open a connection to the ROS node and return the `RobotConnection`.
The `ip` must be a string formated as `"123.123.123.123"`
"""
function open_robot_connection(ip::String, port::Integer=42421)
function open_robot_connection(ip::String, port::Integer)
robot_connection = RobotConnection(ip, port)
return robot_connection
end
Expand All @@ -34,7 +34,6 @@ function robot_connection_task(channel, socket)
end
msg = """{ "controls": $(controls) }\n"""
write(socket, msg)
@info "Control commands sent"
end
close(socket)
@info "Robot connection task completed"
Expand Down
71 changes: 49 additions & 22 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,31 @@ function test_velocity_control()
port = 42450
channel = Channel(1)
server = listen(port)
errormonitor(@async begin
t = errormonitor(@async begin
sock = accept(server)
payload = readline(sock)
put!(channel, payload)
close(sock)
end)
@test !istaskfailed(t)

# connect to localhost, send data, then close the connection
robot_connection = open_robot_connection(ip, port)
@test !istaskfailed(robot_connection.task)
@test !istaskdone(robot_connection.task)

send_control_commands(robot_connection, commands)

close_robot_connection(robot_connection, stop_robot=false)
@test istaskdone(robot_connection.task)

# take the data from the channel and compare it to the sent data
received_payload = take!(channel)
data = JSON.parse(received_payload)
@test haskey(data, "controls")
@test data["controls"] == commands

close(server)
end

function test_state_feedback()
Expand All @@ -36,28 +45,41 @@ function test_state_feedback()
data["angular_vel"] = [.4,.5,.6]
feedback_data = FeedbackData(data)

ip = ip"127.0.0.1"
# create a server on localhost that listens for a connection, reads a
# command, writes feedback data, and puts the received command on a channel
ip = "127.0.0.1"
port = 42450
timeout = 10.0
channel = Channel(1)
timeout = 5.0

# create a task that opens a feedback connection, recieves data, and then
# closes the connection
feedback_connection = open_feedback_connection(port)
task = errormonitor(@async begin
received_feedback_data = receive_feedback_data(feedback_connection, timeout)
put!(channel, received_feedback_data)
close_feedback_connection(feedback_connection)
channel = Channel(1)
server = listen(port)
t = errormonitor(@async begin
sock = accept(server)
payload = readline(sock)
put!(channel, payload)
write(sock,JSON.json(data)*"\n")
close(sock)
end)
@test !istaskfailed(t)

# open a feedback connection, recieve data, and then close the connection
feedback_connection = open_feedback_connection(ip, port)
@test !istaskfailed(feedback_connection.task)
@test !istaskdone(feedback_connection.task)

# send data to the feedback connection and compare the received data to the
# sent data
sleep(1.0)
sock = UDPSocket()
send(sock, ip, port, JSON.json(data))
received_feedback_data = take!(channel)
wait(task)
received_feedback_data = receive_feedback_data(feedback_connection, timeout)
@test received_feedback_data == feedback_data

close_feedback_connection(feedback_connection)
@test istaskdone(feedback_connection.task)

# check that the command sent to the feedback server is correct
received_payload = take!(channel)
json_cmd = JSON.parse(received_payload)
@test haskey(json_cmd, "action")
@test json_cmd["action"] == "get_feedback_data"

close(server)
end

function Base.:(==)(x::FeedbackData, y::FeedbackData)
Expand All @@ -67,7 +89,12 @@ function Base.:(==)(x::FeedbackData, y::FeedbackData)
x.angular_vel == y.angular_vel
end

@testset "RosSockets.jl" begin
test_velocity_control()
test_state_feedback()
end
@testset verbose = true "RosSockets.jl" begin
@testset "Velocity Control" begin
test_velocity_control()
end

@testset "State Feedback" begin
test_state_feedback()
end
end

0 comments on commit 6fbb159

Please sign in to comment.