Skip to content

Commit

Permalink
Rework pipelining to avoid kernel buffering
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Saveau <[email protected]>
  • Loading branch information
SUPERCILEX committed Jul 31, 2024
1 parent c1d5566 commit 33435e2
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ fn garbage_collect(
}
}

drain_requests(recv, true, &mut pending_requests)?;
drain_requests(recv, 0, &mut pending_requests)?;
println!("Removed {num_duplicates} duplicate entries.");
}

Expand Down Expand Up @@ -900,12 +900,7 @@ fn migrate_from_gch(
let gch_id = gch_id!();
if translation.len() <= gch_id {
unsafe {
drain_add_requests(
&server,
true,
Some(&mut translation),
&mut pending_adds,
)?;
drain_add_requests(&server, Some(&mut translation), &mut pending_adds)?;
}
}
translation[gch_id]
Expand Down Expand Up @@ -984,7 +979,7 @@ fn migrate_from_gch(
}
}

unsafe { drain_add_requests(server, true, None, &mut pending_adds) }
unsafe { drain_add_requests(server, None, &mut pending_adds) }
}

fn migrate_from_clipboard_indicator(
Expand Down Expand Up @@ -1087,7 +1082,7 @@ fn migrate_from_clipboard_indicator(
}
}

unsafe { drain_add_requests(server, true, None, &mut pending_adds) }
unsafe { drain_add_requests(server, None, &mut pending_adds) }
}

fn migrate_from_gpaste(
Expand Down Expand Up @@ -1236,7 +1231,7 @@ fn migrate_from_gpaste(
}
}

unsafe { drain_add_requests(server, true, None, &mut pending_adds) }
unsafe { drain_add_requests(server, None, &mut pending_adds) }
}

#[allow(clippy::cast_precision_loss)]
Expand Down Expand Up @@ -1568,7 +1563,7 @@ fn migrate_from_ringboard_export(
}
};

unsafe { drain_add_requests(server, true, None, &mut pending_adds) }
unsafe { drain_add_requests(server, None, &mut pending_adds) }
}

fn generate(
Expand Down Expand Up @@ -1610,7 +1605,7 @@ fn generate(
}
}

unsafe { drain_add_requests(server, true, None, &mut pending_adds) }
unsafe { drain_add_requests(server, None, &mut pending_adds) }
}

fn fuzz(
Expand Down Expand Up @@ -1887,7 +1882,7 @@ fn fuzz(
verbose,
)
},
true,
0,
&mut pending_requests[idx],
)?;

Expand Down Expand Up @@ -2022,7 +2017,7 @@ fn fuzz(
verbose,
)
},
true,
0,
pending_requests,
)?;
debug_assert!(pending_ops.is_empty());
Expand All @@ -2047,7 +2042,10 @@ fn pipeline_request(
mut recv: impl FnMut(RecvFlags) -> Result<(), ClientError>,
pending_requests: &mut u32,
) -> Result<(), CliError> {
let mut retry = false;
if *pending_requests >= 8 {
drain_requests(&mut recv, *pending_requests / 2, pending_requests)?;
}

loop {
match send(if *pending_requests == 0 {
SendFlags::empty()
Expand All @@ -2058,8 +2056,7 @@ fn pipeline_request(
if e.kind() == ErrorKind::WouldBlock =>
{
debug_assert!(*pending_requests > 0);
drain_requests(&mut recv, retry, pending_requests)?;
retry = true;
drain_requests(&mut recv, *pending_requests / 2, pending_requests)?;
}
r => {
r?;
Expand All @@ -2073,19 +2070,19 @@ fn pipeline_request(

fn drain_requests(
mut recv: impl FnMut(RecvFlags) -> Result<(), ClientError>,
all: bool,
remaining: u32,
pending_requests: &mut u32,
) -> Result<(), CliError> {
while *pending_requests > 0 {
match recv(if all {
match recv(if *pending_requests > remaining {
RecvFlags::empty()
} else {
RecvFlags::DONTWAIT
}) {
Err(ClientError::Core(CoreError::Io { error: e, .. }))
if e.kind() == ErrorKind::WouldBlock =>
{
debug_assert!(!all);
debug_assert!(*pending_requests <= remaining);
break;
}
r => r?,
Expand Down Expand Up @@ -2131,11 +2128,10 @@ unsafe fn pipeline_add_request(

unsafe fn drain_add_requests(
server: impl AsFd,
all: bool,
translation: Option<&mut Vec<u64>>,
pending_adds: &mut u32,
) -> Result<(), CliError> {
drain_requests(pipelined_add_recv(server, translation), all, pending_adds)
drain_requests(pipelined_add_recv(server, translation), 0, pending_adds)
}

fn generate_random_entry_file(
Expand Down

0 comments on commit 33435e2

Please sign in to comment.