From 33435e288deb7903d770578267b793f56ad6c46b Mon Sep 17 00:00:00 2001 From: Alex Saveau Date: Tue, 30 Jul 2024 23:22:12 -0700 Subject: [PATCH] Rework pipelining to avoid kernel buffering Signed-off-by: Alex Saveau --- cli/src/main.rs | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 1059514..8206eaf 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -813,7 +813,7 @@ fn garbage_collect( } } - drain_requests(recv, true, &mut pending_requests)?; + drain_requests(recv, 0, &mut pending_requests)?; println!("Removed {num_duplicates} duplicate entries."); } @@ -900,12 +900,7 @@ fn migrate_from_gch( let gch_id = gch_id!(); if translation.len() <= gch_id { unsafe { - drain_add_requests( - &server, - true, - Some(&mut translation), - &mut pending_adds, - )?; + drain_add_requests(&server, Some(&mut translation), &mut pending_adds)?; } } translation[gch_id] @@ -984,7 +979,7 @@ fn migrate_from_gch( } } - unsafe { drain_add_requests(server, true, None, &mut pending_adds) } + unsafe { drain_add_requests(server, None, &mut pending_adds) } } fn migrate_from_clipboard_indicator( @@ -1087,7 +1082,7 @@ fn migrate_from_clipboard_indicator( } } - unsafe { drain_add_requests(server, true, None, &mut pending_adds) } + unsafe { drain_add_requests(server, None, &mut pending_adds) } } fn migrate_from_gpaste( @@ -1236,7 +1231,7 @@ fn migrate_from_gpaste( } } - unsafe { drain_add_requests(server, true, None, &mut pending_adds) } + unsafe { drain_add_requests(server, None, &mut pending_adds) } } #[allow(clippy::cast_precision_loss)] @@ -1568,7 +1563,7 @@ fn migrate_from_ringboard_export( } }; - unsafe { drain_add_requests(server, true, None, &mut pending_adds) } + unsafe { drain_add_requests(server, None, &mut pending_adds) } } fn generate( @@ -1610,7 +1605,7 @@ fn generate( } } - unsafe { drain_add_requests(server, true, None, &mut pending_adds) } + unsafe { drain_add_requests(server, None, &mut pending_adds) } } fn fuzz( @@ -1887,7 +1882,7 @@ fn fuzz( verbose, ) }, - true, + 0, &mut pending_requests[idx], )?; @@ -2022,7 +2017,7 @@ fn fuzz( verbose, ) }, - true, + 0, pending_requests, )?; debug_assert!(pending_ops.is_empty()); @@ -2047,7 +2042,10 @@ fn pipeline_request( mut recv: impl FnMut(RecvFlags) -> Result<(), ClientError>, pending_requests: &mut u32, ) -> Result<(), CliError> { - let mut retry = false; + if *pending_requests >= 8 { + drain_requests(&mut recv, *pending_requests / 2, pending_requests)?; + } + loop { match send(if *pending_requests == 0 { SendFlags::empty() @@ -2058,8 +2056,7 @@ fn pipeline_request( if e.kind() == ErrorKind::WouldBlock => { debug_assert!(*pending_requests > 0); - drain_requests(&mut recv, retry, pending_requests)?; - retry = true; + drain_requests(&mut recv, *pending_requests / 2, pending_requests)?; } r => { r?; @@ -2073,11 +2070,11 @@ fn pipeline_request( fn drain_requests( mut recv: impl FnMut(RecvFlags) -> Result<(), ClientError>, - all: bool, + remaining: u32, pending_requests: &mut u32, ) -> Result<(), CliError> { while *pending_requests > 0 { - match recv(if all { + match recv(if *pending_requests > remaining { RecvFlags::empty() } else { RecvFlags::DONTWAIT @@ -2085,7 +2082,7 @@ fn drain_requests( Err(ClientError::Core(CoreError::Io { error: e, .. })) if e.kind() == ErrorKind::WouldBlock => { - debug_assert!(!all); + debug_assert!(*pending_requests <= remaining); break; } r => r?, @@ -2131,11 +2128,10 @@ unsafe fn pipeline_add_request( unsafe fn drain_add_requests( server: impl AsFd, - all: bool, translation: Option<&mut Vec>, pending_adds: &mut u32, ) -> Result<(), CliError> { - drain_requests(pipelined_add_recv(server, translation), all, pending_adds) + drain_requests(pipelined_add_recv(server, translation), 0, pending_adds) } fn generate_random_entry_file(