Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix split tunneling packet loss #6640

Merged
merged 8 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Line wrap the file at 100 chars. Th
#### macOS
- Fix intermittent failures to connect with PQ enabled.
- Exclude programs when executed using a relative path from a shell.
- Reduce packet loss when using split tunneling.


## [2024.4] - 2024-07-23
Expand Down
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"talpid-core",
"talpid-dbus",
"talpid-future",
"talpid-net",
"talpid-openvpn",
"talpid-openvpn-plugin",
"talpid-platform-metadata",
Expand Down
1 change: 1 addition & 0 deletions talpid-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tun = { version = "0.5.5", features = ["async"] }
nix = { version = "0.28", features = ["socket"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
talpid-net = { path = "../talpid-net" }

[target.'cfg(windows)'.dependencies]
bitflags = "1.2"
Expand Down
1 change: 1 addition & 0 deletions talpid-core/src/split_tunnel/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ impl State {
let result = tun::create_split_tunnel(
default_interface,
new_vpn_interface.clone(),
route_manager.clone(),
Box::new(move |packet| {
match states.get_process_status(packet.header.pth_pid as u32) {
ExclusionStatus::Excluded => tun::RoutingDecision::DefaultInterface,
Expand Down
141 changes: 132 additions & 9 deletions talpid-core/src/split_tunnel/macos/tun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{
};
use futures::{Stream, StreamExt};
use libc::{AF_INET, AF_INET6};
use nix::net::if_::if_nametoindex;
use pcap::PacketCodec;
use pnet_packet::{
ethernet::{EtherTypes, MutableEthernetPacket},
Expand All @@ -26,6 +27,7 @@ use std::{
net::{Ipv4Addr, Ipv6Addr},
ptr::NonNull,
};
use talpid_routing::RouteManagerHandle;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::broadcast,
Expand Down Expand Up @@ -114,11 +116,15 @@ pub struct SplitTunnelHandle {
/// Task that handles outgoing packets. On completion, it returns a handle for the pktap, as
/// well as the function used to classify packets
egress_task: tokio::task::JoinHandle<Result<EgressResult, Error>>,
/// Task that synchronizes the ST tunnel MTU with the VPN tunnel MTU
mtu_listener: Option<tokio::task::JoinHandle<()>>,
route_manager: RouteManagerHandle,
}

impl SplitTunnelHandle {
pub async fn shutdown(self) -> Result<(), Error> {
pub async fn shutdown(mut self) -> Result<(), Error> {
log::debug!("Shutting down split tunnel");
self.abort_mtu_listener().await;
let _ = self.abort_tx.send(());
let _ = self.ingress_task.await.map_err(|_| Error::StopRedirect)?;
let _ = self.egress_task.await.map_err(|_| Error::StopRedirect)??;
Expand All @@ -131,12 +137,14 @@ impl SplitTunnelHandle {
}

pub async fn set_interfaces(
self,
mut self,
default_interface: DefaultInterface,
vpn_interface: Option<VpnInterface>,
) -> Result<Self, Error> {
let _ = self.abort_tx.send(());

self.abort_mtu_listener().await;

let st_utun = self.ingress_task.await.map_err(|_| Error::StopRedirect)?;

let egress_completion = self.egress_task.await.map_err(|_| Error::StopRedirect)??;
Expand All @@ -146,9 +154,17 @@ impl SplitTunnelHandle {
egress_completion.pktap_stream,
default_interface,
vpn_interface,
self.route_manager,
egress_completion.classify,
)
}

async fn abort_mtu_listener(&mut self) {
if let Some(mtu_listener) = self.mtu_listener.take() {
mtu_listener.abort();
let _ = mtu_listener.await;
}
}
}

/// Create split tunnel device and handle all packets using `classify`. Handle any changes to the
Expand All @@ -161,10 +177,17 @@ impl SplitTunnelHandle {
pub async fn create_split_tunnel(
default_interface: DefaultInterface,
vpn_interface: Option<VpnInterface>,
route_manager: RouteManagerHandle,
classify: ClassifyFn,
) -> Result<SplitTunnelHandle, Error> {
let tun_device = create_utun().await?;
redirect_packets(tun_device, default_interface, vpn_interface, classify)
redirect_packets(
tun_device,
default_interface,
vpn_interface,
route_manager,
classify,
)
}

/// Create a utun device for split tunneling, and configure its IP addresses.
Expand Down Expand Up @@ -207,6 +230,7 @@ fn redirect_packets(
st_tun_device: tun::AsyncDevice,
default_interface: DefaultInterface,
vpn_interface: Option<VpnInterface>,
route_manager: RouteManagerHandle,
classify: ClassifyFn,
) -> Result<SplitTunnelHandle, Error> {
let pktap_stream = capture_outbound_packets(st_tun_device.get_ref().name())?;
Expand All @@ -215,6 +239,7 @@ fn redirect_packets(
Box::pin(pktap_stream),
default_interface,
vpn_interface,
route_manager,
Box::new(classify),
)
}
Expand All @@ -232,8 +257,17 @@ fn redirect_packets_for_pktap_stream(
pktap_stream: PktapStream,
default_interface: DefaultInterface,
vpn_interface: Option<VpnInterface>,
route_manager: RouteManagerHandle,
classify: ClassifyFn,
) -> Result<SplitTunnelHandle, Error> {
let mtu_listener = vpn_interface.as_ref().map(|vpn_interface| {
tokio::spawn(mtu_updater(
st_tun_device.get_ref().name().to_owned(),
vpn_interface.name.clone(),
route_manager.clone(),
))
});

let (default_stream, default_write, read_buffer_size) = open_default_bpf(&default_interface)?;

let st_utun_name = st_tun_device.get_ref().name().to_owned();
Expand Down Expand Up @@ -265,9 +299,62 @@ fn redirect_packets_for_pktap_stream(
abort_tx,
ingress_task,
egress_task,
mtu_listener,
route_manager,
})
}

/// Listen for changes to VPN interface MTU and apply them to the ST utun accordingly
async fn mtu_updater(
st_interface_name: String,
vpn_interface_name: String,
route_manager: RouteManagerHandle,
) {
let vpn_tun_index = match if_nametoindex(vpn_interface_name.as_str()) {
Ok(index) => u16::try_from(index).unwrap(),
Err(error) => {
log::error!("Failed to obtain VPN utun index: {error}");
return;
}
};
let mut current_mtu = match talpid_net::unix::get_mtu(&vpn_interface_name) {
Ok(mtu) => mtu,
Err(error) => {
log::error!("Failed to fetch current VPN tunnel MTU: {error}");
return;
}
};

try_update_mtu(&st_interface_name, current_mtu);

let mut listener = match route_manager.interface_change_listener().await {
Ok(listener) => listener,
Err(error) => {
log::warn!("Failed to start interface listener: {error}");
return;
}
};
while let Some(details) = listener.next().await {
if details.interface_index != vpn_tun_index || details.mtu == current_mtu {
continue;
}
current_mtu = details.mtu;
try_update_mtu(&st_interface_name, current_mtu);
}
}

/// Try to update the MTU of `st_iface_name`, and log if this fails
fn try_update_mtu(st_iface_name: &str, mtu: u16) {
match talpid_net::unix::set_mtu(st_iface_name, mtu) {
Ok(()) => {
log::debug!("ST interface MTU: {mtu}");
}
Err(error) => {
log::error!("Failed to set MTU of {st_iface_name} to {mtu}: {error}");
}
}
}

/// Open a BPF device for the specified default interface. Return a read and write half, and the
/// buffer size.
fn open_default_bpf(
Expand Down Expand Up @@ -440,6 +527,16 @@ fn classify_and_send(
log::error!("dropping invalid IPv4 packet");
return;
};
if let Some(vpn_v4) = vpn_interface.and_then(|iface| iface.0.v4_address) {
let src_ip = ip.get_source();
if src_ip != vpn_v4 && src_ip != addrs.source_ip {
// Drop packet from invalid source
return;
}
} else if ip.get_source() != addrs.source_ip {
// Drop packet from invalid source
return;
}
fix_ipv4_checksums(&mut ip, Some(addrs.source_ip), None);
if let Err(error) = default_write.write(packet.frame.packet()) {
log::error!("Failed to forward to default device: {error}");
Expand All @@ -457,6 +554,16 @@ fn classify_and_send(
log::error!("dropping invalid IPv6 packet");
return;
};
if let Some(vpn_v6) = vpn_interface.and_then(|iface| iface.0.v6_address) {
let src_ip = ip.get_source();
if src_ip != vpn_v6 && src_ip != addrs.source_ip {
// Drop packet from invalid source
return;
}
} else if ip.get_source() != addrs.source_ip {
// Drop packet from invalid source
return;
}
fix_ipv6_checksums(&mut ip, Some(addrs.source_ip), None);
if let Err(error) = default_write.write(packet.frame.packet()) {
log::error!("Failed to forward to default device: {error}");
Expand All @@ -480,9 +587,16 @@ fn classify_and_send(
log::error!("dropping invalid IPv4 packet");
return;
};
if ip.get_source() != addr {
// Drop packet from invalid source
return;
}
fix_ipv4_checksums(&mut ip, Some(addr), None);
if let Err(error) = vpn_write.write(packet.frame.payload()) {
log::error!("Failed to forward to tun device: {error}");
log::trace!(
"Failed to forward to VPN tunnel: {error}, size: {}",
packet.frame.payload().len()
);
}
}
EtherTypes::Ipv6 => {
Expand All @@ -494,9 +608,16 @@ fn classify_and_send(
log::error!("dropping invalid IPv6 packet");
return;
};
if ip.get_source() != addr {
// Drop packet from invalid source
return;
}
fix_ipv6_checksums(&mut ip, Some(addr), None);
if let Err(error) = vpn_write.write(packet.frame.payload()) {
log::error!("Failed to forward to tun device: {error}");
log::trace!(
"Failed to forward to VPN tunnel: {error}, size: {}",
packet.frame.payload().len()
);
}
}
other => log::error!("unknown ethertype: {other}"),
Expand Down Expand Up @@ -690,10 +811,12 @@ fn capture_outbound_packets(
.open()
.map_err(Error::CaptureSplitTunnelDevice)?;

// TODO: This is unsupported on macOS 13 and lower, so we determine the direction using the
// pktap header flags. Once macOS 13 is no longer supported, this can be uncommented.
// cap.direction(pcap::Direction::Out)
// .map_err(Error::SetDirection)?;
// TODO: `Capture::direction` is unsupported on macOS 13 and lower, so we determine the
// direction using the pktap header as well. Once macOS 13 is no longer supported,
// this can be assumed to work. Filtering here appears to be a lot faster.
if let Err(error) = cap.direction(pcap::Direction::Out) {
log::debug!("Failed to set capture direction. Might be on macOS 13: {error}");
}

let cap = cap.setnonblock().map_err(Error::EnableNonblock)?;
let stream = cap
Expand Down
17 changes: 17 additions & 0 deletions talpid-net/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "talpid-net"
description = "Networking helpers"
authors.workspace = true
repository.workspace = true
license.workspace = true
edition.workspace = true
rust-version.workspace = true

[lints]
workspace = true

[target.'cfg(unix)'.dependencies]
libc = "0.2"
talpid-types = { path = "../talpid-types" }
socket2 = { version = "0.5.3", features = ["all"] }
log = { workspace = true }
2 changes: 2 additions & 0 deletions talpid-net/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(unix)]
pub mod unix;
Loading
Loading