Skip to content

Commit

Permalink
making a mess to clean up for IAT durations and lengths - interim commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwample committed May 16, 2024
1 parent a6662b8 commit 9361776
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 93 deletions.
4 changes: 2 additions & 2 deletions crates/obfs4/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Client {

/// On a failed handshake the client will read for the remainder of the
/// handshake timeout and then close the connection.
pub async fn wrap<'a, T>(self, mut stream: T) -> Result<Obfs4Stream<T>>
pub async fn wrap<'a, T>(self, mut stream: T) -> Result<Obfs4Stream>
where
T: AsyncRead + AsyncWrite + Unpin + 'a,
{
Expand All @@ -156,7 +156,7 @@ impl Client {
pub async fn establish<'a, T, E>(
self,
mut stream_fut: Pin<ptrs::FutureResult<T, E>>,
) -> Result<Obfs4Stream<T>>
) -> Result<Obfs4Stream>
where
T: AsyncRead + AsyncWrite + Unpin + 'a,
E: std::error::Error + Send + Sync + 'static,
Expand Down
1 change: 1 addition & 0 deletions crates/obfs4/src/common/drbg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl RngCore for Drbg {
}
}


#[cfg(test)]
mod test {
use super::*;
Expand Down
297 changes: 213 additions & 84 deletions crates/obfs4/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::{
common::{
drbg,
probdist::{self, WeightedDist},
delay, drbg, probdist::{self, WeightedDist}
},
constants::*,
framing,
framing::{self, Message},
sessions::Session,
Error, Result,
};

use bytes::{Buf, BytesMut};
use futures::{Sink, Stream};
use futures::{sink::Sink, stream::Stream};
use pin_project::pin_project;
use ptrs::trace;
use sha2::{Digest, Sha256};
Expand Down Expand Up @@ -85,20 +84,14 @@ impl MaybeTimeout {
}

#[pin_project]
pub struct Obfs4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub struct Obfs4Stream {
// s: Arc<Mutex<O4Stream<'a, T>>>,
#[pin]
s: O4Stream<T>,
s: O4Stream,
}

impl<T> Obfs4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn from_o4(o4: O4Stream<T>) -> Self {
impl Obfs4Stream {
pub(crate) fn from_o4(o4: O4Stream) -> Self {
Obfs4Stream {
// s: Arc::new(Mutex::new(o4)),
s: o4,
Expand All @@ -107,30 +100,34 @@ where
}

#[pin_project]
pub(crate) struct O4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) struct O4Stream {
#[pin]
pub stream: Framed<T, framing::Obfs4Codec>,
// pub stream: Framed<T, framing::Obfs4Codec>,
pub stream: Box<dyn Sink<Messages, Error=()>>,

pub length_dist: probdist::WeightedDist,
pub iat_dist: probdist::WeightedDist,

pub session: Session,
}

impl<T> O4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new(
impl O4Stream {
pub(crate) fn new<T>(
// inner: &'a mut dyn Stream<'a>,
inner: T,
codec: framing::Obfs4Codec,
session: Session,
) -> O4Stream<T> {
let stream = Framed::new(inner, codec);
) -> O4Stream
where
T: AsyncRead + AsyncWrite + Unpin,
{
let stream: Box<dyn Sink<Messages, Error=()>> = match session.get_iat_mode() {
IAT::Off => Box::new(Framed::new(inner, codec)),
IAT::Enabled | IAT::Paranoid => {
let f = Framed::new(inner, codec);
Box::new(delay::DelayedSink::new(f, session.iat_duration_sampler()))
}
};
let len_seed = session.len_seed();

let mut hasher = Sha256::new();
Expand Down Expand Up @@ -169,54 +166,10 @@ where
_ => Ok(()),
}
}

/*// TODO Apply pad_burst logic and IAT policy to packet assembly (probably as part of AsyncRead / AsyncWrite impl)
/// Attempts to pad a burst of data so that the last packet is of the length
/// `to_pad_to`. This can involve creating multiple packets, making this
/// slightly complex.
///
/// TODO: document logic more clearly
pub(crate) fn pad_burst(&self, buf: &mut BytesMut, to_pad_to: usize) -> Result<()> {
let tail_len = buf.len() % framing::MAX_SEGMENT_LENGTH;
let pad_len: usize = if to_pad_to >= tail_len {
to_pad_to - tail_len
} else {
(framing::MAX_SEGMENT_LENGTH - tail_len) + to_pad_to
};
if pad_len > HEADER_LENGTH {
// pad_len > 19
Ok(framing::build_and_marshall(
buf,
MessageTypes::Payload.into(),
vec![],
pad_len - HEADER_LENGTH,
)?)
} else if pad_len > 0 {
framing::build_and_marshall(
buf,
MessageTypes::Payload.into(),
vec![],
framing::MAX_MESSAGE_PAYLOAD_LENGTH,
)?;
// } else {
Ok(framing::build_and_marshall(
buf,
MessageTypes::Payload.into(),
vec![],
pad_len,
)?)
} else {
Ok(())
}
} */
}

impl<T> AsyncWrite for O4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{

impl AsyncWrite for O4Stream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -284,10 +237,7 @@ where
}
}

impl<T> AsyncRead for O4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
impl AsyncRead for O4Stream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -334,10 +284,7 @@ where
}
}

impl<T> AsyncWrite for Obfs4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
impl AsyncWrite for Obfs4Stream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -358,10 +305,7 @@ where
}
}

