Skip to content

Commit

Permalink
Merge pull request #12 from CLeARoboticsLab/general_connection
Browse files Browse the repository at this point in the history
Add general TCP connection
  • Loading branch information
jake-levy authored Mar 13, 2023
2 parents 6fbb159 + a0bdbdc commit 5a17709
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 113 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "RosSockets"
uuid = "f2b1035b-1fed-4502-a057-be66ed18c291"
authors = ["Jacob Levy"]
version = "0.3.0"
version = "0.4.0"

[deps]
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Expand Down
48 changes: 47 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![RosSockets](https://github.com/CLeARoboticsLab/RosSockets.jl/actions/workflows/test.yml/badge.svg)](https://github.com/CLeARoboticsLab/RosSockets.jl/actions/workflows/test.yml)

Tools for sending and receiving information from ROS via TCP that can be used to control robots.
This package is meant to communicate with the ROS nodes from [ros_sockets](https://github.com/CLeARoboticsLab/ros_sockets).
This package is meant to communicate with the ROS nodes from [ros_sockets](https://github.com/CLeARoboticsLab/ros_sockets), but also provides a framework for communication with TCP server in general.

## Installation

Expand Down Expand Up @@ -83,6 +83,52 @@ When complete with tasks, be sure to close the connection:
close_feedback_connection(feedback_connection)
```

### General TCP Communication

First, open a connection to the TCP server, setting setting `ip` and `port` to match that of the server:

```jl
ip = "192.168.1.135"
port = 42423
connection = open_connection(ip, port)
```

Send messages with `send`. Note: some TCP servers may be require the message be formatted a certain way (such as JSON), and may also require an end of line character, such as `\n`, to terminate the message. Here is an example of sending a JSON formatted message:

```jl
import JSON

# create a JSON formatted message with property name "action" and value "start_experiment"
start_cmd = JSON.json(Dict("action" => "start_experiment")) * "\n"

# send the message
send(connection, start_cmd)
```

Send a message and wait for a response with `send_receive`. Note: some TCP servers may be require the message be formatted a certain way (such as JSON), and may also require an end of line character, such as `\n`, to terminate the message. This function blocks execution while waiting, up to the timeout duration provided. If the timeout duration elapses without the arrival of data, throws a `TimeoutError` exception. Note: the payload which is returned will be in a raw format. To convert to a string, use `String(payload)`. This string may further converted to other formats such as JSON. Here is an example of sending a JSON formatted message, receiving a response, and parsing the response.

```jl
import JSON

# create a JSON formatted message with property name "action" and value "get_time_elapsed"
get_time_cmd = JSON.json(Dict("action" => "get_time_elapsed")) * "\n"

# send the message and wait for a response
payload = send_receive(connection, get_time_cmd)

# convert the payload to a String, parse the String as a JSON, extract the data,
# and print it
data = JSON.parse(String(payload))
elapsed_time = data["elapsed_time"]
println("Elapsed time: $(elapsed_time)")
```

When complete with tasks, be sure to close the connection:

```jl
close_connection(connection)
```

## Acknowledgments

The velocity control is heavily based on infrastructure from [@schmidma](https://github.com/schmidma) and [@lassepe](https://github.com/lassepe).
37 changes: 37 additions & 0 deletions examples/general_connection_example.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using RosSockets
import JSON

function test_time_server()
ip = "192.168.1.135" # ip address of the host of the ROS node
port = 42423 # port to connect on

# open a connection to a TCP server
connection = open_connection(ip, port)

# create command strings that are in the form of JSONs. These strings should
# be formatted as required by the receiving ROS node. Note that some nodes
# may require an end of line character, such as "\n", to terminate the command.
start_cmd = JSON.json(Dict("action" => "start_experiment")) * "\n"
stop_cmd = JSON.json(Dict("action" => "stop_experiment")) * "\n"
get_time_cmd = JSON.json(Dict("action" => "get_time_elapsed")) * "\n"

# send a command to the TCP server
send(connection, start_cmd)

# send commands and wait to receive responses from the server
for _ in 1:10
payload = send_receive(connection, get_time_cmd)
data = JSON.parse(String(payload))
elapsed_time = data["elapsed_time"]
println("Elapsed time: $(elapsed_time)")
sleep(0.5)
end

# send another command to the TCP server
send(connection, stop_cmd)
sleep(1.0)

# Close the connection to the TCP server. This should always be called when
# complete with tasks to ensure graceful shutdown.
close_connection(connection)
end
25 changes: 25 additions & 0 deletions examples/velocity_control_simple_receding_horizon.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
complete_control_sequence = zeros(500)

ip = "192.168.88.128" # ip address of the host of the ROS node
port = 42421 # port to connect on

max_window_duration = 5.0 # maximum length of control sequence (sec)
update_time = 2.0 # duration between each sending of the control sequence (sec)
timestep = 0.1 # duration of each timestep (sec)

max_window_length = Integer(round(max_window_duration/timestep))
sleep_length = Integer(round(update_time/timestep))
complete_control_sequence_length = size(complete_control_sequence)[1]

robot_connection = open_robot_connection(ip, port)

idx = 1
while idx < complete_control_sequence_length
window_length = min(max_window_length, complete_control_sequence_length - idx)
control_sequence = complete_control_sequence[idx:idx+window_length]
send_control_commands(robot_connection, control_sequence)
idx += sleep_length
sleep(update_time)
end

close_robot_connection(robot_connection)
4 changes: 4 additions & 0 deletions src/RosSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module RosSockets
using Sockets
import JSON

include("connection.jl")
export Connection, open_connection, close_connection,
send, send_receive

include("velocity_control.jl")
export open_robot_connection,
send_control_commands,
Expand Down
117 changes: 117 additions & 0 deletions src/connection.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
struct Connection
task::Task
command_channel::Channel{Any}
data_channel::Channel{Any}

function Connection(ip::String, port::Integer)
command_channel = Channel(1)
data_channel = Channel(1)
@info "Connecting to server at $(ip):$(port) ..."
socket = Sockets.connect(ip, port)
task = errormonitor(Threads.@spawn connection_task(socket,
command_channel, data_channel))
connection = new(task, command_channel, data_channel)
return connection
end
end

struct TimeoutError <: Exception end

"""
open_connection(ip::String, port::Integer)
Open a connection to a TCP server and return the `Connection`.
The `ip` must be a string formatted as `"123.123.123.123"`
"""
function open_connection(ip::String, port::Integer)
return Connection(ip, port)
end

function connection_task(socket, command_channel, data_channel)
@info "Connection task spawned"
while true
command, msg = take!(command_channel)
if command === :close
@info "Closing connection ..."
break
elseif command === :send
write(socket, msg)
elseif command === :send_receive
write(socket, msg)
data = readline(socket)
put!(data_channel, data)
end
end
close(socket)
@info "Connection task completed"
end

"""
send(connection::Connection, msg::String)
Send a message.
# Arguments
- `connection`: the `Connection` obtained from `open_connection`.
- `msg`: the message to be sent. Note: some TCP servers may be require the
message be formatted a certain way (such as JSON), and may also require an
end of line character, such as `\\n`, to terminate the message.
"""
function send(connection::Connection, msg::String)
put!(connection.command_channel, (:send, msg))
end

"""
send_receive(connection::Connection, msg::String, timeout::Real = 10.0)
Sends a message and waits for a response, which is then returned. This function
blocks execution while waiting, up to the timeout duration provided. If the
timeout duration elapses without the arrival of data, throws a TimeoutError
exception. Note: the payload which is returned will be in a raw format. To
convert to a string, use `String(payload)`. This string may further converted to
other formats such as JSON.
# Arguments
- `connection`: the `Connection` obtained from `open_connection`.
- `msg`: the message to be sent. Note: some TCP servers may be require the
message be formatted a certain way (such as JSON), and may also require an
end of line character, such as `\\n`, to terminate the message.
- `timeout`: maximum time in seconds to wait for data.
"""
function send_receive(connection::Connection, msg::String, timeout::Real = 10.0)
t = Timer(_ -> timeout_callback(connection), timeout)
payload = nothing
try
put!(connection.command_channel, (:send_receive, msg))
payload = take!(connection.data_channel)
catch e
if typeof(e) != InvalidStateException
close_connection(connection)
rethrow(e)
end
finally
close(t)
end
return payload
end

function timeout_callback(connection::Connection)
close_connection(connection)
@error "Server timed out waiting for data."
throw(TimeoutError())
end

"""
close_connection(connection::Connection)
Close the connection to the TCP server.
"""
function close_connection(connection::Connection)
@info "Stopping server ..."
put!(connection.command_channel, (:close, ""))
wait(connection.task)
close(connection.data_channel)
close(connection.command_channel)
@info "Server stopped"
end
82 changes: 11 additions & 71 deletions src/state_feedback.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mutable struct FeedbackData
struct FeedbackData
position::Vector{Float64}
orientation::Vector{Float64}
linear_vel::Vector{Float64}
Expand All @@ -16,52 +16,15 @@ mutable struct FeedbackData
end
end

mutable struct FeedbackConnection
task::Task
command_channel::Channel{Any}
data_channel::Channel{Any}

function FeedbackConnection(ip::String, port::Integer)
command_channel = Channel(1)
data_channel = Channel(1)
@info "Connecting to feedback server at $(ip):$(port) ..."
socket = Sockets.connect(ip, port)
task = errormonitor(Threads.@spawn feedback_connection_task(socket,
command_channel, data_channel))
feedback_connection = new(task, command_channel, data_channel)
return feedback_connection
end
end

mutable struct TimeoutError <: Exception
end

"""
open_feedback_connection(ip::String, port::Integer)
Open a connection to the ROS node and return the `FeedbackConnection`.
The `ip` must be a string formated as `"123.123.123.123"`
The `ip` must be a string formatted as `"123.123.123.123"`
"""
function open_feedback_connection(ip::String, port::Integer)
return FeedbackConnection(ip, port)
end

function feedback_connection_task(socket, command_channel, data_channel)
@info "Feedback connection task spawned"
while true
command = take!(command_channel)
if command === :close
@info "Closing feedback connection ..."
break
end
msg = """{ "action": "get_feedback_data" }\n"""
write(socket, msg)
data = readline(socket)
put!(data_channel, data)
end
close(socket)
@info "Feedback connection task completed"
return open_connection(ip, port)
end

"""
Expand All @@ -70,7 +33,7 @@ end
Waits for data to arrive from the ROS node and returns a struct of the data with
the following fields: position, orientation, linear_vel, angular_vel. This
function blocks execution while waiting, up to the timout duration provided. If
function blocks execution while waiting, up to the timeout duration provided. If
the timeout duration elapses without the arrival of data, throws a TimeoutError
exception.
Expand All @@ -79,42 +42,19 @@ exception.
`open_feedback_connection`.
- `timeout`: maximum time in seconds to wait for data.
"""
function receive_feedback_data(feedback_connection::FeedbackConnection,
timeout::Real = 10.0)
t = Timer(_ -> timeout_callback(feedback_connection), timeout)
feedback_data = nothing
try
put!(feedback_connection.command_channel, :read)
payload = take!(feedback_connection.data_channel)
data = JSON.parse(String(payload))
feedback_data = FeedbackData(data)
catch e
if typeof(e) != InvalidStateException
close_feedback_connection(feedback_connection)
rethrow(e)
end
finally
close(t)
end
function receive_feedback_data(feedback_connection::Connection, timeout::Real = 10.0)
msg = """{ "action": "get_feedback_data" }\n"""
payload = send_receive(feedback_connection, msg, timeout)
data = JSON.parse(String(payload))
feedback_data = FeedbackData(data)
return feedback_data
end

function timeout_callback(feedback_connection::FeedbackConnection)
close_feedback_connection(feedback_connection)
@error "Feedback server timed out waiting for data."
throw(TimeoutError())
end

"""
close_feedback_connection(feedback_connection::FeedbackConnection)
Close the connection with the ROS node.
"""
function close_feedback_connection(feedback_connection::FeedbackConnection)
@info "Stopping feedback server ..."
put!(feedback_connection.command_channel, :close)
wait(feedback_connection.task)
close(feedback_connection.data_channel)
close(feedback_connection.command_channel)
@info "Feedback server stopped"
function close_feedback_connection(feedback_connection::Connection)
close_connection(feedback_connection)
end
Loading

2 comments on commit 5a17709

@jake-levy
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/85962

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.4.0 -m "<description of version>" 5a17709fd4b744e2906c68c807d02f5ebb92f799
git push origin v0.4.0

Also, note the warning: This looks like a new registration that registers version 0.4.0.
Ideally, you should register an initial release with 0.0.1, 0.1.0 or 1.0.0 version numbers
This can be safely ignored. However, if you want to fix this you can do so. Call register() again after making the fix. This will update the Pull request.

Please sign in to comment.