impl<T> AsyncRead for Obfs4Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
impl AsyncRead for Obfs4Stream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -371,3 +315,188 @@ where
this.s.poll_read(cx, buf)
}
}

impl Sink<Messages> for O4Stream {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
todo!();
}

fn start_send(self: Pin<&mut Self>, item: Messages) -> StdResult<(), Self::Error> {
todo!();
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
todo!();
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
todo!();
}
}


// TODO Apply pad_burst logic and IAT policy to Message assembly (probably as part of AsyncRead / AsyncWrite impl)
/// Attempts to pad a burst of data so that the last [`Message`] is of the length
/// `to_pad_to`. This can involve creating multiple packets, making this
/// slightly complex.
///
/// TODO: document logic more clearly
pub(crate) fn pad_burst(buf: &mut BytesMut, to_pad_to: usize) -> Result<()> {
let tail_len = buf.len() % framing::MAX_SEGMENT_LENGTH;

let pad_len: usize = if to_pad_to >= tail_len {
to_pad_to - tail_len
} else {
(framing::MAX_SEGMENT_LENGTH - tail_len) + to_pad_to
};

if pad_len > HEADER_LENGTH {
// pad_len > 19
Ok(framing::build_and_marshall(
buf,
framing::MessageTypes::Payload.into(),
vec![],
pad_len - HEADER_LENGTH,
)?)
} else if pad_len > 0 {
framing::build_and_marshall(
buf,
framing::MessageTypes::Payload.into(),
vec![],
framing::MAX_MESSAGE_PAYLOAD_LENGTH,
)?;
// } else {
Ok(framing::build_and_marshall(
buf,
framing::MessageTypes::Payload.into(),
vec![],
pad_len,
)?)
} else {
Ok(())
}
}

/*
///
/// Off:
/// pad burst = send max-frame-length frames while available, pad the last with
/// send with no delay
/// [ msg ]
/// [ max-pkt ][ max-pkt ][ max-pkt ][ max-pkt ][pkt]{pad}
/// Enabled:
/// pad burst = send max-frame-length frames while available, pad the last with
/// send with sampled delay
/// [ msg ]
/// [ max-pkt ]... [ max-pkt ]. [ max-pkt ].. [ max-pkt ].... [pkt]{pad}
/// Paranoid:
/// ??
/// send with sampled delay
/// [ msg ]
/// [ max-pkt ]... [ max-pkt ]. [ max-pkt ].. [ max-pkt ].... [pkt]{pad}
fn split_and_pad(iat: IAT) {
// Send maximum sized frames. while they are available
let payload_chunks = b.chunks(MAX_MESSAGE_PAYLOAD_LENGTH);
match iat {
IAT::Off => {}
IAT::Enabled => {}
IAT::Paranoid => {}
}
}
if conn.iatMode != iatParanoid {
// For non-paranoid IAT, pad once per burst. Paranoid IAT handles
// things differently.
if err = conn.padBurst(&frameBuf, conn.lenDist.Sample()); err != nil {
return 0, err
}
}
// Write the pending data onto the network. Partial writes are fatal,
// because the frame encoder state is advanced, and the code doesn't keep
// frameBuf around. In theory, write timeouts and whatnot could be
// supported if this wasn't the case, but that complicates the code.
if conn.iatMode != iatNone {
var iatFrame [framing.MaximumSegmentLength]byte
for frameBuf.Len() > 0 {
iatWrLen := 0
switch conn.iatMode {
case iatEnabled:
// Standard (ScrambleSuit-style) IAT obfuscation optimizes for
// bulk transport and will write ~MTU sized frames when
// possible.
iatWrLen, err = frameBuf.Read(iatFrame[:])
case iatParanoid:
// Paranoid IAT obfuscation throws performance out of the
// window and will sample the length distribution every time a
// write is scheduled.
targetLen := conn.lenDist.Sample()
if frameBuf.Len() < targetLen {
// There's not enough data buffered for the target write,
// so padding must be inserted.
if err = conn.padBurst(&frameBuf, targetLen); err != nil {
return 0, err
}
if frameBuf.Len() != targetLen {
// Ugh, padding came out to a value that required more
// than one frame, this is relatively unlikely so just
// resample since there's enough data to ensure that
// the next sample will be written.
continue
}
}
iatWrLen, err = frameBuf.Read(iatFrame[:targetLen])
}
if err != nil {
return 0, err
} else if iatWrLen == 0 {
panic(fmt.Sprintf("BUG: Write(), iat length was 0"))
}
// Calculate the delay. The delay resolution is 100 usec, leading
// to a maximum delay of 10 msec.
iatDelta := time.Duration(conn.iatDist.Sample() * 100)
// Write then sleep.
_, err = conn.Conn.Write(iatFrame[:iatWrLen])
if err != nil {
return 0, err
}
time.Sleep(iatDelta * time.Microsecond)
}
} else {
_, err = conn.Conn.Write(frameBuf.Bytes())
}
return
}
/*
chopBuf := bytes.NewBuffer(b)
var payload [maxPacketPayloadLength]byte
var frameBuf bytes.Buffer
// Chop the pending data into payload frames.
for chopBuf.Len() > 0 {
rdLen := 0
rdLen, err = chopBuf.Read(payload[:])
if err != nil {
return 0, err
} else if rdLen == 0 {
panic(fmt.Sprintf("BUG: Write(), chopping length was 0"))
}
n += rdLen
err = conn.makePacket(&frameBuf, packetTypePayload, payload[:rdLen], 0)
if err != nil {
return 0, err
}
}
*/
*/
Loading

0 comments on commit 9361776

Please sign in to comment.